ritual/checker.py
vvzvlad 57c8b81c13 Add format_number function to checker.py for improved subscription ID formatting
This update introduces a new `format_number` function that formats subscription IDs into a more readable format (e.g., converting 1000 to '1k'). The `check_logs` function has been modified to utilize this new formatting for both head subscription ID and last subscription ID in the status messages, enhancing clarity in log analysis and improving the overall readability of subscription status reporting.
2025-01-19 11:56:00 +03:00

213 lines
8.0 KiB
Python

# flake8: noqa
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
# type: ignore
import warnings
warnings.filterwarnings("ignore", category=Warning)
import re
from datetime import datetime, timedelta, timezone
import subprocess
import os
import time
import random
import sys
import pkg_resources
import requests
import json
from collections import deque
required_packages = {
'grist-api': 'latest',
'colorama': 'latest',
'requests': '2.31.0',
'urllib3': '2.0.7',
'charset-normalizer': '3.3.2'
}
installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set}
for package, version in required_packages.items():
if package not in installed_packages or (version != 'latest' and installed_packages[package] != version):
if version == 'latest':
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--break-system-packages'])
else:
subprocess.check_call([sys.executable, '-m', 'pip', 'install', f"{package}=={version}", '--break-system-packages'])
from grist_api import GristDocAPI
import colorama
import logging
import socket
def self_update(logger):
logger.info("Checking for updates..")
script_path = os.path.abspath(__file__)
update_url = "https://gitea.vvzvlad.xyz/vvzvlad/ritual/raw/branch/main-22aug/checker.py"
try:
response = requests.get(update_url, timeout=10)
if response.status_code == 200:
current_content = ""
with open(script_path, 'r', encoding='utf-8') as f:
current_content = f.read()
if current_content != response.text:
with open(script_path, 'w', encoding='utf-8') as f:
f.write(response.text)
logger.info("Script updated successfully, restarting")
os.execv(sys.executable, ['python3'] + sys.argv)
else:
logger.info("Script is up to date")
else:
logger.error(f"Failed to download update, status code: {response.status_code}")
except Exception as e:
logger.error(f"Update error: {str(e)}")
class GRIST:
def __init__(self, server, doc_id, api_key, logger):
self.server = server
self.doc_id = doc_id
self.api_key = api_key
self.logger = logger
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
def table_name_convert(self, table_name):
return table_name.replace(" ", "_")
def to_timestamp(self, dtime: datetime) -> int:
if dtime.tzinfo is None:
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
return int(dtime.timestamp())
def insert_row(self, data, table):
data = {key.replace(" ", "_"): value for key, value in data.items()}
row_id = self.grist.add_records(self.table_name_convert(table), [data])
return row_id
def update_column(self, row_id, column_name, value, table):
if isinstance(value, datetime):
value = self.to_timestamp(value)
column_name = column_name.replace(" ", "_")
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
def delete_row(self, row_id, table):
self.grist.delete_records(self.table_name_convert(table), [row_id])
def update(self, row_id, updates, table):
for column_name, value in updates.items():
if isinstance(value, datetime):
updates[column_name] = self.to_timestamp(value)
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
def fetch_table(self, table):
return self.grist.fetch_table(self.table_name_convert(table))
def find_record(self, id=None, state=None, name=None, table=None):
if table is None:
raise ValueError("Table is not specified")
table_data = self.grist.fetch_table(self.table_name_convert(table))
if id is not None:
record = [row for row in table_data if row.id == id]
return record
if state is not None and name is not None:
record = [row for row in table_data if row.State == state and row.name == name]
return record
if state is not None:
record = [row for row in table_data if row.State == state]
return record
if name is not None:
record = [row for row in table_data if row.Name == name]
return record
def find_settings(self, key, table):
table = self.fetch_table(self.table_name_convert(table))
for record in table:
if record.Setting == key:
if record.Value is None or record.Value == "":
raise ValueError(f"Setting {key} blank")
return record.Value
raise ValueError(f"Setting {key} not found")
def clean_ansi(text):
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def format_number(number_str):
number = int(number_str)
if number >= 1000:
return f"{number//1000}k"
return str(number)
def check_logs(logger):
try:
logs = subprocess.run(['docker', 'logs', '--since', '10m', 'infernet-node'], capture_output=True, text=True, check=True)
log_content = clean_ansi(logs.stdout)
last_subscription_id = None
head_sub_id = None
for line in log_content.splitlines():
if "Ignored subscription creation" in line and "id=" in line:
id_match = re.search(r'id=(\d+)', line)
if id_match:
last_subscription_id = id_match.group(1)
if "head sub id is:" in line:
id_match = re.search(r'head sub id is:\s*(\d+)', line)
if id_match:
head_sub_id = id_match.group(1)
if head_sub_id:
logger.info(f"Head sub id: {head_sub_id}")
return {"status": f"OK: {format_number(head_sub_id)}"}
if last_subscription_id:
logger.info(f"Subscription: {last_subscription_id}")
return {"status": f"Sync: {format_number(last_subscription_id)}"}
logger.info("Not found subscription")
return {"status": "Idle"}
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error running docker logs: {e}")
if __name__ == "__main__":
colorama.init(autoreset=True)
logger = logging.getLogger("Checker")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.info("Checker started")
self_update(logger)
#random_sleep = random.randint(1, 60)
#logger.info(f"Sleeping for {random_sleep} seconds")
#time.sleep(random_sleep)
grist_data = {}
with open('/root/node/grist.json', 'r', encoding='utf-8') as f:
grist_data = json.loads(f.read())
GRIST_ROW_NAME = socket.gethostname()
NODES_TABLE = "Nodes"
grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0]
def grist_callback(msg): grist.update(current_vm.id, msg, NODES_TABLE)
for attempt in range(3):
try:
result = check_logs(logger)
grist_callback({ "Health": result["status"] })
logger.info(f"Status: {result['status']}")
break
except Exception as e:
logger.error(f"Error on attempt {attempt+1}/3: {e}")
if attempt == 2:
grist_callback({ "Health": f"Error: {e}" })
if attempt < 2:
time.sleep(5)