New version : Oin Thermostat

This commit is contained in:
Edgar P. Burkhart 2024-12-07 15:22:08 +01:00
parent cb53e91a4d
commit a0e7ea91aa
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227
8 changed files with 363 additions and 455 deletions

View file

@ -0,0 +1,15 @@
import logging
from .mqtt import HAClient
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
client = HAClient(
"climate.chaudiere",
["sensor.esptic_tempo", "sensor.rte_tempo_prochaine_couleur"],
)
client.connect()
client.loop()

118
oin_thermostat/mqtt.py Normal file
View file

@ -0,0 +1,118 @@
import json
import logging
import paho.mqtt.client as mqtt
from .screen import Screen
from .select import Selector
logger = logging.getLogger(__name__)
class HAClient:
def __init__(self, entity, secondary_entities=[]):
self.entity = entity
self.secondary_entities = secondary_entities
self.state_topic = "oin/state"
self.availability_topic = "oin/availability"
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.client.username_pw_set(username="oin", password="n+Bi58l7LxbH5nEJ")
self.screen = Screen()
self.selector = Selector(self.send_data)
@property
def ha_options(self):
return {
"dev": {
"ids": "oin",
"name": "Oin",
},
"o": {
"name": "Oin",
},
"availability_topic": self.availability_topic,
"state_topic": self.state_topic,
"cmps": self.selector.ha_options,
}
def connect(self):
logger.debug("Connecting to HA...")
self.client.will_set(self.availability_topic, "offline", retain=True)
self.client.connect("homeassistant.local", 1883, 60)
self.subscribe(entity_topic(self.entity), self.state_update)
for entity in self.secondary_entities:
self.subscribe(entity_topic(entity, "state"), self.secondary_state_update)
self.publish("homeassistant/device/oin/config", self.ha_options, retain=True)
self.client.publish(self.availability_topic, "online", retain=True)
def publish(self, topic, data, **kwargs):
logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}")
self.client.publish(topic, json.dumps(data), **kwargs)
def subscribe(self, topic, callback):
logger.debug(f"Subscribe to <{topic}>")
self.client.subscribe(topic)
self.client.message_callback_add(topic, callback)
def unsubscribe(self, topic):
logger.debug(f"Unsubscribe from <{topic}>")
self.client.unsubscribe(topic)
def loop(self):
logger.info("Starting MQTT client loop")
self.client.loop_forever()
def state_update(self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage):
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.")
subtopic = message.topic.rsplit("/", maxsplit=1)[1]
match subtopic:
case "current_temperature":
self.screen.value = parse(message)
case "temperature":
if (value := parse(message)) != self.selector.temperature:
self.screen.tmp_value = value
self.selector.temperature = value
case "hvac_action":
self.screen.mode = parse(message)
case "preset_modes":
if (value := parse(message)) != self.selector.preset_modes:
self.selector.preset_modes = value
case "preset_mode":
if (value := parse(message)) != self.selector.mode:
self.selector.mode = value
case "state":
match message.payload.decode():
case "heat":
self.selector.switch = True
case "off":
self.selector.switch = False
def secondary_state_update(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage
):
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.")
_, grp, ent, subtopic = message.topic.split("/")
idx = self.secondary_entities.index(f"{grp}.{ent}")
if subtopic == "state":
self.screen.secondary |= {idx: message.payload.decode()}
def send_data(self, data):
self.publish(self.state_topic, data)
def parse(message):
return json.loads(message.payload.decode())
def entity_topic(entity, subtopic="#"):
topic = entity.replace(".", "/")
return f"homeassistant/{topic}/{subtopic}"

140
oin_thermostat/screen.py Normal file
View file

@ -0,0 +1,140 @@
import logging
import math
from threading import Timer
import bdfparser
from sense_hat import SenseHat
logger = logging.getLogger(__name__)
COLORS = {
"Bleu": [0, 0, 255],
"Blanc": [0, 0, 0],
"Rouge": [255, 0, 0],
"Verte": [0, 127, 31],
"Jaune": [255, 255, 0],
"heat": [255, 0, 0],
"heating": [255, 0, 0],
"idle": [127, 0, 255],
"off": [127, 127, 127],
"on_setting": [255, 255, 0],
"off_setting": [255, 255, 255],
None: [0, 0, 0],
}
class Screen:
def __init__(self):
self.sense = SenseHat()
self._value = ""
self._tmp = False
self._tmp_value = None
self._mode = None
self.font = bdfparser.Font("src/tom-thumb.bdf")
self._secondary = dict()
self._secondary_pixels = [[0, 0, 0]] * 8
self.timer = Timer(0, self.set_pixels)
self._held = False
self.sense.stick.direction_middle = self.stick_click
@property
def value(self):
return self._value
@value.setter
def value(self, value):
logger.debug(f"Updated value: <{value}>")
self._value = format_value(value)
if not self._tmp:
self.set_pixels()
@property
def color(self):
return COLORS.get(self.mode, [0, 0, 0])
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, value):
self._mode = value
if not self._tmp:
self.set_pixels()
@property
def tmp_value(self):
return self._tmp_value
@tmp_value.setter
def tmp_value(self, value):
logger.debug(f"Show value: <{value}>")
self.timer.cancel()
self._tmp_value = format_value(value)
self.show_tmp()
def show_tmp(self):
self._tmp = True
self.set_pixels(
self.tmp_value,
color=COLORS.get("off_setting" if self.mode == "off" else "on_setting"),
)
self.timer = Timer(3, self.set_pixels)
self.timer.start()
def set_pixels(self, value=None, color=None, bg_color=[0, 0, 0]):
if value is None:
value = self.value
self._tmp = False
if color is None:
color = self.color
if value:
pixels = [
color if x else bg_color
for x in self.font.draw(value, mode=0).crop(8, 7).todata(3)
]
else:
pixels = 48 * [[0, 0, 0]]
pixels += self.secondary_pixels
self.sense.set_pixels(pixels)
@property
def secondary(self):
return self._secondary
@property
def secondary_pixels(self):
return self._secondary_pixels
@secondary.setter
def secondary(self, value):
self._secondary = value
for idx in range(2):
self._secondary_pixels[4 * idx : 4 * (idx + 1)] = [
COLORS.get(value.get(idx, None), [0, 0, 0])
] * 4
if not self._tmp:
self.set_pixels()
def stick_click(self, event):
match (event.action, self._held):
case ("held", False):
self._held = True
case ("released", True):
self._held = False
case ("released", False):
self.show_tmp()
def format_value(value):
v = math.trunc(value)
d = "." if (value - v) >= 0.5 else ""
return f"{v}{d}"

90
oin_thermostat/select.py Normal file
View file

@ -0,0 +1,90 @@
import logging
import math
from sense_hat import ACTION_HELD, ACTION_RELEASED, SenseHat
logger = logging.getLogger(__name__)
class Selector:
def __init__(self, send_data):
self.stick = SenseHat().stick
self.temperature = None
self.mode = None
self.switch = None
self.preset_modes = []
self.send_data = send_data
self.switch_held = False
self.default_data = {"temperature": None, "mode": None, "switch": None}
self.stick.direction_middle = self.toggle
self.stick.direction_up = self.increase_temperature
self.stick.direction_down = self.decrease_temperature
self.stick.direction_right = self.next_mode
self.stick.direction_left = self.prev_mode
@property
def ha_options(self):
return {
"temperature": {
"p": "sensor",
"value_template": "{{ value_json.temperature }}",
"unique_id": "oin_temp",
"device_class": "temperature",
"entity_category": "diagnostic",
"icon": "mdi:thermostat",
"unit_of_measurement": "°C",
},
"mode": {
"p": "sensor",
"name": "Mode",
"value_template": "{{ value_json.mode }}",
"unique_id": "oin_mode",
"entity_category": "diagnostic",
"icon": "mdi:thermostat-auto",
},
"switch": {
"p": "sensor",
"name": "Switch",
"value_template": "{{ value_json.switch }}",
"unique_id": "oin_switch",
"entity_category": "diagnostic",
"icon": "mdi:thermostat-auto",
},
}
def increase_temperature(self, event):
if event.action != ACTION_RELEASED and self.temperature is not None:
self.callback(temperature=math.floor(self.temperature * 2) / 2 + 0.5)
def decrease_temperature(self, event):
if event.action != ACTION_RELEASED and self.temperature is not None:
self.callback(temperature=math.ceil(self.temperature * 2) / 2 - 0.5)
def next_mode(self, event):
if event.action != ACTION_RELEASED and self.mode is not None:
self.callback(
mode=self.preset_modes[
(self.preset_modes.index(self.mode) + 1) % len(self.preset_modes)
]
)
def prev_mode(self, event):
if event.action != ACTION_RELEASED and self.mode is not None:
self.callback(
mode=self.preset_modes[
(self.preset_modes.index(self.mode) - 1) % len(self.preset_modes)
]
)
def toggle(self, event):
if not self.switch_held and event.action == ACTION_HELD:
self.switch_held = True
self.callback(switch="off" if self.switch else "heat")
elif self.switch_held and event.action == ACTION_RELEASED:
self.switch_held = False
def callback(self, **kwargs):
self.send_data(self.default_data | kwargs)

View file