import logging import telnetlib from datetime import timedelta, datetime, time from typing import Callable, Any import re from homeassistant.components.sensor import SensorEntity, SensorStateClass, SensorDeviceClass from homeassistant.const import UnitOfTemperature, UnitOfPower, UnitOfElectricPotential, UnitOfElectricCurrent, \ UnitOfEnergy, UnitOfFrequency, UnitOfTime from homeassistant.core import callback from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import dispatcher_send from homeassistant.helpers.event import async_track_time_interval SIGNAL = 'solareco' DOMAIN = 'solareco' _LOGGER = logging.getLogger('solareco') class SolarecoSensorConfig: def __init__(self, name, unit_of_measurement, device_class, data_transformation, icon, state_class=SensorStateClass.MEASUREMENT): self.name = name self.unit_of_measurement = unit_of_measurement self.device_class = device_class self.data_transformation = data_transformation self.icon = icon self.state_class = state_class SENSORS = [ SolarecoSensorConfig('relay', None, None, lambda data: re.findall('AC+\d', data)[0][2:] if re.search('AC+\d',data) else re.findall('R:+\d', data)[0][2:],""), SolarecoSensorConfig('fan', None, None, lambda data: re.findall('F:+\d', data)[0][2:] if re.search('F:+\d',data) else None,"mdi:fan"), SolarecoSensorConfig('required_voltage', UnitOfElectricPotential.VOLT, None, lambda data: re.findall('\d+U', data)[0][:-1],"mdi:alpha-v-circle-outline"), SolarecoSensorConfig('voltage', UnitOfElectricPotential.VOLT, SensorDeviceClass.VOLTAGE, lambda data: re.findall('\d+V', data)[0][:-1],"mdi:alpha-v-circle-outline"), SolarecoSensorConfig('current', UnitOfElectricCurrent.MILLIAMPERE, SensorDeviceClass.CURRENT, lambda data: re.findall('\d+mA', data)[0][:-2],"mdi:current-dc"), SolarecoSensorConfig('power', UnitOfPower.WATT, SensorDeviceClass.POWER, lambda data: re.findall('\d+W', data)[0][:-1],"mdi:alpha-w-circle-outline"), SolarecoSensorConfig('frequency', UnitOfFrequency.HERTZ, SensorDeviceClass.FREQUENCY, lambda data: re.findall('\d+Hz', data)[0][:-2] if re.search('\d+Hz',data) else None,""), SolarecoSensorConfig('cooler_temperature', UnitOfTemperature.CELSIUS, SensorDeviceClass.TEMPERATURE, lambda data: re.findall('\d+C', data)[0][:-1],""), SolarecoSensorConfig('boiler_temperature', UnitOfTemperature.CELSIUS, SensorDeviceClass.TEMPERATURE, lambda data: re.findall('(\d+):\d+C',data)[0],""), SolarecoSensorConfig('pulse_width', UnitOfTime.MICROSECONDS, None, lambda data: re.findall('\d+us', data)[0][:-2],""), SolarecoSensorConfig('total_energy', UnitOfEnergy.WATT_HOUR, SensorDeviceClass.ENERGY, lambda data: re.findall('\d+kWh\s\d+Wh', data)[0].replace(' ','').replace('kWh','').replace('Wh','') if re.search('\d+kWh\s\d+Wh', data) else None,"mdi:alpha-w-circle-outline"), SolarecoSensorConfig('day_energy', UnitOfEnergy.WATT_HOUR, SensorDeviceClass.ENERGY, lambda data: re.findall('\d+Wh', data)[0][:-2],"mdi:alpha-w-circle-outline" ,SensorStateClass.TOTAL_INCREASING), ] async def async_setup_platform(hass, config, async_add_entities, discovery_info=None) -> True: _LOGGER.info(str(config)) sensor_connector = SensorConnector(hass, config['host'], config['port']) # Do first update await hass.async_add_executor_job(sensor_connector.update) # Poll for updates in the background async_track_time_interval( hass, lambda now: sensor_connector.update(), timedelta(seconds=int(config['poll_interval_seconds'])), ) entities: list[SensorEntity] = [] entities.extend([SolarecoSensor(sensor_connector, sensor_config) for sensor_config in SENSORS]) async_add_entities(entities, True) hass.data.setdefault(DOMAIN, {}) class SolarecoSensor(SensorEntity): def __init__(self, sensor_connector, sensor_config: SolarecoSensorConfig): super().__init__() self.sensor_connector = sensor_connector self.sensor_config = sensor_config self._attr_has_entity_name = True self._attr_translation_key = self.sensor_config.name self._state = None self._state_attributes = None self._attr_icon = self.sensor_config.icon async def async_added_to_hass(self) -> None: self.async_on_remove( async_dispatcher_connect(self.hass, SIGNAL, self._async_update_callback) ) @callback def _async_update_callback(self): self._async_update_data() self.async_write_ha_state() @property def unique_id(self): return f"{'solareco'} {self.sensor_config.name}" # @property # def name(self): # return f"{'solareco'} {self.sensor_config.name}" @property def native_value(self): return self._state @property def state_class(self): return self.sensor_config.state_class @property def device_class(self): return self.sensor_config.device_class @property def native_unit_of_measurement(self): return self.sensor_config.unit_of_measurement @callback def _async_update_data(self): self._state = self.sensor_connector.data[self.sensor_config.name] class SensorConnector: def __init__(self, hass, solareco_host, solareco_port): self.hass = hass self.solareco_host = solareco_host self.solareco_port = solareco_port self.data = {SENSORS[i]: None for i in range(0, len(SENSORS))} def update(self): try: with telnetlib.Telnet(self.solareco_host, self.solareco_port) as tn: line = tn.read_until(b'\n').decode('ascii') for i in range(0, len(SENSORS)): sensor = SENSORS[i] self.data[sensor.name] = sensor.data_transformation(line) dispatcher_send(self.hass, SIGNAL) except: _LOGGER.error("Can't connect to SolarEco")