diff --git a/checker.py b/checker.py new file mode 100644 index 0000000..19a5275 --- /dev/null +++ b/checker.py @@ -0,0 +1,193 @@ +# flake8: noqa +# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name +# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues +# type: ignore + +import re +from datetime import datetime, timedelta, timezone +import subprocess +import os +import time +import random +import sys +import pkg_resources +import requests +import json +from collections import deque +required_packages = ['grist-api', 'colorama'] +installed_packages = [pkg.key for pkg in pkg_resources.working_set] + +for package in required_packages: + if package not in installed_packages: + subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--break-system-packages']) + +from grist_api import GristDocAPI +import colorama + +import logging +import socket + +def self_update(): + script_path = os.path.abspath(__file__) + update_url = "https://gitea.vvzvlad.xyz/vvzvlad/ritual/raw/branch/main-22aug/checker.py" + try: + response = requests.get(update_url, timeout=10) + if response.status_code == 200: + current_content = "" + with open(script_path, 'r', encoding='utf-8') as f: + current_content = f.read() + + if current_content != response.text: + with open(script_path, 'w', encoding='utf-8') as f: + f.write(response.text) + logging.info("Script updated successfully, restarting") + os.execv(sys.executable, ['python3'] + sys.argv) + else: + logging.info("Script is up to date") + else: + logging.error(f"Failed to download update, status code: {response.status_code}") + except Exception as e: + logging.error(f"Update error: {str(e)}") + +class GRIST: + def __init__(self, server, doc_id, api_key, logger): + self.server = server + self.doc_id = doc_id + self.api_key = api_key + self.logger = logger + self.grist = GristDocAPI(doc_id, server=server, api_key=api_key) + + def table_name_convert(self, table_name): + return table_name.replace(" ", "_") + + def to_timestamp(self, dtime: datetime) -> int: + if dtime.tzinfo is None: + dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) + return int(dtime.timestamp()) + + def insert_row(self, data, table): + data = {key.replace(" ", "_"): value for key, value in data.items()} + row_id = self.grist.add_records(self.table_name_convert(table), [data]) + return row_id + + def update_column(self, row_id, column_name, value, table): + if isinstance(value, datetime): + value = self.to_timestamp(value) + column_name = column_name.replace(" ", "_") + self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }]) + + def delete_row(self, row_id, table): + self.grist.delete_records(self.table_name_convert(table), [row_id]) + + def update(self, row_id, updates, table): + for column_name, value in updates.items(): + if isinstance(value, datetime): + updates[column_name] = self.to_timestamp(value) + updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()} + self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}]) + + def fetch_table(self, table): + return self.grist.fetch_table(self.table_name_convert(table)) + + def find_record(self, id=None, state=None, name=None, table=None): + if table is None: + raise ValueError("Table is not specified") + table_data = self.grist.fetch_table(self.table_name_convert(table)) + if id is not None: + record = [row for row in table_data if row.id == id] + return record + if state is not None and name is not None: + record = [row for row in table_data if row.State == state and row.name == name] + return record + if state is not None: + record = [row for row in table_data if row.State == state] + return record + if name is not None: + record = [row for row in table_data if row.Name == name] + return record + + def find_settings(self, key, table): + table = self.fetch_table(self.table_name_convert(table)) + for record in table: + if record.Setting == key: + if record.Value is None or record.Value == "": + raise ValueError(f"Setting {key} blank") + return record.Value + raise ValueError(f"Setting {key} not found") + + +def check_logs(log_handler): + error_count = 0 + proved_count = 0 + proof_speeds = deque(maxlen=100) + + try: + logs = subprocess.run(['docker', 'compose', 'logs', '--since', '2h'], cwd='/root/node/', capture_output=True, text=True, check=True) + log_content = logs.stdout + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Error running docker compose logs: {e}") from e + + for line in log_content.split('\n'): + if "error" in line.lower(): + log_handler.error(f"Error: {line}") + error_count += 1 + if "Proved step" in line: + proved_count += 1 + log_handler.info(f"Proved step: {line}") + + proof_speed_match = re.search(r'Proved step \d+ at (\d+\.\d+) proof cycles/sec', line) + if proof_speed_match: + current_speed = float(proof_speed_match.group(1)) + proof_speeds.append(current_speed) + log_handler.info(f"Current proof speed: {current_speed} proof cycles/sec") + + # Calculate average proof speed from the collected values + avg_proof_speed = sum(proof_speeds) / len(proof_speeds) if proof_speeds else 0 + log_handler.info(f"Average proof speed (last {len(proof_speeds)} values): {avg_proof_speed:.2f} proof cycles/sec") + + data = { + "errors": error_count, + "proved_steps": proved_count/10, + "proof_speed": int(avg_proof_speed) + } + log_handler.info(f"Result: {data}") + return data + +if __name__ == "__main__": + colorama.init(autoreset=True) + logger = logging.getLogger("Checker") + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ch = logging.StreamHandler() + ch.setFormatter(formatter) + logger.addHandler(ch) + + logger.info("Checker started") + self_update() + random_sleep = random.randint(1, 600) + logger.info(f"Sleeping for {random_sleep} seconds") + time.sleep(random_sleep) + + grist_data = {} + with open('/root/node/grist.json', 'r', encoding='utf-8') as f: + grist_data = json.loads(f.read()) + + GRIST_ROW_NAME = socket.gethostname() + NODES_TABLE = "Nodes" + grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger) + current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0] + def grist_callback(msg): grist.update(current_vm.id, msg, NODES_TABLE) + + for attempt in range(3): + try: + result = check_logs(logger) + data = f"{result['proved_steps']}/{result['proof_speed']}/{result['errors']}" # proved/proof_speed/Errors + grist_callback({ "Health": data }) + print(result) + break + except Exception as e: + logger.error(f"Error on attempt {attempt+1}/3: {e}") + if attempt == 2: + grist_callback({ "Health": f"Error: {e}" }) + if attempt < 2: + time.sleep(5) \ No newline at end of file diff --git a/playbook.yml b/playbook.yml index 40baddf..1bc36ae 100644 --- a/playbook.yml +++ b/playbook.yml @@ -264,5 +264,32 @@ # docker update --restart unless-stopped infernet-anvil # docker update --restart unless-stopped deploy-fluentbit-1 + - name: Copy checker service file + ansible.builtin.copy: + dest: /etc/systemd/system/node-checker.service + content: | + [Unit] + Description=Node Checker Service + After=network.target + + [Service] + Type=simple + User=root + WorkingDirectory={{ ansible_env.HOME }}/node + ExecStart=/usr/bin/python3 {{ ansible_env.HOME }}/node/checker.py + Restart=always + RestartSec=1800 + + [Install] + WantedBy=multi-user.target + mode: '0644' + + - name: Enable and start node-checker service + ansible.builtin.systemd: + name: node-checker + enabled: yes + state: started + daemon_reload: yes + - name: Remove docker login credentials ansible.builtin.shell: rm -rf /root/.docker/config.json