VictronGX_Custom_Sensors/dbus_batrium_native/batrium.py

159 lines
6.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import logging
import itertools
from time import time
from datetime import datetime
import can
import struct
from argparse import ArgumentParser
VERSION = '1.0'
class BatriumBattery(can.Listener):
def __init__(self, capacity, connection):
self.capacity = capacity
self.maxChargeVoltage = 0
self.numberOfModules = 3
self.chargeComplete = 0
self.soc = 0
self.voltage = 0
self.current = 0
self.temperature = 0
self.maxCellTemperature = 0
self.minCellTemperature = 0
self.cellVoltages_min =[(0,0,0,0) for i in range(self.numberOfModules)]
self.cellVoltages_max =[(0,0,0,0) for i in range(self.numberOfModules)]
self.cellTemperatures =[(0,0,0,0) for i in range(self.numberOfModules)]
self.cellBypassTemperatures =[(0,0,0,0) for i in range(self.numberOfModules)]
self.cellBypassPWM =[(0,0,0,0) for i in range(self.numberOfModules)]
self.maxCellVoltage = 0
self.maxCellVoltageId = 0
self.minCellVoltage = 0
self.minCellVoltageId = 0
self.maxChargeCurrent = 0
self.maxDischargeCurrent = 0
self.firmwareVersion = '2.17.39'
self.updated = -1
self.cyclicModeTask = None
self.TimeToEmpty = 0
self.TimeToFull = 0
self._ci = can.interface.Bus(channel=connection, bustype='socketcan')
# check connection and that reported system voltage roughly matches configuration
found = False
msg = None
while True:
try:
msg = self._ci.recv(timeout=10)
except can.CanError:
logging.error("Canbus error")
if msg == None:
#timeout no system connected
logging.error("No messages on canbus %s received. Check connection and speed." % connection)
break;
elif msg.arbitration_id == 0x00111500:
logging.info("Found Batrium BMS on %s" % connection)
self.soc = (msg.data[0] * 0.5) - 5
self.voltage = ((msg.data[3] * 256) + msg.data[2]) * 0.01
self.current = (struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001
self.temperature = msg.data[1] - 40
logging.debug("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.temperature))
found = True
break;
def on_message_received(self, msg):
self.updated = msg.timestamp
logging.debug("CAN message arrived")
if msg.arbitration_id == 0x00111500:
self.soc = (msg.data[0] * 0.5) - 5
self.voltage = ((msg.data[3] * 256) + msg.data[2]) * 0.01
self.current = round((struct.unpack('>f', bytearray([msg.data[7], msg.data[6], msg.data[5], msg.data[4]])))[0] * 0.001, 2)
self.temperature = msg.data[1] - 40
elif msg.arbitration_id == 0x00111800:
self.TimeToEmpty = ((msg.data[2] * 256) + msg.data[1])
elif msg.arbitration_id == 0x00111700:
self.TimeToFull = ((msg.data[2] * 256) + msg.data[1])
elif msg.arbitration_id == 0x00140400:
self.maxDischargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.1
logging.debug("Idmax %dA", self.maxDischargeCurrent)
logging.debug("I: %dA U: %dV",self.current, self.voltage)
elif msg.arbitration_id == 0x00140300:
self.maxChargeVoltage = ((msg.data[7] * 256) + msg.data[6]) * 0.01
self.maxChargeCurrent = ((msg.data[3] * 256) + msg.data[2]) * 0.01
logging.debug("Icmax %dA, Ivmax %dA", self.maxChargeCurrent, self.maxChargeVoltage)
logging.debug("I: %dA U: %dV",self.current, self.voltage)
elif msg.arbitration_id == 0x00161700:
self.minCellTemperature = msg.data[0] - 40
self.maxCellTemperature = msg.data[1] - 40
logging.debug("Min Cell Temp: %d, Max Cell Temp: %d", self.minCellTemperature, self.maxCellTemperature)
elif msg.arbitration_id == 0x00161100:
self.minCellVoltage = ((msg.data[1] * 256) + msg.data[0]) * 0.001
self.maxCellVoltage = ((msg.data[3] * 256) + msg.data[2]) * 0.001
self.minCellVoltageId = msg.data[4]
self.maxCellVoltageId = msg.data[5]
logging.debug("Umin %1.3fV Umax %1.3fV", self.minCellVoltage, self.maxCellVoltage)
elif ((msg.arbitration_id >= 0x001D1101) and (msg.arbitration_id <= 0x001D11F9)):
cell_id = msg.arbitration_id & 0xff
logging.debug("Cell ID: %d", cell_id)
#self.cellVoltages_min[cell_id] = ((msg.data[1] * 256) + msg.data[0]) * 0.001
#self.cellVoltages_max[cell_id] = ((msg.data[3] * 256) + msg.data[2]) * 0.001
#self.cellTemperatures[cell_id] = msg.data[4] - 40
#self.cellBypassTemperatures[cell_id] = msg.data[5] - 40
#self.cellBypassPWM[cell_id] = msg.data[6]
def prnt(self):
print("SOC: %2d, I: %2.2fA, U: %2.2fV, T:%2.1fC" % (self.soc, self.current, self.voltage, self.maxCellTemperature))
print("Umin: %1.2f, Umax: %1.2f Udelta: %1.2f" % (self.minCellVoltage, self.maxCellVoltage, self.maxCellVoltage-self.minCellVoltage))
print("CCL: %4.1f A CVL: %2.2f V" % (self.maxChargeCurrent, self.maxChargeVoltage))
# === All code below is to simply run it from the commandline for debugging purposes ===
def main():
logging.info('Starting dbus_batrium_native listener')
#logger = can.Logger('logfile.asc')
bat = BatriumBattery(capacity=300, connection='can0')
listeners = [
#logger, # Regular Listener object
bat
]
notifier = can.Notifier(bat._ci, listeners)
for msg in bat._ci:
if msg.arbitration_id == 0x00140100:
bat.prnt()
# Clean-up
notifier.stop()
bat._ci.shutdown()
logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=(logging.DEBUG),
handlers=[
logging.FileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))),
logging.StreamHandler()
])
if __name__ == "__main__":
main()