refactor playbook for improved clarity and structure
This commit is contained in:
parent
e7cca8d6c0
commit
3d07dc47ed
8
rotate/grist.json
Normal file
8
rotate/grist.json
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"grist_server": "https://grist.vvzvlad.xyz",
|
||||
"grist_doc_id": "inwaCJSGxZA1u64QJmQBEi",
|
||||
"grist_api_key": "6bbcce2a64e7c865fbb2e5ec4480f0e1328f317f"
|
||||
}
|
||||
|
||||
|
||||
|
11
rotate/requirements.txt
Normal file
11
rotate/requirements.txt
Normal file
@ -0,0 +1,11 @@
|
||||
ansible-output-parser==0.1.0
|
||||
certifi==2024.7.4
|
||||
charset-normalizer==3.3.2
|
||||
colorama==0.4.6
|
||||
future==1.0.0
|
||||
grist-api==0.1.0
|
||||
idna==3.8
|
||||
proxmoxer==2.1.0
|
||||
PyYAML==6.0.2
|
||||
requests==2.32.3
|
||||
urllib3==2.2.2
|
290
rotate/rotate.py
Normal file
290
rotate/rotate.py
Normal file
@ -0,0 +1,290 @@
|
||||
# flake8: noqa
|
||||
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
|
||||
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
|
||||
# type: ignore
|
||||
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import subprocess
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import sys
|
||||
import json
|
||||
|
||||
from grist_api import GristDocAPI
|
||||
import colorama
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
|
||||
class GRIST:
|
||||
def __init__(self, server, doc_id, api_key, logger):
|
||||
self.server = server
|
||||
self.doc_id = doc_id
|
||||
self.api_key = api_key
|
||||
self.logger = logger
|
||||
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
|
||||
|
||||
def table_name_convert(self, table_name):
|
||||
return table_name.replace(" ", "_")
|
||||
|
||||
def to_timestamp(self, dtime: datetime) -> int:
|
||||
if dtime.tzinfo is None:
|
||||
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
|
||||
return int(dtime.timestamp())
|
||||
|
||||
def insert_row(self, data, table):
|
||||
data = {key.replace(" ", "_"): value for key, value in data.items()}
|
||||
row_id = self.grist.add_records(self.table_name_convert(table), [data])
|
||||
return row_id
|
||||
|
||||
def update_column(self, row_id, column_name, value, table):
|
||||
if isinstance(value, datetime):
|
||||
value = self.to_timestamp(value)
|
||||
column_name = column_name.replace(" ", "_")
|
||||
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
|
||||
|
||||
def delete_row(self, row_id, table):
|
||||
self.grist.delete_records(self.table_name_convert(table), [row_id])
|
||||
|
||||
def update(self, row_id, updates, table):
|
||||
for column_name, value in updates.items():
|
||||
if isinstance(value, datetime):
|
||||
updates[column_name] = self.to_timestamp(value)
|
||||
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
|
||||
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
|
||||
|
||||
def fetch_table(self, table):
|
||||
return self.grist.fetch_table(self.table_name_convert(table))
|
||||
|
||||
def find_record(self, id=None, state=None, name=None, table=None):
|
||||
if table is None:
|
||||
raise ValueError("Table is not specified")
|
||||
table_data = self.grist.fetch_table(self.table_name_convert(table))
|
||||
if id is not None:
|
||||
record = [row for row in table_data if row.id == id]
|
||||
return record
|
||||
if state is not None and name is not None:
|
||||
record = [row for row in table_data if row.State == state and row.name == name]
|
||||
return record
|
||||
if state is not None:
|
||||
record = [row for row in table_data if row.State == state]
|
||||
return record
|
||||
if name is not None:
|
||||
record = [row for row in table_data if row.Name == name]
|
||||
return record
|
||||
|
||||
def find_settings(self, key, table):
|
||||
table = self.fetch_table(self.table_name_convert(table))
|
||||
for record in table:
|
||||
if record.Setting == key:
|
||||
if record.Value is None or record.Value == "":
|
||||
raise ValueError(f"Setting {key} blank")
|
||||
return record.Value
|
||||
raise ValueError(f"Setting {key} not found")
|
||||
|
||||
|
||||
def run_docker_command(command, logger):
|
||||
"""Execute docker command and return success status"""
|
||||
try:
|
||||
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30)
|
||||
if result.returncode == 0:
|
||||
logger.info(f"Docker command successful: {command}")
|
||||
if result.stdout.strip():
|
||||
logger.info(f"Docker output: {result.stdout.strip()}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Docker command failed: {command}, error: {result.stderr.strip()}")
|
||||
return False
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error(f"Docker command timed out: {command}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Docker command exception: {command}, error: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def stop_and_remove_container(container_name, logger):
|
||||
"""Stop and remove docker container"""
|
||||
logger.info(f"Stopping container: {container_name}")
|
||||
run_docker_command(f"docker stop {container_name}", logger)
|
||||
|
||||
logger.info(f"Removing container: {container_name}")
|
||||
run_docker_command(f"docker rm {container_name}", logger)
|
||||
|
||||
|
||||
def check_container_status(container_name, logger):
|
||||
"""Check if container is running"""
|
||||
try:
|
||||
result = subprocess.run(f"docker ps --filter name={container_name} --format '{{{{.Status}}}}'",
|
||||
shell=True, capture_output=True, text=True, timeout=10)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
status = result.stdout.strip()
|
||||
if "Up" in status:
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"Container {container_name} status: {status}")
|
||||
return False
|
||||
else:
|
||||
logger.warning(f"Container {container_name} not found in running containers")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check container {container_name} status: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def start_container(container_name, node_id, logger):
|
||||
"""Start nexus container with given node_id"""
|
||||
docker_command = f"docker run -td --init --name {container_name} nexusxyz/nexus-cli:latest start --node-id {node_id}"
|
||||
logger.info(f"Starting container with node-id: {node_id}")
|
||||
return run_docker_command(docker_command, logger)
|
||||
|
||||
|
||||
def get_next_node(grist, logger):
|
||||
"""Get node with lowest hours"""
|
||||
try:
|
||||
nodes = grist.fetch_table(table="Nodes")
|
||||
nodes = [row for row in nodes if row.NodeID != "1"]
|
||||
|
||||
if not nodes:
|
||||
logger.error("No available nodes found in table")
|
||||
return None
|
||||
|
||||
nodes.sort(key=lambda node: int(node.Hours))
|
||||
selected_node = nodes[0]
|
||||
|
||||
logger.info(f"Selected node: ID={selected_node.NodeID}, Hours={selected_node.Hours}")
|
||||
return selected_node
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get next node: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def main_rotation_cycle():
|
||||
"""Main rotation cycle for nexus nodes"""
|
||||
colorama.init(autoreset=True)
|
||||
logger = logging.getLogger("NexusRotator")
|
||||
logger.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
ch = logging.StreamHandler()
|
||||
ch.setFormatter(formatter)
|
||||
logger.addHandler(ch)
|
||||
|
||||
# Load grist configuration
|
||||
try:
|
||||
with open('grist.json', 'r', encoding='utf-8') as f:
|
||||
grist_data = json.loads(f.read())
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load grist.json: {str(e)}")
|
||||
return
|
||||
|
||||
# Initialize Grist connection
|
||||
try:
|
||||
grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
|
||||
logger.info("Connected to Grist successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Grist: {str(e)}")
|
||||
return
|
||||
|
||||
container_name = "nexus"
|
||||
cycle_count = 0
|
||||
|
||||
logger.info("Starting nexus rotation cycle")
|
||||
|
||||
while True:
|
||||
cycle_count += 1
|
||||
logger.info(f"=== Starting cycle #{cycle_count} ===")
|
||||
|
||||
try:
|
||||
# Get next node
|
||||
node = get_next_node(grist, logger)
|
||||
if not node:
|
||||
logger.error("No node available, waiting 60 seconds before retry")
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
node_id = node.NodeID
|
||||
current_hours = int(node.Hours)
|
||||
|
||||
# Update hours (+1 before starting container)
|
||||
new_hours = current_hours + 1
|
||||
grist.update(node.id, {"Hours": new_hours}, "Nodes")
|
||||
logger.info(f"Updated node {node_id} hours: {current_hours} -> {new_hours}")
|
||||
|
||||
# Remove any existing container with same name
|
||||
stop_and_remove_container(container_name, logger)
|
||||
|
||||
# Start new container
|
||||
if not start_container(container_name, node_id, logger):
|
||||
logger.error(f"Failed to start container for node {node_id}")
|
||||
# Return the hour back since container didn't start
|
||||
grist.update(node.id, {"Hours": current_hours}, "Nodes")
|
||||
logger.info(f"Reverted node {node_id} hours back to: {current_hours}")
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
logger.info(f"Container started successfully for node {node_id}")
|
||||
|
||||
# Wait 5 hours with progress updates and health checks every 10 minutes
|
||||
wait_hours = 5
|
||||
total_minutes = wait_hours * 60
|
||||
interval_minutes = 10
|
||||
|
||||
logger.info(f"Waiting {wait_hours} hours ({total_minutes} minutes) for node {node_id}")
|
||||
|
||||
container_failed = False
|
||||
for elapsed_minutes in range(0, total_minutes, interval_minutes):
|
||||
remaining_minutes = total_minutes - elapsed_minutes
|
||||
remaining_hours = remaining_minutes // 60
|
||||
remaining_mins = remaining_minutes % 60
|
||||
|
||||
if elapsed_minutes > 0: # Skip first iteration log
|
||||
logger.info(f"Node {node_id}: {remaining_hours}h {remaining_mins}m remaining")
|
||||
|
||||
# Check container status before sleeping
|
||||
if not check_container_status(container_name, logger):
|
||||
logger.error(f"Container {container_name} is not running, attempting restart")
|
||||
stop_and_remove_container(container_name, logger)
|
||||
|
||||
if not start_container(container_name, node_id, logger):
|
||||
logger.error(f"Failed to restart container for node {node_id}")
|
||||
container_failed = True
|
||||
break
|
||||
else:
|
||||
logger.info(f"Container restarted successfully for node {node_id}")
|
||||
|
||||
time.sleep(interval_minutes * 60) # Sleep 10 minutes
|
||||
|
||||
# If container failed during the cycle, skip to next iteration
|
||||
if container_failed:
|
||||
logger.error(f"Container failed during cycle for node {node_id}, moving to next node")
|
||||
continue
|
||||
|
||||
# Stop and remove container
|
||||
logger.info(f"5 hours completed for node {node_id}, stopping container")
|
||||
stop_and_remove_container(container_name, logger)
|
||||
|
||||
# Update hours (+4 after completion)
|
||||
final_hours = new_hours + 4
|
||||
grist.update(node.id, {"Hours": final_hours}, "Nodes")
|
||||
logger.info(f"Updated node {node_id} final hours: {new_hours} -> {final_hours}")
|
||||
|
||||
logger.info(f"=== Cycle #{cycle_count} completed for node {node_id} ===")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received keyboard interrupt, stopping rotation")
|
||||
stop_and_remove_container(container_name, logger)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Cycle #{cycle_count} failed with error: {str(e)}")
|
||||
stop_and_remove_container(container_name, logger)
|
||||
logger.info("Waiting 60 seconds before next attempt")
|
||||
time.sleep(60)
|
||||
|
||||
logger.info("Nexus rotation cycle stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_rotation_cycle()
|
Loading…
Reference in New Issue
Block a user