emz_poll/emz_poll.py

85 lines
2.0 KiB
Python

import requests
import xml.dom.minidom
import time
import logging
import sys
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.exceptions import ConnectionException
client = None
inputs = dict()
failure = True
logger = logging.getLogger()
logger.setLevel(logging.INFO)
#logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def write_coil(nr, value):
while True:
try:
client.write_coil(nr, value, unit=1)
return
except ConnectionException:
print("Error writing to Modbus, retrying...")
time.sleep(5)
def poll():
global failure
try:
r = requests.get("http://192.168.43.100/input_emz.xml", timeout=5)
except Exception:
logger.debug("Could not get data from EMA. Sending error state!")
write_coil(501, False)
failure = True
return
if failure:
write_coil(501, True)
failure = False
with xml.dom.minidom.parseString(r.text) as dom:
for inp in dom.getElementsByTagName('Input'):
nr = inp.getAttribute('nr')
led = inp.getElementsByTagName('Led')
if len(led):
led = led[0].firstChild.nodeValue
value = False
if led == "RED_ON":
value = True
if nr not in inputs or inputs[nr] != value:
inputs[nr] = value
offset = 399+int(nr)
logger.info('EMZ-MG: ' + str(nr) + " --> Modbus Coil: " + str(offset) + " State: " + str(value))
write_coil(int(offset), value)
def loop():
global client
try:
logger.info("Starting EMZ Poller...")
client = ModbusTcpClient('192.168.43.1')
while True:
logger.debug("Polling...")
poll()
time.sleep(1)
except KeyboardInterrupt:
raise SystemExit(0)
finally:
if client is not None:
client.close()
if __name__ == "__main__":
loop()