diff --git a/rotate/grist.json b/rotate/grist.json new file mode 100644 index 0000000..f4144ae --- /dev/null +++ b/rotate/grist.json @@ -0,0 +1,8 @@ +{ + "grist_server": "https://grist.vvzvlad.xyz", + "grist_doc_id": "inwaCJSGxZA1u64QJmQBEi", + "grist_api_key": "6bbcce2a64e7c865fbb2e5ec4480f0e1328f317f" +} + + + diff --git a/rotate/requirements.txt b/rotate/requirements.txt new file mode 100644 index 0000000..540ef79 --- /dev/null +++ b/rotate/requirements.txt @@ -0,0 +1,11 @@ +ansible-output-parser==0.1.0 +certifi==2024.7.4 +charset-normalizer==3.3.2 +colorama==0.4.6 +future==1.0.0 +grist-api==0.1.0 +idna==3.8 +proxmoxer==2.1.0 +PyYAML==6.0.2 +requests==2.32.3 +urllib3==2.2.2 \ No newline at end of file diff --git a/rotate/rotate.py b/rotate/rotate.py new file mode 100644 index 0000000..fa65f40 --- /dev/null +++ b/rotate/rotate.py @@ -0,0 +1,290 @@ +# flake8: noqa +# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name +# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues +# type: ignore + +import re +from datetime import datetime, timedelta, timezone +import subprocess +import os +import time +import random +import sys +import json + +from grist_api import GristDocAPI +import colorama + +import logging +import socket + + +class GRIST: + def __init__(self, server, doc_id, api_key, logger): + self.server = server + self.doc_id = doc_id + self.api_key = api_key + self.logger = logger + self.grist = GristDocAPI(doc_id, server=server, api_key=api_key) + + def table_name_convert(self, table_name): + return table_name.replace(" ", "_") + + def to_timestamp(self, dtime: datetime) -> int: + if dtime.tzinfo is None: + dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) + return int(dtime.timestamp()) + + def insert_row(self, data, table): + data = {key.replace(" ", "_"): value for key, value in data.items()} + row_id = self.grist.add_records(self.table_name_convert(table), [data]) + return row_id + + def update_column(self, row_id, column_name, value, table): + if isinstance(value, datetime): + value = self.to_timestamp(value) + column_name = column_name.replace(" ", "_") + self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }]) + + def delete_row(self, row_id, table): + self.grist.delete_records(self.table_name_convert(table), [row_id]) + + def update(self, row_id, updates, table): + for column_name, value in updates.items(): + if isinstance(value, datetime): + updates[column_name] = self.to_timestamp(value) + updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()} + self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}]) + + def fetch_table(self, table): + return self.grist.fetch_table(self.table_name_convert(table)) + + def find_record(self, id=None, state=None, name=None, table=None): + if table is None: + raise ValueError("Table is not specified") + table_data = self.grist.fetch_table(self.table_name_convert(table)) + if id is not None: + record = [row for row in table_data if row.id == id] + return record + if state is not None and name is not None: + record = [row for row in table_data if row.State == state and row.name == name] + return record + if state is not None: + record = [row for row in table_data if row.State == state] + return record + if name is not None: + record = [row for row in table_data if row.Name == name] + return record + + def find_settings(self, key, table): + table = self.fetch_table(self.table_name_convert(table)) + for record in table: + if record.Setting == key: + if record.Value is None or record.Value == "": + raise ValueError(f"Setting {key} blank") + return record.Value + raise ValueError(f"Setting {key} not found") + + +def run_docker_command(command, logger): + """Execute docker command and return success status""" + try: + result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + logger.info(f"Docker command successful: {command}") + if result.stdout.strip(): + logger.info(f"Docker output: {result.stdout.strip()}") + return True + else: + logger.error(f"Docker command failed: {command}, error: {result.stderr.strip()}") + return False + except subprocess.TimeoutExpired: + logger.error(f"Docker command timed out: {command}") + return False + except Exception as e: + logger.error(f"Docker command exception: {command}, error: {str(e)}") + return False + + +def stop_and_remove_container(container_name, logger): + """Stop and remove docker container""" + logger.info(f"Stopping container: {container_name}") + run_docker_command(f"docker stop {container_name}", logger) + + logger.info(f"Removing container: {container_name}") + run_docker_command(f"docker rm {container_name}", logger) + + +def check_container_status(container_name, logger): + """Check if container is running""" + try: + result = subprocess.run(f"docker ps --filter name={container_name} --format '{{{{.Status}}}}'", + shell=True, capture_output=True, text=True, timeout=10) + if result.returncode == 0 and result.stdout.strip(): + status = result.stdout.strip() + if "Up" in status: + return True + else: + logger.warning(f"Container {container_name} status: {status}") + return False + else: + logger.warning(f"Container {container_name} not found in running containers") + return False + except Exception as e: + logger.error(f"Failed to check container {container_name} status: {str(e)}") + return False + + +def start_container(container_name, node_id, logger): + """Start nexus container with given node_id""" + docker_command = f"docker run -td --init --name {container_name} nexusxyz/nexus-cli:latest start --node-id {node_id}" + logger.info(f"Starting container with node-id: {node_id}") + return run_docker_command(docker_command, logger) + + +def get_next_node(grist, logger): + """Get node with lowest hours""" + try: + nodes = grist.fetch_table(table="Nodes") + nodes = [row for row in nodes if row.NodeID != "1"] + + if not nodes: + logger.error("No available nodes found in table") + return None + + nodes.sort(key=lambda node: int(node.Hours)) + selected_node = nodes[0] + + logger.info(f"Selected node: ID={selected_node.NodeID}, Hours={selected_node.Hours}") + return selected_node + except Exception as e: + logger.error(f"Failed to get next node: {str(e)}") + return None + + +def main_rotation_cycle(): + """Main rotation cycle for nexus nodes""" + colorama.init(autoreset=True) + logger = logging.getLogger("NexusRotator") + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ch = logging.StreamHandler() + ch.setFormatter(formatter) + logger.addHandler(ch) + + # Load grist configuration + try: + with open('grist.json', 'r', encoding='utf-8') as f: + grist_data = json.loads(f.read()) + except Exception as e: + logger.error(f"Failed to load grist.json: {str(e)}") + return + + # Initialize Grist connection + try: + grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger) + logger.info("Connected to Grist successfully") + except Exception as e: + logger.error(f"Failed to connect to Grist: {str(e)}") + return + + container_name = "nexus" + cycle_count = 0 + + logger.info("Starting nexus rotation cycle") + + while True: + cycle_count += 1 + logger.info(f"=== Starting cycle #{cycle_count} ===") + + try: + # Get next node + node = get_next_node(grist, logger) + if not node: + logger.error("No node available, waiting 60 seconds before retry") + time.sleep(60) + continue + + node_id = node.NodeID + current_hours = int(node.Hours) + + # Update hours (+1 before starting container) + new_hours = current_hours + 1 + grist.update(node.id, {"Hours": new_hours}, "Nodes") + logger.info(f"Updated node {node_id} hours: {current_hours} -> {new_hours}") + + # Remove any existing container with same name + stop_and_remove_container(container_name, logger) + + # Start new container + if not start_container(container_name, node_id, logger): + logger.error(f"Failed to start container for node {node_id}") + # Return the hour back since container didn't start + grist.update(node.id, {"Hours": current_hours}, "Nodes") + logger.info(f"Reverted node {node_id} hours back to: {current_hours}") + time.sleep(60) + continue + + logger.info(f"Container started successfully for node {node_id}") + + # Wait 5 hours with progress updates and health checks every 10 minutes + wait_hours = 5 + total_minutes = wait_hours * 60 + interval_minutes = 10 + + logger.info(f"Waiting {wait_hours} hours ({total_minutes} minutes) for node {node_id}") + + container_failed = False + for elapsed_minutes in range(0, total_minutes, interval_minutes): + remaining_minutes = total_minutes - elapsed_minutes + remaining_hours = remaining_minutes // 60 + remaining_mins = remaining_minutes % 60 + + if elapsed_minutes > 0: # Skip first iteration log + logger.info(f"Node {node_id}: {remaining_hours}h {remaining_mins}m remaining") + + # Check container status before sleeping + if not check_container_status(container_name, logger): + logger.error(f"Container {container_name} is not running, attempting restart") + stop_and_remove_container(container_name, logger) + + if not start_container(container_name, node_id, logger): + logger.error(f"Failed to restart container for node {node_id}") + container_failed = True + break + else: + logger.info(f"Container restarted successfully for node {node_id}") + + time.sleep(interval_minutes * 60) # Sleep 10 minutes + + # If container failed during the cycle, skip to next iteration + if container_failed: + logger.error(f"Container failed during cycle for node {node_id}, moving to next node") + continue + + # Stop and remove container + logger.info(f"5 hours completed for node {node_id}, stopping container") + stop_and_remove_container(container_name, logger) + + # Update hours (+4 after completion) + final_hours = new_hours + 4 + grist.update(node.id, {"Hours": final_hours}, "Nodes") + logger.info(f"Updated node {node_id} final hours: {new_hours} -> {final_hours}") + + logger.info(f"=== Cycle #{cycle_count} completed for node {node_id} ===") + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, stopping rotation") + stop_and_remove_container(container_name, logger) + break + except Exception as e: + logger.error(f"Cycle #{cycle_count} failed with error: {str(e)}") + stop_and_remove_container(container_name, logger) + logger.info("Waiting 60 seconds before next attempt") + time.sleep(60) + + logger.info("Nexus rotation cycle stopped") + + +if __name__ == "__main__": + main_rotation_cycle()