2023-05-20 15:44:18 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
2023-05-20 16:32:08 +00:00
|
|
|
import sys
|
|
|
|
import os
|
2023-05-20 15:44:18 +00:00
|
|
|
import logging
|
|
|
|
import itertools
|
|
|
|
|
|
|
|
from time import time
|
|
|
|
from datetime import datetime
|
|
|
|
import can
|
|
|
|
import struct
|
|
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
|
|
VERSION = '1.0'
|
|
|
|
|
|
|
|
class BatriumBattery(can.Listener):
|
2023-05-20 22:36:37 +00:00
|
|
|
def __init__(self, connection):
|
|
|
|
self.capacity = 2.2
|
2023-05-20 15:44:18 +00:00
|
|
|
self.maxChargeVoltage = 0
|
2023-05-20 22:36:37 +00:00
|
|
|
self.numberOfModules = 0
|
2023-05-20 15:44:18 +00:00
|
|
|
self.chargeComplete = 0
|
|
|
|
self.soc = 0
|
|
|
|
self.voltage = 0
|
|
|
|
self.current = 0
|
|
|
|
self.temperature = 0
|
|
|
|
self.maxCellTemperature = 0
|
2023-05-20 22:36:37 +00:00
|
|
|
self.maxCellTemperatureId = 0
|
2023-05-20 15:44:18 +00:00
|
|
|
self.minCellTemperature = 0
|
2023-05-20 22:36:37 +00:00
|
|
|
self.minCellTemperatureId = 0
|
|
|
|
self.cellVoltages_min = dict()
|
|
|
|
self.cellVoltages_max = dict()
|
|
|
|
self.cellTemperatures = dict()
|
|
|
|
self.cellBypassTemperatures = dict()
|
|
|
|
self.cellBypassPWM = dict()
|
2023-05-20 15:44:18 +00:00
|
|
|
self.maxCellVoltage = 0
|
|
|
|
self.maxCellVoltageId = 0
|
|
|
|
self.minCellVoltage = 0
|
|
|
|
self.minCellVoltageId = 0
|
|
|
|
self.maxChargeCurrent = 0
|
|
|
|
self.maxDischargeCurrent = 0
|
|
|
|
self.updated = -1
|
|
|
|
self.cyclicModeTask = None
|
|
|
|
self.TimeToEmpty = 0
|
|
|
|
self.TimeToFull = 0
|
2023-05-20 22:36:37 +00:00
|
|
|
self.AhToEmpty = 0
|
|
|
|
self.AhToFull = 0
|
2023-05-21 02:01:39 +00:00
|
|
|
self.NumberInBypass = 0
|
|
|
|
self.alarm_high_temperature = 0
|
|
|
|
self.alarm_high_voltage = 0
|
|
|
|
self.alarm_low_temperature = 0
|
|
|
|
self.alarm_low_voltage = 0
|
|
|
|
self.alarm_low_soc = 0
|
|
|
|
self.alarm_high_charge_current = 0
|
|
|
|
self.alarm_high_discharge_current = 0
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
self._ci = can.interface.Bus(channel=connection, bustype='socketcan')
|
|
|
|
|
|
|
|
# check connection and that reported system voltage roughly matches configuration
|
|
|
|
found = False
|
|
|
|
msg = None
|
|
|
|
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
msg = self._ci.recv(timeout=10)
|
|
|
|
except can.CanError:
|
|
|
|
logging.error("Canbus error")
|
|
|
|
|
|
|
|
if msg == None:
|
|
|
|
#timeout no system connected
|
|
|
|
logging.error("No messages on canbus %s received. Check connection and speed." % connection)
|
|
|
|
break;
|
|
|
|
|
|
|
|
elif msg.arbitration_id == 0x00111500:
|
|
|
|
logging.info("Found Batrium BMS on %s" % connection)
|
|
|
|
found = True
|
|
|
|
break;
|
|
|
|
|
|
|
|
def on_message_received(self, msg):
|
|
|
|
self.updated = msg.timestamp
|
|
|
|
if msg.arbitration_id == 0x00111500:
|
|
|
|
self.soc = (msg.data[0] * 0.5) - 5
|
|
|
|
self.voltage = ((msg.data[3] * 256) + msg.data[2]) * 0.01
|
|
|
|
self.current = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2)
|
|
|
|
self.temperature = msg.data[1] - 40
|
|
|
|
|
2023-05-20 22:36:37 +00:00
|
|
|
elif msg.arbitration_id == 0x00111900:
|
|
|
|
logging.debug("Ah 0x00111900 received")
|
|
|
|
self.AhToFull = round((struct.unpack('>f', bytearray([msg.data[3], msg.data[2], msg.data[1], msg.data[0]])))[0] * 0.001, 2)
|
|
|
|
self.AhToEmpty = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2)
|
|
|
|
self.capacity = self.AhToEmpty + self.AhToFull
|
|
|
|
logging.debug("Ah to Full: %2.2f", self.AhToFull)
|
|
|
|
logging.debug("Ah to Empty: %2.2f", self.AhToEmpty)
|
|
|
|
logging.debug("Capacity: %2.2f", self.capacity)
|
|
|
|
|
2023-05-21 02:01:39 +00:00
|
|
|
elif msg.arbitration_id == 0x00140100:
|
|
|
|
self.alarm_high_temperature = (msg.data[2] >> 3) & 1
|
|
|
|
logging.debug("High Temp Alarm: %d", self.alarm_high_temperature)
|
|
|
|
self.alarm_high_voltage = (msg.data[2] >> 1) & 1
|
|
|
|
logging.debug("High Voltage Alarm: %d", self.alarm_high_voltage)
|
|
|
|
self.alarm_low_temperature = (msg.data[2] >> 2) & 1
|
|
|
|
logging.debug("Low Temperature Alarm: %d", self.alarm_low_temperature)
|
|
|
|
self.alarm_low_voltage = (msg.data[2] >> 0) & 1
|
|
|
|
logging.debug("Low Voltage Alarm: %d", self.alarm_low_voltage)
|
|
|
|
#self.alarm_low_soc = (msg.data[7] >> 2) & 1
|
|
|
|
logging.debug("Low SOC Alarm: %d", self.alarm_low_soc)
|
|
|
|
self.alarm_high_charge_current = (msg.data[3] >> 3) & 1
|
|
|
|
logging.debug("High Charge Current Alarm: %d", self.alarm_high_charge_current)
|
|
|
|
self.alarm_high_discharge_current = (msg.data[3] >> 4) & 1
|
|
|
|
logging.debug("High Discharge Current Alarm: %d", self.alarm_high_discharge_current)
|
|
|
|
|
|
|
|
elif msg.arbitration_id == 0x00111300:
|
|
|
|
self.NumberInBypass = msg.data[6]
|
|
|
|
logging.debug("Cell number in bypass: %d", self.NumberInBypass)
|
|
|
|
|
2023-05-20 15:44:18 +00:00
|
|
|
elif msg.arbitration_id == 0x00111800:
|
2023-05-20 22:36:37 +00:00
|
|
|
self.TimeToEmpty = ((msg.data[5] * 256) + msg.data[4])
|
|
|
|
logging.debug("Time to empty: %dmin", self.TimeToEmpty)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
elif msg.arbitration_id == 0x00111700:
|
2023-05-20 22:36:37 +00:00
|
|
|
self.TimeToFull = ((msg.data[5] * 256) + msg.data[4])
|
|
|
|
logging.debug("Time to full: %dmin", self.TimeToFull)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
elif msg.arbitration_id == 0x00140400:
|
2023-05-20 22:36:37 +00:00
|
|
|
self.maxDischargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.01
|
|
|
|
logging.debug("MaxDischargeCurrent (DCL) %2.2fA", self.maxDischargeCurrent)
|
|
|
|
logging.debug("I: %2.2fA U: %2.2fV",self.current, self.voltage)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
elif msg.arbitration_id == 0x00140300:
|
|
|
|
self.maxChargeVoltage = ((msg.data[7] * 256) + msg.data[6]) * 0.01
|
|
|
|
self.maxChargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.01
|
2023-05-20 22:36:37 +00:00
|
|
|
logging.debug("MaxChargeCurrent (CCL) %2.2fA, MaxChargeVoltage (CVL) %2.2fV", self.maxChargeCurrent, self.maxChargeVoltage)
|
|
|
|
logging.debug("I: %2.2fA U: %2.2fV",self.current, self.voltage)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
2023-05-20 22:36:37 +00:00
|
|
|
elif msg.arbitration_id == 0x00111200:
|
2023-05-20 15:44:18 +00:00
|
|
|
self.minCellTemperature = msg.data[0] - 40
|
|
|
|
self.maxCellTemperature = msg.data[1] - 40
|
2023-05-21 02:01:39 +00:00
|
|
|
self.minCellTemperatureId = msg.data[2]
|
|
|
|
self.maxCellTemperatureId = msg.data[3]
|
2023-05-20 22:36:37 +00:00
|
|
|
logging.debug("MinCellTemp: %d (ID: %d), MaxCellTemp: %d (ID: %d)", self.minCellTemperature, self.minCellTemperatureId, self.maxCellTemperature, self.maxCellTemperatureId)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
2023-05-20 22:36:37 +00:00
|
|
|
elif msg.arbitration_id == 0x00111100:
|
2023-05-20 15:44:18 +00:00
|
|
|
self.minCellVoltage = ((msg.data[1] * 256) + msg.data[0]) * 0.001
|
|
|
|
self.maxCellVoltage = ((msg.data[3] * 256) + msg.data[2]) * 0.001
|
|
|
|
self.minCellVoltageId = msg.data[4]
|
|
|
|
self.maxCellVoltageId = msg.data[5]
|
2023-05-20 22:36:37 +00:00
|
|
|
logging.debug("MinCellVolt %1.3fV (ID: %d), MaxCellVolt %1.3fV (ID: %d)", self.minCellVoltage, self.minCellVoltageId, self.maxCellVoltage, self.maxCellVoltageId)
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
elif ((msg.arbitration_id >= 0x001D1101) and (msg.arbitration_id <= 0x001D11F9)):
|
|
|
|
cell_id = msg.arbitration_id & 0xff
|
2023-05-20 22:36:37 +00:00
|
|
|
#logging.debug("Cell ID: %d, Voltage_min: %2.2f, Voltage_max: %2.2f, Temperature: %d, Bypass_Temperature: %d, BypassPWM: %d", cell_id, ((msg.data[1] * 256) + msg.data[0]) * 0.001, ((msg.data[3] * 256) + msg.data[2]) * 0.001, msg.data[4] - 40, msg.data[5] - 40, msg.data[6])
|
|
|
|
self.cellVoltages_min[cell_id] = ((msg.data[1] * 256) + msg.data[0]) * 0.001
|
|
|
|
self.cellVoltages_max[cell_id] = ((msg.data[3] * 256) + msg.data[2]) * 0.001
|
|
|
|
self.cellTemperatures[cell_id] = msg.data[4] - 40
|
|
|
|
self.cellBypassTemperatures[cell_id] = msg.data[5] - 40
|
|
|
|
self.cellBypassPWM[cell_id] = msg.data[6]
|
2023-05-21 02:01:39 +00:00
|
|
|
self.numberOfModules = len(self.cellVoltages_min.keys())
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
def prnt(self):
|
|
|
|
print("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.maxCellTemperature))
|
2023-05-20 22:36:37 +00:00
|
|
|
print("MinCellVolt: %1.2f, MaxCellVolt: %1.2f CellVoltDelta: %1.2f" % (self.minCellVoltage, self.maxCellVoltage, self.maxCellVoltage-self.minCellVoltage))
|
|
|
|
print("MaxChargeCurrent (CCL): %4.1f A MaxChargeVoltage (CVL): %2.2f V" % (self.maxChargeCurrent, self.maxChargeVoltage))
|
|
|
|
print("Number of modules found: %1.0f" % (self.numberOfModules))
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
# === All code below is to simply run it from the commandline for debugging purposes ===
|
|
|
|
def main():
|
2023-05-20 16:32:08 +00:00
|
|
|
logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
|
|
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
|
|
level=logging.DEBUG,
|
|
|
|
stream=sys.stdout,
|
|
|
|
)
|
|
|
|
|
2023-05-20 15:44:18 +00:00
|
|
|
logging.info('Starting dbus_batrium_native listener')
|
|
|
|
|
|
|
|
#logger = can.Logger('logfile.asc')
|
|
|
|
|
2023-05-20 22:36:37 +00:00
|
|
|
bat = BatriumBattery(connection='can0')
|
2023-05-20 15:44:18 +00:00
|
|
|
|
|
|
|
listeners = [
|
|
|
|
#logger, # Regular Listener object
|
|
|
|
bat
|
|
|
|
]
|
|
|
|
|
|
|
|
notifier = can.Notifier(bat._ci, listeners)
|
|
|
|
for msg in bat._ci:
|
|
|
|
if msg.arbitration_id == 0x00140100:
|
|
|
|
bat.prnt()
|
|
|
|
|
|
|
|
# Clean-up
|
|
|
|
notifier.stop()
|
|
|
|
bat._ci.shutdown()
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|