This update introduces a new `clean_ansi` function to remove ANSI escape sequences from log output, enhancing readability. The `check_logs` function has been modified to utilize this new function, ensuring that the logs retrieved from Docker are plain text, which improves clarity for subsequent log analysis.
186 lines
7.0 KiB
Python
186 lines
7.0 KiB
Python
# flake8: noqa
|
|
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
|
|
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
|
|
# type: ignore
|
|
|
|
import warnings
|
|
warnings.filterwarnings("ignore", category=Warning)
|
|
|
|
import re
|
|
from datetime import datetime, timedelta, timezone
|
|
import subprocess
|
|
import os
|
|
import time
|
|
import random
|
|
import sys
|
|
import pkg_resources
|
|
import requests
|
|
import json
|
|
from collections import deque
|
|
|
|
required_packages = {
|
|
'grist-api': 'latest',
|
|
'colorama': 'latest',
|
|
'requests': '2.31.0',
|
|
'urllib3': '2.0.7',
|
|
'charset-normalizer': '3.3.2'
|
|
}
|
|
|
|
installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set}
|
|
|
|
for package, version in required_packages.items():
|
|
if package not in installed_packages or (version != 'latest' and installed_packages[package] != version):
|
|
if version == 'latest':
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--break-system-packages'])
|
|
else:
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', f"{package}=={version}", '--break-system-packages'])
|
|
|
|
from grist_api import GristDocAPI
|
|
import colorama
|
|
|
|
import logging
|
|
import socket
|
|
|
|
def self_update(logger):
|
|
logger.info("Checking for updates..")
|
|
script_path = os.path.abspath(__file__)
|
|
update_url = "https://gitea.vvzvlad.xyz/vvzvlad/ritual/raw/branch/main-22aug/checker.py"
|
|
try:
|
|
response = requests.get(update_url, timeout=10)
|
|
if response.status_code == 200:
|
|
current_content = ""
|
|
with open(script_path, 'r', encoding='utf-8') as f:
|
|
current_content = f.read()
|
|
|
|
if current_content != response.text:
|
|
with open(script_path, 'w', encoding='utf-8') as f:
|
|
f.write(response.text)
|
|
logger.info("Script updated successfully, restarting")
|
|
os.execv(sys.executable, ['python3'] + sys.argv)
|
|
else:
|
|
logger.info("Script is up to date")
|
|
else:
|
|
logger.error(f"Failed to download update, status code: {response.status_code}")
|
|
except Exception as e:
|
|
logger.error(f"Update error: {str(e)}")
|
|
|
|
class GRIST:
|
|
def __init__(self, server, doc_id, api_key, logger):
|
|
self.server = server
|
|
self.doc_id = doc_id
|
|
self.api_key = api_key
|
|
self.logger = logger
|
|
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
|
|
|
|
def table_name_convert(self, table_name):
|
|
return table_name.replace(" ", "_")
|
|
|
|
def to_timestamp(self, dtime: datetime) -> int:
|
|
if dtime.tzinfo is None:
|
|
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
|
|
return int(dtime.timestamp())
|
|
|
|
def insert_row(self, data, table):
|
|
data = {key.replace(" ", "_"): value for key, value in data.items()}
|
|
row_id = self.grist.add_records(self.table_name_convert(table), [data])
|
|
return row_id
|
|
|
|
def update_column(self, row_id, column_name, value, table):
|
|
if isinstance(value, datetime):
|
|
value = self.to_timestamp(value)
|
|
column_name = column_name.replace(" ", "_")
|
|
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
|
|
|
|
def delete_row(self, row_id, table):
|
|
self.grist.delete_records(self.table_name_convert(table), [row_id])
|
|
|
|
def update(self, row_id, updates, table):
|
|
for column_name, value in updates.items():
|
|
if isinstance(value, datetime):
|
|
updates[column_name] = self.to_timestamp(value)
|
|
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
|
|
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
|
|
|
|
def fetch_table(self, table):
|
|
return self.grist.fetch_table(self.table_name_convert(table))
|
|
|
|
def find_record(self, id=None, state=None, name=None, table=None):
|
|
if table is None:
|
|
raise ValueError("Table is not specified")
|
|
table_data = self.grist.fetch_table(self.table_name_convert(table))
|
|
if id is not None:
|
|
record = [row for row in table_data if row.id == id]
|
|
return record
|
|
if state is not None and name is not None:
|
|
record = [row for row in table_data if row.State == state and row.name == name]
|
|
return record
|
|
if state is not None:
|
|
record = [row for row in table_data if row.State == state]
|
|
return record
|
|
if name is not None:
|
|
record = [row for row in table_data if row.Name == name]
|
|
return record
|
|
|
|
def find_settings(self, key, table):
|
|
table = self.fetch_table(self.table_name_convert(table))
|
|
for record in table:
|
|
if record.Setting == key:
|
|
if record.Value is None or record.Value == "":
|
|
raise ValueError(f"Setting {key} blank")
|
|
return record.Value
|
|
raise ValueError(f"Setting {key} not found")
|
|
|
|
def clean_ansi(text):
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
return ansi_escape.sub('', text)
|
|
|
|
def check_logs(log_handler):
|
|
try:
|
|
logs = subprocess.run(['docker', 'logs', '--since', '10m', 'infernet-node'], capture_output=True, text=True, check=True)
|
|
log_content = clean_ansi(logs.stdout)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Error running docker logs: {e}")
|
|
|
|
for line in log_content.splitlines():
|
|
print(line)
|
|
|
|
return {"status": "Idle"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
colorama.init(autoreset=True)
|
|
logger = logging.getLogger("Checker")
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
ch = logging.StreamHandler()
|
|
ch.setFormatter(formatter)
|
|
logger.addHandler(ch)
|
|
|
|
logger.info("Checker started")
|
|
self_update(logger)
|
|
#random_sleep = random.randint(1, 60)
|
|
#logger.info(f"Sleeping for {random_sleep} seconds")
|
|
#time.sleep(random_sleep)
|
|
|
|
grist_data = {}
|
|
with open('/root/node/grist.json', 'r', encoding='utf-8') as f:
|
|
grist_data = json.loads(f.read())
|
|
|
|
GRIST_ROW_NAME = socket.gethostname()
|
|
NODES_TABLE = "Nodes"
|
|
grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
|
|
current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0]
|
|
def grist_callback(msg): grist.update(current_vm.id, msg, NODES_TABLE)
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
result = check_logs(logger)
|
|
grist_callback({ "Health": result["status"] })
|
|
logger.info(f"Status: {result['status']}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error on attempt {attempt+1}/3: {e}")
|
|
if attempt == 2:
|
|
grist_callback({ "Health": f"Error: {e}" })
|
|
if attempt < 2:
|
|
time.sleep(5) |