Skip to content

Instantly share code, notes, and snippets.

@ZsBT
Created November 30, 2022 19:39
Show Gist options
  • Save ZsBT/cf5498950d710ad1a0967cfe133455ea to your computer and use it in GitHub Desktop.
Save ZsBT/cf5498950d710ad1a0967cfe133455ea to your computer and use it in GitHub Desktop.
ha w1k init
"""
Support for W1000 energy portal
Thanks to https://github.com/amargo/ for the login session ideas
"""
import logging
import aiohttp
import voluptuous as vol
import unicodedata
from homeassistant.core import callback, HomeAssistant
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_utc_time_change
from homeassistant.const import (
CONF_SCAN_INTERVAL,
)
import homeassistant.util.dt as dt_util
from bs4 import BeautifulSoup
import requests, yaml, re
from datetime import datetime, timedelta, timezone
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
async_import_statistics
)
_LOGGER = logging.getLogger(__name__)
DOMAIN = "w1000-energy-monitor"
CONF_ENDPOINT = "url"
CONF_USERNAME = "login_user"
CONF_PASSWORD = "login_pass"
CONF_REPORTS = "reports"
CONF_INTERVAL = "scan_interval"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_REPORTS): cv.string,
vol.Optional(CONF_INTERVAL, default=60): cv.positive_int, # minutes
vol.Optional(CONF_ENDPOINT, default="https://energia.eon-hungaria.hu/W1000"): cv.string,
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass, config):
scan_interval = config[DOMAIN][CONF_INTERVAL]
monitor = w1k_Portal(hass, config[DOMAIN][CONF_USERNAME], config[DOMAIN][CONF_PASSWORD], config[DOMAIN][CONF_ENDPOINT], config[DOMAIN][CONF_REPORTS] )
hass.data[DOMAIN] = monitor
now = dt_util.utcnow()
async_track_utc_time_change(
hass,
monitor.update,
minute=range(now.minute % scan_interval, 60, scan_interval),
second=now.second,
)
hass.async_create_task(
discovery.async_load_platform(hass, "sensor", DOMAIN, {}, config)
)
return True
class w1k_API:
def __init__(self, username, password, endpoint, reports):
self.username = username
self.password = password
self.account_url = endpoint+"/Account/Login"
self.profile_data_url = endpoint + "/ProfileData/ProfileData"
self.lastlogin = None
self.reports = [ x.strip() for x in reports.split(",") ]
self.session = None
self.start_values = {'consumption': None, 'production': None}
async def request_data(self, ssl=True):
ret = {}
for report in self.reports:
_LOGGER.debug("reading report "+report)
retitem = await self.read_reportname(report)
ret[report] = retitem[0]
return ret
def mysession(self):
if self.session:
return self.session
jar = aiohttp.CookieJar(unsafe=True)
self.session = aiohttp.ClientSession(cookie_jar=jar)
return self.session
async def login(self, ssl=False):
try:
session = self.mysession()
async with session.get(
url=self.account_url, ssl=ssl
) as resp:
content = (await resp.content.read()).decode("utf8")
status = resp.status
index_content = BeautifulSoup(content, "html.parser")
dome = index_content.select('#pg-login input[name=__RequestVerificationToken]')[0]
self.request_verification_token = dome.get("value")
payload = {
"__RequestVerificationToken": self.request_verification_token,
"UserName": self.username,
"Password": self.password,
}
header = {}
# header["User-Agent"] = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36"
# header["Referer"] = self.account_url
async with session.post(
url=self.account_url, data=payload, ssl=ssl
) as resp:
content = (await resp.content.read()).decode("utf8")
status = resp.status
_LOGGER.debug("resp status http "+str(status) )
match = re.findall( r'W1000.start\((.+)sessionTimeout', content.replace("\n", " ") )
if not status == 200:
_LOGGER.error("Login failed.")
_LOGGER.debug("HTML page: "+content)
_LOGGER.debug("Index page was: "+index_content)
return False
if len(match)==0:
_LOGGER.error("could not find session data. invalid or locked account?")
_LOGGER.debug("HTML page: "+content)
return False
respob = yaml.safe_load(match[0]+"}")
self.currentUser = respob['currentUser']
self.workareas = respob['workareas']
self.lastlogin = datetime.utcnow()
for workarea in self.workareas:
for window in workarea['windows']:
_LOGGER.debug("found report "+window['name']+" in workarea "+workarea['name'] )
return True
except Exception as ex:
availability = 'Offline'
_LOGGER.error("exception at login")
print(datetime.now(), "Error retrive data from {0}.".format(str(ex)))
async def read_reportname(self, reportname: str):
loginerror = False
if not self.lastlogin or self.lastlogin + timedelta(minutes=10) < datetime.utcnow():
loginerror = not await self.login()
if loginerror:
return [None]
for workarea in self.workareas:
for window in workarea['windows']:
if window['name'] == reportname:
return await self.read_reportid( int(window['reportid']), reportname )
_LOGGER.error("report "+reportname+" not found")
return [None]
async def read_reportid(self, reportid: int, reportname: str, ssl=True):
now = datetime.utcnow()
loginerror = False
if not self.lastlogin or self.lastlogin + timedelta(hours=1) < datetime.utcnow():
loginerror = not await self.login()
if loginerror:
return None
since = (now + timedelta(days=-2)).strftime("%Y-%m-%dT23:59:59")
until = (now + timedelta(days=0 )).strftime("%Y-%m-%dT%H:00:00")
params = {
"page": 1,"perPage": 96*3,
"reportId": reportid,
"since": since,
"until": until,
"_": (now - timedelta(hours=3)).strftime("%s557")
}
session = self.mysession()
async with session.get(
url=self.profile_data_url, data=params, ssl=ssl
) as resp:
jsonResponse = await resp.json()
status = resp.status
if status == 200:
lastvalue = None
unit = None
lasttime = None
ret = []
statistic_id = f'sensor.w1000_'+(''.join(ch for ch in unicodedata.normalize('NFKD', reportname) if not unicodedata.combining(ch)))
hourly = {}
# collect hourly sums and total
for curve in jsonResponse:
unit = curve['unit']
hourly_sum = None
_LOGGER.debug(f"curve: {curve['name']}")
name = curve['name']
for data in curve['data']:
if data['status'] > 0:
idx = data['time'][:13]
if not idx in hourly:
hourly[idx] = { 'sum':0, 'state':0 }
if name.endswith("A"):
hourly[idx]['sum'] += data['value']
if '.8.' in name:
hourly[idx]['state'] = data['value']
state = 0
statistics = []
sumsum = 0
for idx in hourly:
# skip unknown state from the beginning
if state + hourly[idx]['state'] == 0:
continue
# create statistic entry
timestamp = idx+":00:00+02:00" #TODO: needs to calculate DST
if hourly[idx]['state'] > 0:
state = hourly[idx]['state']
else:
state += hourly[idx]['sum']
sumsum += hourly[idx]['sum']
if hourly[idx]['sum'] > 0: # TODO: not sure if we can skip an hour when sum is zero.
statistics.append(
StatisticData(
start = datetime.fromisoformat(timestamp).astimezone(),
state = round(state,1),
sum = sumsum
)
)
ret.append( {'curve':curve['name'], 'last_value':state, 'unit':curve['unit'], 'last_time':timestamp} )
metadata = StatisticMetaData(
has_mean = False,
has_sum = True,
name = "w1000 "+reportname,
source = 'recorder',
statistic_id = statistic_id,
unit_of_measurement = curve['unit'],
)
# _LOGGER.debug(metadata)
_LOGGER.debug("import statistics: "+statistic_id+" count: "+str(len(statistics)))
try:
async_import_statistics(self._hass, metadata, statistics)
except Exception as ex:
_LOGGER.warn("exception at async_import_statistics '"+statistic_id+"': "+str(ex))
else:
_LOGGER.warm("error reading repot: http "+str(status) )
_LOGGER.debug( jsonResponse )
return ret
class w1k_Portal(w1k_API):
def __init__(self, hass, username, password, endpoint, reports):
super().__init__(username, password, endpoint, reports)
self._hass = hass
self._data = {}
self._update_listeners = []
def get_data(self, name):
return self._data.get(name)
async def update(self, *args):
json = await self.request_data()
self._data = self._prepare_data(json)
self._notify_listeners()
def _prepare_data(self, json):
out = {}
for report in json:
dta = json[report]
if dta and 'curve' in dta:
if '.8.' in dta['curve']:
state_class = 'total_increasing'
else:
state_class = 'measurement'
out[report] = { 'state': dta['last_value'], 'unit':dta['unit'], 'attributes':{
'curve':dta['curve'],
'generated':dta['last_time'],
'state_class': state_class,
}}
if dta['unit'].endswith('W') or dta['unit'].endswith('Var'):
out[report]['attributes']['device_class'] = 'power'
if dta['unit'].endswith('Wh') or dta['unit'].endswith('Varh'):
out[report]['attributes']['device_class'] = 'energy'
return out
def add_update_listener(self, listener):
self._update_listeners.append(listener)
_LOGGER.debug(f"registered sensor: {listener.entity_id}")
listener.update_callback()
def _notify_listeners(self):
for listener in self._update_listeners:
listener.update_callback()
_LOGGER.debug("Notifying all listeners")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment