Compare commits

..

14 Commits

Author SHA1 Message Date
51d6fa568d refactor main_rotation_cycle to streamline node processing and container management 2025-08-14 01:17:20 +03:00
72a9804c69 add curl command to download and execute rotate.py in run.sh 2025-08-02 01:17:56 +03:00
ee59511e5d add docker image check and update functionality to rotate.py 2025-08-02 01:15:49 +03:00
3d07dc47ed refactor playbook for improved clarity and structure 2025-07-27 20:35:01 +03:00
e7cca8d6c0 v2 2025-07-27 02:17:32 +03:00
aa24574319 update playbook 2024-12-12 19:23:21 +03:00
20a8d423aa remove upgrade target 2024-12-12 15:41:25 +03:00
91914d9486 fix error 2024-12-12 03:35:36 +03:00
1f5ba5ff58 playbook update 2024-12-12 03:23:39 +03:00
3f4b1b51c0 update checker 2024-12-12 03:04:07 +03:00
f580f41130 change to 2h 2024-12-12 03:03:22 +03:00
5fa7675387 update compose 2024-12-12 03:01:47 +03:00
b5ac0b38f3 update playbook 2024-12-12 02:58:07 +03:00
3cc574a592 update 2024-12-12 02:32:36 +03:00
14 changed files with 1463 additions and 129 deletions

View File

@ -101,7 +101,7 @@ def check_logs(log_handler):
proof_speeds = deque(maxlen=100)
try:
logs = subprocess.run(['docker', 'compose', 'logs', '--since', '24h'], cwd='/root/node/', capture_output=True, text=True, check=True)
logs = subprocess.run(['docker', 'compose', 'logs', '--since', '2h'], cwd='/root/node/', capture_output=True, text=True, check=True)
log_content = logs.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error running docker compose logs: {e}") from e
@ -127,7 +127,7 @@ def check_logs(log_handler):
data = {
"errors": error_count,
"proved_steps": proved_count/10,
"proof_speed": avg_proof_speed
"proof_speed": int(avg_proof_speed)
}
log_handler.info(f"Result: {data}")
return data

View File

@ -0,0 +1,49 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn
NhAAAAAwEAAQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaaXoExE8G37OXSr+bh3tlr
Rmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWvL1bJ63EEeqjlr9GtM7Wb
JxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7YonziuWNNHh4POWoa8EL8O
0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueKJAMSicr/1VLPN8X7pUgB
77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yAt/yL72Q/zRCj3p1F4dBW
Ah92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesBuiYI2LmLgIJaCOSsLRdC
MOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv6SkwCVqzOCt43V8rHJ2l
XrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7+Y2CksJpIWVPJJ3O4kF4
uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYruC3UKfCyFkhxcXQCLpUr
D1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t5D5ndEjSP6HllYv7vMoh
sAAAdQSM/bbEjP22wAAAAHc3NoLXJzYQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaa
XoExE8G37OXSr+bh3tlrRmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWv
L1bJ63EEeqjlr9GtM7WbJxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7Yo
nziuWNNHh4POWoa8EL8O0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueK
JAMSicr/1VLPN8X7pUgB77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yA
t/yL72Q/zRCj3p1F4dBWAh92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesB
uiYI2LmLgIJaCOSsLRdCMOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv
6SkwCVqzOCt43V8rHJ2lXrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7
+Y2CksJpIWVPJJ3O4kF4uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYr
uC3UKfCyFkhxcXQCLpUrD1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t
5D5ndEjSP6HllYv7vMohsAAAADAQABAAACAA0AkfMV80yRwqai0wGqlqk+k7PGVHu+0hAi
rfzNfSDARUeMYLPvywepl0p4Mg0n/CuSm80NyHXyprQpL2Dz0WWnqzS+0ddbIn4FZjE6CS
UStrzjp3YBgvD0yb8Yw6phlYuT3hOTv19CnRIuHwUgUve9yCVVRAccJPgijmWUMOD/yy9F
0C2hg+9Z6zVFWW0CbaV78WvIEalAIseOx8fvo9Y/kOSIyWoiACJHMKpglpX98138WDuanF
wfmcNrfC78bvNF/Jk8GOjI4EcsWbhUd3ajiHnfG74M6KrQHoy35ywgFYZAHAAl13Q0RCdG
MhDclR0Osd6kXQCdSItL+ZChOGaPAzmEst370KPmqlU1ZN5kp2ZNrHF1ucTyssAOCmR4su
hAL9BZJgyhBW01Xqm1owVaMqjQ6wzAhNqnfxx7GZhUluejiE+8EFx4dXDHWUjXBK+h+hV+
+WZUSYIi9X6Sg7/kj7F0QPihr8T9ll+p4W2DOlDOM5hyIA3XAYJmy60Ui6AiL5wUzyQEGY
2RqntCe+lKEQhf9cqWZ2SVB8q8AFS4sXVVfmj4u0FOkgBaZbNcTknBLRekNDaM93mvPOcT
523u8iybgSlyZpYBdRCnyHIw86lfvWRW2YUiG7bxDSpFsjtr16QKLprMUgiCajfRveDh/9
qFvpcYYSShiaGlE6uZAAABAFNMVfHaaqKBL76JIb4qjX3Dsx8Ig2PkXgzqmz0l8T82+Mdw
V1uHNHjaRO+x9QXD2c29OOrqUC0Pmcb3wD7u/EzX3yIyqhnyB94sl7mWA8fjHdIfEjWB3R
+j/QHBNtM8xNt2jPxguxXWo+8489i7/YjblTiNdtiWsnKXetlaJh9qya2bhbbws28kBfQ1
m1nZj/TCu9JySpwOQPtR8iX/qAkogHwmGq5+HXGJ4te+lmiBF99REeB/C0NgvWB1dGVZCK
9sJL0ztcCBoMvdPG4oBXEtd9xiU0VT8L17XuykcQvUd6i5r6AgLr/dOZA3SF6XNDW7F7mQ
/7BvIa576LFS61EAAAEBANn6TI43Eg4dRzeI8TuHTfB5QQYn5emal+LPbptP0KrByyfOtZ
mL/qiSpzNLRanp/DGm7GhyQUF2h61XObZ9BCs4RNE4adqVcae3xbxZ7yRneBl4A76tu4NA
BXKllz8141R3HtYTJ/VGAIdwGmkHdFYOCxBhxnLELFHC+4Qp/3Iq2Uu3c4fAWknxDPT+p/
Y7jRia2TLlUvQrF38VXIUIgh7OnPMXzhErXDvRG8+8DxqI4zg1rKEP86z+rx0/hYsSst+M
LOByEmHDAc0m4h6bAwJwL5SBxrWKU0/vTYOusCyfgzznSDfIi5YHgsXM9K9pyonRfjUST+
ehPsilDpPl778AAAEBANdzMLYhkJm1/bIfpsrogmSGQt5ymWvEcuWCxYnMDDALIjS9Ih2G
sOmmiCAl1u/tSIbyyOP7YFf9JGYaOumKzkRwKcFr1dVpOgrez+/dbrJE93JOrFDVAxILSW
cc7kYwSFCIlC8LshWNyxuhGnAgkyXROlAdCABUxY4sJ3kj7oSUFA8ZwSh4THx1RfvLV6VT
GtoNO8gh6eRl/M8hM2oOds2wWp45j9jRNTrTdZX6aprn0g0dPOjPF7ufMEc0tYoz1WZYzk
DTLGQysm2Sas5llq8hXOmp70BYm+dFud+dOPjU/IbmRI5DLmU89WvpvjWCSfP1YdUfEZZ/
ujdZaCWt5KUAAAAWdnZ6dmxhZEByaXR1YWwtZGVwbG9lcgECAwQF
-----END OPENSSH PRIVATE KEY-----

879
deployer/nexus_deployer.py Normal file
View File

@ -0,0 +1,879 @@
# flake8: noqa
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
# type: ignore
import subprocess
import time
import json
import logging
import sys
import random
import os
import re
import traceback
from datetime import datetime, timedelta, timezone
from collections import deque
import warnings
import concurrent.futures
import threading
import uuid
from ansible_parser.logs import Play as AnsibleLogParser
from grist_api import GristDocAPI
from proxmoxer import ProxmoxAPI, ResourceException
import yaml
import colorama
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
last_deploy_times = deque(maxlen=5)
grist_lock = threading.Lock()
class ColoredFormatter(logging.Formatter):
def __init__(self, fmt):
colorama.init(autoreset=True)
super().__init__(fmt)
def format(self, record):
if record.levelno == logging.ERROR:
record.msg = f"{colorama.Fore.RED}{record.msg}{colorama.Style.RESET_ALL}"
elif record.levelno == logging.WARNING:
record.msg = f"{colorama.Fore.YELLOW}{record.msg}{colorama.Style.RESET_ALL}"
elif record.levelno == logging.INFO:
record.msg = f"{colorama.Fore.GREEN}{record.msg}{colorama.Style.RESET_ALL}"
return super().format(record)
class LogsToGrist(logging.Handler):
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def emit(self, record):
log_entry = self.format(record)
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
log_entry = ansi_escape.sub('', log_entry)
log_entry = log_entry.replace("Ansible task: ", "")
self.callback({"Status": log_entry})
class LoggerWrapper:
def __init__(self, name, log_file, grist_callback=None):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False
# File handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
colored_formatter = ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(colored_formatter)
self.logger.addHandler(console_handler)
# grist handler
if grist_callback is not None:
grist_handler = LogsToGrist(callback=grist_callback)
grist_handler.setLevel(logging.INFO)
grist_handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(grist_handler)
def __del__(self):
handlers = self.logger.handlers[:]
for handler in handlers:
handler.close()
self.logger.removeHandler(handler)
def get_logger(self):
return self.logger
def format_time(current_time):
if current_time < 5:
return f"{current_time:.1f} sec"
elif current_time < 60:
return f"{int(current_time)} sec"
elif current_time < 3600:
minutes = current_time // 60
seconds = current_time % 60
return f"{int(minutes)} min {int(seconds)} sec"
elif current_time < 86400:
hours = current_time // 3600
minutes = (current_time % 3600) // 60
seconds = current_time % 60
return f"{int(hours)} h {int(minutes)} min"
else:
days = current_time // 86400
hours = (current_time % 86400) // 3600
minutes = (current_time % 3600) // 60
seconds = current_time % 60
return f"{int(days)} d {int(hours)} h"
def random_sleep(logger, min_delay=None, max_delay=None, delay=None):
if delay is None:
delay = random.uniform(min_delay, max_delay)
formatted_delay = format_time(delay)
end_time = datetime.now() + timedelta(seconds=delay)
end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
logger.debug(f"Waiting for {formatted_delay} (expected end time: {end_time_str})")
time.sleep(delay)
class TelegramBot:
def __init__(self, token, topic, chat_id, logger):
self.token = token
self.topic = topic
self.logger = logger
self.chat_id = chat_id
self.base_url = f"https://api.telegram.org/bot{self.token}/"
def message(self, message):
url = self.base_url + "sendMessage"
payload = {
'chat_id': self.chat_id,
'text': f"[{self.topic}] {message}"
}
response = requests.post(url, json=payload, timeout=10)
response_data = response.json()
if not response_data.get('ok', True):
self.logger.error(f"Error sending message: {response_data}")
return response_data
class GRIST:
def __init__(self, server, doc_id, api_key, logger):
self.server = server
self.doc_id = doc_id
self.api_key = api_key
self.logger = logger
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
def table_name_convert(self, table_name):
return table_name.replace(" ", "_")
def to_timestamp(self, dtime: datetime) -> int:
if dtime.tzinfo is None:
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
return int(dtime.timestamp())
def insert_row(self, data, table):
data = {key.replace(" ", "_"): value for key, value in data.items()}
row_id = self.grist.add_records(self.table_name_convert(table), [data])
return row_id
def update_column(self, row_id, column_name, value, table):
if isinstance(value, datetime):
value = self.to_timestamp(value)
column_name = column_name.replace(" ", "_")
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
def delete_row(self, row_id, table):
self.grist.delete_records(self.table_name_convert(table), [row_id])
def update(self, row_id, updates, table):
for column_name, value in updates.items():
if isinstance(value, datetime):
updates[column_name] = self.to_timestamp(value)
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
def fetch_table(self, table):
return self.grist.fetch_table(self.table_name_convert(table))
def find_record(self, id=None, state=None, name=None, table=None):
if table is None:
raise ValueError("Table is not specified")
table_data = self.grist.fetch_table(self.table_name_convert(table))
if id is not None:
record = [row for row in table_data if row.id == id]
return record
if state is not None and name is not None:
record = [row for row in table_data if row.State == state and row.name == name]
return record
if state is not None:
record = [row for row in table_data if row.State == state]
return record
if name is not None:
record = [row for row in table_data if row.Name == name]
return record
#def find_data_by_key(self, key, table):
# data = getattr(self.fetch_table(self.table_name_convert(table))[0], key)
# return data
def find_settings(self, key, table):
table = self.fetch_table(self.table_name_convert(table))
for record in table:
if record.Setting == key:
if record.Value is None or record.Value == "":
raise ValueError(f"Setting {key} blank")
return record.Value
raise ValueError(f"Setting {key} not found")
class Prox:
def __init__(self, host, user, password, logger, timeout=10):
self.session = requests.Session()
self.logger = logger
self.session.request = lambda *args, **kwargs: requests.request(*args, timeout=timeout, **kwargs)
self.proxmox = ProxmoxAPI(host=host, user=user, password=password, verify_ssl=False, service='PVE', timeout=timeout)
def get_vms_data(self, vm_name=None, node_name=None, vm_id=None):
if node_name is not None: nodes = [{"node": node_name}]
else: nodes = self.proxmox.nodes.get()
vms_with_name = []
for node in nodes:
if vm_id is not None:
try: vms = [self.proxmox.nodes(node['node']).qemu(vm_id).status.current.get()]
except Exception: continue
else: vms = self.proxmox.nodes(node['node']).qemu.get()
for vm in vms:
if vm_name is None or vm.get('name') == vm_name or vm.get('vmid') == str(vm_id):
cfg = self.proxmox.nodes(node['node']).qemu(vm.get('vmid')).config.get()
vm['ipconfig'] = cfg.get('ipconfig0', None)
vm['cpu'] = cfg.get('cpu', None)
vm['node'] = node['node']
vm['memory'] = cfg.get('memory', None)
vm['net'] = cfg.get('net0', None)
vm['template'] = int(vm.get('template', 0))
vm['disk'] = cfg.get(cfg.get('bootdisk', ''), None)
if vm['disk']:
vm['disk_storage'] = vm['disk'].split(':')[0]
else:
vm['disk_storage'] = None
vms_with_name.append(vm)
return vms_with_name
def search_vm(self, vm_name=None, node=None, vm_id=None):
vms = self.get_vms_data(vm_name=vm_name, node_name=node, vm_id=vm_id)
if len(vms) > 0:
return vms[0]
return None
def cmd_vm_by_name(self, vm_name, cmd):
nds = self.proxmox.nodes
nodes = nds.get()
for node in nodes:
for vm in nds(node['node']).qemu.get():
if vm['name'] == vm_name:
if cmd == "reboot":
upid = nds(node['node']).qemu(vm['vmid']).status.reboot.post()
elif cmd == "shutdown":
upid = nds(node['node']).qemu(vm['vmid']).status.shutdown.post()
elif cmd == "start":
upid = nds(node['node']).qemu(vm['vmid']).status.start.post()
elif cmd == "stop":
upid = nds(node['node']).qemu(vm['vmid']).status.stop.post()
elif cmd == "reset":
upid = nds(node['node']).qemu(vm['vmid']).status.reset.post()
elif cmd == "destroy":
upid = nds(node['node']).qemu(vm['vmid']).status.stop.post()
while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1)
upid = nds(node['node']).qemu(vm['vmid']).delete()
while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1)
def id_by_name(self, vm_name):
vms = self.get_vms_data(vm_name=vm_name)
if len(vms) > 0:
return vms[0]['vmid']
return None
def name_by_id(self, vm_id):
vms = self.get_vms_data(vm_id=vm_id)
if len(vms) > 0:
return vms[0]['name']
return None
def wait_for_ip(self, node, vmid, max_iterations=10, period=10):
self.logger.info(f"Wait boot VM {vmid} on node {node}")
vm_ip_v4 = ""
iterations = 0
while not vm_ip_v4 and iterations < max_iterations:
time.sleep(period)
self.logger.info(f"Get IP attempt {iterations} of {max_iterations}")
try:
vm_status = self.proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get()
if 'result' in vm_status:
for interface in vm_status['result']:
if interface['name'].startswith(('lo', 'br-', 'veth', 'docker')): continue
for ip in interface.get('ip-addresses', []):
if ip['ip-address-type'] == 'ipv4' and not vm_ip_v4:
vm_ip_v4 = ip['ip-address']
if vm_ip_v4:
self.logger.info(f"VM {vmid} on node {node} received IP: {vm_ip_v4}")
break
except ResourceException as e:
if "QEMU guest agent is not running" in str(e):
pass
#print(f".", end="", flush=True)
except Exception as e:
self.logger.info(f"Error retrieving IP for VM {vmid} on node {node}: {e}")
iterations += 1
if not vm_ip_v4:
#print("")
return False
return vm_ip_v4
def check_vm(self, vm_id):
try:
vm = self.search_vm(vm_id=vm_id)
self.logger.info(f"VM {vm_id} status: {vm}")
return True
except ResourceException as e:
self.logger.info(f"Failed to check VM {vm_id} status: {e}")
return False
def clone_vm(self, template_id, new_vm_name):
try:
new_vm_id = self.proxmox.cluster.nextid.get()
template = self.search_vm(vm_id=template_id)
if template is None:
raise ValueError(f"Template VM with ID {template_id} not found")
node = template['node']
self.logger.info(f"Clone VM {template_id}->{new_vm_id}: {new_vm_name}...")
upid = self.proxmox.nodes(node).qemu(template_id).clone.post(newid=new_vm_id, name=new_vm_name)
while self.proxmox.nodes(node).tasks(upid).status.get()['status'] != 'stopped':
print(f"{new_vm_name} clone state: {self.proxmox.nodes(node).tasks(upid).status.get()['status']}")
time.sleep(10)
except ResourceException as e:
self.logger.error(f"Failed to clone VM {template_id}: {e}")
raise ValueError(f"Failed to clone VM {template_id}/{new_vm_name}") from e
return new_vm_id, node
def get_proxmox_nodes(self):
nodes = []
raw_nodes = self.proxmox.nodes.get()
for node in raw_nodes:
nodes.append({
"cpu": node.get('cpu'),
"name": node.get('node'),
"status": node.get('status'),
"uptime": node.get('uptime')
})
return nodes
def create_vm(self, vm_name, template_id=None):
existing_vm_id = self.id_by_name(vm_name)
old_vm_data = self.search_vm(vm_id=existing_vm_id)
if existing_vm_id is not None:
self.logger.info(f"Virtual machine {vm_name} already exists")
#self.logger.info(f"Find probabilistic template...")
#all_vms = self.get_vms_data(node_name=old_vm_data['node'])
#templates_vms = [vm for vm in all_vms if vm['template'] == 1]
#probabilistic_template = [vm for vm in templates_vms if vm['disk_storage'] == old_vm_data['disk_storage']][0]
#self.logger.info(f"Find {old_vm_data['name']}({old_vm_data['vmid']}) probabilistic template: {probabilistic_template['name']}({probabilistic_template['vmid']})/{probabilistic_template['node']}")
#template_id = probabilistic_template['vmid']
self.logger.info(f"Deleting VM ID {existing_vm_id}/{old_vm_data['node']}")
self.cmd_vm_by_name(vm_name, "destroy")
else:
self.logger.info(f"Virtual machine with name {vm_name} not found, creating new VM...")
if template_id is None:
self.logger.error(f"Template ID not provided")
raise ValueError("Template ID not provided")
new_vm_id, node = self.clone_vm(template_id, vm_name)
self.proxmox.nodes(node).qemu(new_vm_id).config.post(tags="")
self.logger.info(f"Starting VM {new_vm_id} on node {node}")
self.cmd_vm_by_name(vm_name, "start")
vm_ip = self.wait_for_ip(node, new_vm_id)
if vm_ip == False:
self.logger.info(f"Failed to get IP address for VM {new_vm_id}.")
raise ValueError(f"Failed to get IP address for VM {new_vm_id}.")
if vm_ip != False and vm_ip is not None:
self.logger.info(f"Virtual machine {vm_name} with ID {new_vm_id} has IP address: {vm_ip}")
return vm_ip, node, new_vm_id
raise ValueError(f"Unknown error")
def check_duplicate_vms(self, vms):
vm_node_mapping = {}
for vm_info in vms:
vm = vm_info['name']
node = vm_info['node']
if vm in vm_node_mapping:
vm_node_mapping[vm].append(node)
else:
vm_node_mapping[vm] = [node]
duplicates = {name: nodes for name, nodes in vm_node_mapping.items() if len(nodes) > 1}
return duplicates
def modify_tag(self, vm_id, tag, action="add"):
tag = tag.lower()
vm = self.search_vm(vm_id=vm_id)
if vm is None:
self.logger.error(f"VM {vm_id} not found")
return False
if vm is not None:
current_config = self.proxmox.nodes(vm['node']).qemu(vm_id).config.get()
tags_list = current_config.get('tags', '').split(';')
if action == "add":
if tag not in tags_list:
tags_list.append(tag)
updated_tags = ';'.join(filter(None, tags_list))
self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags)
self.logger.info(f"Added tag '{tag}' to VM {vm_id}")
else:
self.logger.info(f"Tag '{tag}' already exists in VM {vm_id}")
elif action == "remove":
if tag in tags_list:
tags_list.remove(tag)
updated_tags = ';'.join(filter(None, tags_list))
self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags)
self.logger.info(f"Removed tag '{tag}' from VM {vm_id}")
else:
self.logger.info(f"Tag '{tag}' not found in VM {vm_id}")
else:
self.logger.error(f"Invalid action: {action}")
return False
return True
def parse_ansible_log(ansible_log):
task_blocks = re.split(r"(?=\nTASK \[)", ansible_log)
last_task_block = task_blocks[-1]
if "failed:" in last_task_block or "fatal:" in last_task_block:
try:
task_name = re.search(r"TASK \[(.*?)\]", last_task_block).group(1)
failed_json_str = re.search(r'=> ({.*?})', last_task_block, re.DOTALL).group(1)
failed_json = json.loads(failed_json_str)
stdout_lines = failed_json.get("stdout_lines", [])
stderr_lines = failed_json.get("stderr_lines", [])
msg = failed_json.get("msg", "")
print(f"Stdout lines: {stderr_lines}", flush=True)
for line in stdout_lines[-10:] + stderr_lines[-10:] + [msg]:
if "429 Too Many Requests" in line and "https://www.docker.com/increase-rate-limit" in line:
print("429 Too Many Requests detected in stdout_lines.", flush=True)
return task_name, "429TOOMANYREQUESTS"
if "HTTP Error 504: Gateway Time-out" in line:
print("HTTP Error 504: Gateway Time-out detected in stdout_lines.", flush=True)
return task_name, "504GATEWAYTIMEOUT"
for line in stdout_lines[-5:]:
print(f"Stdout: {line}", flush=True)
return task_name, None
for line in stderr_lines[-5:]:
print(f"Stderr: {line}", flush=True)
return task_name, None
print("No error message found in stdout or stderr.", flush=True)
return task_name, None
except (json.JSONDecodeError, AttributeError):
print("Error extracting or decoding JSON.", flush=True)
return None, None
else:
print("The last TASK did not end with ASYNC FAILED and fatal.", flush=True)
return None, None
def deploy_node(name, ansible_host, node_id, proxy, proxy_jump, private_key_file,
docker_username, docker_password, git_version, git_base_url, logger_instance, grist_server, grist_doc_id, grist_api_key):
if not os.path.exists(private_key_file):
raise Exception(f"SSH private key file not found: {private_key_file}")
ansible_dir = f"ansible"
if not os.path.exists(ansible_dir):
os.makedirs(ansible_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
inventory_filename = f"{ansible_dir}/inventory_{name}_{timestamp}.yml"
log_output_filename = f"{ansible_dir}/log_{name}_{timestamp}.log"
playbook_filename = f"{ansible_dir}/playbook_{name}_{timestamp}.yml"
playbook_url = f"{git_base_url}/raw/branch/{git_version}/playbook.yml"
response = requests.get(playbook_url, timeout=10)
if response.status_code == 200:
with open(playbook_filename, 'w', encoding='utf-8') as f:
f.write(response.text)
else:
logger_instance.error(f"Failed to download playbook at {playbook_url}. Status code: {response.status_code}")
raise Exception(f"Failed to download playbook. Status code: {response.status_code}")
inventory = {'all': {'vars': {}, 'hosts': {}}}
inventory['all']['vars']['ansible_user'] = 'root'
inventory['all']['vars']['ansible_ssh_private_key_file'] = private_key_file
if proxy_jump: inventory['all']['vars']['ansible_ssh_common_args'] = f'-J {proxy_jump}'
inventory['all']['hosts'][name] = {}
inventory['all']['hosts'][name]['ansible_host'] = ansible_host
inventory['all']['hosts'][name]['serverid'] = name
inventory['all']['hosts'][name]['id'] = node_id
inventory['all']['hosts'][name]['proxy'] = proxy
inventory['all']['hosts'][name]['grist_server'] = grist_server
inventory['all']['hosts'][name]['grist_doc_id'] = grist_doc_id
inventory['all']['hosts'][name]['grist_api_key'] = grist_api_key
inventory['all']['hosts'][name]['git_version'] = git_version
inventory['all']['hosts'][name]['docker_username'] = docker_username
inventory['all']['hosts'][name]['docker_password'] = docker_password
with open(inventory_filename, 'w', encoding='utf-8') as file:
yaml.dump(inventory, file, default_flow_style=False)
logger_instance.info(f"Running nillion deploy playbook...")
log_data = ""
with open(log_output_filename, 'w', encoding='utf-8') as deploy_log:
process = subprocess.Popen( ['ansible-playbook', '-i', inventory_filename, playbook_filename, '-v'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 )
for line in iter(process.stdout.readline, ''):
if line.startswith("TASK ["): logger_instance.info(f"Ansible task: {line.replace('*', '').replace('TASK ', '').strip()}")
deploy_log.write(line)
log_data += line
process.stdout.close()
return_code = process.wait()
deploy_log.flush()
if return_code != 0:
ansible_failed_task = ""
ansible = AnsibleLogParser(play_output=log_data)
plays = ansible.plays()
for _, tasks in plays.items():
for task_name, task_object in tasks.items():
for result in task_object.results:
if result.get('status') == 'failed' or result.get('status') == 'fatal':
#print(f"Failed task: {result}")
ansible_failed_task = task_name
logger_instance.error(f"Deploy failed. Failed stage: {ansible_failed_task} Check log file: {log_output_filename}, inventory file: {inventory_filename}")
raise Exception(f"Ansible error on {ansible_failed_task}")
return log_data
def nodes_table_preprocessing(grist_instance, logger):
current_time = grist_instance.to_timestamp(datetime.now())
max_wip = 60*60*2
logger.info(f"Run grist processing NoneState -> Dirty and NoneVersion -> av1")
for row in grist_instance.fetch_table("Nodes"):
if row.State == "": grist_instance.update_column(row.id, "State", "Dirty", "Nodes")
if row.Version == "": grist_instance.update_column(row.id, "Version", "nv1", "Nodes" )
logger.info(f"Run grist processing WiP and >max_wip old -> Dirty")
for row in grist_instance.fetch_table("Nodes"):
if row.Deploy_date is not None:
vm_old_age = current_time - row.Deploy_date
if row.State == "WiP" and vm_old_age > max_wip and row.State != "Dirty":
grist_instance.update_column(row.id, "State", "Dirty", "Nodes")
grist_instance.update_column(row.id, "Status", "Set Dirty by WiP Timeout", "Nodes")
logger.info(f"Run grist processing NoneRetries -> 0/4")
for row in grist_instance.fetch_table("Nodes"):
if row.Retries is None or row.Retries == "":
grist_instance.update_column(row.id, "Retries", "0/4", "Nodes")
def generate_proxy(login, password, proxy_ip, proxy_port, logger):
countries = [ "AT", "AM", "BE", "HU", "DE", "IL", "IE", "ES", "IT",
"LU", "NL", "NO", "PL", "PT", "RS", "FI", "FR", "SE", "CH", "CZ", "EE" ]
country = random.choice(countries)
id_hex = ''.join(random.choices('0123456789abcdef', k=10))
login_id = f"{login}_s_{id_hex}_c_{country}"
proxy_socks5 = f"socks5://{login_id}:{password}@{proxy_ip}:{proxy_port}"
proxy_http = f"http://{login_id}:{password}@{proxy_ip}:{proxy_port}"
ipinfo_url = "http://ipinfo.io"
headers = {"User-Agent": "curl/7.68.0"}
try:
response = requests.get(ipinfo_url, headers=headers, proxies={"http": proxy_http, "https": proxy_http}, timeout=10)
except requests.exceptions.RequestException as e:
logger.error(f"[proxy test] Error while requesting ipinfo.io: {e}")
return None
if response.status_code == 200:
logger.info(f"[proxy test] Successfully retrieved IP information: {response.json().get('ip')}")
return proxy_socks5
else:
logger.error(f"[proxy test] Error while requesting ipinfo.io: {response.status_code}")
return None
def repeat_deploy(grist_instance, logger, grist_env):
result = None
generated_address = False
wait_period = int(grist_instance.find_settings("Wait period in minutes", "Settings"))*60
name_node = grist_instance.find_settings("Node name", "Settings")
sticker_node = grist_instance.find_settings("Node sticker", "Settings")
multiplier = 1.5
diff = round(wait_period/multiplier)
min_wait = round(wait_period - diff)
max_wait = round(wait_period + diff)
with grist_lock:
nodes_table_preprocessing(grist_instance, logger)
dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes")
if len(dirty_vms) == 0:
print(f"No dirty nodes found")
result = False
return result
dirty_vms.sort(key=lambda vm: (int(vm.Retries.split('/')[0])))
#dirty_vms = sorted(dirty_vms, key=lambda row: 0 if row.Impact == "" else 1)
dirty_vm = dirty_vms[0]
def grist_callback(msg): grist_instance.update(dirty_vm.id, msg, "Nodes")
grist_instance.update(dirty_vm.id, { "State": "WiP", "Status": "Locked", "Deploy date": datetime.now() }, "Nodes")
if dirty_vm.Retries.count('/') == 1:
try:
attempt = int(dirty_vm.Retries.split('/')[0])
max_retries = int(dirty_vm.Retries.split('/')[1])
if attempt >= max_retries:
grist_instance.update(dirty_vm.id, { "State": "Error", "Status": f"{dirty_vm.Status} (Max retries)" }, "Nodes")
result = False
return result
attempt += 1
grist_instance.update(dirty_vm.id, { "Retries": f"{attempt}/{max_retries}" }, "Nodes")
except ValueError:
grist_instance.update(dirty_vm.id, { "State": "Error", "Status": "Error retries" }, "Nodes")
result = False
return result
if dirty_vm.Proxy is None or dirty_vm.Proxy == "":
grist_callback( { "Status": "Generate proxy" })
login = grist_instance.find_settings("Proxy generator login", "Settings")
password = grist_instance.find_settings("Proxy generator password", "Settings")
proxy_ip = grist_instance.find_settings("Proxy generator ip", "Settings")
proxy_port = grist_instance.find_settings("Proxy generator port", "Settings")
for _ in range(5):
proxy_socks5 = generate_proxy(login, password, proxy_ip, proxy_port, logger)
if proxy_socks5 is not None:
break
logger.error("[proxy] Failed to generate proxy, retrying...")
time.sleep(1)
grist_instance.update(dirty_vm.id, { "Proxy": proxy_socks5 }, "Nodes")
dirty_vm = grist_instance.find_record(id=dirty_vm.id, table="Nodes")[0]
vm_name = f"{name_node}-{dirty_vm.NodeID[8:16]}"
logger_instance = LoggerWrapper(name=f"{vm_name}", log_file=f"logs/{name_node}_creation_{vm_name}.log", grist_callback=grist_callback)
logger = logger_instance.get_logger()
grist_callback( { "Name": vm_name, "State": "WiP", "Status": "Run proxmox worker.." })
required_fields = ["NodeID", "Proxy", "Server", "Storage", "Retries"]
for field in required_fields:
if not getattr(dirty_vm, field.replace(" ", "_")):
grist_callback( { "State": "Error", "Status": f"No {field.lower()}" })
result = False
return result
prox_host = grist_instance.find_settings("Proxmox host", "Settings")
prox_user = grist_instance.find_settings("Proxmox user", "Settings")
prox_password = grist_instance.find_settings("Proxmox password", "Settings")
prox = Prox(prox_host, prox_user, prox_password, logger)
tg_token = grist_instance.find_settings("Telegram token", "Settings")
tg_chat_id = grist_instance.find_settings("Telegram chat id", "Settings")
tg_log = TelegramBot(tg_token, f"{sticker_node} {name_node}", tg_chat_id, logger)
current_version = grist_instance.find_settings("Current version", "Settings")
logger.info(f"Create-run-deploy node {vm_name} on attempt {attempt}")
deploy_start_time = datetime.now()
try:
grist_callback( { "IP": "", "Deploy date": datetime.now()})
all_vms = prox.get_vms_data(node_name=dirty_vm.Server)
vm_prox_node = dirty_vm.Server
templates_vms = [vm for vm in all_vms if vm['template'] == 1]
target_storage_templates = [vm for vm in templates_vms if vm['disk_storage'] == dirty_vm.Storage]
latest_templates = [vm for vm in target_storage_templates if 'tags' in vm and 'latest' in vm['tags'].split(';')]
if len(latest_templates) == 0:
raise ResourceException(status_code=404, status_message="No latest template", content="No latest template")
template_id = latest_templates[0]['vmid']
try:
vm_ip, vm_prox_node, vm_id = prox.create_vm(vm_name, template_id=template_id)
except Exception as e:
raise ResourceException(status_code=500, status_message="Failed to create VM", content=str(e)) from e
prox.modify_tag(vm_id=vm_id, tag="WiP", action="add")
grist_callback( { "IP": vm_ip })
if generated_address:
delay = random.uniform(min_wait/2, max_wait/2)
grist_callback( { "Status": f"Wait {format_time(delay)} before ansible.." })
random_sleep(logger, delay=delay)
grist_callback( { "Status": "Run ansible worker.." })
deploy_node(name=vm_name, ansible_host=vm_ip, node_id=dirty_vm.NodeID, proxy=dirty_vm.Proxy,
git_version=grist_instance.find_settings("Git version", "Settings"),
git_base_url = grist_instance.find_settings("Git base URL", "Settings"),
proxy_jump=grist_instance.find_settings("Ansible SSH Jump host", "Settings"),
private_key_file=grist_instance.find_settings("Ansible SSH keyfile", "Settings"),
docker_username=grist_instance.find_settings("Docker username", "Settings"),
docker_password=grist_instance.find_settings("Docker password", "Settings"),
grist_server=grist_env["GRIST_SERVER"],
grist_doc_id=grist_env["GRIST_DOC_ID"],
grist_api_key=grist_env["GRIST_API_KEY"],
logger_instance=logger)
prox.modify_tag(vm_id=vm_id, tag="WiP", action="remove")
prox.modify_tag(vm_id=vm_id, tag=current_version, action="add")
clear_vms = grist_instance.find_record(state="Clean", table="Nodes")
dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes")
wip_vms = grist_instance.find_record(state="WiP", table="Nodes")
clear_keys = len(clear_vms)
dirty_keys = len(dirty_vms) + len(wip_vms)
deploy_elapsed_time = (datetime.now() - deploy_start_time).total_seconds()
if vm_prox_node is None: vm_prox_node = "None"
tg_log.message(f"✅ Deployed vm {vm_name} in '{vm_prox_node}' (took {format_time(deploy_elapsed_time)}, vm {clear_keys} out of {dirty_keys+clear_keys} in batch, left {dirty_keys} nodes")
logger.info(f"Deployed node {vm_name}/{vm_ip} in {vm_prox_node} ({format_time(deploy_elapsed_time)} elapsed install)")
delay = random.uniform(min_wait/2, max_wait/2)
format_delay = format_time(delay)
grist_callback( { "Status": f"Deployed, wait {format_delay}.." })
grist_callback( {"State": "Clean", "Version": current_version, "Deploy date": datetime.now()} )
grist_callback( {"Deploy time": f"{format_time(deploy_elapsed_time)}"} )
random_sleep(logger, delay=delay)
grist_callback( { "Status": f"Deployed ok on attempt {attempt}/{max_retries}", "Retries": f"{0}/{4}" })
result = True
return result
except ResourceException as e:
attempt += 0.5
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: Proxmox Resource Exception: {e}")
error_message = str(e).replace('\n', '')
grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"})
tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}")
time.sleep(30)
result = False
return result
except Exception as e:
attempt += 1
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: {e}")
tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}")
error_message = str(e).replace('\n', '')
grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"})
time.sleep(30)
result = False
return result
finally:
if result == None: grist_callback( {"State": "Dirty", "Status": f"Ended by interrupt"} )
def main():
log_dir = "logs"
log_file = "nexus_deployer.log"
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_dir): os.makedirs(log_dir)
if not os.path.exists(log_path): open(log_path, 'a', encoding='utf-8').close()
logger_instance = LoggerWrapper(name="nexus", log_file=log_path)
logger = logger_instance.get_logger()
grist_env = {}
grist_env["GRIST_SERVER"] = os.getenv("GRIST_SERVER")
grist_env["GRIST_DOC_ID"] = os.getenv("GRIST_DOC_ID")
grist_env["GRIST_API_KEY"] = os.getenv("GRIST_API_KEY")
grist_instance = GRIST(grist_env["GRIST_SERVER"], grist_env["GRIST_DOC_ID"], grist_env["GRIST_API_KEY"], logger)
logger.info(f"Starting nexus deployer...")
deployer_key = grist_instance.find_settings("Ansible SSH keyfile", "Settings")
if not os.path.exists(deployer_key):
logger.error(f"SSH private key file not found: {deployer_key}")
raise FileNotFoundError(f"Required SSH private key file {deployer_key} is missing")
logger.info(f"Run on-start grist processing WiP -> Dirty")
#repeats = grist_instance.find_settings("Repeats", "Settings")
#for row in grist_instance.fetch_table("Nodes"):
# if row.State == "WiP":
# grist_instance.update(row.id, {"State": "Dirty", "Settings"})
# grist_instance.update(row.id, {"Status": "Set Dirty by restart deployer", "Settings"})
#logger.info(f"Run on-start grist processing Dirty -> 0/{repeats}")
#for row in grist_instance.fetch_table("Nodes"):
# if row.State == "Dirty" and row.Retries is not None and row.Retries != f"0/{repeats}":
# grist_instance.update(row.id, {"Retries": f"0/{repeats}"}, "Nodes")
#while True:
#repeat_deploy(grist_instance, logger, grist_env)
#logger.info("Restart deployer")
#os.execv(sys.executable, [sys.executable] + sys.argv)
#time.sleep(10)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = []
start_times = {}
#while True:
max_threads = int(grist_instance.find_settings("Workers", "Settings"))
if max_threads > 20: max_threads = 20
while len(futures) < max_threads:
future = executor.submit(repeat_deploy, grist_instance, logger, grist_env)
futures.append(future)
start_times[future] = time.time()
logger.info(f"Spawned new task. Total active tasks: {len(futures)}, max threads: {max_threads}")
time.sleep(120)
max_threads = int(grist_instance.find_settings("Workers", "Settings"))
current_time = time.time()
for future in futures.copy():
if future.done():
try:
result = future.result()
logger.info(f"Task completed: {result}")
except Exception as e:
logger.error(f"Task failed with error: {e}")
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
futures.remove(future)
start_times.pop(future, None)
elif current_time - start_times[future] > 60*60*3:
future.cancel()
logger.info("Cancelled a task due to timeout.")
futures.remove(future)
start_times.pop(future, None)
#time.sleep(10)
except KeyboardInterrupt:
logger.info("Program interrupted, shutting down executor...")
finally:
executor.shutdown(wait=True)
logger.info("Resources released, executor shutdown completed.")
if __name__ == "__main__":
if main() == False:
sys.exit(1)
sys.exit(0)

11
deployer/requirements.txt Normal file
View File

@ -0,0 +1,11 @@
ansible-output-parser==0.1.0
certifi==2024.7.4
charset-normalizer==3.3.2
colorama==0.4.6
future==1.0.0
grist-api==0.1.0
idna==3.8
proxmoxer==2.1.0
PyYAML==6.0.2
requests==2.32.3
urllib3==2.2.2

View File

@ -1,11 +1,12 @@
#docker build -t nexus-image .
services:
nexus-prover:
image: nexus-image
container_name: nexus-prover
restart: unless-stopped
volumes:
- /root/prover-id:/root/.nexus/prover-id
- /root/node/prover-id:/root/.nexus/prover-id
logging:
driver: "none"
driver: "json-file"
options:
max-file: 2
max-size: 1m

View File

@ -1,4 +1,4 @@
- name: Nillion deployment playbook
- name: Deployment playbook
hosts: all
become: true
vars:
@ -11,7 +11,7 @@
create: yes
block: |
#1724983098
cd /root/node/ ; docker compose logs -f
cd /root/node/ ; docker compose logs --since 3h -f
marker: ""
- name: Set locale to C.UTF-8
@ -80,71 +80,68 @@
echo "127.0.1.1 {{ serverid }}" >> /etc/hosts
changed_when: false
- name: Update and upgrade apt
ansible.builtin.apt:
update_cache: true
upgrade: dist
force_apt_get: true
autoremove: true
register: apt_update_result
retries: 5
delay: 50
until: apt_update_result is succeeded
async: "{{ 60 * 20 }}"
#- name: Update and upgrade apt
# ansible.builtin.apt:
# update_cache: true
# #upgrade: dist
# #force_apt_get: true
# #autoremove: true
# register: apt_update_result
# retries: 5
# delay: 50
# until: apt_update_result is succeeded
# async: "{{ 60 * 20 }}"
# poll: 30
#- name: Install packages
# ansible.builtin.apt:
# name:
# - ca-certificates
# - zlib1g-dev
# - libncurses5-dev
# - libgdbm-dev
# - libnss3-dev
# - curl
# - jq
# - git
# - zip
# - wget
# - make
# - python3
# - python3-pip
# - iftop
# state: present
# update_cache: true
# async: "{{ 60 * 20 }}"
# poll: 30
- name: Clone repository
ansible.builtin.git:
repo: https://gitea.vvzvlad.xyz/vvzvlad/nexus
dest: "{{ ansible_env.HOME }}/node"
version: "{{ git_version }}"
force: true
async: "{{ 60 * 15 }}"
poll: 30
- name: Install packages
ansible.builtin.apt:
name:
- ca-certificates
- zlib1g-dev
- libncurses5-dev
- libgdbm-dev
- libnss3-dev
- curl
- jq
- git
- zip
- wget
- make
- python3
- python3-pip
- iftop
state: present
update_cache: true
async: "{{ 60 * 20 }}"
poll: 30
#- name: Install grist-api and colorama (attempt 1)
# ansible.builtin.command: pip3 install grist-api colorama --break-system-packages
# args:
# chdir: "{{ ansible_env.HOME }}/node"
# changed_when: false
- name: Install Docker
ansible.builtin.shell: curl -fsSL https://get.docker.com | bash
changed_when: false
async: "{{ 60 * 5 }}"
poll: 30
#- name: Install Docker
# ansible.builtin.shell: curl -fsSL https://get.docker.com | bash
# changed_when: false
# async: "{{ 60 * 5 }}"
# poll: 30
- name: Update Docker daemon journald logging
ansible.builtin.copy:
dest: /etc/docker/daemon.json
content: |
{ "log-driver": "journald" }
mode: '0644'
- name: Restart Docker
ansible.builtin.service:
name: docker
state: restarted
- name: Update journald log SystemMaxUse=2G configuration
ansible.builtin.lineinfile:
path: /etc/systemd/journald.conf
line: 'SystemMaxUse=2G'
insertafter: EOF
create: true
mode: '0644'
- name: Restart journald
ansible.builtin.service:
name: systemd-journald
state: restarted
#- name: Update Docker daemon journald logging
# ansible.builtin.copy:
# dest: /etc/docker/daemon.json
# content: |
# { "log-driver": "journald" }
# mode: '0644'
- name: Docker login
ansible.builtin.shell: docker login -u "{{ docker_username }}" -p "{{ docker_password }}"
@ -152,14 +149,23 @@
changed_when: false
failed_when: "'Login Succeeded' not in docker_login_result.stdout"
- name: Clone repository
ansible.builtin.git:
repo: https://gitea.vvzvlad.xyz/vvzvlad/nillion
dest: "{{ ansible_env.HOME }}/node"
version: "{{ git_version }}"
force: true
async: "{{ 60 * 15 }}"
poll: 30
#- name: Restart Docker
# ansible.builtin.service:
# name: docker
# state: restarted
#- name: Update journald log SystemMaxUse=2G configuration
# ansible.builtin.lineinfile:
# path: /etc/systemd/journald.conf
# line: 'SystemMaxUse=2G'
# insertafter: EOF
# create: true
# mode: '0644'
#- name: Restart journald
# ansible.builtin.service:
# name: systemd-journald
# state: restarted
- name: Make update.sh executable
ansible.builtin.shell: |
@ -170,10 +176,7 @@
- name: Update environment variables
ansible.builtin.shell: |
./update.sh ADDRESS "{{ address }}"
./update.sh PRIVATE "{{ private_key }}"
./update.sh PUBLIC "{{ public_key }}"
./update.sh RPC "{{ rpc_url }}"
./update.sh ID "{{ id }}"
./update.sh GRIST_SERVER "{{ grist_server }}"
./update.sh GRIST_DOC_ID "{{ grist_doc_id }}"
./update.sh GRIST_API_KEY "{{ grist_api_key }}"
@ -181,15 +184,11 @@
chdir: "{{ ansible_env.HOME }}/node"
changed_when: false
- name: Download dockers images
ansible.builtin.command: docker compose pull
- name: Build dockers images
ansible.builtin.command: docker build -t nexus-image .
args:
chdir: "{{ ansible_env.HOME }}/node"
environment:
COMPOSE_INTERACTIVE_NO_CLI: 'true'
changed_when: false
async: "{{ 60 * 45 }}"
poll: "{{ 60 * 5 }}"
- name: Check external IP before
ansible.builtin.command: curl https://ifconfig.me
@ -203,20 +202,20 @@
fail_msg: "The returned value is not a valid IP address."
success_msg: "The returned value is a valid IP address."
- name: Download tun2socks
ansible.builtin.get_url:
url: https://github.com/xjasonlyu/tun2socks/releases/download/v2.5.2/tun2socks-linux-amd64.zip
dest: /tmp/tun2socks-linux-amd64.zip
mode: '0644'
async: "{{ 60 * 5 }}"
poll: 30
#- name: Download tun2socks
# ansible.builtin.get_url:
# url: https://github.com/xjasonlyu/tun2socks/releases/download/v2.5.2/tun2socks-linux-amd64.zip
# dest: /tmp/tun2socks-linux-amd64.zip
# mode: '0644'
# async: "{{ 60 * 5 }}"
# poll: 30
- name: Unzip tun2socks
ansible.builtin.unarchive:
src: /tmp/tun2socks-linux-amd64.zip
dest: /usr/local/sbin/
remote_src: true
mode: '0755'
#- name: Unzip tun2socks
# ansible.builtin.unarchive:
# src: /tmp/tun2socks-linux-amd64.zip
# dest: /usr/local/sbin/
# remote_src: true
# mode: '0755'
- name: Create proxy file
ansible.builtin.copy:
@ -275,19 +274,6 @@
name: tun2socks
state: restarted
- name: Check API availability for RPC URL
ansible.builtin.uri:
url: "{{ rpc_url }}/health?"
method: GET
return_content: true
timeout: 30
register: rpc_url_response
retries: 3
delay: 60
failed_when:
- rpc_url_response.status != 200
- rpc_url_response.json is not none and rpc_url_response.json is not defined
- name: Check external IP after
ansible.builtin.command: curl https://ifconfig.me
register: ip_after
@ -319,18 +305,13 @@
async: "{{ 60 * 80 }}"
poll: "{{ 60 }}"
- name: Install grist-api and colorama
ansible.builtin.command: pip3 install grist-api colorama --break-system-packages
args:
chdir: "{{ ansible_env.HOME }}/node"
changed_when: false
- name: Copy checker service file
ansible.builtin.copy:
dest: /etc/systemd/system/nillion-checker.service
dest: /etc/systemd/system/node-checker.service
content: |
[Unit]
Description=Nillion Checker Service
Description=Node Checker Service
After=network.target
[Service]
@ -345,15 +326,12 @@
WantedBy=multi-user.target
mode: '0644'
- name: Reload systemd
- name: Enable and start node-checker service
ansible.builtin.systemd:
daemon_reload: yes
- name: Enable and start nillion-checker service
ansible.builtin.systemd:
name: nillion-checker
name: node-checker
enabled: yes
state: started
daemon_reload: yes
- name: Remove docker login credentials
ansible.builtin.file:

View File

@ -1 +0,0 @@
###ID###

8
rotate/grist.json Normal file
View File

@ -0,0 +1,8 @@
{
"grist_server": "https://grist.vvzvlad.xyz",
"grist_doc_id": "inwaCJSGxZA1u64QJmQBEi",
"grist_api_key": "6bbcce2a64e7c865fbb2e5ec4480f0e1328f317f"
}

11
rotate/requirements.txt Normal file
View File

@ -0,0 +1,11 @@
ansible-output-parser==0.1.0
certifi==2024.7.4
charset-normalizer==3.3.2
colorama==0.4.6
future==1.0.0
grist-api==0.1.0
idna==3.8
proxmoxer==2.1.0
PyYAML==6.0.2
requests==2.32.3
urllib3==2.2.2

364
rotate/rotate.py Normal file
View File

@ -0,0 +1,364 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# flake8: noqa
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
# pylint: disable=multiple-statements, logging-fstring-interpolation, trailing-whitespace, line-too-long
# pylint: disable=broad-exception-caught, missing-function-docstring, missing-class-docstring
# pylint: disable=f-string-without-interpolation, wrong-import-position
# pylance: disable=reportMissingImports, reportMissingModuleSource
#
#curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/grist.json?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o grist.json;
#curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/requirements.txt?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o requirements.txt;
#pip3 install -r requirements.txt --break-system-packages;
#docker pull nexusxyz/nexus-cli:latest
# curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/rotate.py?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o rotate.py; python3 rotate.py;
import re
from datetime import datetime, timedelta, timezone
import subprocess
import os
import time
import random
import sys
import json
from grist_api import GristDocAPI
import colorama
import logging
import socket
class GRIST:
def __init__(self, server, doc_id, api_key, logger):
self.server = server
self.doc_id = doc_id
self.api_key = api_key
self.logger = logger
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
def table_name_convert(self, table_name):
return table_name.replace(" ", "_")
def to_timestamp(self, dtime: datetime) -> int:
if dtime.tzinfo is None:
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
return int(dtime.timestamp())
def insert_row(self, data, table):
data = {key.replace(" ", "_"): value for key, value in data.items()}
row_id = self.grist.add_records(self.table_name_convert(table), [data])
return row_id
def update_column(self, row_id, column_name, value, table):
if isinstance(value, datetime):
value = self.to_timestamp(value)
column_name = column_name.replace(" ", "_")
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
def delete_row(self, row_id, table):
self.grist.delete_records(self.table_name_convert(table), [row_id])
def update(self, row_id, updates, table):
for column_name, value in updates.items():
if isinstance(value, datetime):
updates[column_name] = self.to_timestamp(value)
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
def fetch_table(self, table):
return self.grist.fetch_table(self.table_name_convert(table))
def find_record(self, id=None, state=None, name=None, table=None):
if table is None:
raise ValueError("Table is not specified")
table_data = self.grist.fetch_table(self.table_name_convert(table))
if id is not None:
record = [row for row in table_data if row.id == id]
return record
if state is not None and name is not None:
record = [row for row in table_data if row.State == state and row.name == name]
return record
if state is not None:
record = [row for row in table_data if row.State == state]
return record
if name is not None:
record = [row for row in table_data if row.Name == name]
return record
def find_settings(self, key, table):
table = self.fetch_table(self.table_name_convert(table))
for record in table:
if record.Setting == key:
if record.Value is None or record.Value == "":
raise ValueError(f"Setting {key} blank")
return record.Value
raise ValueError(f"Setting {key} not found")
def run_docker_command(command, logger):
"""Execute docker command and return success status"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.info(f"Docker command successful: {command}")
if result.stdout.strip():
logger.info(f"Docker output: {result.stdout.strip()}")
return True
else:
logger.error(f"Docker command failed: {command}, error: {result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
logger.error(f"Docker command timed out: {command}")
return False
except Exception as e:
logger.error(f"Docker command exception: {command}, error: {str(e)}")
return False
def stop_and_remove_container(container_name, logger):
"""Stop and remove docker container"""
logger.info(f"Stopping container: {container_name}")
run_docker_command(f"docker stop {container_name}", logger)
logger.info(f"Removing container: {container_name}")
run_docker_command(f"docker rm {container_name}", logger)
def check_container_status(container_name, logger):
"""Check if container is running"""
try:
result = subprocess.run(f"docker ps --filter name={container_name} --format '{{{{.Status}}}}'",
shell=True, capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
status = result.stdout.strip()
if "Up" in status:
return True
else:
logger.warning(f"Container {container_name} status: {status}")
return False
else:
logger.warning(f"Container {container_name} not found in running containers")
return False
except Exception as e:
logger.error(f"Failed to check container {container_name} status: {str(e)}")
return False
def start_container(container_name, node_id, logger):
"""Start nexus container with given node_id"""
docker_command = f"docker run -td --init --name {container_name} nexusxyz/nexus-cli:latest start --node-id {node_id}"
logger.info(f"Starting container with node-id: {node_id}")
return run_docker_command(docker_command, logger)
def get_next_node(grist, logger):
"""Get node with lowest hours"""
try:
nodes = grist.fetch_table(table="Nodes")
nodes = [row for row in nodes if row.NodeID != "1"]
if not nodes:
logger.error("No available nodes found in table")
return None
nodes.sort(key=lambda node: int(node.Hours))
selected_node = nodes[0]
logger.info(f"Selected node: ID={selected_node.NodeID}, Hours={selected_node.Hours}")
return selected_node
except Exception as e:
logger.error(f"Failed to get next node: {str(e)}")
return None
def check_and_update_docker_image(image_name, logger):
"""Check for new docker image and update if available"""
try:
logger.info(f"Checking for updates for image: {image_name}")
# Get local image digest
local_digest_cmd = f"docker images --digests --format '{{{{.Digest}}}}' {image_name}"
local_result = subprocess.run(local_digest_cmd, shell=True, capture_output=True, text=True, timeout=30)
local_digest = local_result.stdout.strip() if local_result.returncode == 0 else ""
# Get remote image digest without pulling
remote_digest_cmd = f"docker manifest inspect {image_name} --verbose | grep -m1 '\"digest\"' | cut -d'\"' -f4"
remote_result = subprocess.run(remote_digest_cmd, shell=True, capture_output=True, text=True, timeout=60)
if remote_result.returncode != 0:
logger.warning(f"Failed to get remote digest for {image_name}, proceeding with pull check")
# Fallback to pull and check output
pull_cmd = f"docker pull {image_name}"
pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300)
if pull_result.returncode == 0:
if "Image is up to date" in pull_result.stderr or "up to date" in pull_result.stdout:
logger.info(f"Image {image_name} is already up to date")
else:
logger.info(f"Image {image_name} updated successfully")
return True
else:
logger.error(f"Failed to pull image {image_name}: {pull_result.stderr.strip()}")
return False
remote_digest = remote_result.stdout.strip()
# Compare digests
if local_digest and remote_digest:
if local_digest == remote_digest:
logger.info(f"Image {image_name} is already up to date (digest: {local_digest[:19]}...)")
return True
else:
logger.info(f"New version available for {image_name}, downloading...")
# Pull new image
pull_cmd = f"docker pull {image_name}"
pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300)
if pull_result.returncode == 0:
logger.info(f"Image {image_name} updated successfully (new digest: {remote_digest[:19]}...)")
return True
else:
logger.error(f"Failed to pull updated image {image_name}: {pull_result.stderr.strip()}")
return False
else:
logger.info(f"Unable to compare digests, performing full pull for {image_name}")
# Pull image
pull_cmd = f"docker pull {image_name}"
pull_result = subprocess.run(pull_cmd, shell=True, capture_output=True, text=True, timeout=300)
if pull_result.returncode == 0:
logger.info(f"Image {image_name} pulled successfully")
return True
else:
logger.error(f"Failed to pull image {image_name}: {pull_result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
logger.error(f"Timeout while checking/updating image {image_name}")
return False
except Exception as e:
logger.error(f"Failed to check/update image {image_name}: {str(e)}")
return False
def main_rotation_cycle():
"""Main rotation cycle for nexus nodes"""
colorama.init(autoreset=True)
logger = logging.getLogger("NexusRotator")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
# Check and update docker image
image_name = "nexusxyz/nexus-cli:latest"
if not check_and_update_docker_image(image_name, logger):
logger.warning(f"Failed to update {image_name}, continuing with existing image")
# Load grist configuration
try:
with open('grist.json', 'r', encoding='utf-8') as f:
grist_data = json.loads(f.read())
except Exception as e:
logger.error(f"Failed to load grist.json: {str(e)}")
return
# Initialize Grist connection
try:
grist = GRIST(grist_data.get('grist_server'), grist_data.get('grist_doc_id'), grist_data.get('grist_api_key'), logger)
logger.info("Connected to Grist successfully")
except Exception as e:
logger.error(f"Failed to connect to Grist: {str(e)}")
return
container_name = "nexus"
cycle_count = 0
logger.info("Starting nexus rotation cycle")
# Get next node
node = get_next_node(grist, logger)
if not node:
logger.error("No node available, waiting 60 seconds before retry")
time.sleep(60)
return
node_id = node.NodeID
current_hours = int(node.Hours)
# Update hours (+1 before starting container)
new_hours = current_hours + 1
grist.update(node.id, {"Hours": new_hours}, "Nodes")
logger.info(f"Updated node {node_id} hours: {current_hours} -> {new_hours}")
# Remove any existing container with same name
stop_and_remove_container(container_name, logger)
# Start new container
if not start_container(container_name, node_id, logger):
logger.error(f"Failed to start container for node {node_id}")
# Return the hour back since container didn't start
grist.update(node.id, {"Hours": current_hours}, "Nodes")
logger.info(f"Reverted node {node_id} hours back to: {current_hours}")
time.sleep(60)
return
logger.info(f"Container started successfully for node {node_id}")
# Wait 5 hours with progress updates and health checks every 10 minutes
wait_hours = 5
total_minutes = wait_hours * 60
interval_minutes = 10
logger.info(f"Waiting {wait_hours} hours ({total_minutes} minutes) for node {node_id}")
container_failed = False
for elapsed_minutes in range(0, total_minutes, interval_minutes):
remaining_minutes = total_minutes - elapsed_minutes
remaining_hours = remaining_minutes // 60
remaining_mins = remaining_minutes % 60
if elapsed_minutes > 0: # Skip first iteration log
logger.info(f"Node {node_id}: {remaining_hours}h {remaining_mins}m remaining")
# Check container status before sleeping
if not check_container_status(container_name, logger):
logger.error(f"Container {container_name} is not running, attempting restart")
stop_and_remove_container(container_name, logger)
if not start_container(container_name, node_id, logger):
logger.error(f"Failed to restart container for node {node_id}")
container_failed = True
break
else:
logger.info(f"Container restarted successfully for node {node_id}")
time.sleep(interval_minutes * 60) # Sleep 10 minutes
# If container failed during the cycle, skip to next iteration
if container_failed:
logger.error(f"Container failed during cycle for node {node_id}, moving to next node")
return
# Stop and remove container
logger.info(f"5 hours completed for node {node_id}, stopping container")
stop_and_remove_container(container_name, logger)
# Update hours (+4 after completion)
final_hours = new_hours + 4
grist.update(node.id, {"Hours": final_hours}, "Nodes")
logger.info(f"Updated node {node_id} final hours: {new_hours} -> {final_hours}")
logger.info(f"=== Cycle #{cycle_count} completed for node {node_id} ===")
if __name__ == "__main__":
main_rotation_cycle()

10
rotate/run.sh Normal file
View File

@ -0,0 +1,10 @@
curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/rotate.py?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o rotate.py;
curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/grist.json?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o grist.json;
curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/requirements.txt?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o requirements.txt;
pip3 install -r requirements.txt --break-system-packages;
docker pull nexusxyz/nexus-cli:latest
screen -S rotate -m python3 rotate.py
curl https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/rotate/rotate.py?token=7b58d730732f07e90a88169a63531d96bfc0fbbd -o rotate.py; python3 rotate.py

View File

@ -1,4 +1,4 @@
#!/bin/bash
curl -o /root/node/checker.py https://gitea.vvzvlad.xyz/vvzvlad/nillion/raw/branch/main/checker.py
curl -o /root/node/checker.py https://gitea.vvzvlad.xyz/vvzvlad/nexus/raw/branch/main/checker.py
python3 /root/node/checker.py

View File

@ -10,7 +10,7 @@ NEW_VALUE=$2
# Список файлов
FILES=(
"credentials.json"
"prover-id"
"docker-compose.yml"
"grist.json"
)

24
ws.code-workspace Normal file
View File

@ -0,0 +1,24 @@
{
"folders": [
{
"path": "."
}
],
"settings": {
"workbench.colorCustomizations": {
"activityBar.activeBackground": "#44a003",
"activityBar.background": "#44a003",
"activityBar.foreground": "#e7e7e7",
"activityBar.inactiveForeground": "#e7e7e799",
"activityBarBadge.background": "#0450bc",
"activityBarBadge.foreground": "#e7e7e7",
"commandCenter.border": "#e7e7e799",
"sash.hoverBorder": "#44a003",
"titleBar.activeBackground": "#2f6e02",
"titleBar.activeForeground": "#e7e7e7",
"titleBar.inactiveBackground": "#2f6e0299",
"titleBar.inactiveForeground": "#e7e7e799"
},
"peacock.color": "#2f6e02"
}
}