375 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			375 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # flake8: noqa
 | |
| # pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
 | |
| # pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
 | |
| # type: ignore
 | |
| 
 | |
| import warnings
 | |
| warnings.filterwarnings("ignore", category=Warning)
 | |
| 
 | |
| import re
 | |
| from datetime import datetime, timedelta, timezone
 | |
| import subprocess
 | |
| import os
 | |
| import time
 | |
| import random
 | |
| import sys
 | |
| import pkg_resources
 | |
| import requests
 | |
| import json
 | |
| from collections import deque
 | |
| 
 | |
| required_packages = {
 | |
|     'grist-api': 'latest',
 | |
|     'colorama': 'latest',
 | |
|     'requests': '2.31.0',
 | |
|     'urllib3': '2.0.7',
 | |
|     'charset-normalizer': '3.3.2'
 | |
| }
 | |
| 
 | |
| installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set}
 | |
| 
 | |
| for package, version in required_packages.items():
 | |
|     if package not in installed_packages or (version != 'latest' and installed_packages[package] != version):
 | |
|         if version == 'latest':
 | |
|             subprocess.check_call([sys.executable, '-m', 'pip', 'install', package, '--break-system-packages'])
 | |
|         else:
 | |
|             subprocess.check_call([sys.executable, '-m', 'pip', 'install', f"{package}=={version}", '--break-system-packages'])
 | |
| 
 | |
| from grist_api import GristDocAPI
 | |
| import colorama
 | |
| 
 | |
| import logging
 | |
| import socket
 | |
| 
 | |
| def self_update(logger):
 | |
|     logger.info("Checking for updates..")
 | |
|     script_path = os.path.abspath(__file__)
 | |
|     update_url = "https://gitea.vvzvlad.xyz/vvzvlad/ritual/raw/branch/main-22aug/checker.py"
 | |
|     try:
 | |
|         response = requests.get(update_url, timeout=10)
 | |
|         if response.status_code == 200:
 | |
|             current_content = ""
 | |
|             with open(script_path, 'r', encoding='utf-8') as f:
 | |
|                 current_content = f.read()
 | |
|             
 | |
|             if current_content != response.text:
 | |
|                 with open(script_path, 'w', encoding='utf-8') as f:
 | |
|                     f.write(response.text)
 | |
|                 logger.info("Script updated successfully, restarting")
 | |
|                 os.execv(sys.executable, ['python3'] + sys.argv)
 | |
|             else:
 | |
|                 logger.info("Script is up to date")
 | |
|         else:
 | |
|             logger.error(f"Failed to download update, status code: {response.status_code}")
 | |
|     except Exception as e:
 | |
|         logger.error(f"Update error: {str(e)}")
 | |
| 
 | |
| class GRIST:
 | |
|     def __init__(self, server, doc_id, api_key, logger):
 | |
|         self.server = server
 | |
|         self.doc_id = doc_id
 | |
|         self.api_key = api_key
 | |
|         self.logger = logger
 | |
|         self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
 | |
| 
 | |
|     def table_name_convert(self, table_name):
 | |
|         return table_name.replace(" ", "_")
 | |
| 
 | |
|     def to_timestamp(self, dtime: datetime) -> int:
 | |
|         if dtime.tzinfo is None:
 | |
|             dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) 
 | |
|         return int(dtime.timestamp())
 | |
| 
 | |
|     def insert_row(self, data, table):
 | |
|         data = {key.replace(" ", "_"): value for key, value in data.items()}
 | |
|         row_id = self.grist.add_records(self.table_name_convert(table), [data])
 | |
|         return row_id
 | |
| 
 | |
|     def update_column(self, row_id, column_name, value, table):
 | |
|         if isinstance(value, datetime):
 | |
|             value = self.to_timestamp(value)
 | |
|         column_name = column_name.replace(" ", "_")
 | |
|         self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
 | |
| 
 | |
|     def delete_row(self, row_id, table):
 | |
|         self.grist.delete_records(self.table_name_convert(table), [row_id])
 | |
| 
 | |
|     def update(self, row_id, updates, table):
 | |
|         for column_name, value in updates.items():
 | |
|             if isinstance(value, datetime):
 | |
|                 updates[column_name] = self.to_timestamp(value)
 | |
|         updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
 | |
|         self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
 | |
| 
 | |
|     def fetch_table(self, table):
 | |
|         return self.grist.fetch_table(self.table_name_convert(table))
 | |
| 
 | |
|     def find_record(self, id=None, state=None, name=None, table=None):
 | |
|         if table is None:
 | |
|             raise ValueError("Table is not specified")
 | |
|         table_data = self.grist.fetch_table(self.table_name_convert(table))
 | |
|         if id is not None:
 | |
|             record = [row for row in table_data if row.id == id]
 | |
|             return record
 | |
|         if state is not None and name is not None:
 | |
|             record = [row for row in table_data if row.State == state and row.name == name]
 | |
|             return record
 | |
|         if state is not None:
 | |
|             record = [row for row in table_data if row.State == state]
 | |
|             return record
 | |
|         if name is not None:
 | |
|             record = [row for row in table_data if row.Name == name]
 | |
|             return record
 | |
| 
 | |
|     def find_settings(self, key, table):
 | |
|         table = self.fetch_table(self.table_name_convert(table))
 | |
|         for record in table:
 | |
|             if record.Setting == key:
 | |
|                 if record.Value is None or record.Value == "":
 | |
|                     raise ValueError(f"Setting {key} blank")
 | |
|                 return record.Value
 | |
|         raise ValueError(f"Setting {key} not found")
 | |
| 
 | |
| def clean_ansi(text):
 | |
|     ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
 | |
|     return ansi_escape.sub('', text)
 | |
| 
 | |
| def format_number(number_str):
 | |
|     try:
 | |
|         number = int(number_str)
 | |
|         if number >= 1000:
 | |
|             value_in_k = number / 1000.0
 | |
|             # Format to 3 decimal places if needed, remove trailing zeros and potentially the dot
 | |
|             formatted_num = f"{value_in_k:.3f}".rstrip('0').rstrip('.')
 | |
|             return f"{formatted_num}k"
 | |
|         return str(number)
 | |
|     except (ValueError, TypeError):
 | |
|         return "NaN" # Or some other indicator of invalid input
 | |
| 
 | |
| def check_logs(logger, initial_sync_count, previous_status):
 | |
|     """
 | |
|     Checks docker logs for node status (Syncing, OK, Idle) and updates sync count.
 | |
| 
 | |
|     Args:
 | |
|         logger: The logger instance.
 | |
|         initial_sync_count: The sync count read from Grist at the start.
 | |
|         previous_status: The last known status read from Grist ('Sync', 'OK', 'Idle', or others).
 | |
| 
 | |
|     Returns:
 | |
|         A dictionary containing:
 | |
|         - status_message: A string describing the current status (e.g., "Sync: 123k (5)").
 | |
|         - current_status_type: The type of the current status ('Sync', 'OK', 'Idle', 'Error').
 | |
|         - current_sync_count: The updated sync count.
 | |
|     """
 | |
|     current_sync_count = initial_sync_count # Initialize with the value from Grist
 | |
| 
 | |
|     try:
 | |
|         logs = subprocess.run(['docker', 'logs', '--since', '10m', 'infernet-node'], capture_output=True, text=True, check=True)
 | |
|         log_content = clean_ansi(logs.stdout)
 | |
| 
 | |
|         last_checking_info = None
 | |
|         last_ignored_id = None
 | |
|         last_head_sub_id = None
 | |
| 
 | |
|         # Regex patterns
 | |
|         checking_pattern = re.compile(r'Checking subscriptions.*last_sub_id=(\d+).*head_sub_id=(\d+).*num_subs_to_sync=(\d+)')
 | |
|         ignored_pattern = re.compile(r'Ignored subscription creation.*id=(\d+)')
 | |
|         head_sub_pattern = re.compile(r'head sub id is:\s*(\d+)')
 | |
| 
 | |
|         # Use deque to efficiently get the last few relevant lines if needed,
 | |
|         # but processing all lines and keeping the last match is simpler here.
 | |
|         for line in log_content.splitlines():
 | |
|             match = checking_pattern.search(line)
 | |
|             if match:
 | |
|                 last_checking_info = {
 | |
|                     "last_sub_id": match.group(1),
 | |
|                     "head_sub_id": match.group(2),
 | |
|                     "num_subs_to_sync": int(match.group(3))
 | |
|                 }
 | |
|                 continue # Prioritize checking_info
 | |
| 
 | |
|             match = ignored_pattern.search(line)
 | |
|             if match:
 | |
|                 last_ignored_id = match.group(1)
 | |
|                 continue
 | |
| 
 | |
|             match = head_sub_pattern.search(line)
 | |
|             if match:
 | |
|                 last_head_sub_id = match.group(1)
 | |
|                 # No continue here, allows checking_info from same timeframe to override
 | |
| 
 | |
|         current_status_type = "Idle"
 | |
|         status_message = "Idle"
 | |
|         
 | |
|         if last_checking_info:
 | |
|             formatted_id = format_number(last_checking_info["last_sub_id"])
 | |
|             if last_checking_info["num_subs_to_sync"] > 0:
 | |
|                 current_status_type = "Sync"
 | |
|                 status_message = f"Sync: {formatted_id}" # Use current_sync_count
 | |
|                 logger.info(f"Node is syncing. Last sub ID: {last_checking_info['last_sub_id']}, Num subs to sync: {last_checking_info['num_subs_to_sync']}")
 | |
|             else:
 | |
|                 current_status_type = "OK"
 | |
|                 # Increment count only on transition from Sync to OK
 | |
|                 if previous_status == "Sync":
 | |
|                     current_sync_count += 1 # Increment local count
 | |
|                     logger.info(f"Sync completed. Sync count incremented to {current_sync_count}.")
 | |
|                 status_message = f"OK: {formatted_id}" # Use current_sync_count
 | |
|                 logger.info(f"Node is OK. Last sub ID: {last_checking_info['last_sub_id']}")
 | |
|         
 | |
|         elif last_ignored_id:
 | |
|             # Fallback to "Ignored" logs if "Checking" is missing
 | |
|             formatted_id = format_number(last_ignored_id)
 | |
|             current_status_type = "Sync" # Assume sync if we only see ignored creations recently
 | |
|             status_message = f"Sync: {formatted_id}" # Use current_sync_count
 | |
|             logger.info(f"Node possibly syncing (based on ignored logs). Last ignored ID: {last_ignored_id}")
 | |
| 
 | |
|         elif last_head_sub_id:
 | |
|             # Fallback to "head sub id" if others are missing
 | |
|             formatted_id = format_number(last_head_sub_id)
 | |
|             current_status_type = "OK" # Assume OK if this is the latest relevant info
 | |
|             # Don't increment sync count here, only on Sync -> OK transition based on "Checking" logs
 | |
|             status_message = f"OK: {formatted_id}" # Use current_sync_count
 | |
|             logger.info(f"Node status based on head sub id. Head sub ID: {last_head_sub_id}")
 | |
|             
 | |
|         else:
 | |
|             logger.info("No relevant subscription log entries found in the last 10 minutes. Status: Idle.")
 | |
|             status_message = "Idle"
 | |
|             current_status_type = "Idle"
 | |
| 
 | |
|         # Return the results instead of writing to a file
 | |
|         return {
 | |
|             "status_message": status_message,
 | |
|             "current_status_type": current_status_type,
 | |
|             "current_sync_count": current_sync_count
 | |
|         }
 | |
| 
 | |
|     except subprocess.CalledProcessError as e:
 | |
|         error_msg = f"Error: Docker logs failed ({e.returncode})"
 | |
|         logger.error(f"Error running docker logs command: {e.stderr or e.stdout or e}")
 | |
|         # Return error status and original sync count
 | |
|         return {
 | |
|             "status_message": error_msg,
 | |
|             "current_status_type": "Error",
 | |
|             "current_sync_count": initial_sync_count # Return original count on error
 | |
|         }
 | |
|     except Exception as e:
 | |
|         error_msg = "Error: Log processing failed"
 | |
|         logger.error(f"Unexpected error processing logs: {e}", exc_info=True)
 | |
|         # Return error status and original sync count
 | |
|         return {
 | |
|             "status_message": error_msg,
 | |
|             "current_status_type": "Error",
 | |
|             "current_sync_count": initial_sync_count # Return original count on error
 | |
|         }
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     colorama.init(autoreset=True)
 | |
|     logger = logging.getLogger("Checker")
 | |
|     logger.setLevel(logging.INFO)
 | |
|     formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 | |
|     ch = logging.StreamHandler()
 | |
|     ch.setFormatter(formatter)
 | |
|     logger.addHandler(ch)
 | |
| 
 | |
|     logger.info("Checker started")
 | |
|     self_update(logger)
 | |
|     #random_sleep = random.randint(1, 60)
 | |
|     #logger.info(f"Sleeping for {random_sleep} seconds")
 | |
|     #time.sleep(random_sleep)
 | |
| 
 | |
|     grist_data = {}
 | |
|     with open('/root/node/grist.json', 'r', encoding='utf-8') as f:
 | |
|         grist_data = json.loads(f.read())
 | |
| 
 | |
|     GRIST_ROW_NAME = socket.gethostname()
 | |
|     NODES_TABLE = "Nodes"
 | |
|     grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
 | |
|     current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0]
 | |
|     def grist_callback(msg): grist.update(current_vm.id, msg, NODES_TABLE)
 | |
| 
 | |
|     # Initialize updates dictionary
 | |
|     initial_updates = {}
 | |
|     # Check and prepare update for Syncs if it's None or empty
 | |
|     if not current_vm.Syncs: # Handles None, empty string, potentially 0 if that's how Grist stores it
 | |
|         initial_updates["Syncs"] = 0
 | |
|     # Check and prepare update for Reboots if it's None or empty
 | |
|     if not current_vm.Reboots: # Handles None, empty string, potentially 0
 | |
|         initial_updates["Reboots"] = 0
 | |
| 
 | |
|     # If there are updates, send them to Grist
 | |
|     if initial_updates:
 | |
|         try:
 | |
|             logger.info(f"Found empty initial values, updating Grist: {initial_updates}")
 | |
|             grist.update(current_vm.id, initial_updates, NODES_TABLE)
 | |
|             # Re-fetch the record to ensure subsequent logic uses the updated values
 | |
|             current_vm = grist.find_record(name=GRIST_ROW_NAME, table=NODES_TABLE)[0]
 | |
|             logger.info("Grist updated successfully with initial zeros.")
 | |
|         except Exception as e:
 | |
|             logger.error(f"Failed to update Grist with initial zeros: {e}")
 | |
|             # Decide how to proceed: maybe exit, maybe continue with potentially incorrect defaults
 | |
|             # For now, we'll log the error and continue using the potentially incorrect defaults from the first fetch
 | |
| 
 | |
|     # Get initial state from Grist (now potentially updated)
 | |
|     initial_sync_count = int(current_vm.Syncs or 0) # 'or 0' still useful as fallback
 | |
|     reboot_count = int(current_vm.Reboots or 0) # 'or 0' still useful as fallback
 | |
|     # Determine previous status type based on Health string (simplified)
 | |
|     previous_health_status = current_vm.Health or "Idle"
 | |
|     previous_status_type = "Idle" # Default
 | |
|     if previous_health_status.startswith("Sync"):
 | |
|         previous_status_type = "Sync"
 | |
|     elif previous_health_status.startswith("OK"):
 | |
|         previous_status_type = "OK"
 | |
|     elif previous_health_status.startswith("Error"):
 | |
|         previous_status_type = "Error" # Consider error state
 | |
| 
 | |
|     logger.info(f"Initial state from Grist - Syncs: {initial_sync_count}, Health: {previous_health_status}, Reboots: {reboot_count}")
 | |
| 
 | |
|     for attempt in range(3):
 | |
|         try:
 | |
|             vm_ip = os.popen("ip -4 addr show eth0 | grep -oP '(?<=inet )[^/]+'").read()
 | |
|             vm_ip = vm_ip.strip()
 | |
|             if vm_ip == "":
 | |
|                 logger.error("Failed to get VM IP address")
 | |
|             else:
 | |
|                 logger.info(f"VM IP address: {vm_ip}")
 | |
|                 grist_callback({"IP": f"{vm_ip}"})
 | |
| 
 | |
| 
 | |
|             # Pass initial state to check_logs
 | |
|             result = check_logs(logger, initial_sync_count, previous_status_type)
 | |
| 
 | |
|             grist_updates = {"Health": result["status_message"]}
 | |
|             
 | |
|             # Update Syncs count in Grist only if it changed
 | |
|             if result["current_sync_count"] != initial_sync_count:
 | |
|                 grist_updates["Syncs"] = result["current_sync_count"]
 | |
|                 logger.info(f"Sync count changed from {initial_sync_count} to {result['current_sync_count']}")
 | |
| 
 | |
|             # Send updates to Grist
 | |
|             grist_callback(grist_updates)
 | |
|             logger.info(f"Status update sent: {grist_updates}")
 | |
|             
 | |
|             # Reboot logic (remains mostly the same, reads Reboots from current_vm)
 | |
|             if result["current_status_type"] == "Idle": # Check type, not message
 | |
|                 uptime_seconds = os.popen("cat /proc/uptime | cut -d'.' -f1").read()
 | |
|                 uptime_seconds = int(uptime_seconds)
 | |
|                 if uptime_seconds > 60*60*4:
 | |
|                     reboot_count = int(current_vm.Reboots or 0)
 | |
|                     reboot_count += 1
 | |
|                     # Include reboot count in the final Grist update before rebooting
 | |
|                     grist_updates = { "Health": "Rebooting", "Reboots": reboot_count }
 | |
|                     grist_callback(grist_updates)
 | |
|                     logger.info(f"Idle detected for >4 hours (uptime: {uptime_seconds}s). Rebooting. Reboot count: {reboot_count}")
 | |
|                     os.system("reboot")
 | |
|             break # Exit loop on success
 | |
|         except Exception as e:
 | |
|             logger.error(f"Error in main loop, attempt {attempt+1}/3: {e}", exc_info=True)
 | |
|             if attempt == 2:
 | |
|                 # Log final error to Grist on last attempt
 | |
|                 try:
 | |
|                     grist_updates = { "Health": f"Error: Main loop failed - {e}" }
 | |
|                     grist_callback(grist_updates)
 | |
|                 except Exception as grist_e:
 | |
|                     logger.error(f"Failed to log final error to Grist: {grist_e}")
 | |
|             time.sleep(5) # Wait before retrying
 |