This commit is contained in:
vvzvlad 2025-07-27 02:17:32 +03:00
parent aa24574319
commit e7cca8d6c0
6 changed files with 964 additions and 5 deletions

View File

@ -0,0 +1,49 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn
NhAAAAAwEAAQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaaXoExE8G37OXSr+bh3tlr
Rmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWvL1bJ63EEeqjlr9GtM7Wb
JxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7YonziuWNNHh4POWoa8EL8O
0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueKJAMSicr/1VLPN8X7pUgB
77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yAt/yL72Q/zRCj3p1F4dBW
Ah92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesBuiYI2LmLgIJaCOSsLRdC
MOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv6SkwCVqzOCt43V8rHJ2l
XrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7+Y2CksJpIWVPJJ3O4kF4
uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYruC3UKfCyFkhxcXQCLpUr
D1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t5D5ndEjSP6HllYv7vMoh
sAAAdQSM/bbEjP22wAAAAHc3NoLXJzYQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaa
XoExE8G37OXSr+bh3tlrRmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWv
L1bJ63EEeqjlr9GtM7WbJxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7Yo
nziuWNNHh4POWoa8EL8O0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueK
JAMSicr/1VLPN8X7pUgB77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yA
t/yL72Q/zRCj3p1F4dBWAh92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesB
uiYI2LmLgIJaCOSsLRdCMOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv
6SkwCVqzOCt43V8rHJ2lXrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7
+Y2CksJpIWVPJJ3O4kF4uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYr
uC3UKfCyFkhxcXQCLpUrD1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t
5D5ndEjSP6HllYv7vMohsAAAADAQABAAACAA0AkfMV80yRwqai0wGqlqk+k7PGVHu+0hAi
rfzNfSDARUeMYLPvywepl0p4Mg0n/CuSm80NyHXyprQpL2Dz0WWnqzS+0ddbIn4FZjE6CS
UStrzjp3YBgvD0yb8Yw6phlYuT3hOTv19CnRIuHwUgUve9yCVVRAccJPgijmWUMOD/yy9F
0C2hg+9Z6zVFWW0CbaV78WvIEalAIseOx8fvo9Y/kOSIyWoiACJHMKpglpX98138WDuanF
wfmcNrfC78bvNF/Jk8GOjI4EcsWbhUd3ajiHnfG74M6KrQHoy35ywgFYZAHAAl13Q0RCdG
MhDclR0Osd6kXQCdSItL+ZChOGaPAzmEst370KPmqlU1ZN5kp2ZNrHF1ucTyssAOCmR4su
hAL9BZJgyhBW01Xqm1owVaMqjQ6wzAhNqnfxx7GZhUluejiE+8EFx4dXDHWUjXBK+h+hV+
+WZUSYIi9X6Sg7/kj7F0QPihr8T9ll+p4W2DOlDOM5hyIA3XAYJmy60Ui6AiL5wUzyQEGY
2RqntCe+lKEQhf9cqWZ2SVB8q8AFS4sXVVfmj4u0FOkgBaZbNcTknBLRekNDaM93mvPOcT
523u8iybgSlyZpYBdRCnyHIw86lfvWRW2YUiG7bxDSpFsjtr16QKLprMUgiCajfRveDh/9
qFvpcYYSShiaGlE6uZAAABAFNMVfHaaqKBL76JIb4qjX3Dsx8Ig2PkXgzqmz0l8T82+Mdw
V1uHNHjaRO+x9QXD2c29OOrqUC0Pmcb3wD7u/EzX3yIyqhnyB94sl7mWA8fjHdIfEjWB3R
+j/QHBNtM8xNt2jPxguxXWo+8489i7/YjblTiNdtiWsnKXetlaJh9qya2bhbbws28kBfQ1
m1nZj/TCu9JySpwOQPtR8iX/qAkogHwmGq5+HXGJ4te+lmiBF99REeB/C0NgvWB1dGVZCK
9sJL0ztcCBoMvdPG4oBXEtd9xiU0VT8L17XuykcQvUd6i5r6AgLr/dOZA3SF6XNDW7F7mQ
/7BvIa576LFS61EAAAEBANn6TI43Eg4dRzeI8TuHTfB5QQYn5emal+LPbptP0KrByyfOtZ
mL/qiSpzNLRanp/DGm7GhyQUF2h61XObZ9BCs4RNE4adqVcae3xbxZ7yRneBl4A76tu4NA
BXKllz8141R3HtYTJ/VGAIdwGmkHdFYOCxBhxnLELFHC+4Qp/3Iq2Uu3c4fAWknxDPT+p/
Y7jRia2TLlUvQrF38VXIUIgh7OnPMXzhErXDvRG8+8DxqI4zg1rKEP86z+rx0/hYsSst+M
LOByEmHDAc0m4h6bAwJwL5SBxrWKU0/vTYOusCyfgzznSDfIi5YHgsXM9K9pyonRfjUST+
ehPsilDpPl778AAAEBANdzMLYhkJm1/bIfpsrogmSGQt5ymWvEcuWCxYnMDDALIjS9Ih2G
sOmmiCAl1u/tSIbyyOP7YFf9JGYaOumKzkRwKcFr1dVpOgrez+/dbrJE93JOrFDVAxILSW
cc7kYwSFCIlC8LshWNyxuhGnAgkyXROlAdCABUxY4sJ3kj7oSUFA8ZwSh4THx1RfvLV6VT
GtoNO8gh6eRl/M8hM2oOds2wWp45j9jRNTrTdZX6aprn0g0dPOjPF7ufMEc0tYoz1WZYzk
DTLGQysm2Sas5llq8hXOmp70BYm+dFud+dOPjU/IbmRI5DLmU89WvpvjWCSfP1YdUfEZZ/
ujdZaCWt5KUAAAAWdnZ6dmxhZEByaXR1YWwtZGVwbG9lcgECAwQF
-----END OPENSSH PRIVATE KEY-----

879
deployer/nexus_deployer.py Normal file
View File

@ -0,0 +1,879 @@
# flake8: noqa
# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name
# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues
# type: ignore
import subprocess
import time
import json
import logging
import sys
import random
import os
import re
import traceback
from datetime import datetime, timedelta, timezone
from collections import deque
import warnings
import concurrent.futures
import threading
import uuid
from ansible_parser.logs import Play as AnsibleLogParser
from grist_api import GristDocAPI
from proxmoxer import ProxmoxAPI, ResourceException
import yaml
import colorama
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
last_deploy_times = deque(maxlen=5)
grist_lock = threading.Lock()
class ColoredFormatter(logging.Formatter):
def __init__(self, fmt):
colorama.init(autoreset=True)
super().__init__(fmt)
def format(self, record):
if record.levelno == logging.ERROR:
record.msg = f"{colorama.Fore.RED}{record.msg}{colorama.Style.RESET_ALL}"
elif record.levelno == logging.WARNING:
record.msg = f"{colorama.Fore.YELLOW}{record.msg}{colorama.Style.RESET_ALL}"
elif record.levelno == logging.INFO:
record.msg = f"{colorama.Fore.GREEN}{record.msg}{colorama.Style.RESET_ALL}"
return super().format(record)
class LogsToGrist(logging.Handler):
def __init__(self, callback=None):
super().__init__()
self.callback = callback
def emit(self, record):
log_entry = self.format(record)
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
log_entry = ansi_escape.sub('', log_entry)
log_entry = log_entry.replace("Ansible task: ", "")
self.callback({"Status": log_entry})
class LoggerWrapper:
def __init__(self, name, log_file, grist_callback=None):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
self.logger.propagate = False
# File handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
colored_formatter = ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(colored_formatter)
self.logger.addHandler(console_handler)
# grist handler
if grist_callback is not None:
grist_handler = LogsToGrist(callback=grist_callback)
grist_handler.setLevel(logging.INFO)
grist_handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(grist_handler)
def __del__(self):
handlers = self.logger.handlers[:]
for handler in handlers:
handler.close()
self.logger.removeHandler(handler)
def get_logger(self):
return self.logger
def format_time(current_time):
if current_time < 5:
return f"{current_time:.1f} sec"
elif current_time < 60:
return f"{int(current_time)} sec"
elif current_time < 3600:
minutes = current_time // 60
seconds = current_time % 60
return f"{int(minutes)} min {int(seconds)} sec"
elif current_time < 86400:
hours = current_time // 3600
minutes = (current_time % 3600) // 60
seconds = current_time % 60
return f"{int(hours)} h {int(minutes)} min"
else:
days = current_time // 86400
hours = (current_time % 86400) // 3600
minutes = (current_time % 3600) // 60
seconds = current_time % 60
return f"{int(days)} d {int(hours)} h"
def random_sleep(logger, min_delay=None, max_delay=None, delay=None):
if delay is None:
delay = random.uniform(min_delay, max_delay)
formatted_delay = format_time(delay)
end_time = datetime.now() + timedelta(seconds=delay)
end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
logger.debug(f"Waiting for {formatted_delay} (expected end time: {end_time_str})")
time.sleep(delay)
class TelegramBot:
def __init__(self, token, topic, chat_id, logger):
self.token = token
self.topic = topic
self.logger = logger
self.chat_id = chat_id
self.base_url = f"https://api.telegram.org/bot{self.token}/"
def message(self, message):
url = self.base_url + "sendMessage"
payload = {
'chat_id': self.chat_id,
'text': f"[{self.topic}] {message}"
}
response = requests.post(url, json=payload, timeout=10)
response_data = response.json()
if not response_data.get('ok', True):
self.logger.error(f"Error sending message: {response_data}")
return response_data
class GRIST:
def __init__(self, server, doc_id, api_key, logger):
self.server = server
self.doc_id = doc_id
self.api_key = api_key
self.logger = logger
self.grist = GristDocAPI(doc_id, server=server, api_key=api_key)
def table_name_convert(self, table_name):
return table_name.replace(" ", "_")
def to_timestamp(self, dtime: datetime) -> int:
if dtime.tzinfo is None:
dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3)))
return int(dtime.timestamp())
def insert_row(self, data, table):
data = {key.replace(" ", "_"): value for key, value in data.items()}
row_id = self.grist.add_records(self.table_name_convert(table), [data])
return row_id
def update_column(self, row_id, column_name, value, table):
if isinstance(value, datetime):
value = self.to_timestamp(value)
column_name = column_name.replace(" ", "_")
self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }])
def delete_row(self, row_id, table):
self.grist.delete_records(self.table_name_convert(table), [row_id])
def update(self, row_id, updates, table):
for column_name, value in updates.items():
if isinstance(value, datetime):
updates[column_name] = self.to_timestamp(value)
updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()}
self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}])
def fetch_table(self, table):
return self.grist.fetch_table(self.table_name_convert(table))
def find_record(self, id=None, state=None, name=None, table=None):
if table is None:
raise ValueError("Table is not specified")
table_data = self.grist.fetch_table(self.table_name_convert(table))
if id is not None:
record = [row for row in table_data if row.id == id]
return record
if state is not None and name is not None:
record = [row for row in table_data if row.State == state and row.name == name]
return record
if state is not None:
record = [row for row in table_data if row.State == state]
return record
if name is not None:
record = [row for row in table_data if row.Name == name]
return record
#def find_data_by_key(self, key, table):
# data = getattr(self.fetch_table(self.table_name_convert(table))[0], key)
# return data
def find_settings(self, key, table):
table = self.fetch_table(self.table_name_convert(table))
for record in table:
if record.Setting == key:
if record.Value is None or record.Value == "":
raise ValueError(f"Setting {key} blank")
return record.Value
raise ValueError(f"Setting {key} not found")
class Prox:
def __init__(self, host, user, password, logger, timeout=10):
self.session = requests.Session()
self.logger = logger
self.session.request = lambda *args, **kwargs: requests.request(*args, timeout=timeout, **kwargs)
self.proxmox = ProxmoxAPI(host=host, user=user, password=password, verify_ssl=False, service='PVE', timeout=timeout)
def get_vms_data(self, vm_name=None, node_name=None, vm_id=None):
if node_name is not None: nodes = [{"node": node_name}]
else: nodes = self.proxmox.nodes.get()
vms_with_name = []
for node in nodes:
if vm_id is not None:
try: vms = [self.proxmox.nodes(node['node']).qemu(vm_id).status.current.get()]
except Exception: continue
else: vms = self.proxmox.nodes(node['node']).qemu.get()
for vm in vms:
if vm_name is None or vm.get('name') == vm_name or vm.get('vmid') == str(vm_id):
cfg = self.proxmox.nodes(node['node']).qemu(vm.get('vmid')).config.get()
vm['ipconfig'] = cfg.get('ipconfig0', None)
vm['cpu'] = cfg.get('cpu', None)
vm['node'] = node['node']
vm['memory'] = cfg.get('memory', None)
vm['net'] = cfg.get('net0', None)
vm['template'] = int(vm.get('template', 0))
vm['disk'] = cfg.get(cfg.get('bootdisk', ''), None)
if vm['disk']:
vm['disk_storage'] = vm['disk'].split(':')[0]
else:
vm['disk_storage'] = None
vms_with_name.append(vm)
return vms_with_name
def search_vm(self, vm_name=None, node=None, vm_id=None):
vms = self.get_vms_data(vm_name=vm_name, node_name=node, vm_id=vm_id)
if len(vms) > 0:
return vms[0]
return None
def cmd_vm_by_name(self, vm_name, cmd):
nds = self.proxmox.nodes
nodes = nds.get()
for node in nodes:
for vm in nds(node['node']).qemu.get():
if vm['name'] == vm_name:
if cmd == "reboot":
upid = nds(node['node']).qemu(vm['vmid']).status.reboot.post()
elif cmd == "shutdown":
upid = nds(node['node']).qemu(vm['vmid']).status.shutdown.post()
elif cmd == "start":
upid = nds(node['node']).qemu(vm['vmid']).status.start.post()
elif cmd == "stop":
upid = nds(node['node']).qemu(vm['vmid']).status.stop.post()
elif cmd == "reset":
upid = nds(node['node']).qemu(vm['vmid']).status.reset.post()
elif cmd == "destroy":
upid = nds(node['node']).qemu(vm['vmid']).status.stop.post()
while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1)
upid = nds(node['node']).qemu(vm['vmid']).delete()
while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1)
def id_by_name(self, vm_name):
vms = self.get_vms_data(vm_name=vm_name)
if len(vms) > 0:
return vms[0]['vmid']
return None
def name_by_id(self, vm_id):
vms = self.get_vms_data(vm_id=vm_id)
if len(vms) > 0:
return vms[0]['name']
return None
def wait_for_ip(self, node, vmid, max_iterations=10, period=10):
self.logger.info(f"Wait boot VM {vmid} on node {node}")
vm_ip_v4 = ""
iterations = 0
while not vm_ip_v4 and iterations < max_iterations:
time.sleep(period)
self.logger.info(f"Get IP attempt {iterations} of {max_iterations}")
try:
vm_status = self.proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get()
if 'result' in vm_status:
for interface in vm_status['result']:
if interface['name'].startswith(('lo', 'br-', 'veth', 'docker')): continue
for ip in interface.get('ip-addresses', []):
if ip['ip-address-type'] == 'ipv4' and not vm_ip_v4:
vm_ip_v4 = ip['ip-address']
if vm_ip_v4:
self.logger.info(f"VM {vmid} on node {node} received IP: {vm_ip_v4}")
break
except ResourceException as e:
if "QEMU guest agent is not running" in str(e):
pass
#print(f".", end="", flush=True)
except Exception as e:
self.logger.info(f"Error retrieving IP for VM {vmid} on node {node}: {e}")
iterations += 1
if not vm_ip_v4:
#print("")
return False
return vm_ip_v4
def check_vm(self, vm_id):
try:
vm = self.search_vm(vm_id=vm_id)
self.logger.info(f"VM {vm_id} status: {vm}")
return True
except ResourceException as e:
self.logger.info(f"Failed to check VM {vm_id} status: {e}")
return False
def clone_vm(self, template_id, new_vm_name):
try:
new_vm_id = self.proxmox.cluster.nextid.get()
template = self.search_vm(vm_id=template_id)
if template is None:
raise ValueError(f"Template VM with ID {template_id} not found")
node = template['node']
self.logger.info(f"Clone VM {template_id}->{new_vm_id}: {new_vm_name}...")
upid = self.proxmox.nodes(node).qemu(template_id).clone.post(newid=new_vm_id, name=new_vm_name)
while self.proxmox.nodes(node).tasks(upid).status.get()['status'] != 'stopped':
print(f"{new_vm_name} clone state: {self.proxmox.nodes(node).tasks(upid).status.get()['status']}")
time.sleep(10)
except ResourceException as e:
self.logger.error(f"Failed to clone VM {template_id}: {e}")
raise ValueError(f"Failed to clone VM {template_id}/{new_vm_name}") from e
return new_vm_id, node
def get_proxmox_nodes(self):
nodes = []
raw_nodes = self.proxmox.nodes.get()
for node in raw_nodes:
nodes.append({
"cpu": node.get('cpu'),
"name": node.get('node'),
"status": node.get('status'),
"uptime": node.get('uptime')
})
return nodes
def create_vm(self, vm_name, template_id=None):
existing_vm_id = self.id_by_name(vm_name)
old_vm_data = self.search_vm(vm_id=existing_vm_id)
if existing_vm_id is not None:
self.logger.info(f"Virtual machine {vm_name} already exists")
#self.logger.info(f"Find probabilistic template...")
#all_vms = self.get_vms_data(node_name=old_vm_data['node'])
#templates_vms = [vm for vm in all_vms if vm['template'] == 1]
#probabilistic_template = [vm for vm in templates_vms if vm['disk_storage'] == old_vm_data['disk_storage']][0]
#self.logger.info(f"Find {old_vm_data['name']}({old_vm_data['vmid']}) probabilistic template: {probabilistic_template['name']}({probabilistic_template['vmid']})/{probabilistic_template['node']}")
#template_id = probabilistic_template['vmid']
self.logger.info(f"Deleting VM ID {existing_vm_id}/{old_vm_data['node']}")
self.cmd_vm_by_name(vm_name, "destroy")
else:
self.logger.info(f"Virtual machine with name {vm_name} not found, creating new VM...")
if template_id is None:
self.logger.error(f"Template ID not provided")
raise ValueError("Template ID not provided")
new_vm_id, node = self.clone_vm(template_id, vm_name)
self.proxmox.nodes(node).qemu(new_vm_id).config.post(tags="")
self.logger.info(f"Starting VM {new_vm_id} on node {node}")
self.cmd_vm_by_name(vm_name, "start")
vm_ip = self.wait_for_ip(node, new_vm_id)
if vm_ip == False:
self.logger.info(f"Failed to get IP address for VM {new_vm_id}.")
raise ValueError(f"Failed to get IP address for VM {new_vm_id}.")
if vm_ip != False and vm_ip is not None:
self.logger.info(f"Virtual machine {vm_name} with ID {new_vm_id} has IP address: {vm_ip}")
return vm_ip, node, new_vm_id
raise ValueError(f"Unknown error")
def check_duplicate_vms(self, vms):
vm_node_mapping = {}
for vm_info in vms:
vm = vm_info['name']
node = vm_info['node']
if vm in vm_node_mapping:
vm_node_mapping[vm].append(node)
else:
vm_node_mapping[vm] = [node]
duplicates = {name: nodes for name, nodes in vm_node_mapping.items() if len(nodes) > 1}
return duplicates
def modify_tag(self, vm_id, tag, action="add"):
tag = tag.lower()
vm = self.search_vm(vm_id=vm_id)
if vm is None:
self.logger.error(f"VM {vm_id} not found")
return False
if vm is not None:
current_config = self.proxmox.nodes(vm['node']).qemu(vm_id).config.get()
tags_list = current_config.get('tags', '').split(';')
if action == "add":
if tag not in tags_list:
tags_list.append(tag)
updated_tags = ';'.join(filter(None, tags_list))
self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags)
self.logger.info(f"Added tag '{tag}' to VM {vm_id}")
else:
self.logger.info(f"Tag '{tag}' already exists in VM {vm_id}")
elif action == "remove":
if tag in tags_list:
tags_list.remove(tag)
updated_tags = ';'.join(filter(None, tags_list))
self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags)
self.logger.info(f"Removed tag '{tag}' from VM {vm_id}")
else:
self.logger.info(f"Tag '{tag}' not found in VM {vm_id}")
else:
self.logger.error(f"Invalid action: {action}")
return False
return True
def parse_ansible_log(ansible_log):
task_blocks = re.split(r"(?=\nTASK \[)", ansible_log)
last_task_block = task_blocks[-1]
if "failed:" in last_task_block or "fatal:" in last_task_block:
try:
task_name = re.search(r"TASK \[(.*?)\]", last_task_block).group(1)
failed_json_str = re.search(r'=> ({.*?})', last_task_block, re.DOTALL).group(1)
failed_json = json.loads(failed_json_str)
stdout_lines = failed_json.get("stdout_lines", [])
stderr_lines = failed_json.get("stderr_lines", [])
msg = failed_json.get("msg", "")
print(f"Stdout lines: {stderr_lines}", flush=True)
for line in stdout_lines[-10:] + stderr_lines[-10:] + [msg]:
if "429 Too Many Requests" in line and "https://www.docker.com/increase-rate-limit" in line:
print("429 Too Many Requests detected in stdout_lines.", flush=True)
return task_name, "429TOOMANYREQUESTS"
if "HTTP Error 504: Gateway Time-out" in line:
print("HTTP Error 504: Gateway Time-out detected in stdout_lines.", flush=True)
return task_name, "504GATEWAYTIMEOUT"
for line in stdout_lines[-5:]:
print(f"Stdout: {line}", flush=True)
return task_name, None
for line in stderr_lines[-5:]:
print(f"Stderr: {line}", flush=True)
return task_name, None
print("No error message found in stdout or stderr.", flush=True)
return task_name, None
except (json.JSONDecodeError, AttributeError):
print("Error extracting or decoding JSON.", flush=True)
return None, None
else:
print("The last TASK did not end with ASYNC FAILED and fatal.", flush=True)
return None, None
def deploy_node(name, ansible_host, node_id, proxy, proxy_jump, private_key_file,
docker_username, docker_password, git_version, git_base_url, logger_instance, grist_server, grist_doc_id, grist_api_key):
if not os.path.exists(private_key_file):
raise Exception(f"SSH private key file not found: {private_key_file}")
ansible_dir = f"ansible"
if not os.path.exists(ansible_dir):
os.makedirs(ansible_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
inventory_filename = f"{ansible_dir}/inventory_{name}_{timestamp}.yml"
log_output_filename = f"{ansible_dir}/log_{name}_{timestamp}.log"
playbook_filename = f"{ansible_dir}/playbook_{name}_{timestamp}.yml"
playbook_url = f"{git_base_url}/raw/branch/{git_version}/playbook.yml"
response = requests.get(playbook_url, timeout=10)
if response.status_code == 200:
with open(playbook_filename, 'w', encoding='utf-8') as f:
f.write(response.text)
else:
logger_instance.error(f"Failed to download playbook at {playbook_url}. Status code: {response.status_code}")
raise Exception(f"Failed to download playbook. Status code: {response.status_code}")
inventory = {'all': {'vars': {}, 'hosts': {}}}
inventory['all']['vars']['ansible_user'] = 'root'
inventory['all']['vars']['ansible_ssh_private_key_file'] = private_key_file
if proxy_jump: inventory['all']['vars']['ansible_ssh_common_args'] = f'-J {proxy_jump}'
inventory['all']['hosts'][name] = {}
inventory['all']['hosts'][name]['ansible_host'] = ansible_host
inventory['all']['hosts'][name]['serverid'] = name
inventory['all']['hosts'][name]['id'] = node_id
inventory['all']['hosts'][name]['proxy'] = proxy
inventory['all']['hosts'][name]['grist_server'] = grist_server
inventory['all']['hosts'][name]['grist_doc_id'] = grist_doc_id
inventory['all']['hosts'][name]['grist_api_key'] = grist_api_key
inventory['all']['hosts'][name]['git_version'] = git_version
inventory['all']['hosts'][name]['docker_username'] = docker_username
inventory['all']['hosts'][name]['docker_password'] = docker_password
with open(inventory_filename, 'w', encoding='utf-8') as file:
yaml.dump(inventory, file, default_flow_style=False)
logger_instance.info(f"Running nillion deploy playbook...")
log_data = ""
with open(log_output_filename, 'w', encoding='utf-8') as deploy_log:
process = subprocess.Popen( ['ansible-playbook', '-i', inventory_filename, playbook_filename, '-v'],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 )
for line in iter(process.stdout.readline, ''):
if line.startswith("TASK ["): logger_instance.info(f"Ansible task: {line.replace('*', '').replace('TASK ', '').strip()}")
deploy_log.write(line)
log_data += line
process.stdout.close()
return_code = process.wait()
deploy_log.flush()
if return_code != 0:
ansible_failed_task = ""
ansible = AnsibleLogParser(play_output=log_data)
plays = ansible.plays()
for _, tasks in plays.items():
for task_name, task_object in tasks.items():
for result in task_object.results:
if result.get('status') == 'failed' or result.get('status') == 'fatal':
#print(f"Failed task: {result}")
ansible_failed_task = task_name
logger_instance.error(f"Deploy failed. Failed stage: {ansible_failed_task} Check log file: {log_output_filename}, inventory file: {inventory_filename}")
raise Exception(f"Ansible error on {ansible_failed_task}")
return log_data
def nodes_table_preprocessing(grist_instance, logger):
current_time = grist_instance.to_timestamp(datetime.now())
max_wip = 60*60*2
logger.info(f"Run grist processing NoneState -> Dirty and NoneVersion -> av1")
for row in grist_instance.fetch_table("Nodes"):
if row.State == "": grist_instance.update_column(row.id, "State", "Dirty", "Nodes")
if row.Version == "": grist_instance.update_column(row.id, "Version", "nv1", "Nodes" )
logger.info(f"Run grist processing WiP and >max_wip old -> Dirty")
for row in grist_instance.fetch_table("Nodes"):
if row.Deploy_date is not None:
vm_old_age = current_time - row.Deploy_date
if row.State == "WiP" and vm_old_age > max_wip and row.State != "Dirty":
grist_instance.update_column(row.id, "State", "Dirty", "Nodes")
grist_instance.update_column(row.id, "Status", "Set Dirty by WiP Timeout", "Nodes")
logger.info(f"Run grist processing NoneRetries -> 0/4")
for row in grist_instance.fetch_table("Nodes"):
if row.Retries is None or row.Retries == "":
grist_instance.update_column(row.id, "Retries", "0/4", "Nodes")
def generate_proxy(login, password, proxy_ip, proxy_port, logger):
countries = [ "AT", "AM", "BE", "HU", "DE", "IL", "IE", "ES", "IT",
"LU", "NL", "NO", "PL", "PT", "RS", "FI", "FR", "SE", "CH", "CZ", "EE" ]
country = random.choice(countries)
id_hex = ''.join(random.choices('0123456789abcdef', k=10))
login_id = f"{login}_s_{id_hex}_c_{country}"
proxy_socks5 = f"socks5://{login_id}:{password}@{proxy_ip}:{proxy_port}"
proxy_http = f"http://{login_id}:{password}@{proxy_ip}:{proxy_port}"
ipinfo_url = "http://ipinfo.io"
headers = {"User-Agent": "curl/7.68.0"}
try:
response = requests.get(ipinfo_url, headers=headers, proxies={"http": proxy_http, "https": proxy_http}, timeout=10)
except requests.exceptions.RequestException as e:
logger.error(f"[proxy test] Error while requesting ipinfo.io: {e}")
return None
if response.status_code == 200:
logger.info(f"[proxy test] Successfully retrieved IP information: {response.json().get('ip')}")
return proxy_socks5
else:
logger.error(f"[proxy test] Error while requesting ipinfo.io: {response.status_code}")
return None
def repeat_deploy(grist_instance, logger, grist_env):
result = None
generated_address = False
wait_period = int(grist_instance.find_settings("Wait period in minutes", "Settings"))*60
name_node = grist_instance.find_settings("Node name", "Settings")
sticker_node = grist_instance.find_settings("Node sticker", "Settings")
multiplier = 1.5
diff = round(wait_period/multiplier)
min_wait = round(wait_period - diff)
max_wait = round(wait_period + diff)
with grist_lock:
nodes_table_preprocessing(grist_instance, logger)
dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes")
if len(dirty_vms) == 0:
print(f"No dirty nodes found")
result = False
return result
dirty_vms.sort(key=lambda vm: (int(vm.Retries.split('/')[0])))
#dirty_vms = sorted(dirty_vms, key=lambda row: 0 if row.Impact == "" else 1)
dirty_vm = dirty_vms[0]
def grist_callback(msg): grist_instance.update(dirty_vm.id, msg, "Nodes")
grist_instance.update(dirty_vm.id, { "State": "WiP", "Status": "Locked", "Deploy date": datetime.now() }, "Nodes")
if dirty_vm.Retries.count('/') == 1:
try:
attempt = int(dirty_vm.Retries.split('/')[0])
max_retries = int(dirty_vm.Retries.split('/')[1])
if attempt >= max_retries:
grist_instance.update(dirty_vm.id, { "State": "Error", "Status": f"{dirty_vm.Status} (Max retries)" }, "Nodes")
result = False
return result
attempt += 1
grist_instance.update(dirty_vm.id, { "Retries": f"{attempt}/{max_retries}" }, "Nodes")
except ValueError:
grist_instance.update(dirty_vm.id, { "State": "Error", "Status": "Error retries" }, "Nodes")
result = False
return result
if dirty_vm.Proxy is None or dirty_vm.Proxy == "":
grist_callback( { "Status": "Generate proxy" })
login = grist_instance.find_settings("Proxy generator login", "Settings")
password = grist_instance.find_settings("Proxy generator password", "Settings")
proxy_ip = grist_instance.find_settings("Proxy generator ip", "Settings")
proxy_port = grist_instance.find_settings("Proxy generator port", "Settings")
for _ in range(5):
proxy_socks5 = generate_proxy(login, password, proxy_ip, proxy_port, logger)
if proxy_socks5 is not None:
break
logger.error("[proxy] Failed to generate proxy, retrying...")
time.sleep(1)
grist_instance.update(dirty_vm.id, { "Proxy": proxy_socks5 }, "Nodes")
dirty_vm = grist_instance.find_record(id=dirty_vm.id, table="Nodes")[0]
vm_name = f"{name_node}-{dirty_vm.NodeID[8:16]}"
logger_instance = LoggerWrapper(name=f"{vm_name}", log_file=f"logs/{name_node}_creation_{vm_name}.log", grist_callback=grist_callback)
logger = logger_instance.get_logger()
grist_callback( { "Name": vm_name, "State": "WiP", "Status": "Run proxmox worker.." })
required_fields = ["NodeID", "Proxy", "Server", "Storage", "Retries"]
for field in required_fields:
if not getattr(dirty_vm, field.replace(" ", "_")):
grist_callback( { "State": "Error", "Status": f"No {field.lower()}" })
result = False
return result
prox_host = grist_instance.find_settings("Proxmox host", "Settings")
prox_user = grist_instance.find_settings("Proxmox user", "Settings")
prox_password = grist_instance.find_settings("Proxmox password", "Settings")
prox = Prox(prox_host, prox_user, prox_password, logger)
tg_token = grist_instance.find_settings("Telegram token", "Settings")
tg_chat_id = grist_instance.find_settings("Telegram chat id", "Settings")
tg_log = TelegramBot(tg_token, f"{sticker_node} {name_node}", tg_chat_id, logger)
current_version = grist_instance.find_settings("Current version", "Settings")
logger.info(f"Create-run-deploy node {vm_name} on attempt {attempt}")
deploy_start_time = datetime.now()
try:
grist_callback( { "IP": "", "Deploy date": datetime.now()})
all_vms = prox.get_vms_data(node_name=dirty_vm.Server)
vm_prox_node = dirty_vm.Server
templates_vms = [vm for vm in all_vms if vm['template'] == 1]
target_storage_templates = [vm for vm in templates_vms if vm['disk_storage'] == dirty_vm.Storage]
latest_templates = [vm for vm in target_storage_templates if 'tags' in vm and 'latest' in vm['tags'].split(';')]
if len(latest_templates) == 0:
raise ResourceException(status_code=404, status_message="No latest template", content="No latest template")
template_id = latest_templates[0]['vmid']
try:
vm_ip, vm_prox_node, vm_id = prox.create_vm(vm_name, template_id=template_id)
except Exception as e:
raise ResourceException(status_code=500, status_message="Failed to create VM", content=str(e)) from e
prox.modify_tag(vm_id=vm_id, tag="WiP", action="add")
grist_callback( { "IP": vm_ip })
if generated_address:
delay = random.uniform(min_wait/2, max_wait/2)
grist_callback( { "Status": f"Wait {format_time(delay)} before ansible.." })
random_sleep(logger, delay=delay)
grist_callback( { "Status": "Run ansible worker.." })
deploy_node(name=vm_name, ansible_host=vm_ip, node_id=dirty_vm.NodeID, proxy=dirty_vm.Proxy,
git_version=grist_instance.find_settings("Git version", "Settings"),
git_base_url = grist_instance.find_settings("Git base URL", "Settings"),
proxy_jump=grist_instance.find_settings("Ansible SSH Jump host", "Settings"),
private_key_file=grist_instance.find_settings("Ansible SSH keyfile", "Settings"),
docker_username=grist_instance.find_settings("Docker username", "Settings"),
docker_password=grist_instance.find_settings("Docker password", "Settings"),
grist_server=grist_env["GRIST_SERVER"],
grist_doc_id=grist_env["GRIST_DOC_ID"],
grist_api_key=grist_env["GRIST_API_KEY"],
logger_instance=logger)
prox.modify_tag(vm_id=vm_id, tag="WiP", action="remove")
prox.modify_tag(vm_id=vm_id, tag=current_version, action="add")
clear_vms = grist_instance.find_record(state="Clean", table="Nodes")
dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes")
wip_vms = grist_instance.find_record(state="WiP", table="Nodes")
clear_keys = len(clear_vms)
dirty_keys = len(dirty_vms) + len(wip_vms)
deploy_elapsed_time = (datetime.now() - deploy_start_time).total_seconds()
if vm_prox_node is None: vm_prox_node = "None"
tg_log.message(f"✅ Deployed vm {vm_name} in '{vm_prox_node}' (took {format_time(deploy_elapsed_time)}, vm {clear_keys} out of {dirty_keys+clear_keys} in batch, left {dirty_keys} nodes")
logger.info(f"Deployed node {vm_name}/{vm_ip} in {vm_prox_node} ({format_time(deploy_elapsed_time)} elapsed install)")
delay = random.uniform(min_wait/2, max_wait/2)
format_delay = format_time(delay)
grist_callback( { "Status": f"Deployed, wait {format_delay}.." })
grist_callback( {"State": "Clean", "Version": current_version, "Deploy date": datetime.now()} )
grist_callback( {"Deploy time": f"{format_time(deploy_elapsed_time)}"} )
random_sleep(logger, delay=delay)
grist_callback( { "Status": f"Deployed ok on attempt {attempt}/{max_retries}", "Retries": f"{0}/{4}" })
result = True
return result
except ResourceException as e:
attempt += 0.5
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: Proxmox Resource Exception: {e}")
error_message = str(e).replace('\n', '')
grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"})
tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}")
time.sleep(30)
result = False
return result
except Exception as e:
attempt += 1
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: {e}")
tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}")
error_message = str(e).replace('\n', '')
grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"})
time.sleep(30)
result = False
return result
finally:
if result == None: grist_callback( {"State": "Dirty", "Status": f"Ended by interrupt"} )
def main():
log_dir = "logs"
log_file = "nexus_deployer.log"
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_dir): os.makedirs(log_dir)
if not os.path.exists(log_path): open(log_path, 'a', encoding='utf-8').close()
logger_instance = LoggerWrapper(name="nexus", log_file=log_path)
logger = logger_instance.get_logger()
grist_env = {}
grist_env["GRIST_SERVER"] = os.getenv("GRIST_SERVER")
grist_env["GRIST_DOC_ID"] = os.getenv("GRIST_DOC_ID")
grist_env["GRIST_API_KEY"] = os.getenv("GRIST_API_KEY")
grist_instance = GRIST(grist_env["GRIST_SERVER"], grist_env["GRIST_DOC_ID"], grist_env["GRIST_API_KEY"], logger)
logger.info(f"Starting nexus deployer...")
deployer_key = grist_instance.find_settings("Ansible SSH keyfile", "Settings")
if not os.path.exists(deployer_key):
logger.error(f"SSH private key file not found: {deployer_key}")
raise FileNotFoundError(f"Required SSH private key file {deployer_key} is missing")
logger.info(f"Run on-start grist processing WiP -> Dirty")
#repeats = grist_instance.find_settings("Repeats", "Settings")
#for row in grist_instance.fetch_table("Nodes"):
# if row.State == "WiP":
# grist_instance.update(row.id, {"State": "Dirty", "Settings"})
# grist_instance.update(row.id, {"Status": "Set Dirty by restart deployer", "Settings"})
#logger.info(f"Run on-start grist processing Dirty -> 0/{repeats}")
#for row in grist_instance.fetch_table("Nodes"):
# if row.State == "Dirty" and row.Retries is not None and row.Retries != f"0/{repeats}":
# grist_instance.update(row.id, {"Retries": f"0/{repeats}"}, "Nodes")
#while True:
#repeat_deploy(grist_instance, logger, grist_env)
#logger.info("Restart deployer")
#os.execv(sys.executable, [sys.executable] + sys.argv)
#time.sleep(10)
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = []
start_times = {}
#while True:
max_threads = int(grist_instance.find_settings("Workers", "Settings"))
if max_threads > 20: max_threads = 20
while len(futures) < max_threads:
future = executor.submit(repeat_deploy, grist_instance, logger, grist_env)
futures.append(future)
start_times[future] = time.time()
logger.info(f"Spawned new task. Total active tasks: {len(futures)}, max threads: {max_threads}")
time.sleep(120)
max_threads = int(grist_instance.find_settings("Workers", "Settings"))
current_time = time.time()
for future in futures.copy():
if future.done():
try:
result = future.result()
logger.info(f"Task completed: {result}")
except Exception as e:
logger.error(f"Task failed with error: {e}")
logger.error(f"Fail: {e}\n{traceback.format_exc()}")
futures.remove(future)
start_times.pop(future, None)
elif current_time - start_times[future] > 60*60*3:
future.cancel()
logger.info("Cancelled a task due to timeout.")
futures.remove(future)
start_times.pop(future, None)
#time.sleep(10)
except KeyboardInterrupt:
logger.info("Program interrupted, shutting down executor...")
finally:
executor.shutdown(wait=True)
logger.info("Resources released, executor shutdown completed.")
if __name__ == "__main__":
if main() == False:
sys.exit(1)
sys.exit(0)

11
deployer/requirements.txt Normal file
View File

@ -0,0 +1,11 @@
ansible-output-parser==0.1.0
certifi==2024.7.4
charset-normalizer==3.3.2
colorama==0.4.6
future==1.0.0
grist-api==0.1.0
idna==3.8
proxmoxer==2.1.0
PyYAML==6.0.2
requests==2.32.3
urllib3==2.2.2

View File

@ -326,15 +326,12 @@
WantedBy=multi-user.target
mode: '0644'
- name: Reload systemd
ansible.builtin.systemd:
daemon_reload: yes
- name: Enable and start node-checker service
ansible.builtin.systemd:
name: node-checker
enabled: yes
state: started
daemon_reload: yes
- name: Remove docker login credentials
ansible.builtin.file:

View File

@ -1 +0,0 @@
###ID###

24
ws.code-workspace Normal file
View File

@ -0,0 +1,24 @@
{
"folders": [
{
"path": "."
}
],
"settings": {
"workbench.colorCustomizations": {
"activityBar.activeBackground": "#44a003",
"activityBar.background": "#44a003",
"activityBar.foreground": "#e7e7e7",
"activityBar.inactiveForeground": "#e7e7e799",
"activityBarBadge.background": "#0450bc",
"activityBarBadge.foreground": "#e7e7e7",
"commandCenter.border": "#e7e7e799",
"sash.hoverBorder": "#44a003",
"titleBar.activeBackground": "#2f6e02",
"titleBar.activeForeground": "#e7e7e7",
"titleBar.inactiveBackground": "#2f6e0299",
"titleBar.inactiveForeground": "#e7e7e799"
},
"peacock.color": "#2f6e02"
}
}