Voici le code brut de fonderie, c’est pas très propre 
la variable EMULATION est réglée sur True, pour passer sur l’interface série raccordée au materiel il faut mettre False.
import serial
from construct import *
from construct import Int16sl, Int16ul, Int32ul, Nibble, If, Computed
from PyQt5.QtWidgets import *
import sys
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from bitstring import BitStream
from typing import *
from datetime import datetime
from enum import Enum
from functools import reduce
EMULATION = True
port_serie = 'COM3'
frameStruct_C2 = Struct(
"msg_id" / Default(Byte, 0xC2),
"frame_length" / Default(Byte, 35),
"consigne" / Default(Int16sl, 500),
"plage_mode" / BitStruct(
"plage" / Default(Nibble, 0b0000),
"mode" / Default(Nibble, 0b0000),
),
"byte5" / Default(Byte, 0),
"legionelle" / Default(Byte, 0),
"mode_secours" / Default(Byte, 0),
"byte8" / Default(Byte, 0),
"byte9" / Default(Byte, 0),
"autorisation_elec" / Default(Byte, 0),
"plage_chaufe_debut_1" / Default(Byte, 0),
"plage_chaufe_duree_1" / Default(Byte, 0),
"plage_chaufe_debut_2" / Default(Byte, 0),
"plage_chaufe_duree_2" / Default(Byte, 0),
"byte15" / Default(Byte, 0),
"byte16" / Default(Byte, 0),
"Defaut ?" / BitStruct(
"byte17_1" / Default(Bit, 0),
"Perte de l'heure" / Default(Bit, 0),
"byte17_3" / Default(Bit, 0),
"byte17_4" / Default(Bit, 0),
"byte17_5" / Default(Bit, 0),
"byte17_6" / Default(Bit, 0),
"byte17_7" / Default(Bit, 0),
"byte17_8" / Default(Bit, 0),
),
"seconde" / Default(Byte, 56),
"date" / ExprAdapter(Default(Int16ul, 545),
decoder = lambda obj, ctx: obj,
encoder = lambda obj, ctx: (datetime.now().year - 2016) << 9 | (datetime.now().month << 5) | datetime.now().day,
),
"annee" / Computed(lambda ctx: (ctx.date >> 9) + 2016 if ctx.date is not None else Byte),
"mois" / Computed(lambda ctx: ctx.date >> 5 & 0xF if ctx.date is not None else Byte),
"jour" / Computed(lambda ctx: ctx.date & 0x1F if ctx.date is not None else Byte),
"minute" / Default(Byte, 47),
"heure" / Default(Byte, 19),
"mode_test_bool" / BitStruct(
"byte23_1" / Default(Bit, 0),
"byte23_2" / Default(Bit, 0),
"byte23_3" / Default(Bit, 0),
"byte23_4" / Default(Bit, 0),
"test_pac_froid_a_verif" / Default(Bit, 0),
"test_pac_chaud_a_verif" / Default(Bit, 0),
"test_mode_pac_elec_a_verif" / Default(Bit, 0),
"byte23_8" / Default(Bit, 0),
),
"byte24" / Default(Byte, 0),
"byte25" / Default(Byte, 0),
"byte26" / Default(Byte, 0),
"byte27" / Default(Byte, 0x03),
"byte28" / Default(Byte, 0),
"byte29" / Default(Byte, 0),
"byte30" / Default(Byte, 0),
"byte31" / Default(Byte, 0),
"byte32" / Default(Byte, 0),
"byte33" / Default(Byte, 0xFF),
"byte34" / Default(Byte, 0xFF),
"byte35" / Default(Byte, 0xFF),
"crc" / Default(Byte, 0x00),
)
frameStruct_43 = Struct(
"msg_id" / Byte,
"frame_length" / Byte,
"p_pac" / Int16ul,
"p_elec" / Int16ul,
"reserve_16bit_1" / Int16ul,
"reserve_16bit_2" / Int16ul,
"reserve_16bit_3" / Int16ul,
"pac_time" / Int32ul,
"elec_time" / Int32ul,
"total_time" / Int32ul,
"conso" / Int32ul,
"reserve_32bit" / Int32ul,
"crc" / Byte,
)
frameStruct_C1 = Struct(
"msg_id" / Byte,
"frame_length" / Byte,
"temp_ballon" / Int16sl,
"temp_condensseur" / Int16sl,
"temp_haut" / Int16sl,
"temp_reserve_1" / Int16sl,
"temp_ext" / Int16sl,
"temp_reserve_2" / Int16sl,
"temp_reserve_3" / Int16sl,
"temp_reserve_4" / Int16sl,
"etat_bool" / BitStruct(
"def_sonde_1_2ballon_condens" / Bit,
"def_sonde_2_2ballon" / Bit,
"byte18_3" / Bit,
"byte18_4" / Bit,
"pac_froid" / Bit,
"pac_ok" / Bit,
"pac_marche" / Bit,
"relais_elec" / Bit,
),
"com_carte_regul" / Byte,
"byte20" / Byte,
"vitesse_compresseur" / Byte,
"Int32ul_1" / Int32ul,
"Int32ul_2" / Int32ul,
"temp_consigne" / Int16ul,
"Int16ul_1" / Int16ul,
"Int16ul_2" / Int16ul,
"Int16ul_3" / Int16ul,
"crc" / Byte,
)
# Constantes pour les types de trame
frames_config: List[Dict[str, Union[int, bytes, Struct]]] = [
{
"frame_id": 0xC2,
"payload_length": 0x23,
"frame_marker": (0xC2, 0x23),
"structure": frameStruct_C2,
},
{
"frame_id": 0x43,
"payload_length": 0x1F,
"frame_marker": (0x43, 0x1F),
"structure": frameStruct_43,
},
{
"frame_id": 0xC1,
"payload_length": 0x25,
"frame_marker": (0xC1, 0x25),
"structure": frameStruct_C1,
},
]
class consigne_enum(Enum):
_3_DOUCHES = 500
_4_DOUCHES = 525
_5_DOUCHES = 545
BOOST = 500
ABSENCE = 200
LEGIONELLE = 620
SECOURS = 650
class plages_enum(Enum):
PAC_24h_ELEC_24h = 0b0000 # 0
PAC_24h_ELEC_HC = 0b0001 # 1
PAC_HC_ELEC_24h = 0b0010 # 2 Non disponible
PAC_HC_ELEC_HC = 0b0011 # 3
PAC_Prog_ELEC_Prog = 0b0100 # 4
class mode_config_enum(Enum):
ABSENCE = 0b0000 # 0
MANUEL_ECO = 0b0001 # 1 # L’appoint électrique démarre seulement en cas d’alarme PAC ou si la PAC est hors plage de températures.
MANUEL = 0b0010 # 2 # L’appoint électrique démarre si la pompe à chaleur ne chauffe pas assez vite ou si la PAC est hors plage de températures.
BOOST = 0b0011 # 3
AUTO = 0b0100 # 4 # L’appoint électrique démarre si la pompe à chaleur ne chauffe pas assez vite ou si la PAC est hors plage de températures.
class config:
def __init__(self):
self.plage = plages_enum.PAC_24h_ELEC_24h
#self.mode = mode_enum.ABSENCE
self.consigne = consigne_enum.ABSENCE
self.time = 0
def update(self):
True
class Frame:
def __init__(self, frames_config: List[Dict[str, Union[int, bytes, Struct]]], rawFrame: bytes):
self.frames_config = frames_config
self.rawFrame = b''
self.rawLength = 0
self.Type = 0
self.dataLength = 0
self.crc = 0x00
self.isValid = False
self.parsed = None
if rawFrame:
self.update(rawFrame)
def update(self, rawFrame: bytes) -> Union[None, dict]:
self.rawFrame = rawFrame
self.rawLength = len(rawFrame)
self.Type = self.rawFrame[0]
self.dataLength = self.rawFrame[1]
self.crc = self.rawFrame[-1]
self.isValid = self.crc_compare() and self.rawLength == self.dataLength + 2
if self.isValid:
self.parse()
else:
print("CRC invalide", self.rawFrame[0])
return self.parsed
def crc_compare(self) -> bool:
crc = reduce(lambda x, y: x ^ y, self.rawFrame[1:-1])
return crc == self.rawFrame[-1]
def crc_calc(self, payload):
self.crc = reduce(lambda x, y: x ^ y, payload[:-1])
return self.crc
def parse(self) -> None:
for frame_info in self.frames_config:
if frame_info["frame_id"] == self.Type:
self.parsed = frame_info["structure"].parse(self.rawFrame)
break
else:
self.parsed = None
return
class ReadBus:
def __init__(self, frames_config: List[Dict[str, Union[int, bytes, Struct]]]):
self.frames_config = frames_config
self.frames_markers = [frame_info["frame_marker"] for frame_info in frames_config]
self.read_bits = 1
self.buffer = bytearray()
self.frame_list: List[Frame] = []
if not EMULATION:
self.init_serial()
def init_serial(self) -> None:
self.serialBus = serial.Serial(port_serie, baudrate=9600, timeout=0)
return
def read(self, frame) -> Union[None, dict]:
if EMULATION:
data = frame
else:
data = self.serialBus.read(self.read_bits)
if data:
self.buffer.extend(data)
if not EMULATION and len(self.buffer) == 1 and self.buffer[0] == b'\xc2':
self.serialBus.write(frame[1:-1])
return None
self.alignBuffer()
if len(self.buffer) >= 40:
self.extract_frame()
if self.frame_list_len() > 0:
return self.frame_list[0].parsed
return None
def frame_list_len(self) -> int:
return len(self.frame_list)
def frame_begin_check(self, marker_1: Union[None, bytes] = None, marker_2: Union[None, bytes] = None) -> bool:
if marker_1 is not None and marker_2 is not None:
return (marker_1, marker_2) in self.frames_markers
else:
return (self.buffer[0], self.buffer[1]) in self.frames_markers
def alignBuffer(self) -> None:
while len(self.buffer) > 2 and not self.frame_begin_check():
#print("Alignement", self.buffer)
self.buffer = self.buffer[1:]
if len(self.buffer) >= 109:
self.read_bits = 1
return
def extract_frame(self) -> None:
i = 0
buff_len = len(self.buffer)
if buff_len < 2:
return
while i < buff_len - 1:
frame_type = self.buffer[i]
data_length = self.buffer[i + 1]
if self.frame_begin_check(frame_type, data_length):
frame_length = data_length + 2
if i + frame_length <= buff_len:
self.ajouter_frame(self.buffer[i:i + frame_length])
self.buffer = self.buffer[i + frame_length :]
if not self.frame_list[-1].isValid:
self.supprimer_frame_la_plus_recente()
self.buffer = self.buffer[1:]
break
break
else:
break
else:
i += 1
return
def ajouter_frame(self, data: bytes) -> None:
frame = Frame(self.frames_config, data)
self.frame_list.append(frame)
def supprimer_frame_la_plus_ancienne(self) -> None:
if len(self.frame_list) > 0:
frame_supprimee = self.frame_list.pop(0)
else:
print("Aucune frame à supprimer, la liste est vide.")
def supprimer_frame_la_plus_recente(self) -> None:
if self.frame_list:
frame_supprimee = self.frame_list.pop()
else:
print("Aucune frame à supprimer, la liste est vide.")
class UI(QMainWindow):
def __init__(self):
super().__init__()
self.init_ui()
self.readBus = ReadBus(frames_config)
self.previous_data: Dict[int, dict] = {frame_info["frame_id"]: {} for frame_info in frames_config}
self.timer = QTimer(self)
self.timer.timeout.connect(self.add_data_to_table)
self.timer.start(1)
def init_ui(self) -> None:
self.send_data_fields = 0
self.mode = mode_config_enum.BOOST.value
self.central_widget = QTabWidget()
self.setCentralWidget(self.central_widget)
self.tab_reception = QWidget()
self.tab_envoi = QWidget()
self.central_widget.addTab(self.tab_reception, "Réception de données")
self.central_widget.addTab(self.tab_envoi, "Envoi de données")
self.layout_reception = QHBoxLayout()
self.layout_envoi = QHBoxLayout()
self.tab_reception.setLayout(self.layout_reception)
self.tab_envoi.setLayout(self.layout_envoi)
self.frames_ids = [frame_info["frame_id"] for frame_info in frames_config]
self.tables_list: Dict[QTableWidget] = {}
for frame_info in frames_config:
frame_id = frame_info["frame_id"]
self.tables_list[frame_id] = self.create_table()
self.layout_reception.addWidget(self.tables_list[frame_id])
self.layout_envoi.addWidget(self.create_send_data_widgets()) # Ajouter les champs d'envoi de données
def create_send_data_widgets(self):
send_data_widget = QWidget()
layout = QFormLayout() # Utilisez un QFormLayout pour placer les champs en face des noms
send_data_widget.setLayout(layout)
field_input1 = QLineEdit()
layout.addRow(QLabel("consigne:"), field_input1)
mode_combo = QComboBox()
mode_combo.addItem("ABSENCE")
mode_combo.addItem("MANUEL_ECO")
mode_combo.addItem("MANUEL")
mode_combo.addItem("BOOST")
mode_combo.addItem("AUTO")
mode_combo.currentIndexChanged.connect(self.on_mode_selection)
layout.addRow(QLabel("mode:"), mode_combo)
print(self.send_data_fields)
# Créez un bouton d'envoi
send_button = QPushButton("Envoyer")
send_button.clicked.connect(self.send_data) # Connectez-le à une méthode d'envoi
layout.addRow(send_button)
return send_data_widget
def on_mode_selection(self, index):
if index >= 0:
selected_mode = mode_config_enum(index).value
choix_mode = selected_mode
print("Mode sélectionné:", choix_mode)
self.send_data_fields = choix_mode
def send_data(self):
self.mode = self.send_data_fields
def create_table(self) -> QTableWidget:
table = QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Frame Type", "Decimal Value", "Hex Value", "Binary Value"])
font = table.font()
font.setPointSize(8)
table.setFont(font)
table.verticalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
table.verticalHeader().setDefaultSectionSize(20)
table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
table.horizontalHeader().setDefaultSectionSize(100)
table.setRowCount(1)
table.setItem(0, 0, QTableWidgetItem("Variable"))
table.setItem(0, 1, QTableWidgetItem("Decimal Value"))
table.setItem(0, 2, QTableWidgetItem("Hex Value"))
table.setItem(0, 3, QTableWidgetItem("Binary Value"))
return table
def add_data_to_table(self) -> None:
if EMULATION:
data = self.readBus.read(self.serialize())
else :
data = self.readBus.read(False)
#data = False
if data:
self.readBus.supprimer_frame_la_plus_ancienne()
frame_type = data["msg_id"]
table: QTableWidget = None
for frame_info in frames_config:
if frame_info["frame_id"] == frame_type:
table = self.tables_list[frame_type]
break
else:
print(f"Table for frame_type {frame_type} not found")
return
row_position = 0
for field, value in list(data.items())[1:]:
is_container = isinstance(value, Container)
valQTable = self.bitToInt(value) if is_container else value
#valQTable = 255 if is_container else value
fieldColumn, intColumn, hexColumn, bitColumn = 0, 1, 2, 3
table.setRowCount(row_position + 1)
table.setItem(row_position, fieldColumn, QTableWidgetItem(field))
table.setItem(row_position, intColumn, QTableWidgetItem(str(valQTable)))
table.setItem(row_position, hexColumn, QTableWidgetItem(self.hexPrint(valQTable)))
table.setItem(row_position, bitColumn, QTableWidgetItem(self.binPrint(valQTable)))
if is_container:
row_position += 1
fieldColumn, intColumn = 1, 2
valQTable = value
for field, value in list(value.items())[1:]:
valQTable = value
table.setRowCount(row_position + 1)
table.setItem(row_position, fieldColumn, QTableWidgetItem(str(field)))
table.setItem(row_position, intColumn, QTableWidgetItem(str(valQTable)))
if self.data_changed(frame_type, field, value):
self.updateHighlight(table, row_position, intColumn)
row_position += 1
else :
if self.data_changed(frame_type, field, value):
self.updateHighlight(table, row_position, intColumn)
row_position += 1
def serialize(self):
consigne = consigne_enum._4_DOUCHES.value
mode = self.mode
maintenant = datetime.now()
heure = maintenant.hour
minute = maintenant.minute
seconde = maintenant.second
date = (maintenant.year - 2016) << 9 | (maintenant.month << 5) | maintenant.day
send_frame = frameStruct_C2.build(
{
"consigne": consigne,
"plage_mode": {
"mode": mode
},
"heure": heure,
"minute": minute,
"seconde": seconde
}
)
crc = reduce(lambda x, y: x ^ y, send_frame[1:-1])
send_frame = frameStruct_C2.build(
{
"crc": crc,
"consigne": consigne,
"plage_mode": {
"mode": mode
},
"heure": heure,
"minute": minute,
"seconde": seconde
}
)
return send_frame
def binPrint(self, value: int) -> str:
return ' '.join(format(value, '016b')[i:i+4] for i in range(0, 16, 4))
def hexPrint(self, value: int) -> str:
hex_string = '{:0{width}X}'.format(value, width=((value.bit_length() + 7) // 8) * 2)
hex_data = ' '.join([hex_string[i:i+2] for i in range(0, len(hex_string), 2)])
return str(hex_data)
def bitToInt(self, bitTable: Container) -> int:
if len(bitTable) == 9:
return int(''.join(map(str, [int(bitvalue) for bitvalue in list(bitTable.values())[1:]])), 2)
elif len(bitTable) == 3:
return int(''.join(map(str, [format(bitvalue, '04b') for bitvalue in list(bitTable.values())[1:]])), 2)
def updateHighlight(self, table: QTableWidget, row: int, column: int) -> None:
table.item(row, column).setBackground(QColor(255, 0, 0))
def data_changed(self, frame_type: int, field: str, value: int) -> bool:
previous_data = self.previous_data[frame_type]
if field in previous_data and previous_data[field] != value and not field.endswith("_bool"):
return True
else:
self.previous_data[frame_type][field] = value
return False
if __name__ == '__main__':
app = QApplication(sys.argv)
window = UI()
window.show()
sys.exit(app.exec_())