#!/usr/bin/env python3 """ A class to put a battery service on the dbus, according to victron standards, with constantly updating paths. """ from gi.repository import GLib import platform import argparse import logging import sys import os import dbus import itertools import math from time import time from datetime import datetime import struct from argparse import ArgumentParser from batrium import * # Victron energy libraries sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python')) from vedbus import VeDbusService from ve_utils import exit_on_error from settingsdevice import SettingsDevice VERSION = '1.0' def handle_changed_setting(setting, oldvalue, newvalue): logging.debug('setting changed, setting: %s, old: %s, new: %s' % (setting, oldvalue, newvalue)) class DbusBatteryService: def __init__(self, servicename, deviceinstance, productname='Batrium BMS', connection='can0'): self.minUpdateDone = 0 self.secUpdateDone = 0 self.dailyResetDone = 0 self.lastUpdated = 0 self.cell_balanced = 0 self.ChargedEnergy = 0 self.DischargedEnergy = 0 self.maxChargeVoltage = 0 self._bat = BatriumBattery(connection=connection) self.notifier = can.Notifier(self._bat._ci, [self._bat]) try: self._dbusservice = VeDbusService(servicename+'.socketcan_'+connection+'_di'+str(deviceinstance)) except: exit # Create the management objects, as specified in the Venus OS dbus-api document https://github.com/victronenergy/venus/wiki/dbus-api self._dbusservice.add_path('/Mgmt/ProcessName', __file__) self._dbusservice.add_path('/Mgmt/ProcessVersion', VERSION + ' running on Python ' + platform.python_version()) self._dbusservice.add_path('/Mgmt/Connection', connection) # Create the mandatory objects self._dbusservice.add_path('/DeviceInstance', deviceinstance) self._dbusservice.add_path('/ProductId', 0xB038) self._dbusservice.add_path('/ProductName', productname) self._dbusservice.add_path('/FirmwareVersion', 'unknown') self._dbusservice.add_path('/HardwareVersion', 'unknown') self._dbusservice.add_path('/Connected', 0) # Create battery specific objects self._dbusservice.add_path('/Status', 0, writeable=False) self._dbusservice.add_path('/Dc/0/Temperature', None, writeable=True) self._dbusservice.add_path('/Info/BatteryLowVoltage', None, writeable=True) self._dbusservice.add_path('/Alarms/CellImbalance', None, writeable=True) self._dbusservice.add_path('/Alarms/LowVoltage', None, writeable=True) self._dbusservice.add_path('/Alarms/HighVoltage', None, writeable=True) self._dbusservice.add_path('/Alarms/HighDischargeCurrent', None, writeable=True) self._dbusservice.add_path('/Alarms/HighChargeCurrent', None, writeable=True) self._dbusservice.add_path('/Alarms/LowSoc', None, writeable=True) self._dbusservice.add_path('/Alarms/LowTemperature', None, writeable=True) self._dbusservice.add_path('/Alarms/HighTemperature', None, writeable=True) self._dbusservice.add_path('/Balancing', None, writeable=True) self._dbusservice.add_path('/System/HasTemperature', 1) self._dbusservice.add_path('/System/NrOfBatteries', 1) self._dbusservice.add_path('/System/NrOfModulesOnline', None, writeable=True) self._dbusservice.add_path('/System/NrOfModulesOffline', 0) self._dbusservice.add_path('/System/NrOfModulesBlockingDischarge', 0) self._dbusservice.add_path('/System/NrOfModulesBlockingCharge', 0) self._dbusservice.add_path('/System/NrOfBatteriesBalancing', None, writeable=True) self._dbusservice.add_path('/System/BatteriesSeries', None, writeable=True) self._dbusservice.add_path('/System/MinVoltageCellId', None, writeable=True) self._dbusservice.add_path('/System/MaxVoltageCellId', None, writeable=True) self._dbusservice.add_path("/System/MinTemperatureCellId", None, writeable=True) self._dbusservice.add_path("/System/MaxTemperatureCellId", None, writeable=True) self._settings = SettingsDevice( bus=dbus.SystemBus() if (platform.machine() == 'armv7l') else dbus.SessionBus(), supportedSettings={ 'AvgDischarge': ['/Settings/Batrium/AvgerageDischarge', 0.0,0,0], 'TotalAhDrawn': ['/Settings/Batrium/TotalAhDrawn', 0.0,0,0], 'TimeLastFull': ['/Settings/Batrium/TimeLastFull', 0.0,0,0], 'MinCellVoltage': ['/Settings/Batrium/MinCellVoltage', 0.0,0.0,0.0], 'MaxCellVoltage': ['/Settings/Batrium/MaxCellVoltage', 0.0,0.0,0.0], 'TargetChargeVoltage': ['/Settings/Batrium/TargetChargeVoltage', 0.0,0.0,0.0], 'interval': ['/Settings/Batrium/Interval', 200, 200, 200] }, eventCallback=handle_changed_setting) self._summeditems = { '/System/MaxCellVoltage': {'gettext': '%.3F V'}, '/System/MinCellVoltage': {'gettext': '%.3F V'}, '/Dc/0/Voltage': {'gettext': '%.2F V'}, '/Dc/0/Current': {'gettext': '%.2F A'}, '/Dc/0/Power': {'gettext': '%.0F W'}, '/Soc': {'gettext': '%.0F%%'}, '/Soh': {'gettext': '%.0F%%'}, '/History/TotalAhDrawn': {'gettext': '%.0F Ah'}, '/History/DischargedEnergy': {'gettext': '%.2F kWh'}, '/History/ChargedEnergy': {'gettext': '%.2F kWh'}, '/History/AverageDischarge': {'gettext': '%.2F kWh'}, '/TimeToGo': {'gettext': '%.0F s'}, '/ConsumedAmphours': {'gettext': '%.1F Ah'}, '/System/MinCellTemperature': {'gettext': '%.1F °C'}, '/System/MaxCellTemperature': {'gettext': '%.1F °C'}, '/Capacity': {'gettext': '%.1F Ah'}, '/InstalledCapacity': {'gettext': '%.1F Ah'}, '/Info/MaxChargeVoltage': {'gettext': '%.1F V'}, '/Info/MaxChargeCurrent': {'gettext': '%.1F A'}, '/Info/MaxDischargeCurrent': {'gettext': '%.1F A'} } for path in self._summeditems.keys(): self._dbusservice.add_path(path, value=None, gettextcallback=self._gettext) ## Load settings from Venus OS config xml ## self._dbusservice['/History/AverageDischarge'] = self._settings['AvgDischarge'] self._dbusservice['/History/TotalAhDrawn'] = self._settings['TotalAhDrawn'] self._dbusservice.add_path('/History/TimeSinceLastFullCharge', 0) self._dbusservice.add_path('/History/MinCellVoltage', self._settings['MinCellVoltage']) self._dbusservice.add_path('/History/MaxCellVoltage', self._settings['MaxCellVoltage']) ## Set variables I will use later to standard values ## self._dbusservice['/Soh'] = 100 self._dbusservice['/ConsumedAmphours'] = 0 ## Set CCL to 1 amp temporarily, it will overwritte immediately after first message, suppresing BMS missing after script crash ## self._dbusservice['/Info/MaxChargeCurrent'] = 1 ## Set SOC according to Batrium as init values, if Cerbo or script is restarted at 100% SOC if self._bat.soc <= 99: self._dbusservice['/Soc'] = self._bat.soc else: self._dbusservice['/Soc'] = 100 ## Log values written to history/XML ## logging.debug("History cell voltage min: %.3f, max: %.3f, totalAhDrawn: %d", self._settings['MinCellVoltage'], self._settings['MaxCellVoltage'], self._settings['TotalAhDrawn']) GLib.timeout_add( self._settings['interval'], exit_on_error, self._update) def _gettext(self, path, value): item = self._summeditems.get(path) if item is not None: return item['gettext'] % value return str(value) def __del__(self): self._safe_history() logging.info('Stopping dbus-batrium-native integration') def _safe_history(self): logging.debug('Saving history to localsettings') self._settings['AvgDischarge'] = self._dbusservice['/History/AverageDischarge'] self._settings['TotalAhDrawn'] = self._dbusservice['/History/TotalAhDrawn'] self._settings['MinCellVoltage'] = self._dbusservice['/History/MinCellVoltage'] self._settings['MaxCellVoltage'] = self._dbusservice['/History/MaxCellVoltage'] self._settings['TargetChargeVoltage'] = self._dbusservice['/Info/MaxChargeVoltage'] def _daily_stats(self): ## Update daily statistics and reset energy counters if (self._dbusservice['/History/DischargedEnergy'] == 0): return logging.debug("Updating stats, SOC: %d, Discharged: %.2f, Charged: %.2f ",self._bat.soc, self._dbusservice['/History/DischargedEnergy'], self._dbusservice['/History/ChargedEnergy']) self._dbusservice['/History/AverageDischarge'] = (6*self._dbusservice['/History/AverageDischarge'] + self._dbusservice['/History/DischargedEnergy'])/7 #rolling week self._dbusservice['/History/ChargedEnergy'] = 0 self.ChargedEnergy = 0 self._dbusservice['/History/DischargedEnergy'] = 0 self.DischargedEnergy = 0 self.dailyResetDone = datetime.now().day def _update(self): time = datetime.now() self.lastUpdated = time.timestamp() ## Monitor communication ## if (self._bat.updated != -1 and self.lastUpdated == 0) or ((self.lastUpdated - self._bat.updated) < 10): self._dbusservice['/Connected'] = 1 else: self._dbusservice['/Connected'] = 0 ## Call daily statistics and reset energy counters function if datetime.now().hour == 6 and datetime.now().minute == 0 and datetime.now().day != self.dailyResetDone : self._daily_stats() ## Alarms ## if self._bat.alarm_high_temperature: self._dbusservice['/Alarms/HighTemperature'] = 2 else: self._dbusservice['/Alarms/HighTemperature'] = 0 if self._bat.alarm_low_temperature: self._dbusservice['/Alarms/LowTemperature'] = 2 else: self._dbusservice['/Alarms/LowTemperature'] = 0 if self._bat.alarm_low_voltage: self._dbusservice['/Alarms/LowVoltage'] = 2 else: self._dbusservice['/Alarms/LowVoltage'] = 0 if self._bat.alarm_high_voltage: self._dbusservice['/Alarms/HighVoltage'] = 2 else: self._dbusservice['/Alarms/HighVoltage'] = 0 if self._bat.alarm_high_charge_current: self._dbusservice['/Alarms/HighChargeCurrent'] = 2 else: self._dbusservice['/Alarms/HighChargeCurrent'] = 0 if self._bat.alarm_high_discharge_current: self._dbusservice['/Alarms/HighDischargeCurrent'] = 2 else: self._dbusservice['/Alarms/HighDischargeCurrent'] = 0 ## State of Charge, Batrium couting SoC above 100, stay at 99 till all cells are balanced, then set timestamp and SoC to 100 ## if self._bat.soc <= 99: self._dbusservice['/Soc'] = self._bat.soc dt = datetime.now() - datetime.fromtimestamp( float(self._settings['TimeLastFull']) ) self._dbusservice['/History/TimeSinceLastFullCharge'] = (dt.seconds + dt.days * 24 * 3600) elif (self._bat.soc > 99) and (self.cell_balanced): self._dbusservice['/Soc'] = 100 self._dbusservice['/ConsumedAmphours'] = 0 self._settings['TimeLastFull'] = time() ## Cells are balancing or not ## if self._bat.NumberInBypass != 0: self._dbusservice['/Balancing'] = 1 else: self._dbusservice['/Balancing'] = 0 ## Write Batrium values direct to Venus OS registers ## self._dbusservice['/Dc/0/Current'] = self._bat.current self._dbusservice['/Dc/0/Voltage'] = self._bat.voltage power = self._bat.voltage * self._bat.current self._dbusservice['/Dc/0/Power'] = power self._dbusservice['/Dc/0/Temperature'] = self._bat.maxCellTemperature self._dbusservice['/Info/MaxChargeCurrent'] = self._bat.maxChargeCurrent self._dbusservice['/Info/MaxDischargeCurrent'] = self._bat.maxDischargeCurrent self._dbusservice['/System/NrOfModulesOnline'] = self._bat.numberOfModules self._dbusservice['/System/NrOfBatteriesBalancing'] = self._bat.NumberInBypass self._dbusservice['/System/BatteriesSeries'] = self._bat.numberOfModules self._dbusservice['/InstalledCapacity'] = self._bat.capacity self._dbusservice['/Capacity'] = self._bat.capacity ## Set Charge Voltage Limit according to Batrium, if charging is disabled, Batrium sent 0.0V. Stay at last CVL and save it to Venus OS config xml, to workaround if restarted with charge off ## if self._bat.maxChargeVoltage != 0: self._dbusservice['/Info/MaxChargeVoltage'] = self._bat.maxChargeVoltage self.maxChargeVoltage = self._bat.maxChargeVoltage # If all cells are balanced, lower CVL 0.1V to spare cells mons elif self.maxChargeVoltage != 0: self._dbusservice['/Info/MaxChargeVoltage'] = self.maxChargeVoltage - 0.1 elif self.maxChargeVoltage == 0 and self._bat.maxChargeVoltage == 0: self._dbusservice['/Info/MaxChargeVoltage'] = self._settings['TargetChargeVoltage'] ## Calculation every minute ## now = datetime.now().time() if now.minute != self.minUpdateDone: self.minUpdateDone = now.minute ## Display min and max cell temperatures every minute, because of better readability self._dbusservice['/System/MinCellTemperature'] = self._bat.minCellTemperature self._dbusservice['/System/MaxCellTemperature'] = self._bat.maxCellTemperature self._dbusservice['/System/MinTemperatureCellId'] = self._bat.minCellTemperatureId self._dbusservice['/System/MaxTemperatureCellId'] = self._bat.maxCellTemperatureId ## Same for cell voltages and saving to history self._dbusservice['/System/MaxCellVoltage'] = self._bat.maxCellVoltage if (self._bat.maxCellVoltage > self._dbusservice['/History/MaxCellVoltage'] ): self._dbusservice['/History/MaxCellVoltage'] = self._bat.maxCellVoltage logging.debug("New maximum cell voltage: %f", self._bat.maxCellVoltage) self._dbusservice['/System/MinCellVoltage'] = self._bat.minCellVoltage if (0 < self._bat.minCellVoltage < self._dbusservice['/History/MinCellVoltage'] ): self._dbusservice['/History/MinCellVoltage'] = self._bat.minCellVoltage logging.debug("New minimum cell voltage: %f", self._bat.minCellVoltage) self._dbusservice['/System/MaxVoltageCellId'] = self._bat.maxCellVoltageId self._dbusservice['/System/MinVoltageCellId'] = self._bat.minCellVoltageId # Time remaining to full and to empty, show None if under 1 amp and under 5 days left for nicer looking readings if self._bat.current > 1: #charging if self._bat.TimeToFull < 432000: self._dbusservice['/TimeToGo'] = self._bat.TimeToFull else: self._dbusservice['/TimeToGo'] = None elif self._bat.current < -1: #discharging self._dbusservice['/ConsumedAmphours'] += abs(self._bat.current * 0.016667) #Ah self._dbusservice['/History/TotalAhDrawn'] += abs(self._bat.current * 0.016667) #Ah if self._bat.TimeToEmpty < 432000: self._dbusservice['/TimeToGo'] = self._bat.TimeToEmpty else: self._dbusservice['/TimeToGo'] = None else : self._dbusservice['/TimeToGo'] = None ## Check if all cells are balancing if self._bat.NumberInBypass == self._bat.numberOfModules: self.cell_balanced = True elif self._bat.NumberInBypass == 0: self.cell_balanced = False ## Call history save function to write history items to Venus OS config xml self._safe_history() ## Calculation every second ## if now.second != self.secUpdateDone: self.secUpdateDone = now.second # Calculated charged and discharged energy and save it to history, because Batrium not sending if power > 0: self.ChargedEnergy += (power / 3600.0) / 1000 if power < 0: self.DischargedEnergy += (power / 3600.0) / 1000 self._dbusservice['/History/ChargedEnergy'] = abs(self.ChargedEnergy) self._dbusservice['/History/DischargedEnergy'] = abs(self.DischargedEnergy) return True def main(): ## Cli parser to set interface and for debbuging purposes parser = ArgumentParser(description='dbus-batrium-native integration', add_help=True) parser.add_argument('-d', '--debug', help='enable debug logging', action='store_true') parser.add_argument('-i', '--interface', help='CAN interface') parser.add_argument('-p', '--print', help='print only') args = parser.parse_args() logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=(logging.DEBUG if args.debug else logging.INFO), handlers=[ logging.FileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))), logging.StreamHandler() ]) if not args.interface: logging.info('No CAN interface specified, using default can0') args.interface = 'can0' logging.info('Starting dbus-batrium-native integration %s on %s ' % (VERSION, args.interface)) from dbus.mainloop.glib import DBusGMainLoop # Have a mainloop, so we can send/receive asynchronous calls to and from dbus DBusGMainLoop(set_as_default=True) battery_output = DbusBatteryService( servicename='com.victronenergy.battery', connection = args.interface, deviceinstance = 0, ) logging.debug('Connected to dbus, and switching over to GLib.MainLoop() (= event based)') mainloop = GLib.MainLoop() mainloop.run() if __name__ == "__main__": main()