vestasync/device_status_checker.py
2023-06-01 00:34:47 +07:00

118 lines
4.1 KiB
Python

import paho.mqtt.client as mqtt
import time
import os
from collections import Counter
import argparse
parser = argparse.ArgumentParser(description="MQTT Device Error Status")
parser.add_argument("-a", "--wb", type=str, required=True, help="WB address")
args = parser.parse_args()
def get_modbus_devices():
devices = {}
def on_connect(client, userdata, flags, rc):
client.subscribe("/devices/+/meta/driver")
def on_message(client, userdata, msg):
if msg.payload.decode() == "wb-modbus":
device_name = msg.topic.split('/')[2]
devices[device_name] = {}
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.wb, 1883, 60)
client.loop_start()
time.sleep(3)
client.loop_stop()
client.unsubscribe("/devices/+/meta/driver")
client.disconnect()
return devices
def get_all_controls(devices):
def on_connect(client, userdata, flags, rc):
for device in devices.keys():
client.subscribe(f"/devices/{device}/controls/+")
def on_message(client, userdata, msg):
device = msg.topic.split('/')[2]
control = msg.topic.split('/')[-1]
devices[device][control] = "noerror"
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.wb, 1883, 60)
client.loop_start()
time.sleep(3)
client.loop_stop()
client.disconnect()
return devices
def get_all_controls_errors(devices):
def on_connect(client, userdata, flags, rc):
for device, controls in devices.items():
for control in controls.keys():
client.subscribe(f"/devices/{device}/controls/{control}/meta/error")
def on_message(client, userdata, msg):
error = msg.payload.decode()
device = msg.topic.split('/')[2]
control = msg.topic.split('/')[4]
if any(char in error for char in ['r', 'w']):
devices[device][control] = "readwriteerror"
elif error == "p":
devices[device][control] = "perioderror"
else:
devices[device][control] = "noerror"
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(args.wb, 1883, 60)
client.loop_start()
def sort_devices(devices):
sorted_devices = sorted(devices.items(), key=lambda item: item[0])
sorted_devices = sorted(sorted_devices, key=lambda item: sum(value == "readwriteerror" for value in item[1].values()), reverse=True)
return sorted_devices
print("Get devices...")
devices = get_modbus_devices()
print("Get controls...")
devices = get_all_controls(devices)
print("Subscribe errors...")
get_all_controls_errors(devices)
lines_printed = 0
while True:
time.sleep(1)
# Move the cursor up and clear the line for each line printed in the last iteration
for _ in range(lines_printed):
print("\033[F\033[K", end="")
lines_printed = 0
print("Device error status:")
lines_printed += 1
sorted_devices = sort_devices(devices)
max_device_name_length = max(len(device) for device, _ in sorted_devices)
print(f"+{'-'*(max_device_name_length+2)}+{'-'*7}+{'-'*7}+{'-'*8}+{'-'*8}+")
print(f"| {'Device':<{max_device_name_length}} | {'All':<5} | {'R/W':<5} | {'Period':<5} | {'Normal':<5} |")
print(f"+{'-'*(max_device_name_length+2)}+{'-'*7}+{'-'*7}+{'-'*8}+{'-'*8}+")
lines_printed += 3
for device, controls in sorted_devices:
error_counter = Counter(controls.values())
normal = len(controls) - error_counter['readwriteerror']
if normal != len(controls):
if normal == 0:
print("\033[31m", end="") # Start red color
print(f"| {device:<{max_device_name_length}} | {len(controls):<5} | {error_counter['readwriteerror']:<5} | {error_counter['perioderror']:<6} | {normal:<6} |")
lines_printed += 1
if normal == 0:
print("\033[0m", end="") # Reset color
print(f"+{'-'*(max_device_name_length+2)}+{'-'*7}+{'-'*7}+{'-'*8}+{'-'*8}+")
lines_printed += 1