#!/usr/bin/env python3 import sys import os import logging import itertools from time import time from datetime import datetime import can import struct from argparse import ArgumentParser VERSION = '1.0' class BatriumBattery(can.Listener): def __init__(self, connection): self.hw_version = 0 self.sw_version = 0 self.capacity = 2.2 self.maxChargeVoltage = 0 self.numberOfModules = 3 self.soc = 0 self.soh = 0 self.voltage = 0 self.current = 0 self.temperature = 0 self.maxCellTemperature = 0 self.maxCellTemperatureId = 0 self.minCellTemperature = 0 self.minCellTemperatureId = 0 #self.cellVoltages_min = dict() #self.cellVoltages_max = dict() #self.cellTemperatures = dict() #self.cellBypassTemperatures = dict() #self.cellBypassPWM = dict() self.maxCellVoltage = 0 self.maxCellVoltageId = 0 self.minCellVoltage = 0 self.minCellVoltageId = 0 self.maxChargeCurrent = 0 self.maxDischargeCurrent = 0 self.updated = -1 #self.TimeToEmpty = 0 #self.TimeToFull = 0 self.AhToEmpty = 0 #self.AhToFull = 0 self.NumberInBypass = 0 #self.alarm_high_temperature = 0 #self.alarm_high_voltage = 0 #self.alarm_low_temperature = 0 #self.alarm_low_voltage = 0 #self.alarm_low_soc = 0 #self.alarm_high_charge_current = 0 #self.alarm_high_discharge_current = 0 self._ci = can.interface.Bus(channel=connection, bustype='socketcan') # check connection and that reported system voltage roughly matches configuration found = False msg = None while True: try: msg = self._ci.recv(timeout=10) except can.CanError: logging.error("Canbus error") if msg == None: #timeout no system connected logging.error("No messages on canbus %s received. Check connection and speed." % connection) break; elif msg.arbitration_id == 0x01: logging.info("Found Batrium BMS on %s" % connection) found = True break; def on_message_received(self, msg): self.updated = msg.timestamp if msg.arbitration_id == 0x00: self.hw_version = (((msg.data[1] * 256 + (1 << 8)) + msg.data[0] + (1 << 8)) ) self.sw_version = (((msg.data[3] * 256 + (1 << 8)) + msg.data[2] + (1 << 8)) ) self.serial = ((msg.data[7] * 256) + (msg.data[6] * 256) + (msg.data[5] * 256) + msg.data[4]) + (1 << 32) logging.debug("HW Version: %d, SW Version: %d, Serialnumber: %d", self.hw_version, self.sw_version, self.serial) elif msg.arbitration_id == 0x01: self.minCellVoltage = ((msg.data[1] * 256) + msg.data[0]) * 0.001 self.maxCellVoltage = ((msg.data[3] * 256) + msg.data[2]) * 0.001 #self.avgCellVoltage = ((msg.data[5] * 256) + msg.data[4]) * 0.01 self.minCellVoltageId = msg.data[6] self.maxCellVoltageId = msg.data[7] logging.debug("MinCellVolt %1.3fV (ID: %d), MaxCellVolt %1.3fV (ID: %d)", self.minCellVoltage, self.minCellVoltageId, self.maxCellVoltage, self.maxCellVoltageId) elif msg.arbitration_id == 0x02: self.minCellTemperature = msg.data[0] - 40 self.maxCellTemperature = msg.data[1] - 40 self.temperature = msg.data[2] - 40 self.minCellTemperatureId = msg.data[3] self.maxCellTemperatureId = msg.data[4] logging.debug("MinCellTemp: %d (ID: %d), MaxCellTemp: %d (ID: %d), AverageTemp: %d", self.minCellTemperature, self.minCellTemperatureId, self.maxCellTemperature, self.maxCellTemperatureId, self.temperature) elif msg.arbitration_id == 0x03: self.NumberInBypass = msg.data[0] logging.debug("Cell number in bypass: %d", self.NumberInBypass) if msg.arbitration_id == 0x04: logging.debug(msg) self.voltage = ((msg.data[1] * 256) + msg.data[0]) * 0.1 self.current = int(((msg.data[3] * 256) + msg.data[2]) * 0.001) self.power = int(((msg.data[5] * 256) + msg.data[4]) * 0.1) logging.debug("Voltage: %2.2fV, Current: %2.2fA, Power: %2.2fW", self.voltage, self.current, self.power) if msg.arbitration_id == 0x05: self.soc = (msg.data[1] * 256 + msg.data[0]) * 0.01 self.soh = (msg.data[3] * 256 + msg.data[2]) * 0.01 self.AhToEmpty = (msg.data[5] * 256 + msg.data[4]) * 0.1 self.capacity = (msg.data[7] * 256 + msg.data[6]) * 0.1 logging.debug("SOC: %2.2f%%, SOH: %2.2f%%, Ah Remain: %2.2fAh, Capacity: %2.2fAh", self.soc, self.soh, self.AhToEmpty, self.capacity) elif msg.arbitration_id == 0x06: self.maxChargeVoltage = ((msg.data[1] * 256) + msg.data[0]) * 0.1 self.maxChargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.1 #self.maxDischargeVoltage = ((msg.data[5] * 256) + msg.data[4]) * 0.01 self.maxDischargeCurrent = ((msg.data[7] * 256) + msg.data[6]) * 0.1 logging.debug("MaxChargeVoltage (CVL) %2.2fA, MaxChargeCurrent (CCL) %2.2fV, MaxDischargeCurrent %2.2fA", self.maxChargeCurrent, self.maxChargeVoltage, self.maxDischargeCurrent) #elif msg.arbitration_id == 0x00111900: # logging.debug("Ah 0x00111900 received") # message never arrived, batrium fw bug? #self.AhToFull = round((struct.unpack('>f', bytearray([msg.data[3], msg.data[2], msg.data[1], msg.data[0]])))[0] * 0.001, 2) #self.AhToEmpty = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2) #self.capacity = self.AhToEmpty + self.AhToFull #logging.debug("Ah to Full: %2.2f", self.AhToFull) #logging.debug("Ah to Empty: %2.2f", self.AhToEmpty) #logging.debug("Capacity: %2.2f", self.capacity) #elif msg.arbitration_id == 0x00140100: # self.alarm_high_temperature = (msg.data[2] >> 3) & 1 # logging.debug("High Temp Alarm: %d", self.alarm_high_temperature) # self.alarm_high_voltage = (msg.data[2] >> 1) & 1 # logging.debug("High Voltage Alarm: %d", self.alarm_high_voltage) # self.alarm_low_temperature = (msg.data[2] >> 2) & 1 # logging.debug("Low Temperature Alarm: %d", self.alarm_low_temperature) # self.alarm_low_voltage = (msg.data[2] >> 0) & 1 # logging.debug("Low Voltage Alarm: %d", self.alarm_low_voltage) # self.alarm_high_charge_current = (msg.data[3] >> 3) & 1 # logging.debug("High Charge Current Alarm: %d", self.alarm_high_charge_current) # self.alarm_high_discharge_current = (msg.data[3] >> 4) & 1 # logging.debug("High Discharge Current Alarm: %d", self.alarm_high_discharge_current) #elif msg.arbitration_id == 0x00111800: # self.TimeToEmpty = ((msg.data[5] * 256) + msg.data[4] * 60) # logging.debug("Time to empty: %dmin", self.TimeToEmpty) #elif msg.arbitration_id == 0x00111700: # self.TimeToFull = ((msg.data[5] * 256) + msg.data[4] * 60) # logging.debug("Time to full: %dmin", self.TimeToFull) #elif ((msg.arbitration_id >= 0x001D1101) and (msg.arbitration_id <= 0x001D11F9)): # cell_id = msg.arbitration_id & 0xff #logging.debug("Cell ID: %d, Voltage_min: %2.2f, Voltage_max: %2.2f, Temperature: %d, Bypass_Temperature: %d, BypassPWM: %d", cell_id, ((msg.data[1] * 256) + msg.data[0]) * 0.001, ((msg.data[3] * 256) + msg.data[2]) * 0.001, msg.data[4] - 40, msg.data[5] - 40, msg.data[6]) # self.cellVoltages_min[cell_id] = ((msg.data[1] * 256) + msg.data[0]) * 0.001 # self.cellVoltages_max[cell_id] = ((msg.data[3] * 256) + msg.data[2]) * 0.001 # self.cellTemperatures[cell_id] = msg.data[4] - 40 # self.cellBypassTemperatures[cell_id] = msg.data[5] - 40 # self.cellBypassPWM[cell_id] = msg.data[6] # self.numberOfModules = len(self.cellVoltages_min.keys()) def prnt(self): print("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.maxCellTemperature)) print("MinCellVolt: %1.2f, MaxCellVolt: %1.2f CellVoltDelta: %1.2f" % (self.minCellVoltage, self.maxCellVoltage, self.maxCellVoltage-self.minCellVoltage)) print("MaxChargeCurrent (CCL): %4.1f A MaxChargeVoltage (CVL): %2.2f V" % (self.maxChargeCurrent, self.maxChargeVoltage)) print("Number of modules found: %1.0f" % (self.numberOfModules)) # === All code below is to simply run it from the commandline for debugging purposes === def main(): logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG, stream=sys.stdout, ) logging.info('Starting dbus_batrium_native listener') logger = can.Logger('logfile.asc') bat = BatriumBattery(connection='can0') listeners = [ logger, # Regular Listener object bat ] notifier = can.Notifier(bat._ci, listeners) for msg in bat._ci: if msg.arbitration_id == 0x001: bat.prnt() # Clean-up notifier.stop() bat._ci.shutdown() if __name__ == "__main__": main()