#!/usr/bin/env python3 import sys import os import logging import itertools from time import time from datetime import datetime import can import struct from argparse import ArgumentParser VERSION = '1.0' class BatriumBattery(can.Listener): def __init__(self, connection): self.capacity = 2.2 self.maxChargeVoltage = 0 self.numberOfModules = 0 self.chargeComplete = 0 self.soc = 0 self.voltage = 0 self.current = 0 self.temperature = 0 self.maxCellTemperature = 0 self.maxCellTemperatureId = 0 self.minCellTemperature = 0 self.minCellTemperatureId = 0 self.cellVoltages_min = dict() self.cellVoltages_max = dict() self.cellTemperatures = dict() self.cellBypassTemperatures = dict() self.cellBypassPWM = dict() self.maxCellVoltage = 0 self.maxCellVoltageId = 0 self.minCellVoltage = 0 self.minCellVoltageId = 0 self.maxChargeCurrent = 0 self.maxDischargeCurrent = 0 self.updated = -1 self.cyclicModeTask = None self.TimeToEmpty = 0 self.TimeToFull = 0 self.AhToEmpty = 0 self.AhToFull = 0 self._ci = can.interface.Bus(channel=connection, bustype='socketcan') # check connection and that reported system voltage roughly matches configuration found = False msg = None while True: try: msg = self._ci.recv(timeout=10) except can.CanError: logging.error("Canbus error") if msg == None: #timeout no system connected logging.error("No messages on canbus %s received. Check connection and speed." % connection) break; elif msg.arbitration_id == 0x00111500: logging.info("Found Batrium BMS on %s" % connection) self.soc = (msg.data[0] * 0.5) - 5 self.voltage = ((msg.data[3] * 256) + msg.data[2]) * 0.01 self.current = (struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001 self.temperature = msg.data[1] - 40 logging.debug("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.temperature)) found = True break; def on_message_received(self, msg): self.updated = msg.timestamp if msg.arbitration_id == 0x00111500: self.soc = (msg.data[0] * 0.5) - 5 self.voltage = ((msg.data[3] * 256) + msg.data[2]) * 0.01 self.current = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2) self.temperature = msg.data[1] - 40 elif msg.arbitration_id == 0x00111900: logging.debug("Ah 0x00111900 received") self.AhToFull = round((struct.unpack('>f', bytearray([msg.data[3], msg.data[2], msg.data[1], msg.data[0]])))[0] * 0.001, 2) self.AhToEmpty = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2) self.capacity = self.AhToEmpty + self.AhToFull logging.debug("Ah to Full: %2.2f", self.AhToFull) logging.debug("Ah to Empty: %2.2f", self.AhToEmpty) logging.debug("Capacity: %2.2f", self.capacity) elif msg.arbitration_id == 0x00111800: self.TimeToEmpty = ((msg.data[5] * 256) + msg.data[4]) logging.debug("Time to empty: %dmin", self.TimeToEmpty) elif msg.arbitration_id == 0x00111700: self.TimeToFull = ((msg.data[5] * 256) + msg.data[4]) logging.debug("Time to full: %dmin", self.TimeToFull) elif msg.arbitration_id == 0x00140400: self.maxDischargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.01 logging.debug("MaxDischargeCurrent (DCL) %2.2fA", self.maxDischargeCurrent) logging.debug("I: %2.2fA U: %2.2fV",self.current, self.voltage) elif msg.arbitration_id == 0x00140300: self.maxChargeVoltage = ((msg.data[7] * 256) + msg.data[6]) * 0.01 self.maxChargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.01 logging.debug("MaxChargeCurrent (CCL) %2.2fA, MaxChargeVoltage (CVL) %2.2fV", self.maxChargeCurrent, self.maxChargeVoltage) logging.debug("I: %2.2fA U: %2.2fV",self.current, self.voltage) elif msg.arbitration_id == 0x00111200: self.minCellTemperature = msg.data[0] - 40 self.maxCellTemperature = msg.data[1] - 40 self.minCellTemperatureId = msg.data[4] self.maxCellTemperatureId = msg.data[5] logging.debug("MinCellTemp: %d (ID: %d), MaxCellTemp: %d (ID: %d)", self.minCellTemperature, self.minCellTemperatureId, self.maxCellTemperature, self.maxCellTemperatureId) elif msg.arbitration_id == 0x00111100: self.minCellVoltage = ((msg.data[1] * 256) + msg.data[0]) * 0.001 self.maxCellVoltage = ((msg.data[3] * 256) + msg.data[2]) * 0.001 self.minCellVoltageId = msg.data[4] self.maxCellVoltageId = msg.data[5] logging.debug("MinCellVolt %1.3fV (ID: %d), MaxCellVolt %1.3fV (ID: %d)", self.minCellVoltage, self.minCellVoltageId, self.maxCellVoltage, self.maxCellVoltageId) elif ((msg.arbitration_id >= 0x001D1101) and (msg.arbitration_id <= 0x001D11F9)): cell_id = msg.arbitration_id & 0xff #logging.debug("Cell ID: %d, Voltage_min: %2.2f, Voltage_max: %2.2f, Temperature: %d, Bypass_Temperature: %d, BypassPWM: %d", cell_id, ((msg.data[1] * 256) + msg.data[0]) * 0.001, ((msg.data[3] * 256) + msg.data[2]) * 0.001, msg.data[4] - 40, msg.data[5] - 40, msg.data[6]) self.cellVoltages_min[cell_id] = ((msg.data[1] * 256) + msg.data[0]) * 0.001 self.cellVoltages_max[cell_id] = ((msg.data[3] * 256) + msg.data[2]) * 0.001 self.cellTemperatures[cell_id] = msg.data[4] - 40 self.cellBypassTemperatures[cell_id] = msg.data[5] - 40 self.cellBypassPWM[cell_id] = msg.data[6] def prnt(self): print("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.maxCellTemperature)) print("MinCellVolt: %1.2f, MaxCellVolt: %1.2f CellVoltDelta: %1.2f" % (self.minCellVoltage, self.maxCellVoltage, self.maxCellVoltage-self.minCellVoltage)) print("MaxChargeCurrent (CCL): %4.1f A MaxChargeVoltage (CVL): %2.2f V" % (self.maxChargeCurrent, self.maxChargeVoltage)) self.numberOfModules = len(self.cellVoltages_min.keys()) print("Number of modules found: %1.0f" % (self.numberOfModules)) # === All code below is to simply run it from the commandline for debugging purposes === def main(): logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG, stream=sys.stdout, ) logging.info('Starting dbus_batrium_native listener') #logger = can.Logger('logfile.asc') bat = BatriumBattery(connection='can0') listeners = [ #logger, # Regular Listener object bat ] notifier = can.Notifier(bat._ci, listeners) for msg in bat._ci: if msg.arbitration_id == 0x00140100: bat.prnt() # Clean-up notifier.stop() bat._ci.shutdown() if __name__ == "__main__": main()