# flake8: noqa # pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name # pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues # type: ignore import subprocess import time import json import logging import sys import random import os import re import traceback from datetime import datetime, timedelta, timezone from collections import deque import warnings import concurrent.futures import threading import uuid from ansible_parser.logs import Play as AnsibleLogParser from grist_api import GristDocAPI from proxmoxer import ProxmoxAPI, ResourceException import yaml import colorama import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning last_deploy_times = deque(maxlen=5) grist_lock = threading.Lock() class ColoredFormatter(logging.Formatter): def __init__(self, fmt): colorama.init(autoreset=True) super().__init__(fmt) def format(self, record): if record.levelno == logging.ERROR: record.msg = f"{colorama.Fore.RED}{record.msg}{colorama.Style.RESET_ALL}" elif record.levelno == logging.WARNING: record.msg = f"{colorama.Fore.YELLOW}{record.msg}{colorama.Style.RESET_ALL}" elif record.levelno == logging.INFO: record.msg = f"{colorama.Fore.GREEN}{record.msg}{colorama.Style.RESET_ALL}" return super().format(record) class LogsToGrist(logging.Handler): def __init__(self, callback=None): super().__init__() self.callback = callback def emit(self, record): log_entry = self.format(record) ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') log_entry = ansi_escape.sub('', log_entry) log_entry = log_entry.replace("Ansible task: ", "") self.callback({"Status": log_entry}) class LoggerWrapper: def __init__(self, name, log_file, grist_callback=None): self.logger = logging.getLogger(name) self.logger.setLevel(logging.DEBUG) self.logger.propagate = False # File handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) colored_formatter = ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(colored_formatter) self.logger.addHandler(console_handler) # grist handler if grist_callback is not None: grist_handler = LogsToGrist(callback=grist_callback) grist_handler.setLevel(logging.INFO) grist_handler.setFormatter(logging.Formatter('%(message)s')) self.logger.addHandler(grist_handler) def __del__(self): handlers = self.logger.handlers[:] for handler in handlers: handler.close() self.logger.removeHandler(handler) def get_logger(self): return self.logger def format_time(current_time): if current_time < 5: return f"{current_time:.1f} sec" elif current_time < 60: return f"{int(current_time)} sec" elif current_time < 3600: minutes = current_time // 60 seconds = current_time % 60 return f"{int(minutes)} min {int(seconds)} sec" elif current_time < 86400: hours = current_time // 3600 minutes = (current_time % 3600) // 60 seconds = current_time % 60 return f"{int(hours)} h {int(minutes)} min" else: days = current_time // 86400 hours = (current_time % 86400) // 3600 minutes = (current_time % 3600) // 60 seconds = current_time % 60 return f"{int(days)} d {int(hours)} h" def random_sleep(logger, min_delay=None, max_delay=None, delay=None): if delay is None: delay = random.uniform(min_delay, max_delay) formatted_delay = format_time(delay) end_time = datetime.now() + timedelta(seconds=delay) end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S') logger.debug(f"Waiting for {formatted_delay} (expected end time: {end_time_str})") time.sleep(delay) class TelegramBot: def __init__(self, token, topic, chat_id, logger): self.token = token self.topic = topic self.logger = logger self.chat_id = chat_id self.base_url = f"https://api.telegram.org/bot{self.token}/" def message(self, message): url = self.base_url + "sendMessage" payload = { 'chat_id': self.chat_id, 'text': f"[{self.topic}] {message}" } response = requests.post(url, json=payload, timeout=10) response_data = response.json() if not response_data.get('ok', True): self.logger.error(f"Error sending message: {response_data}") return response_data class GRIST: def __init__(self, server, doc_id, api_key, logger): self.server = server self.doc_id = doc_id self.api_key = api_key self.logger = logger self.grist = GristDocAPI(doc_id, server=server, api_key=api_key) def table_name_convert(self, table_name): return table_name.replace(" ", "_") def to_timestamp(self, dtime: datetime) -> int: if dtime.tzinfo is None: dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) return int(dtime.timestamp()) def insert_row(self, data, table): data = {key.replace(" ", "_"): value for key, value in data.items()} row_id = self.grist.add_records(self.table_name_convert(table), [data]) return row_id def update_column(self, row_id, column_name, value, table): if isinstance(value, datetime): value = self.to_timestamp(value) column_name = column_name.replace(" ", "_") self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }]) def delete_row(self, row_id, table): self.grist.delete_records(self.table_name_convert(table), [row_id]) def update(self, row_id, updates, table): for column_name, value in updates.items(): if isinstance(value, datetime): updates[column_name] = self.to_timestamp(value) updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()} self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}]) def fetch_table(self, table): return self.grist.fetch_table(self.table_name_convert(table)) def find_record(self, id=None, state=None, name=None, table=None): if table is None: raise ValueError("Table is not specified") table_data = self.grist.fetch_table(self.table_name_convert(table)) if id is not None: record = [row for row in table_data if row.id == id] return record if state is not None and name is not None: record = [row for row in table_data if row.State == state and row.name == name] return record if state is not None: record = [row for row in table_data if row.State == state] return record if name is not None: record = [row for row in table_data if row.Name == name] return record #def find_data_by_key(self, key, table): # data = getattr(self.fetch_table(self.table_name_convert(table))[0], key) # return data def find_settings(self, key, table): table = self.fetch_table(self.table_name_convert(table)) for record in table: if record.Setting == key: if record.Value is None or record.Value == "": raise ValueError(f"Setting {key} blank") return record.Value raise ValueError(f"Setting {key} not found") class Prox: def __init__(self, host, user, password, logger, timeout=10): self.session = requests.Session() self.logger = logger self.session.request = lambda *args, **kwargs: requests.request(*args, timeout=timeout, **kwargs) self.proxmox = ProxmoxAPI(host=host, user=user, password=password, verify_ssl=False, service='PVE', timeout=timeout) def get_vms_data(self, vm_name=None, node_name=None, vm_id=None): if node_name is not None: nodes = [{"node": node_name}] else: nodes = self.proxmox.nodes.get() vms_with_name = [] for node in nodes: if vm_id is not None: try: vms = [self.proxmox.nodes(node['node']).qemu(vm_id).status.current.get()] except Exception: continue else: vms = self.proxmox.nodes(node['node']).qemu.get() for vm in vms: if vm_name is None or vm.get('name') == vm_name or vm.get('vmid') == str(vm_id): cfg = self.proxmox.nodes(node['node']).qemu(vm.get('vmid')).config.get() vm['ipconfig'] = cfg.get('ipconfig0', None) vm['cpu'] = cfg.get('cpu', None) vm['node'] = node['node'] vm['memory'] = cfg.get('memory', None) vm['net'] = cfg.get('net0', None) vm['template'] = int(vm.get('template', 0)) vm['disk'] = cfg.get(cfg.get('bootdisk', ''), None) if vm['disk']: vm['disk_storage'] = vm['disk'].split(':')[0] else: vm['disk_storage'] = None vms_with_name.append(vm) return vms_with_name def search_vm(self, vm_name=None, node=None, vm_id=None): vms = self.get_vms_data(vm_name=vm_name, node_name=node, vm_id=vm_id) if len(vms) > 0: return vms[0] return None def cmd_vm_by_name(self, vm_name, cmd): nds = self.proxmox.nodes nodes = nds.get() for node in nodes: for vm in nds(node['node']).qemu.get(): if vm['name'] == vm_name: if cmd == "reboot": upid = nds(node['node']).qemu(vm['vmid']).status.reboot.post() elif cmd == "shutdown": upid = nds(node['node']).qemu(vm['vmid']).status.shutdown.post() elif cmd == "start": upid = nds(node['node']).qemu(vm['vmid']).status.start.post() elif cmd == "stop": upid = nds(node['node']).qemu(vm['vmid']).status.stop.post() elif cmd == "reset": upid = nds(node['node']).qemu(vm['vmid']).status.reset.post() elif cmd == "destroy": upid = nds(node['node']).qemu(vm['vmid']).status.stop.post() while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1) upid = nds(node['node']).qemu(vm['vmid']).delete() while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1) def id_by_name(self, vm_name): vms = self.get_vms_data(vm_name=vm_name) if len(vms) > 0: return vms[0]['vmid'] return None def name_by_id(self, vm_id): vms = self.get_vms_data(vm_id=vm_id) if len(vms) > 0: return vms[0]['name'] return None def wait_for_ip(self, node, vmid, max_iterations=10, period=10): self.logger.info(f"Wait boot VM {vmid} on node {node}") vm_ip_v4 = "" iterations = 0 while not vm_ip_v4 and iterations < max_iterations: time.sleep(period) self.logger.info(f"Get IP attempt {iterations} of {max_iterations}") try: vm_status = self.proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get() if 'result' in vm_status: for interface in vm_status['result']: if interface['name'].startswith(('lo', 'br-', 'veth', 'docker')): continue for ip in interface.get('ip-addresses', []): if ip['ip-address-type'] == 'ipv4' and not vm_ip_v4: vm_ip_v4 = ip['ip-address'] if vm_ip_v4: self.logger.info(f"VM {vmid} on node {node} received IP: {vm_ip_v4}") break except ResourceException as e: if "QEMU guest agent is not running" in str(e): pass #print(f".", end="", flush=True) except Exception as e: self.logger.info(f"Error retrieving IP for VM {vmid} on node {node}: {e}") iterations += 1 if not vm_ip_v4: #print("") return False return vm_ip_v4 def check_vm(self, vm_id): try: vm = self.search_vm(vm_id=vm_id) self.logger.info(f"VM {vm_id} status: {vm}") return True except ResourceException as e: self.logger.info(f"Failed to check VM {vm_id} status: {e}") return False def clone_vm(self, template_id, new_vm_name): try: new_vm_id = self.proxmox.cluster.nextid.get() template = self.search_vm(vm_id=template_id) if template is None: raise ValueError(f"Template VM with ID {template_id} not found") node = template['node'] self.logger.info(f"Clone VM {template_id}->{new_vm_id}: {new_vm_name}...") upid = self.proxmox.nodes(node).qemu(template_id).clone.post(newid=new_vm_id, name=new_vm_name) while self.proxmox.nodes(node).tasks(upid).status.get()['status'] != 'stopped': print(f"{new_vm_name} clone state: {self.proxmox.nodes(node).tasks(upid).status.get()['status']}") time.sleep(10) except ResourceException as e: self.logger.error(f"Failed to clone VM {template_id}: {e}") raise ValueError(f"Failed to clone VM {template_id}/{new_vm_name}") from e return new_vm_id, node def get_proxmox_nodes(self): nodes = [] raw_nodes = self.proxmox.nodes.get() for node in raw_nodes: nodes.append({ "cpu": node.get('cpu'), "name": node.get('node'), "status": node.get('status'), "uptime": node.get('uptime') }) return nodes def create_vm(self, vm_name, template_id=None): existing_vm_id = self.id_by_name(vm_name) old_vm_data = self.search_vm(vm_id=existing_vm_id) if existing_vm_id is not None: self.logger.info(f"Virtual machine {vm_name} already exists") #self.logger.info(f"Find probabilistic template...") #all_vms = self.get_vms_data(node_name=old_vm_data['node']) #templates_vms = [vm for vm in all_vms if vm['template'] == 1] #probabilistic_template = [vm for vm in templates_vms if vm['disk_storage'] == old_vm_data['disk_storage']][0] #self.logger.info(f"Find {old_vm_data['name']}({old_vm_data['vmid']}) probabilistic template: {probabilistic_template['name']}({probabilistic_template['vmid']})/{probabilistic_template['node']}") #template_id = probabilistic_template['vmid'] self.logger.info(f"Deleting VM ID {existing_vm_id}/{old_vm_data['node']}") self.cmd_vm_by_name(vm_name, "destroy") else: self.logger.info(f"Virtual machine with name {vm_name} not found, creating new VM...") if template_id is None: self.logger.error(f"Template ID not provided") raise ValueError("Template ID not provided") new_vm_id, node = self.clone_vm(template_id, vm_name) self.proxmox.nodes(node).qemu(new_vm_id).config.post(tags="") self.logger.info(f"Starting VM {new_vm_id} on node {node}") self.cmd_vm_by_name(vm_name, "start") vm_ip = self.wait_for_ip(node, new_vm_id) if vm_ip == False: self.logger.info(f"Failed to get IP address for VM {new_vm_id}.") raise ValueError(f"Failed to get IP address for VM {new_vm_id}.") if vm_ip != False and vm_ip is not None: self.logger.info(f"Virtual machine {vm_name} with ID {new_vm_id} has IP address: {vm_ip}") return vm_ip, node, new_vm_id raise ValueError(f"Unknown error") def check_duplicate_vms(self, vms): vm_node_mapping = {} for vm_info in vms: vm = vm_info['name'] node = vm_info['node'] if vm in vm_node_mapping: vm_node_mapping[vm].append(node) else: vm_node_mapping[vm] = [node] duplicates = {name: nodes for name, nodes in vm_node_mapping.items() if len(nodes) > 1} return duplicates def modify_tag(self, vm_id, tag, action="add"): tag = tag.lower() vm = self.search_vm(vm_id=vm_id) if vm is None: self.logger.error(f"VM {vm_id} not found") return False if vm is not None: current_config = self.proxmox.nodes(vm['node']).qemu(vm_id).config.get() tags_list = current_config.get('tags', '').split(';') if action == "add": if tag not in tags_list: tags_list.append(tag) updated_tags = ';'.join(filter(None, tags_list)) self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags) self.logger.info(f"Added tag '{tag}' to VM {vm_id}") else: self.logger.info(f"Tag '{tag}' already exists in VM {vm_id}") elif action == "remove": if tag in tags_list: tags_list.remove(tag) updated_tags = ';'.join(filter(None, tags_list)) self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags) self.logger.info(f"Removed tag '{tag}' from VM {vm_id}") else: self.logger.info(f"Tag '{tag}' not found in VM {vm_id}") else: self.logger.error(f"Invalid action: {action}") return False return True def parse_ansible_log(ansible_log): task_blocks = re.split(r"(?=\nTASK \[)", ansible_log) last_task_block = task_blocks[-1] if "failed:" in last_task_block or "fatal:" in last_task_block: try: task_name = re.search(r"TASK \[(.*?)\]", last_task_block).group(1) failed_json_str = re.search(r'=> ({.*?})', last_task_block, re.DOTALL).group(1) failed_json = json.loads(failed_json_str) stdout_lines = failed_json.get("stdout_lines", []) stderr_lines = failed_json.get("stderr_lines", []) msg = failed_json.get("msg", "") print(f"Stdout lines: {stderr_lines}", flush=True) for line in stdout_lines[-10:] + stderr_lines[-10:] + [msg]: if "429 Too Many Requests" in line and "https://www.docker.com/increase-rate-limit" in line: print("429 Too Many Requests detected in stdout_lines.", flush=True) return task_name, "429TOOMANYREQUESTS" if "HTTP Error 504: Gateway Time-out" in line: print("HTTP Error 504: Gateway Time-out detected in stdout_lines.", flush=True) return task_name, "504GATEWAYTIMEOUT" for line in stdout_lines[-5:]: print(f"Stdout: {line}", flush=True) return task_name, None for line in stderr_lines[-5:]: print(f"Stderr: {line}", flush=True) return task_name, None print("No error message found in stdout or stderr.", flush=True) return task_name, None except (json.JSONDecodeError, AttributeError): print("Error extracting or decoding JSON.", flush=True) return None, None else: print("The last TASK did not end with ASYNC FAILED and fatal.", flush=True) return None, None def deploy_node(name, ansible_host, node_id, proxy, proxy_jump, private_key_file, docker_username, docker_password, git_version, git_base_url, logger_instance, grist_server, grist_doc_id, grist_api_key): if not os.path.exists(private_key_file): raise Exception(f"SSH private key file not found: {private_key_file}") ansible_dir = f"ansible" if not os.path.exists(ansible_dir): os.makedirs(ansible_dir) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') inventory_filename = f"{ansible_dir}/inventory_{name}_{timestamp}.yml" log_output_filename = f"{ansible_dir}/log_{name}_{timestamp}.log" playbook_filename = f"{ansible_dir}/playbook_{name}_{timestamp}.yml" playbook_url = f"{git_base_url}/raw/branch/{git_version}/playbook.yml" response = requests.get(playbook_url, timeout=10) if response.status_code == 200: with open(playbook_filename, 'w', encoding='utf-8') as f: f.write(response.text) else: logger_instance.error(f"Failed to download playbook at {playbook_url}. Status code: {response.status_code}") raise Exception(f"Failed to download playbook. Status code: {response.status_code}") inventory = {'all': {'vars': {}, 'hosts': {}}} inventory['all']['vars']['ansible_user'] = 'root' inventory['all']['vars']['ansible_ssh_private_key_file'] = private_key_file if proxy_jump: inventory['all']['vars']['ansible_ssh_common_args'] = f'-J {proxy_jump}' inventory['all']['hosts'][name] = {} inventory['all']['hosts'][name]['ansible_host'] = ansible_host inventory['all']['hosts'][name]['serverid'] = name inventory['all']['hosts'][name]['id'] = node_id inventory['all']['hosts'][name]['proxy'] = proxy inventory['all']['hosts'][name]['grist_server'] = grist_server inventory['all']['hosts'][name]['grist_doc_id'] = grist_doc_id inventory['all']['hosts'][name]['grist_api_key'] = grist_api_key inventory['all']['hosts'][name]['git_version'] = git_version inventory['all']['hosts'][name]['docker_username'] = docker_username inventory['all']['hosts'][name]['docker_password'] = docker_password with open(inventory_filename, 'w', encoding='utf-8') as file: yaml.dump(inventory, file, default_flow_style=False) logger_instance.info(f"Running nillion deploy playbook...") log_data = "" with open(log_output_filename, 'w', encoding='utf-8') as deploy_log: process = subprocess.Popen( ['ansible-playbook', '-i', inventory_filename, playbook_filename, '-v'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) for line in iter(process.stdout.readline, ''): if line.startswith("TASK ["): logger_instance.info(f"Ansible task: {line.replace('*', '').replace('TASK ', '').strip()}") deploy_log.write(line) log_data += line process.stdout.close() return_code = process.wait() deploy_log.flush() if return_code != 0: ansible_failed_task = "" ansible = AnsibleLogParser(play_output=log_data) plays = ansible.plays() for _, tasks in plays.items(): for task_name, task_object in tasks.items(): for result in task_object.results: if result.get('status') == 'failed' or result.get('status') == 'fatal': #print(f"Failed task: {result}") ansible_failed_task = task_name logger_instance.error(f"Deploy failed. Failed stage: {ansible_failed_task} Check log file: {log_output_filename}, inventory file: {inventory_filename}") raise Exception(f"Ansible error on {ansible_failed_task}") return log_data def nodes_table_preprocessing(grist_instance, logger): current_time = grist_instance.to_timestamp(datetime.now()) max_wip = 60*60*2 logger.info(f"Run grist processing NoneState -> Dirty and NoneVersion -> av1") for row in grist_instance.fetch_table("Nodes"): if row.State == "": grist_instance.update_column(row.id, "State", "Dirty", "Nodes") if row.Version == "": grist_instance.update_column(row.id, "Version", "nv1", "Nodes" ) logger.info(f"Run grist processing WiP and >max_wip old -> Dirty") for row in grist_instance.fetch_table("Nodes"): if row.Deploy_date is not None: vm_old_age = current_time - row.Deploy_date if row.State == "WiP" and vm_old_age > max_wip and row.State != "Dirty": grist_instance.update_column(row.id, "State", "Dirty", "Nodes") grist_instance.update_column(row.id, "Status", "Set Dirty by WiP Timeout", "Nodes") logger.info(f"Run grist processing NoneRetries -> 0/4") for row in grist_instance.fetch_table("Nodes"): if row.Retries is None or row.Retries == "": grist_instance.update_column(row.id, "Retries", "0/4", "Nodes") def generate_proxy(login, password, proxy_ip, proxy_port, logger): countries = [ "AT", "AM", "BE", "HU", "DE", "IL", "IE", "ES", "IT", "LU", "NL", "NO", "PL", "PT", "RS", "FI", "FR", "SE", "CH", "CZ", "EE" ] country = random.choice(countries) id_hex = ''.join(random.choices('0123456789abcdef', k=10)) login_id = f"{login}_s_{id_hex}_c_{country}" proxy_socks5 = f"socks5://{login_id}:{password}@{proxy_ip}:{proxy_port}" proxy_http = f"http://{login_id}:{password}@{proxy_ip}:{proxy_port}" ipinfo_url = "http://ipinfo.io" headers = {"User-Agent": "curl/7.68.0"} try: response = requests.get(ipinfo_url, headers=headers, proxies={"http": proxy_http, "https": proxy_http}, timeout=10) except requests.exceptions.RequestException as e: logger.error(f"[proxy test] Error while requesting ipinfo.io: {e}") return None if response.status_code == 200: logger.info(f"[proxy test] Successfully retrieved IP information: {response.json().get('ip')}") return proxy_socks5 else: logger.error(f"[proxy test] Error while requesting ipinfo.io: {response.status_code}") return None def repeat_deploy(grist_instance, logger, grist_env): result = None generated_address = False wait_period = int(grist_instance.find_settings("Wait period in minutes", "Settings"))*60 name_node = grist_instance.find_settings("Node name", "Settings") sticker_node = grist_instance.find_settings("Node sticker", "Settings") multiplier = 1.5 diff = round(wait_period/multiplier) min_wait = round(wait_period - diff) max_wait = round(wait_period + diff) with grist_lock: nodes_table_preprocessing(grist_instance, logger) dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes") if len(dirty_vms) == 0: print(f"No dirty nodes found") result = False return result dirty_vms.sort(key=lambda vm: (int(vm.Retries.split('/')[0]))) #dirty_vms = sorted(dirty_vms, key=lambda row: 0 if row.Impact == "" else 1) dirty_vm = dirty_vms[0] def grist_callback(msg): grist_instance.update(dirty_vm.id, msg, "Nodes") grist_instance.update(dirty_vm.id, { "State": "WiP", "Status": "Locked", "Deploy date": datetime.now() }, "Nodes") if dirty_vm.Retries.count('/') == 1: try: attempt = int(dirty_vm.Retries.split('/')[0]) max_retries = int(dirty_vm.Retries.split('/')[1]) if attempt >= max_retries: grist_instance.update(dirty_vm.id, { "State": "Error", "Status": f"{dirty_vm.Status} (Max retries)" }, "Nodes") result = False return result attempt += 1 grist_instance.update(dirty_vm.id, { "Retries": f"{attempt}/{max_retries}" }, "Nodes") except ValueError: grist_instance.update(dirty_vm.id, { "State": "Error", "Status": "Error retries" }, "Nodes") result = False return result if dirty_vm.Proxy is None or dirty_vm.Proxy == "": grist_callback( { "Status": "Generate proxy" }) login = grist_instance.find_settings("Proxy generator login", "Settings") password = grist_instance.find_settings("Proxy generator password", "Settings") proxy_ip = grist_instance.find_settings("Proxy generator ip", "Settings") proxy_port = grist_instance.find_settings("Proxy generator port", "Settings") for _ in range(5): proxy_socks5 = generate_proxy(login, password, proxy_ip, proxy_port, logger) if proxy_socks5 is not None: break logger.error("[proxy] Failed to generate proxy, retrying...") time.sleep(1) grist_instance.update(dirty_vm.id, { "Proxy": proxy_socks5 }, "Nodes") dirty_vm = grist_instance.find_record(id=dirty_vm.id, table="Nodes")[0] vm_name = f"{name_node}-{dirty_vm.NodeID[8:16]}" logger_instance = LoggerWrapper(name=f"{vm_name}", log_file=f"logs/{name_node}_creation_{vm_name}.log", grist_callback=grist_callback) logger = logger_instance.get_logger() grist_callback( { "Name": vm_name, "State": "WiP", "Status": "Run proxmox worker.." }) required_fields = ["NodeID", "Proxy", "Server", "Storage", "Retries"] for field in required_fields: if not getattr(dirty_vm, field.replace(" ", "_")): grist_callback( { "State": "Error", "Status": f"No {field.lower()}" }) result = False return result prox_host = grist_instance.find_settings("Proxmox host", "Settings") prox_user = grist_instance.find_settings("Proxmox user", "Settings") prox_password = grist_instance.find_settings("Proxmox password", "Settings") prox = Prox(prox_host, prox_user, prox_password, logger) tg_token = grist_instance.find_settings("Telegram token", "Settings") tg_chat_id = grist_instance.find_settings("Telegram chat id", "Settings") tg_log = TelegramBot(tg_token, f"{sticker_node} {name_node}", tg_chat_id, logger) current_version = grist_instance.find_settings("Current version", "Settings") logger.info(f"Create-run-deploy node {vm_name} on attempt {attempt}") deploy_start_time = datetime.now() try: grist_callback( { "IP": "", "Deploy date": datetime.now()}) all_vms = prox.get_vms_data(node_name=dirty_vm.Server) vm_prox_node = dirty_vm.Server templates_vms = [vm for vm in all_vms if vm['template'] == 1] target_storage_templates = [vm for vm in templates_vms if vm['disk_storage'] == dirty_vm.Storage] latest_templates = [vm for vm in target_storage_templates if 'tags' in vm and 'latest' in vm['tags'].split(';')] if len(latest_templates) == 0: raise ResourceException(status_code=404, status_message="No latest template", content="No latest template") template_id = latest_templates[0]['vmid'] try: vm_ip, vm_prox_node, vm_id = prox.create_vm(vm_name, template_id=template_id) except Exception as e: raise ResourceException(status_code=500, status_message="Failed to create VM", content=str(e)) from e prox.modify_tag(vm_id=vm_id, tag="WiP", action="add") grist_callback( { "IP": vm_ip }) if generated_address: delay = random.uniform(min_wait/2, max_wait/2) grist_callback( { "Status": f"Wait {format_time(delay)} before ansible.." }) random_sleep(logger, delay=delay) grist_callback( { "Status": "Run ansible worker.." }) deploy_node(name=vm_name, ansible_host=vm_ip, node_id=dirty_vm.NodeID, proxy=dirty_vm.Proxy, git_version=grist_instance.find_settings("Git version", "Settings"), git_base_url = grist_instance.find_settings("Git base URL", "Settings"), proxy_jump=grist_instance.find_settings("Ansible SSH Jump host", "Settings"), private_key_file=grist_instance.find_settings("Ansible SSH keyfile", "Settings"), docker_username=grist_instance.find_settings("Docker username", "Settings"), docker_password=grist_instance.find_settings("Docker password", "Settings"), grist_server=grist_env["GRIST_SERVER"], grist_doc_id=grist_env["GRIST_DOC_ID"], grist_api_key=grist_env["GRIST_API_KEY"], logger_instance=logger) prox.modify_tag(vm_id=vm_id, tag="WiP", action="remove") prox.modify_tag(vm_id=vm_id, tag=current_version, action="add") clear_vms = grist_instance.find_record(state="Clean", table="Nodes") dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes") wip_vms = grist_instance.find_record(state="WiP", table="Nodes") clear_keys = len(clear_vms) dirty_keys = len(dirty_vms) + len(wip_vms) deploy_elapsed_time = (datetime.now() - deploy_start_time).total_seconds() if vm_prox_node is None: vm_prox_node = "None" tg_log.message(f"✅ Deployed vm {vm_name} in '{vm_prox_node}' (took {format_time(deploy_elapsed_time)}, vm {clear_keys} out of {dirty_keys+clear_keys} in batch, left {dirty_keys} nodes") logger.info(f"Deployed node {vm_name}/{vm_ip} in {vm_prox_node} ({format_time(deploy_elapsed_time)} elapsed install)") delay = random.uniform(min_wait/2, max_wait/2) format_delay = format_time(delay) grist_callback( { "Status": f"Deployed, wait {format_delay}.." }) grist_callback( {"State": "Clean", "Version": current_version, "Deploy date": datetime.now()} ) grist_callback( {"Deploy time": f"{format_time(deploy_elapsed_time)}"} ) random_sleep(logger, delay=delay) grist_callback( { "Status": f"Deployed ok on attempt {attempt}/{max_retries}", "Retries": f"{0}/{4}" }) result = True return result except ResourceException as e: attempt += 0.5 logger.error(f"Fail: {e}\n{traceback.format_exc()}") logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: Proxmox Resource Exception: {e}") error_message = str(e).replace('\n', '') grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"}) tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}") time.sleep(30) result = False return result except Exception as e: attempt += 1 logger.error(f"Fail: {e}\n{traceback.format_exc()}") logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: {e}") tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}") error_message = str(e).replace('\n', '') grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"}) time.sleep(30) result = False return result finally: if result == None: grist_callback( {"State": "Dirty", "Status": f"Ended by interrupt"} ) def main(): log_dir = "logs" log_file = "nexus_deployer.log" log_path = os.path.join(log_dir, log_file) if not os.path.exists(log_dir): os.makedirs(log_dir) if not os.path.exists(log_path): open(log_path, 'a', encoding='utf-8').close() logger_instance = LoggerWrapper(name="nexus", log_file=log_path) logger = logger_instance.get_logger() grist_env = {} grist_env["GRIST_SERVER"] = os.getenv("GRIST_SERVER") grist_env["GRIST_DOC_ID"] = os.getenv("GRIST_DOC_ID") grist_env["GRIST_API_KEY"] = os.getenv("GRIST_API_KEY") grist_instance = GRIST(grist_env["GRIST_SERVER"], grist_env["GRIST_DOC_ID"], grist_env["GRIST_API_KEY"], logger) logger.info(f"Starting nexus deployer...") deployer_key = grist_instance.find_settings("Ansible SSH keyfile", "Settings") if not os.path.exists(deployer_key): logger.error(f"SSH private key file not found: {deployer_key}") raise FileNotFoundError(f"Required SSH private key file {deployer_key} is missing") logger.info(f"Run on-start grist processing WiP -> Dirty") #repeats = grist_instance.find_settings("Repeats", "Settings") #for row in grist_instance.fetch_table("Nodes"): # if row.State == "WiP": # grist_instance.update(row.id, {"State": "Dirty", "Settings"}) # grist_instance.update(row.id, {"Status": "Set Dirty by restart deployer", "Settings"}) #logger.info(f"Run on-start grist processing Dirty -> 0/{repeats}") #for row in grist_instance.fetch_table("Nodes"): # if row.State == "Dirty" and row.Retries is not None and row.Retries != f"0/{repeats}": # grist_instance.update(row.id, {"Retries": f"0/{repeats}"}, "Nodes") #while True: #repeat_deploy(grist_instance, logger, grist_env) #logger.info("Restart deployer") #os.execv(sys.executable, [sys.executable] + sys.argv) #time.sleep(10) try: with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: futures = [] start_times = {} #while True: max_threads = int(grist_instance.find_settings("Workers", "Settings")) if max_threads > 20: max_threads = 20 while len(futures) < max_threads: future = executor.submit(repeat_deploy, grist_instance, logger, grist_env) futures.append(future) start_times[future] = time.time() logger.info(f"Spawned new task. Total active tasks: {len(futures)}, max threads: {max_threads}") time.sleep(120) max_threads = int(grist_instance.find_settings("Workers", "Settings")) current_time = time.time() for future in futures.copy(): if future.done(): try: result = future.result() logger.info(f"Task completed: {result}") except Exception as e: logger.error(f"Task failed with error: {e}") logger.error(f"Fail: {e}\n{traceback.format_exc()}") futures.remove(future) start_times.pop(future, None) elif current_time - start_times[future] > 60*60*3: future.cancel() logger.info("Cancelled a task due to timeout.") futures.remove(future) start_times.pop(future, None) #time.sleep(10) except KeyboardInterrupt: logger.info("Program interrupted, shutting down executor...") finally: executor.shutdown(wait=True) logger.info("Resources released, executor shutdown completed.") if __name__ == "__main__": if main() == False: sys.exit(1) sys.exit(0)