This update introduces a new `format_number` function that formats subscription IDs into a more readable format (e.g., converting 1000 to '1k'). The `check_logs` function has been modified to utilize this new formatting for both head subscription ID and last subscription ID in the status messages, enhancing clarity in log analysis and improving the overall readability of subscription status reporting.
213 lines
8.0 KiB
Python
213 lines
8.0 KiB
Python
# flake8: noqa
|
|
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
|
|
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
|
|
# type: ignore
|
|
|
|
import warnings
|
|
warnings.filterwarnings("ignore", category=Warning)
|
|
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
import subprocess
|
|
import os
|
|
import time
|
|
import random
|
|
import sys
|
|
import pkg_resources
|
|
import requests
|
|
import json
|
|
from collections import deque
|
|
|
|
required_packages = {
|
|
'grist-api': 'latest',
|
|
'colorama': 'latest',
|
|
'requests': '2.31.0',
|
|
'urllib3': '2.0.7',
|
|
'charset-normalizer': '3.3.2'
|
|
}
|
|
|
|
installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set}
|
|
|
|
for package, version in required_packages.items():
|
|
if package not in installed_packages or (version != 'latest' and installed_packages[package] != version):
|
|
if version == 'latest':
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--break-system-packages'])
|
|
else:
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', f"{package}=={version}", '--break-system-packages'])
|
|
|
|
from grist_api import GristDocAPI
|
|
import colorama
|
|
|
|
import logging
|
|
import socket
|
|
|
|
def self_update(logger):
|
|
logger.info("Checking for updates..")
|
|
script_path = os.path.abspath(__file__)
|
|
update_url = "https://gitea.vvzvlad.xyz/vvzvlad/ritual/raw/branch/main-22aug/checker.py"
|
|
try:
|
|
response = requests.get(update_url, timeout=10)
|
|
if response.status_code == 200:
|
|
current_content = ""
|
|
with open(script_path, 'r', encoding='utf-8') as f:
|
|
current_content = f.read()
|
|
|
|
if current_content != response.text:
|
|
with open(script_path, 'w', encoding='utf-8') as f:
|
|
f.write(response.text)
|
|
logger.info("Script updated successfully, restarting")
|
|
os.execv(sys.executable, ['python3'] + sys.argv)
|
|
else:
|
|
logger.info("Script is up to date")
|
|
else:
|
|
logger.error(f"Failed to download update, status code: {response.status_code}")
|
|
except Exception as e:
|
|
logger.error(f"Update error: {str(e)}")
|
|
|
|
class GRIST:
|
|
def __init__(self, server, doc_id, api_key, logger):
|
|
self.server = server
|
|
self.doc_id = doc_id
|
|
self.api_key = api_key
|
|
self.logger = logger
|
|
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
|
|
|
|
def table_name_convert(self, table_name):
|
|
return table_name.replace(" ", "_")
|
|
|
|
def to_timestamp(self, dtime: datetime) -> int:
|
|
if dtime.tzinfo is None:
|
|
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
|
|
return int(dtime.timestamp())
|
|
|
|
def insert_row(self, data, table):
|
|
data = {key.replace(" ", "_"): value for key, value in data.items()}
|
|
row_id = self.grist.add_records(self.table_name_convert(table), [data])
|
|
return row_id
|
|
|
|
def update_column(self, row_id, column_name, value, table):
|
|
if isinstance(value, datetime):
|
|
value = self.to_timestamp(value)
|
|
column_name = column_name.replace(" ", "_")
|
|
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
|
|
|
|
def delete_row(self, row_id, table):
|
|
self.grist.delete_records(self.table_name_convert(table), [row_id])
|
|
|
|
def update(self, row_id, updates, table):
|
|
for column_name, value in updates.items():
|
|
if isinstance(value, datetime):
|
|
updates[column_name] = self.to_timestamp(value)
|
|
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
|
|
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
|
|
|
|
def fetch_table(self, table):
|
|
return self.grist.fetch_table(self.table_name_convert(table))
|
|
|
|
def find_record(self, id=None, state=None, name=None, table=None):
|
|
if table is None:
|
|
raise ValueError("Table is not specified")
|
|
table_data = self.grist.fetch_table(self.table_name_convert(table))
|
|
if id is not None:
|
|
record = [row for row in table_data if row.id == id]
|
|
return record
|
|
if state is not None and name is not None:
|
|
record = [row for row in table_data if row.State == state and row.name == name]
|
|
return record
|
|
if state is not None:
|
|
record = [row for row in table_data if row.State == state]
|
|
return record
|
|
if name is not None:
|
|
record = [row for row in table_data if row.Name == name]
|
|
return record
|
|
|
|
def find_settings(self, key, table):
|
|
table = self.fetch_table(self.table_name_convert(table))
|
|
for record in table:
|
|
if record.Setting == key:
|
|
if record.Value is None or record.Value == "":
|
|
raise ValueError(f"Setting {key} blank")
|
|
return record.Value
|
|
raise ValueError(f"Setting {key} not found")
|
|
|
|
def clean_ansi(text):
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
return ansi_escape.sub('', text)
|
|
|
|
def format_number(number_str):
|
|
number = int(number_str)
|
|
if number >= 1000:
|
|
return f"{number//1000}k"
|
|
return str(number)
|
|
|
|
def check_logs(logger):
|
|
try:
|
|
logs = subprocess.run(['docker', 'logs', '--since', '10m', 'infernet-node'], capture_output=True, text=True, check=True)
|
|
log_content = clean_ansi(logs.stdout)
|
|
|
|
last_subscription_id = None
|
|
head_sub_id = None
|
|
|
|
for line in log_content.splitlines():
|
|
if "Ignored subscription creation" in line and "id=" in line:
|
|
id_match = re.search(r'id=(\d+)', line)
|
|
if id_match:
|
|
last_subscription_id = id_match.group(1)
|
|
|
|
if "head sub id is:" in line:
|
|
id_match = re.search(r'head sub id is:\s*(\d+)', line)
|
|
if id_match:
|
|
head_sub_id = id_match.group(1)
|
|
|
|
if head_sub_id:
|
|
logger.info(f"Head sub id: {head_sub_id}")
|
|
return {"status": f"OK: {format_number(head_sub_id)}"}
|
|
|
|
if last_subscription_id:
|
|
logger.info(f"Subscription: {last_subscription_id}")
|
|
return {"status": f"Sync: {format_number(last_subscription_id)}"}
|
|
|
|
logger.info("Not found subscription")
|
|
return {"status": "Idle"}
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Error running docker logs: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
colorama.init(autoreset=True)
|
|
logger = logging.getLogger("Checker")
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
ch = logging.StreamHandler()
|
|
ch.setFormatter(formatter)
|
|
logger.addHandler(ch)
|
|
|
|
logger.info("Checker started")
|
|
self_update(logger)
|
|
#random_sleep = random.randint(1, 60)
|
|
#logger.info(f"Sleeping for {random_sleep} seconds")
|
|
#time.sleep(random_sleep)
|
|
|
|
grist_data = {}
|
|
with open('/root/node/grist.json', 'r', encoding='utf-8') as f:
|
|
grist_data = json.loads(f.read())
|
|
|
|
GRIST_ROW_NAME = socket.gethostname()
|
|
NODES_TABLE = "Nodes"
|
|
grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
|
|
current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0]
|
|
def grist_callback(msg): grist.update(current_vm.id, msg, NODES_TABLE)
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
result = check_logs(logger)
|
|
grist_callback({ "Health": result["status"] })
|
|
logger.info(f"Status: {result['status']}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error on attempt {attempt+1}/3: {e}")
|
|
if attempt == 2:
|
|
grist_callback({ "Health": f"Error: {e}" })
|
|
if attempt < 2:
|
|
time.sleep(5) |