VictronGX_Custom_Sensors/dbus-batrium-native/dbus-batrium-native.py

346 lines
16 KiB
Python

#!/usr/bin/env python3
"""
A class to put a battery service on the dbus, according to victron standards, with constantly updating
paths.
"""
from gi.repository import GLib
import platform
import argparse
import logging
import sys
import os
import dbus
import itertools
import math
from time import time
from datetime import datetime
import struct
from argparse import ArgumentParser
from batrium import *
# our own packages
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
from ve_utils import exit_on_error
from settingsdevice import SettingsDevice
VERSION = '1.0'
def handle_changed_setting(setting, oldvalue, newvalue):
logging.debug('setting changed, setting: %s, old: %s, new: %s' % (setting, oldvalue, newvalue))
class DbusBatteryService:
def __init__(self, servicename, deviceinstance, productname='Batrium BMS', connection='can0'):
self.minUpdateDone = 0
self.secUpdateDone = 0
self.dailyResetDone = 0
self.lastUpdated = 0
self.cell_balanced = 0
self.ChargedEnergy = 0
self.DischargedEnergy = 0
self._bat = BatriumBattery(connection=connection)
self.notifier = can.Notifier(self._bat._ci, [self._bat])
try:
self._dbusservice = VeDbusService(servicename+'.socketcan_'+connection+'_di'+str(deviceinstance))
except:
exit
# Create the management objects, as specified in the ccgx dbus-api document
self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
self._dbusservice.add_path('/Mgmt/ProcessVersion', VERSION + ' running on Python ' + platform.python_version())
self._dbusservice.add_path('/Mgmt/Connection', connection)
# Create the mandatory objects
self._dbusservice.add_path('/DeviceInstance', deviceinstance)
self._dbusservice.add_path('/ProductId', 0xB038)
self._dbusservice.add_path('/ProductName', productname)
self._dbusservice.add_path('/FirmwareVersion', 'unknown')
self._dbusservice.add_path('/HardwareVersion', 'unknown')
self._dbusservice.add_path('/Connected', 0)
# Create battery specific objects
self._dbusservice.add_path('/Status', None, writeable=True)
self._dbusservice.add_path('/Dc/0/Temperature', None, writeable=True)
self._dbusservice.add_path('/Info/BatteryLowVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/CellImbalance', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighVoltage', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighDischargeCurrent', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighChargeCurrent', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowSoc', None, writeable=True)
self._dbusservice.add_path('/Alarms/LowTemperature', None, writeable=True)
self._dbusservice.add_path('/Alarms/HighTemperature', None, writeable=True)
self._dbusservice.add_path('/Balancing', None, writeable=True)
self._dbusservice.add_path('/System/HasTemperature', 1)
self._dbusservice.add_path('/System/NrOfBatteries', 1)
self._dbusservice.add_path('/System/NrOfModulesOnline', None, writeable=True)
self._dbusservice.add_path('/System/NrOfModulesOffline', 0)
self._dbusservice.add_path('/System/NrOfModulesBlockingDischarge', 0)
self._dbusservice.add_path('/System/NrOfModulesBlockingCharge', 0)
self._dbusservice.add_path('/System/NrOfBatteriesBalancing', None, writeable=True)
self._dbusservice.add_path('/System/BatteriesSeries', None, writeable=True)
self._dbusservice.add_path('/System/MinVoltageCellId', None, writeable=True)
self._dbusservice.add_path('/System/MaxVoltageCellId', None, writeable=True)
self._dbusservice.add_path("/System/MinTemperatureCellId", None, writeable=True)
self._dbusservice.add_path("/System/MaxTemperatureCellId", None, writeable=True)
self._settings = SettingsDevice(
bus=dbus.SystemBus() if (platform.machine() == 'armv7l') else dbus.SessionBus(),
supportedSettings={
'AvgDischarge': ['/Settings/Batrium/AvgerageDischarge', 0.0,0,0],
'TotalAhDrawn': ['/Settings/Batrium/TotalAhDrawn', 0.0,0,0],
'TimeLastFull': ['/Settings/Batrium/TimeLastFull', 0.0 ,0,0],
'MinCellVoltage': ['/Settings/Batrium/MinCellVoltage', 0.0,0.0,0.0],
'MaxCellVoltage': ['/Settings/Batrium/MaxCellVoltage', 0.0,0.0,0.0],
'interval': ['/Settings/Batrium/Interval', 200, 200, 200]
},
eventCallback=handle_changed_setting)
self._summeditems = {
'/System/MaxCellVoltage': {'gettext': '%.3F V'},
'/System/MinCellVoltage': {'gettext': '%.3F V'},
'/Dc/0/Voltage': {'gettext': '%.2F V'},
'/Dc/0/Current': {'gettext': '%.2F A'},
'/Dc/0/Power': {'gettext': '%.0F W'},
'/Soc': {'gettext': '%.0F%%'},
'/Soh': {'gettext': '%.0F%%'},
'/History/TotalAhDrawn': {'gettext': '%.0F Ah'},
'/History/DischargedEnergy': {'gettext': '%.2F kWh'},
'/History/ChargedEnergy': {'gettext': '%.2F kWh'},
'/History/AverageDischarge': {'gettext': '%.2F kWh'},
'/TimeToGo': {'gettext': '%.0F s'},
'/ConsumedAmphours': {'gettext': '%.1F Ah'},
'/System/MinCellTemperature': {'gettext': '%.1F °C'},
'/System/MaxCellTemperature': {'gettext': '%.1F °C'},
'/Capacity': {'gettext': '%.1F Ah'},
'/InstalledCapacity': {'gettext': '%.1F Ah'},
'/Info/MaxChargeVoltage': {'gettext': '%.1F V'},
'/Info/MaxChargeCurrent': {'gettext': '%.1F A'},
'/Info/MaxDischargeCurrent': {'gettext': '%.1F A'}
}
for path in self._summeditems.keys():
self._dbusservice.add_path(path, value=None, gettextcallback=self._gettext)
self._dbusservice['/History/AverageDischarge'] = self._settings['AvgDischarge']
self._dbusservice['/History/TotalAhDrawn'] = self._settings['TotalAhDrawn']
self._dbusservice.add_path('/History/TimeSinceLastFullCharge', 0)
self._dbusservice.add_path('/History/MinCellVoltage', self._settings['MinCellVoltage'])
self._dbusservice.add_path('/History/MaxCellVoltage', self._settings['MaxCellVoltage'])
self._dbusservice['/Soh'] = 100
self._dbusservice['/Status'] = 0
logging.info("History cell voltage min: %.3f, max: %.3f, totalAhDrawn: %d",
self._settings['MinCellVoltage'], self._settings['MaxCellVoltage'], self._settings['TotalAhDrawn'])
if self._bat.soc <= 99:
self._dbusservice['/Soc'] = self._bat.soc
elif self._bat.soc > 100:
self._dbusservice['/Soc'] = 100
GLib.timeout_add( self._settings['interval'], exit_on_error, self._update)
def _gettext(self, path, value):
item = self._summeditems.get(path)
if item is not None:
return item['gettext'] % value
return str(value)
def __del__(self):
self._safe_history()
logging.info('Stopping dbus_Batrium')
def _safe_history(self):
logging.debug('Saving history to localsettings')
self._settings['AvgDischarge'] = self._dbusservice['/History/AverageDischarge']
self._settings['TotalAhDrawn'] = self._dbusservice['/History/TotalAhDrawn']
self._settings['MinCellVoltage'] = self._dbusservice['/History/MinCellVoltage']
self._settings['MaxCellVoltage'] = self._dbusservice['/History/MaxCellVoltage']
def _daily_stats(self):
if (self._dbusservice['/History/DischargedEnergy'] == 0): return
logging.info("Updating stats, SOC: %d, Discharged: %.2f, Charged: %.2f ",self._bat.soc, self._dbusservice['/History/DischargedEnergy'], self._dbusservice['/History/ChargedEnergy'])
self._dbusservice['/History/AverageDischarge'] = (6*self._dbusservice['/History/AverageDischarge'] + self._dbusservice['/History/DischargedEnergy'])/7 #rolling week
self._dbusservice['/History/ChargedEnergy'] = 0
self.ChargedEnergy = 0
self._dbusservice['/History/DischargedEnergy'] = 0
self.DischargedEnergy = 0
dt = datetime.now() - datetime.fromtimestamp( float(self._settings['TimeLastFull']) )
self.dailyResetDone = datetime.now().day
def _update(self):
#logging.debug("Update CAN %d, Time Now: %d", self._bat.updated, self.lastUpdated)
time = datetime.now()
self.lastUpdated = time.timestamp()
if self._bat.alarm_high_temperature:
self._dbusservice['/Alarms/HighTemperature'] = 2
else:
self._dbusservice['/Alarms/HighTemperature'] = 0
if self._bat.alarm_low_temperature:
self._dbusservice['/Alarms/LowTemperature'] = 2
else:
self._dbusservice['/Alarms/LowTemperature'] = 0
if self._bat.alarm_low_voltage:
self._dbusservice['/Alarms/LowVoltage'] = 2
else:
self._dbusservice['/Alarms/LowVoltage'] = 0
if self._bat.alarm_high_voltage:
self._dbusservice['/Alarms/HighVoltage'] = 2
else:
self._dbusservice['/Alarms/HighVoltage'] = 0
if self._bat.alarm_high_charge_current:
self._dbusservice['/Alarms/HighChargeCurrent'] = 2
else:
self._dbusservice['/Alarms/HighChargeCurrent'] = 0
if self._bat.alarm_high_discharge_current:
self._dbusservice['/Alarms/HighDischargeCurrent'] = 2
else:
self._dbusservice['/Alarms/HighDischargeCurrent'] = 0
if (self._bat.updated != -1 and self.lastUpdated == 0) or ((self.lastUpdated - self._bat.updated) < 10):
self._dbusservice['/Connected'] = 1
else:
self._dbusservice['/Connected'] = 0
if self._bat.soc <= 99:
self._dbusservice['/Soc'] = self._bat.soc
elif (self._bat.soc > 99) and (self.cell_balanced):
self._dbusservice['/Soc'] = 100
if self._bat.NumberInBypass != 0:
self._dbusservice['/Balancing'] = 1
else:
self._dbusservice['/Balancing'] = 0
#self._dbusservice['/ConsumedAmphours'] = self._bat.AhToFull --- not working, Batrium not transmitting
self._dbusservice['/ConsumedAmphours'] = ((100 - self._dbusservice['/Soc']) / 100) * self._bat.capacity
self._dbusservice['/InstalledCapacity'] = self._bat.capacity
self._dbusservice['/Capacity'] = self._bat.capacity
self._dbusservice['/Dc/0/Current'] = self._bat.current
self._dbusservice['/Dc/0/Voltage'] = self._bat.voltage
power = self._bat.voltage * self._bat.current
self._dbusservice['/Dc/0/Power'] = power
self._dbusservice['/Dc/0/Temperature'] = self._bat.maxCellTemperature
self._dbusservice['/Info/MaxChargeCurrent'] = self._bat.maxChargeCurrent
self._dbusservice['/Info/MaxDischargeCurrent'] = self._bat.maxDischargeCurrent
if self._bat.maxChargeVoltage != 0:
self._dbusservice['/Info/MaxChargeVoltage'] = self._bat.maxChargeVoltage
self._dbusservice['/System/NrOfModulesOnline'] = self._bat.numberOfModules
self._dbusservice['/System/NrOfBatteriesBalancing'] = self._bat.NumberInBypass
self._dbusservice['/System/BatteriesSeries'] = self._bat.numberOfModules
#update energy statistics daily at 6:00,
if datetime.now().hour == 6 and datetime.now().minute == 0 and datetime.now().day != self.dailyResetDone :
self._daily_stats()
now = datetime.now().time()
if now.minute != self.minUpdateDone:
self.minUpdateDone = now.minute
self._dbusservice['/System/MinCellTemperature'] = self._bat.minCellTemperature
self._dbusservice['/System/MaxCellTemperature'] = self._bat.maxCellTemperature
self._dbusservice['/System/MinTemperatureCellId'] = self._bat.minCellTemperatureId
self._dbusservice['/System/MaxTemperatureCellId'] = self._bat.maxCellTemperatureId
self._dbusservice['/System/MaxCellVoltage'] = self._bat.maxCellVoltage
if (self._bat.maxCellVoltage > self._dbusservice['/History/MaxCellVoltage'] ):
self._dbusservice['/History/MaxCellVoltage'] = self._bat.maxCellVoltage
logging.info("New maximum cell voltage: %f", self._bat.maxCellVoltage)
self._dbusservice['/System/MinCellVoltage'] = self._bat.minCellVoltage
if (0 < self._bat.minCellVoltage < self._dbusservice['/History/MinCellVoltage'] ):
self._dbusservice['/History/MinCellVoltage'] = self._bat.minCellVoltage
logging.info("New minimum cell voltage: %f", self._bat.minCellVoltage)
self._dbusservice['/System/MaxVoltageCellId'] = self._bat.maxCellVoltageId
self._dbusservice['/System/MinVoltageCellId'] = self._bat.minCellVoltageId
if self._bat.current > 0:
#charging
if self._bat.NumberInBypass < 80000:
self._dbusservice['/TimeToGo'] = self._bat.TimeToFull
else:
self._dbusservice['/TimeToGo'] = None
else :
#discharging
if self._bat.NumberInBypass < 80000:
self._dbusservice['/TimeToGo'] = self._bat.TimeToEmpty
else:
self._dbusservice['/TimeToGo'] = None
if self._bat.NumberInBypass == self._bat.numberOfModules:
self.cell_balanced = True
elif self._bat.NumberInBypass == 0:
self.cell_balanced = False
self._safe_history()
if now.second != self.secUpdateDone:
self.secUpdateDone = now.second
#Workaround because of missing messages of Batrium BMS
if power > 0:
self.ChargedEnergy += (power / 3600.0) / 1000
if power < 0:
self.DischargedEnergy += (power / 3600.0) / 1000
self._dbusservice['/History/ChargedEnergy'] = abs(self.ChargedEnergy)
self._dbusservice['/History/DischargedEnergy'] = abs(self.DischargedEnergy)
return True
def main():
parser = ArgumentParser(description='dbus_batrium', add_help=True)
parser.add_argument('-d', '--debug', help='enable debug logging',
action='store_true')
parser.add_argument('-i', '--interface', help='CAN interface')
parser.add_argument('-p', '--print', help='print only')
args = parser.parse_args()
logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=(logging.DEBUG if args.debug else logging.INFO),
handlers=[
logging.FileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__)))),
logging.StreamHandler()
])
if not args.interface:
logging.info('No CAN interface specified, using default can0')
args.interface = 'can0'
logging.info('Starting dbus_batrium %s on %s ' %
(VERSION, args.interface))
logging.debug('Setup main loop')
from dbus.mainloop.glib import DBusGMainLoop
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
logging.debug('Setup dbus service')
battery_output = DbusBatteryService(
servicename='com.victronenergy.battery',
connection = args.interface,
deviceinstance = 0,
)
logging.debug('Connected to dbus, and switching over to GLib.MainLoop() (= event based)')
mainloop = GLib.MainLoop()
mainloop.run()
if __name__ == "__main__":
main()