VictronGX_Custom_Sensors/dbus-batrium-native/dbus-batrium-native.py

369 lines
19 KiB
Python

#!/usr/bin/env python3
"""
A class to put a battery service on the dbus, according to victron standards, with constantly updating
paths.
"""
from gi.repository import GLib
import platform
import argparse
import logging
import sys
import os
import dbus
import itertools
import math
from time import time
from datetime import datetime
import struct
from argparse import ArgumentParser
from batrium import *
# Victron energy libraries
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
from ve_utils import exit_on_error
from settingsdevice import SettingsDevice
VERSION = '1.5'
def handle_changed_setting(setting, oldvalue, newvalue):
logging.debug('setting changed, setting: %s, old: %s, new: %s' % (setting, oldvalue, newvalue))
class DbusBatteryService:
def __init__(self, servicename, deviceinstance, productname='Batrium BMS', connection='can0'):
self.minUpdateDone = 0
self.secUpdateDone = 0
self.dailyResetDone = 0
self.lastUpdated = 0
self.cell_balanced = 0
self.ChargedEnergy = 0
self.DischargedEnergy = 0
self.maxChargeVoltage = 0
self._bat = BatriumBattery(connection=connection)
self.notifier = can.Notifier(self._bat._ci, [self._bat])
try:
self._dbusservice = VeDbusService(servicename+'.socketcan_'+connection+'_di'+str(deviceinstance))
except:
exit
# Create the management objects, as specified in the Venus OS dbus-api document https://github.com/victronenergy/venus/wiki/dbus-api
self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
self._dbusservice.add_path('/Mgmt/ProcessVersion', VERSION + ' running on Python ' + platform.python_version())
self._dbusservice.add_path('/Mgmt/Connection', connection)
# Create the mandatory objects
self._dbusservice.add_path('/DeviceInstance', deviceinstance)
self._dbusservice.add_path('/ProductId', 0xB038)
self._dbusservice.add_path('/ProductName', productname)
self._dbusservice.add_path('/FirmwareVersion', 'unknown')
self._dbusservice.add_path('/HardwareVersion', 'unknown')
self._dbusservice.add_path('/Connected', 0)
# Create battery specific objects
self._dbusservice.add_path('/Status', 0, writeable=False)
self._dbusservice.add_path('/Dc/0/Temperature', None, writeable=True)
self._dbusservice.add_path('/Info/BatteryLowVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/CellImbalance', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighDischargeCurrent', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighChargeCurrent', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowSoc', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowTemperature', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighTemperature', None, writeable=True)
self._dbusservice.add_path('/Balancing', None, writeable=True)
self._dbusservice.add_path('/System/HasTemperature', 1)
self._dbusservice.add_path('/System/NrOfBatteries', 1)
self._dbusservice.add_path('/System/NrOfModulesOnline', None, writeable=True)
self._dbusservice.add_path('/System/NrOfModulesOffline', 0)
self._dbusservice.add_path('/System/NrOfModulesBlockingDischarge', 0)
self._dbusservice.add_path('/System/NrOfModulesBlockingCharge', 0)
self._dbusservice.add_path('/System/NrOfBatteriesBalancing', None, writeable=True)
self._dbusservice.add_path('/System/BatteriesSeries', None, writeable=True)
self._dbusservice.add_path('/System/MinVoltageCellId', None, writeable=True)
self._dbusservice.add_path('/System/MaxVoltageCellId', None, writeable=True)
self._dbusservice.add_path("/System/MinTemperatureCellId", None, writeable=True)
self._dbusservice.add_path("/System/MaxTemperatureCellId", None, writeable=True)
self._settings = SettingsDevice(
bus=dbus.SystemBus() if (platform.machine() == 'armv7l') else dbus.SessionBus(),
supportedSettings={
'AvgDischarge': ['/Settings/Batrium/AvgerageDischarge', 0.0,0,0],
'TotalAhDrawn': ['/Settings/Batrium/TotalAhDrawn', 0.0,0,0],
'TimeLastFull': ['/Settings/Batrium/TimeLastFull', 0.0,0,0],
'MinCellVoltage': ['/Settings/Batrium/MinCellVoltage', 0.0,0.0,0.0],
'MaxCellVoltage': ['/Settings/Batrium/MaxCellVoltage', 0.0,0.0,0.0],
'TargetChargeVoltage': ['/Settings/Batrium/TargetChargeVoltage', 0.0,0.0,0.0],
'interval': ['/Settings/Batrium/Interval', 200, 200, 200]
},
eventCallback=handle_changed_setting)
self._summeditems = {
'/System/MaxCellVoltage': {'gettext': '%.3F V'},
'/System/MinCellVoltage': {'gettext': '%.3F V'},
'/Dc/0/Voltage': {'gettext': '%.2F V'},
'/Dc/0/Current': {'gettext': '%.2F A'},
'/Dc/0/Power': {'gettext': '%.0F W'},
'/Soc': {'gettext': '%.0F%%'},
'/Soh': {'gettext': '%.0F%%'},
'/History/TotalAhDrawn': {'gettext': '%.0F Ah'},
'/History/DischargedEnergy': {'gettext': '%.2F kWh'},
'/History/ChargedEnergy': {'gettext': '%.2F kWh'},
'/History/AverageDischarge': {'gettext': '%.2F kWh'},
'/TimeToGo': {'gettext': '%.0F s'},
'/ConsumedAmphours': {'gettext': '%.1F Ah'},
'/System/MinCellTemperature': {'gettext': '%.1F °C'},
'/System/MaxCellTemperature': {'gettext': '%.1F °C'},
'/Capacity': {'gettext': '%.1F Ah'},
'/InstalledCapacity': {'gettext': '%.1F Ah'},
'/Info/MaxChargeVoltage': {'gettext': '%.1F V'},
'/Info/MaxChargeCurrent': {'gettext': '%.1F A'},
'/Info/MaxDischargeCurrent': {'gettext': '%.1F A'}
}
for path in self._summeditems.keys():
self._dbusservice.add_path(path, value=None, gettextcallback=self._gettext)
## Load settings from Venus OS config xml ##
self._dbusservice['/History/AverageDischarge'] = self._settings['AvgDischarge']
self._dbusservice['/History/TotalAhDrawn'] = self._settings['TotalAhDrawn']
self._dbusservice.add_path('/History/TimeSinceLastFullCharge', 0)
self._dbusservice.add_path('/History/MinCellVoltage', self._settings['MinCellVoltage'])
self._dbusservice.add_path('/History/MaxCellVoltage', self._settings['MaxCellVoltage'])
## Set variables I will use later to standard values ##
self._dbusservice['/Soh'] = 100
self._dbusservice['/ConsumedAmphours'] = 0
## Set CCL to 1 amp temporarily, it will overwritte immediately after first message, suppresing BMS missing after script crash ##
self._dbusservice['/Info/MaxChargeCurrent'] = 1
## Set SOC according to Batrium as init values, if Cerbo or script is restarted at 100% SOC
if self._bat.soc <= 99:
self._dbusservice['/Soc'] = self._bat.soc
else:
self._dbusservice['/Soc'] = 100
## Log values written to history/XML ##
logging.debug("History cell voltage min: %.3f, max: %.3f, totalAhDrawn: %d", self._settings['MinCellVoltage'], self._settings['MaxCellVoltage'], self._settings['TotalAhDrawn'])
GLib.timeout_add( self._settings['interval'], exit_on_error, self._update)
def _gettext(self, path, value):
item = self._summeditems.get(path)
if item is not None:
return item['gettext'] % value
return str(value)
def __del__(self):
self._safe_history()
logging.info('Stopping dbus-batrium-native integration')
def _safe_history(self):
logging.debug('Saving history to localsettings')
self._settings['AvgDischarge'] = self._dbusservice['/History/AverageDischarge']
self._settings['TotalAhDrawn'] = self._dbusservice['/History/TotalAhDrawn']
self._settings['MinCellVoltage'] = self._dbusservice['/History/MinCellVoltage']
self._settings['MaxCellVoltage'] = self._dbusservice['/History/MaxCellVoltage']
self._settings['TargetChargeVoltage'] = self._dbusservice['/Info/MaxChargeVoltage']
def _daily_stats(self):
## Update daily statistics and reset energy counters
if (self._dbusservice['/History/DischargedEnergy'] == 0): return
logging.debug("Updating stats, SOC: %d, Discharged: %.2f, Charged: %.2f ",self._bat.soc, self._dbusservice['/History/DischargedEnergy'], self._dbusservice['/History/ChargedEnergy'])
self._dbusservice['/History/AverageDischarge'] = (6*self._dbusservice['/History/AverageDischarge'] + self._dbusservice['/History/DischargedEnergy'])/7 #rolling week
self._dbusservice['/History/ChargedEnergy'] = 0
self.ChargedEnergy = 0
self._dbusservice['/History/DischargedEnergy'] = 0
self.DischargedEnergy = 0
self.dailyResetDone = datetime.now().day
def _update(self):
time = datetime.now()
self.lastUpdated = time.timestamp()
## Monitor communication ##
if (self._bat.updated != -1 and self.lastUpdated == 0) or ((self.lastUpdated - self._bat.updated) < 10):
self._dbusservice['/Connected'] = 1
else:
self._dbusservice['/Connected'] = 0
## Call daily statistics and reset energy counters function
if datetime.now().hour == 6 and datetime.now().minute == 0 and datetime.now().day != self.dailyResetDone :
self._daily_stats()
## Alarms ##
if self._bat.alarm_high_temperature:
self._dbusservice['/Alarms/HighTemperature'] = 2
else:
self._dbusservice['/Alarms/HighTemperature'] = 0
if self._bat.alarm_low_temperature:
self._dbusservice['/Alarms/LowTemperature'] = 2
else:
self._dbusservice['/Alarms/LowTemperature'] = 0
if self._bat.alarm_low_voltage:
self._dbusservice['/Alarms/LowVoltage'] = 2
else:
self._dbusservice['/Alarms/LowVoltage'] = 0
if self._bat.alarm_high_voltage:
self._dbusservice['/Alarms/HighVoltage'] = 2
else:
self._dbusservice['/Alarms/HighVoltage'] = 0
if self._bat.alarm_high_charge_current:
self._dbusservice['/Alarms/HighChargeCurrent'] = 2
else:
self._dbusservice['/Alarms/HighChargeCurrent'] = 0
if self._bat.alarm_high_discharge_current:
self._dbusservice['/Alarms/HighDischargeCurrent'] = 2
else:
self._dbusservice['/Alarms/HighDischargeCurrent'] = 0
## State of Charge, Batrium couting SoC above 100, stay at 99 till all cells are balanced, then set timestamp and SoC to 100 ##
if self._bat.soc <= 99:
self._dbusservice['/Soc'] = self._bat.soc
dt = datetime.now() - datetime.fromtimestamp( float(self._settings['TimeLastFull']) )
self._dbusservice['/History/TimeSinceLastFullCharge'] = (dt.seconds + dt.days * 24 * 3600)
elif (self._bat.soc > 99) and (self.cell_balanced):
self._dbusservice['/Soc'] = 100
self._dbusservice['/ConsumedAmphours'] = 0
self._settings['TimeLastFull'] = time()
## Cells are balancing or not ##
if self._bat.NumberInBypass != 0:
self._dbusservice['/Balancing'] = 1
else:
self._dbusservice['/Balancing'] = 0
## Write Batrium values direct to Venus OS registers ##
self._dbusservice['/Dc/0/Current'] = self._bat.current
self._dbusservice['/Dc/0/Voltage'] = self._bat.voltage
power = self._bat.voltage * self._bat.current
self._dbusservice['/Dc/0/Power'] = power
self._dbusservice['/Dc/0/Temperature'] = self._bat.maxCellTemperature
self._dbusservice['/Info/MaxChargeCurrent'] = self._bat.maxChargeCurrent
self._dbusservice['/Info/MaxDischargeCurrent'] = self._bat.maxDischargeCurrent
self._dbusservice['/System/NrOfModulesOnline'] = self._bat.numberOfModules
self._dbusservice['/System/NrOfBatteriesBalancing'] = self._bat.NumberInBypass
self._dbusservice['/System/BatteriesSeries'] = self._bat.numberOfModules
self._dbusservice['/InstalledCapacity'] = self._bat.capacity
self._dbusservice['/Capacity'] = self._bat.capacity
## Set Charge Voltage Limit according to Batrium, if charging is disabled, Batrium sent 0.0V. Stay at last CVL and save it to Venus OS config xml, to workaround if restarted with charge off ##
if self._bat.maxChargeVoltage != 0:
self._dbusservice['/Info/MaxChargeVoltage'] = self._bat.maxChargeVoltage
self.maxChargeVoltage = self._bat.maxChargeVoltage
# If all cells are balanced, lower CVL 0.1V to spare cells mons
elif self.maxChargeVoltage != 0:
self._dbusservice['/Info/MaxChargeVoltage'] = self.maxChargeVoltage - 0.1
elif self.maxChargeVoltage == 0 and self._bat.maxChargeVoltage == 0:
self._dbusservice['/Info/MaxChargeVoltage'] = self._settings['TargetChargeVoltage']
## Calculation every minute ##
now = datetime.now().time()
if now.minute != self.minUpdateDone:
self.minUpdateDone = now.minute
## Display min and max cell temperatures every minute, because of better readability
self._dbusservice['/System/MinCellTemperature'] = self._bat.minCellTemperature
self._dbusservice['/System/MaxCellTemperature'] = self._bat.maxCellTemperature
self._dbusservice['/System/MinTemperatureCellId'] = self._bat.minCellTemperatureId
self._dbusservice['/System/MaxTemperatureCellId'] = self._bat.maxCellTemperatureId
## Same for cell voltages and saving to history
self._dbusservice['/System/MaxCellVoltage'] = self._bat.maxCellVoltage
if (self._bat.maxCellVoltage > self._dbusservice['/History/MaxCellVoltage'] ):
self._dbusservice['/History/MaxCellVoltage'] = self._bat.maxCellVoltage
logging.debug("New maximum cell voltage: %f", self._bat.maxCellVoltage)
self._dbusservice['/System/MinCellVoltage'] = self._bat.minCellVoltage
if (0 < self._bat.minCellVoltage < self._dbusservice['/History/MinCellVoltage'] ):
self._dbusservice['/History/MinCellVoltage'] = self._bat.minCellVoltage
logging.debug("New minimum cell voltage: %f", self._bat.minCellVoltage)
self._dbusservice['/System/MaxVoltageCellId'] = self._bat.maxCellVoltageId
self._dbusservice['/System/MinVoltageCellId'] = self._bat.minCellVoltageId
# Time remaining to full and to empty, show None if under 1 amp and under 5 days left for nicer looking readings
if self._bat.current > 1:
#charging
if self._bat.TimeToFull < 432000:
self._dbusservice['/TimeToGo'] = self._bat.TimeToFull
else:
self._dbusservice['/TimeToGo'] = None
elif self._bat.current < -1:
#discharging
self._dbusservice['/ConsumedAmphours'] += abs(self._bat.current * 0.016667) #Ah
self._dbusservice['/History/TotalAhDrawn'] += abs(self._bat.current * 0.016667) #Ah
if self._bat.TimeToEmpty < 432000:
self._dbusservice['/TimeToGo'] = self._bat.TimeToEmpty
else:
self._dbusservice['/TimeToGo'] = None
else :
self._dbusservice['/TimeToGo'] = None
## Check if all cells are balancing
if self._bat.NumberInBypass == self._bat.numberOfModules:
self.cell_balanced = True
elif self._bat.NumberInBypass == 0:
self.cell_balanced = False
## Call history save function to write history items to Venus OS config xml
self._safe_history()
## Calculation every second ##
if now.second != self.secUpdateDone:
self.secUpdateDone = now.second
# Calculated charged and discharged energy and save it to history, because Batrium not sending
if power > 0:
self.ChargedEnergy += (power / 3600.0) / 1000
if power < 0:
self.DischargedEnergy += (power / 3600.0) / 1000
self._dbusservice['/History/ChargedEnergy'] = abs(self.ChargedEnergy)
self._dbusservice['/History/DischargedEnergy'] = abs(self.DischargedEnergy)
return True
def main():
## Cli parser to set interface and for debbuging purposes
parser = ArgumentParser(description='dbus-batrium-native integration', add_help=True)
parser.add_argument('-d', '--debug', help='enable debug logging', action='store_true')
parser.add_argument('-i', '--interface', help='CAN interface')
parser.add_argument('-p', '--print', help='print only')
args = parser.parse_args()
logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=(logging.DEBUG if args.debug else logging.INFO),
handlers=[
logging.FileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))),
logging.StreamHandler()
])
if not args.interface:
logging.info('No CAN interface specified, using default can0')
args.interface = 'can0'
logging.info('Starting dbus-batrium-native integration %s on %s ' % (VERSION, args.interface))
from dbus.mainloop.glib import DBusGMainLoop
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
battery_output = DbusBatteryService(
servicename='com.victronenergy.battery',
connection = args.interface,
deviceinstance = 0,
)
logging.debug('Connected to dbus, and switching over to GLib.MainLoop() (= event based)')
mainloop = GLib.MainLoop()
mainloop.run()
if __name__ == "__main__":
main()