#!/usr/bin/env python3 # -*- coding: utf-8 -*- # flake8: noqa # pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name # pylint: disable=multiple-statements, logging-fstring-interpolation, trailing-whitespace, line-too-long # pylint: disable=broad-exception-caught, missing-function-docstring, missing-class-docstring # pylint: disable=f-string-without-interpolation, wrong-import-position # pylance: disable=reportMissingImports, reportMissingModuleSource # #curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/grist.json?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o grist.json; #curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/requirements.txt?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o requirements.txt; #pip3 install -r requirements.txt --break-system-packages; #docker pull nexusxyz/nexus-cli:latest # curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/rotate.py?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o rotate.py; python3 rotate.py; import re from datetime import datetime, timedelta, timezone import subprocess import os import time import random import sys import json from grist_api import GristDocAPI import colorama import logging import socket class GRIST: def __init__(self, server, doc_id, api_key, logger): self.server = server self.doc_id = doc_id self.api_key = api_key self.logger = logger self.grist = GristDocAPI(doc_id, server=server, api_key=api_key) def table_name_convert(self, table_name): return table_name.replace(" ", "_") def to_timestamp(self, dtime: datetime) -> int: if dtime.tzinfo is None: dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) return int(dtime.timestamp()) def insert_row(self, data, table): data = {key.replace(" ", "_"): value for key, value in data.items()} row_id = self.grist.add_records(self.table_name_convert(table), [data]) return row_id def update_column(self, row_id, column_name, value, table): if isinstance(value, datetime): value = self.to_timestamp(value) column_name = column_name.replace(" ", "_") self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }]) def delete_row(self, row_id, table): self.grist.delete_records(self.table_name_convert(table), [row_id]) def update(self, row_id, updates, table): for column_name, value in updates.items(): if isinstance(value, datetime): updates[column_name] = self.to_timestamp(value) updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()} self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}]) def fetch_table(self, table): return self.grist.fetch_table(self.table_name_convert(table)) def find_record(self, id=None, state=None, name=None, table=None): if table is None: raise ValueError("Table is not specified") table_data = self.grist.fetch_table(self.table_name_convert(table)) if id is not None: record = [row for row in table_data if row.id == id] return record if state is not None and name is not None: record = [row for row in table_data if row.State == state and row.name == name] return record if state is not None: record = [row for row in table_data if row.State == state] return record if name is not None: record = [row for row in table_data if row.Name == name] return record def find_settings(self, key, table): table = self.fetch_table(self.table_name_convert(table)) for record in table: if record.Setting == key: if record.Value is None or record.Value == "": raise ValueError(f"Setting {key} blank") return record.Value raise ValueError(f"Setting {key} not found") def run_docker_command(command, logger): """Execute docker command and return success status""" try: result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) if result.returncode == 0: logger.info(f"Docker command successful: {command}") if result.stdout.strip(): logger.info(f"Docker output: {result.stdout.strip()}") return True else: logger.error(f"Docker command failed: {command}, error: {result.stderr.strip()}") return False except subprocess.TimeoutExpired: logger.error(f"Docker command timed out: {command}") return False except Exception as e: logger.error(f"Docker command exception: {command}, error: {str(e)}") return False def stop_and_remove_container(container_name, logger): """Stop and remove docker container""" logger.info(f"Stopping container: {container_name}") run_docker_command(f"docker stop {container_name}", logger) logger.info(f"Removing container: {container_name}") run_docker_command(f"docker rm {container_name}", logger) def check_container_status(container_name, logger): """Check if container is running""" try: result = subprocess.run(f"docker ps --filter name={container_name} --format '{{{{.Status}}}}'", shell=True, capture_output=True, text=True, timeout=10) if result.returncode == 0 and result.stdout.strip(): status = result.stdout.strip() if "Up" in status: return True else: logger.warning(f"Container {container_name} status: {status}") return False else: logger.warning(f"Container {container_name} not found in running containers") return False except Exception as e: logger.error(f"Failed to check container {container_name} status: {str(e)}") return False def start_container(container_name, node_id, logger): """Start nexus container with given node_id""" docker_command = f"docker run -td --init --name {container_name} nexusxyz/nexus-cli:latest start --node-id {node_id}" logger.info(f"Starting container with node-id: {node_id}") return run_docker_command(docker_command, logger) def get_next_node(grist, logger): """Get node with lowest hours""" try: nodes = grist.fetch_table(table="Nodes") nodes = [row for row in nodes if row.NodeID != "1"] if not nodes: logger.error("No available nodes found in table") return None nodes.sort(key=lambda node: int(node.Hours)) selected_node = nodes[0] logger.info(f"Selected node: ID={selected_node.NodeID}, Hours={selected_node.Hours}") return selected_node except Exception as e: logger.error(f"Failed to get next node: {str(e)}") return None def check_and_update_docker_image(image_name, logger): """Check for new docker image and update if available""" try: logger.info(f"Checking for updates for image: {image_name}") # Get local image digest local_digest_cmd = f"docker images --digests --format '{{{{.Digest}}}}' {image_name}" local_result = subprocess.run(local_digest_cmd, shell=True, capture_output=True, text=True, timeout=30) local_digest = local_result.stdout.strip() if local_result.returncode == 0 else "" # Get remote image digest without pulling remote_digest_cmd = f"docker manifest inspect {image_name} --verbose | grep -m1 '\"digest\"' | cut -d'\"' -f4" remote_result = subprocess.run(remote_digest_cmd, shell=True, capture_output=True, text=True, timeout=60) if remote_result.returncode != 0: logger.warning(f"Failed to get remote digest for {image_name}, proceeding with pull check") # Fallback to pull and check output pull_cmd = f"docker pull {image_name}" pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300) if pull_result.returncode == 0: if "Image is up to date" in pull_result.stderr or "up to date" in pull_result.stdout: logger.info(f"Image {image_name} is already up to date") else: logger.info(f"Image {image_name} updated successfully") return True else: logger.error(f"Failed to pull image {image_name}: {pull_result.stderr.strip()}") return False remote_digest = remote_result.stdout.strip() # Compare digests if local_digest and remote_digest: if local_digest == remote_digest: logger.info(f"Image {image_name} is already up to date (digest: {local_digest[:19]}...)") return True else: logger.info(f"New version available for {image_name}, downloading...") # Pull new image pull_cmd = f"docker pull {image_name}" pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300) if pull_result.returncode == 0: logger.info(f"Image {image_name} updated successfully (new digest: {remote_digest[:19]}...)") return True else: logger.error(f"Failed to pull updated image {image_name}: {pull_result.stderr.strip()}") return False else: logger.info(f"Unable to compare digests, performing full pull for {image_name}") # Pull image pull_cmd = f"docker pull {image_name}" pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300) if pull_result.returncode == 0: logger.info(f"Image {image_name} pulled successfully") return True else: logger.error(f"Failed to pull image {image_name}: {pull_result.stderr.strip()}") return False except subprocess.TimeoutExpired: logger.error(f"Timeout while checking/updating image {image_name}") return False except Exception as e: logger.error(f"Failed to check/update image {image_name}: {str(e)}") return False def main_rotation_cycle(): """Main rotation cycle for nexus nodes""" colorama.init(autoreset=True) logger = logging.getLogger("NexusRotator") logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) # Check and update docker image image_name = "nexusxyz/nexus-cli:latest" if not check_and_update_docker_image(image_name, logger): logger.warning(f"Failed to update {image_name}, continuing with existing image") # Load grist configuration try: with open('grist.json', 'r', encoding='utf-8') as f: grist_data = json.loads(f.read()) except Exception as e: logger.error(f"Failed to load grist.json: {str(e)}") return # Initialize Grist connection try: grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger) logger.info("Connected to Grist successfully") except Exception as e: logger.error(f"Failed to connect to Grist: {str(e)}") return container_name = "nexus" cycle_count = 0 logger.info("Starting nexus rotation cycle") while True: cycle_count += 1 logger.info(f"=== Starting cycle #{cycle_count} ===") try: # Get next node node = get_next_node(grist, logger) if not node: logger.error("No node available, waiting 60 seconds before retry") time.sleep(60) continue node_id = node.NodeID current_hours = int(node.Hours) # Update hours (+1 before starting container) new_hours = current_hours + 1 grist.update(node.id, {"Hours": new_hours}, "Nodes") logger.info(f"Updated node {node_id} hours: {current_hours} -> {new_hours}") # Remove any existing container with same name stop_and_remove_container(container_name, logger) # Start new container if not start_container(container_name, node_id, logger): logger.error(f"Failed to start container for node {node_id}") # Return the hour back since container didn't start grist.update(node.id, {"Hours": current_hours}, "Nodes") logger.info(f"Reverted node {node_id} hours back to: {current_hours}") time.sleep(60) continue logger.info(f"Container started successfully for node {node_id}") # Wait 5 hours with progress updates and health checks every 10 minutes wait_hours = 5 total_minutes = wait_hours * 60 interval_minutes = 10 logger.info(f"Waiting {wait_hours} hours ({total_minutes} minutes) for node {node_id}") container_failed = False for elapsed_minutes in range(0, total_minutes, interval_minutes): remaining_minutes = total_minutes - elapsed_minutes remaining_hours = remaining_minutes // 60 remaining_mins = remaining_minutes % 60 if elapsed_minutes > 0: # Skip first iteration log logger.info(f"Node {node_id}: {remaining_hours}h {remaining_mins}m remaining") # Check container status before sleeping if not check_container_status(container_name, logger): logger.error(f"Container {container_name} is not running, attempting restart") stop_and_remove_container(container_name, logger) if not start_container(container_name, node_id, logger): logger.error(f"Failed to restart container for node {node_id}") container_failed = True break else: logger.info(f"Container restarted successfully for node {node_id}") time.sleep(interval_minutes * 60) # Sleep 10 minutes # If container failed during the cycle, skip to next iteration if container_failed: logger.error(f"Container failed during cycle for node {node_id}, moving to next node") continue # Stop and remove container logger.info(f"5 hours completed for node {node_id}, stopping container") stop_and_remove_container(container_name, logger) # Update hours (+4 after completion) final_hours = new_hours + 4 grist.update(node.id, {"Hours": final_hours}, "Nodes") logger.info(f"Updated node {node_id} final hours: {new_hours} -> {final_hours}") logger.info(f"=== Cycle #{cycle_count} completed for node {node_id} ===") except KeyboardInterrupt: logger.info("Received keyboard interrupt, stopping rotation") stop_and_remove_container(container_name, logger) break except Exception as e: logger.error(f"Cycle #{cycle_count} failed with error: {str(e)}") stop_and_remove_container(container_name, logger) logger.info("Waiting 60 seconds before next attempt") time.sleep(60) logger.info("Nexus rotation cycle stopped") if __name__ == "__main__": main_rotation_cycle()