diff --git a/deployer/deploer_key.private b/deployer/deploer_key.private new file mode 100644 index 0000000..e61b6ac --- /dev/null +++ b/deployer/deploer_key.private @@ -0,0 +1,49 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAACFwAAAAdzc2gtcn +NhAAAAAwEAAQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaaXoExE8G37OXSr+bh3tlr +Rmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWvL1bJ63EEeqjlr9GtM7Wb +JxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7YonziuWNNHh4POWoa8EL8O +0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueKJAMSicr/1VLPN8X7pUgB +77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yAt/yL72Q/zRCj3p1F4dBW +Ah92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesBuiYI2LmLgIJaCOSsLRdC +MOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv6SkwCVqzOCt43V8rHJ2l +XrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7+Y2CksJpIWVPJJ3O4kF4 +uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYruC3UKfCyFkhxcXQCLpUr +D1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t5D5ndEjSP6HllYv7vMoh +sAAAdQSM/bbEjP22wAAAAHc3NoLXJzYQAAAgEAt3NLNdcEYfBqR541tQYitkWa1PVgngaa +XoExE8G37OXSr+bh3tlrRmnemtbb+bgYciB7b0AR13+DKJy3UEFiIJNPOGHNhrHlf80WWv +L1bJ63EEeqjlr9GtM7WbJxYCGBMZ/dI1FYEi+/Aqfwv1D3WdIHjouCzc0wriGJvCIVx7Yo +nziuWNNHh4POWoa8EL8O0hF3Mb0sG6fSIQsmyLCR0qoIDVHXC+1i8XfQBx+iAlglQL3ueK +JAMSicr/1VLPN8X7pUgB77QPdn5AmHVwikMADVQS0BjUpjE/BsScQkJ9xgr8pVcHobx4yA +t/yL72Q/zRCj3p1F4dBWAh92QDHhBVpyiKvQizcLFUG9+INTv+HM60n/XJEEJIvKViNesB +uiYI2LmLgIJaCOSsLRdCMOtiff0A9pJh9a02QPhLrR1cDMGkYkWJ6HfZz6oL0js5xytpFv +6SkwCVqzOCt43V8rHJ2lXrGz7Rew1gSVDPXgEsNYIr1FzNX/bPIHy26Cxq4zaxy+7Er9q7 ++Y2CksJpIWVPJJ3O4kF4uSnyiXcB9RNifc03dKVolizhQB7NoPMa0naT2LZJ9NXghyJhYr +uC3UKfCyFkhxcXQCLpUrD1styqfqaLEG9Gq34+WhqM0QdixDA5lsp5qY3q7C47B1ZxyR1t +5D5ndEjSP6HllYv7vMohsAAAADAQABAAACAA0AkfMV80yRwqai0wGqlqk+k7PGVHu+0hAi +rfzNfSDARUeMYLPvywepl0p4Mg0n/CuSm80NyHXyprQpL2Dz0WWnqzS+0ddbIn4FZjE6CS +UStrzjp3YBgvD0yb8Yw6phlYuT3hOTv19CnRIuHwUgUve9yCVVRAccJPgijmWUMOD/yy9F +0C2hg+9Z6zVFWW0CbaV78WvIEalAIseOx8fvo9Y/kOSIyWoiACJHMKpglpX98138WDuanF +wfmcNrfC78bvNF/Jk8GOjI4EcsWbhUd3ajiHnfG74M6KrQHoy35ywgFYZAHAAl13Q0RCdG +MhDclR0Osd6kXQCdSItL+ZChOGaPAzmEst370KPmqlU1ZN5kp2ZNrHF1ucTyssAOCmR4su +hAL9BZJgyhBW01Xqm1owVaMqjQ6wzAhNqnfxx7GZhUluejiE+8EFx4dXDHWUjXBK+h+hV+ ++WZUSYIi9X6Sg7/kj7F0QPihr8T9ll+p4W2DOlDOM5hyIA3XAYJmy60Ui6AiL5wUzyQEGY +2RqntCe+lKEQhf9cqWZ2SVB8q8AFS4sXVVfmj4u0FOkgBaZbNcTknBLRekNDaM93mvPOcT +523u8iybgSlyZpYBdRCnyHIw86lfvWRW2YUiG7bxDSpFsjtr16QKLprMUgiCajfRveDh/9 +qFvpcYYSShiaGlE6uZAAABAFNMVfHaaqKBL76JIb4qjX3Dsx8Ig2PkXgzqmz0l8T82+Mdw +V1uHNHjaRO+x9QXD2c29OOrqUC0Pmcb3wD7u/EzX3yIyqhnyB94sl7mWA8fjHdIfEjWB3R ++j/QHBNtM8xNt2jPxguxXWo+8489i7/YjblTiNdtiWsnKXetlaJh9qya2bhbbws28kBfQ1 +m1nZj/TCu9JySpwOQPtR8iX/qAkogHwmGq5+HXGJ4te+lmiBF99REeB/C0NgvWB1dGVZCK +9sJL0ztcCBoMvdPG4oBXEtd9xiU0VT8L17XuykcQvUd6i5r6AgLr/dOZA3SF6XNDW7F7mQ +/7BvIa576LFS61EAAAEBANn6TI43Eg4dRzeI8TuHTfB5QQYn5emal+LPbptP0KrByyfOtZ +mL/qiSpzNLRanp/DGm7GhyQUF2h61XObZ9BCs4RNE4adqVcae3xbxZ7yRneBl4A76tu4NA +BXKllz8141R3HtYTJ/VGAIdwGmkHdFYOCxBhxnLELFHC+4Qp/3Iq2Uu3c4fAWknxDPT+p/ +Y7jRia2TLlUvQrF38VXIUIgh7OnPMXzhErXDvRG8+8DxqI4zg1rKEP86z+rx0/hYsSst+M +LOByEmHDAc0m4h6bAwJwL5SBxrWKU0/vTYOusCyfgzznSDfIi5YHgsXM9K9pyonRfjUST+ +ehPsilDpPl778AAAEBANdzMLYhkJm1/bIfpsrogmSGQt5ymWvEcuWCxYnMDDALIjS9Ih2G +sOmmiCAl1u/tSIbyyOP7YFf9JGYaOumKzkRwKcFr1dVpOgrez+/dbrJE93JOrFDVAxILSW +cc7kYwSFCIlC8LshWNyxuhGnAgkyXROlAdCABUxY4sJ3kj7oSUFA8ZwSh4THx1RfvLV6VT +GtoNO8gh6eRl/M8hM2oOds2wWp45j9jRNTrTdZX6aprn0g0dPOjPF7ufMEc0tYoz1WZYzk +DTLGQysm2Sas5llq8hXOmp70BYm+dFud+dOPjU/IbmRI5DLmU89WvpvjWCSfP1YdUfEZZ/ +ujdZaCWt5KUAAAAWdnZ6dmxhZEByaXR1YWwtZGVwbG9lcgECAwQF +-----END OPENSSH PRIVATE KEY----- diff --git a/deployer/nexus_deployer.py b/deployer/nexus_deployer.py new file mode 100644 index 0000000..706d9bb --- /dev/null +++ b/deployer/nexus_deployer.py @@ -0,0 +1,879 @@ +# flake8: noqa +# pylint: disable=broad-exception-raised, raise-missing-from, too-many-arguments, redefined-outer-name +# pylance: disable=reportMissingImports, reportMissingModuleSource, reportGeneralTypeIssues +# type: ignore +import subprocess +import time +import json +import logging +import sys +import random +import os +import re +import traceback +from datetime import datetime, timedelta, timezone +from collections import deque +import warnings +import concurrent.futures +import threading +import uuid + +from ansible_parser.logs import Play as AnsibleLogParser +from grist_api import GristDocAPI +from proxmoxer import ProxmoxAPI, ResourceException +import yaml +import colorama +import requests +from requests.packages.urllib3.exceptions import InsecureRequestWarning + + +last_deploy_times = deque(maxlen=5) +grist_lock = threading.Lock() + +class ColoredFormatter(logging.Formatter): + def __init__(self, fmt): + colorama.init(autoreset=True) + super().__init__(fmt) + + def format(self, record): + if record.levelno == logging.ERROR: + record.msg = f"{colorama.Fore.RED}{record.msg}{colorama.Style.RESET_ALL}" + elif record.levelno == logging.WARNING: + record.msg = f"{colorama.Fore.YELLOW}{record.msg}{colorama.Style.RESET_ALL}" + elif record.levelno == logging.INFO: + record.msg = f"{colorama.Fore.GREEN}{record.msg}{colorama.Style.RESET_ALL}" + return super().format(record) + +class LogsToGrist(logging.Handler): + def __init__(self, callback=None): + super().__init__() + self.callback = callback + + def emit(self, record): + log_entry = self.format(record) + + ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') + log_entry = ansi_escape.sub('', log_entry) + log_entry = log_entry.replace("Ansible task: ", "") + + self.callback({"Status": log_entry}) + +class LoggerWrapper: + def __init__(self, name, log_file, grist_callback=None): + self.logger = logging.getLogger(name) + self.logger.setLevel(logging.DEBUG) + self.logger.propagate = False + + # File handler + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + colored_formatter = ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(colored_formatter) + self.logger.addHandler(console_handler) + + # grist handler + if grist_callback is not None: + grist_handler = LogsToGrist(callback=grist_callback) + grist_handler.setLevel(logging.INFO) + grist_handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(grist_handler) + + def __del__(self): + handlers = self.logger.handlers[:] + for handler in handlers: + handler.close() + self.logger.removeHandler(handler) + + def get_logger(self): + return self.logger + + +def format_time(current_time): + if current_time < 5: + return f"{current_time:.1f} sec" + elif current_time < 60: + return f"{int(current_time)} sec" + elif current_time < 3600: + minutes = current_time // 60 + seconds = current_time % 60 + return f"{int(minutes)} min {int(seconds)} sec" + elif current_time < 86400: + hours = current_time // 3600 + minutes = (current_time % 3600) // 60 + seconds = current_time % 60 + return f"{int(hours)} h {int(minutes)} min" + else: + days = current_time // 86400 + hours = (current_time % 86400) // 3600 + minutes = (current_time % 3600) // 60 + seconds = current_time % 60 + return f"{int(days)} d {int(hours)} h" + +def random_sleep(logger, min_delay=None, max_delay=None, delay=None): + if delay is None: + delay = random.uniform(min_delay, max_delay) + formatted_delay = format_time(delay) + end_time = datetime.now() + timedelta(seconds=delay) + end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S') + logger.debug(f"Waiting for {formatted_delay} (expected end time: {end_time_str})") + time.sleep(delay) + + +class TelegramBot: + def __init__(self, token, topic, chat_id, logger): + self.token = token + self.topic = topic + self.logger = logger + self.chat_id = chat_id + self.base_url = f"https://api.telegram.org/bot{self.token}/" + + def message(self, message): + url = self.base_url + "sendMessage" + payload = { + 'chat_id': self.chat_id, + 'text': f"[{self.topic}] {message}" + } + response = requests.post(url, json=payload, timeout=10) + response_data = response.json() + + if not response_data.get('ok', True): + self.logger.error(f"Error sending message: {response_data}") + + return response_data + + +class GRIST: + def __init__(self, server, doc_id, api_key, logger): + self.server = server + self.doc_id = doc_id + self.api_key = api_key + self.logger = logger + self.grist = GristDocAPI(doc_id, server=server, api_key=api_key) + + def table_name_convert(self, table_name): + return table_name.replace(" ", "_") + + def to_timestamp(self, dtime: datetime) -> int: + if dtime.tzinfo is None: + dtime = dtime.replace(tzinfo=timezone(timedelta(hours=3))) + return int(dtime.timestamp()) + + def insert_row(self, data, table): + data = {key.replace(" ", "_"): value for key, value in data.items()} + row_id = self.grist.add_records(self.table_name_convert(table), [data]) + return row_id + + def update_column(self, row_id, column_name, value, table): + if isinstance(value, datetime): + value = self.to_timestamp(value) + column_name = column_name.replace(" ", "_") + self.grist.update_records(self.table_name_convert(table), [{ "id": row_id, column_name: value }]) + + def delete_row(self, row_id, table): + self.grist.delete_records(self.table_name_convert(table), [row_id]) + + def update(self, row_id, updates, table): + for column_name, value in updates.items(): + if isinstance(value, datetime): + updates[column_name] = self.to_timestamp(value) + updates = {column_name.replace(" ", "_"): value for column_name, value in updates.items()} + self.grist.update_records(self.table_name_convert(table), [{"id": row_id, **updates}]) + + def fetch_table(self, table): + return self.grist.fetch_table(self.table_name_convert(table)) + + def find_record(self, id=None, state=None, name=None, table=None): + if table is None: + raise ValueError("Table is not specified") + table_data = self.grist.fetch_table(self.table_name_convert(table)) + if id is not None: + record = [row for row in table_data if row.id == id] + return record + if state is not None and name is not None: + record = [row for row in table_data if row.State == state and row.name == name] + return record + if state is not None: + record = [row for row in table_data if row.State == state] + return record + if name is not None: + record = [row for row in table_data if row.Name == name] + return record + + #def find_data_by_key(self, key, table): + # data = getattr(self.fetch_table(self.table_name_convert(table))[0], key) + # return data + + def find_settings(self, key, table): + table = self.fetch_table(self.table_name_convert(table)) + for record in table: + if record.Setting == key: + if record.Value is None or record.Value == "": + raise ValueError(f"Setting {key} blank") + return record.Value + raise ValueError(f"Setting {key} not found") + +class Prox: + def __init__(self, host, user, password, logger, timeout=10): + self.session = requests.Session() + self.logger = logger + self.session.request = lambda *args, **kwargs: requests.request(*args, timeout=timeout, **kwargs) + self.proxmox = ProxmoxAPI(host=host, user=user, password=password, verify_ssl=False, service='PVE', timeout=timeout) + + def get_vms_data(self, vm_name=None, node_name=None, vm_id=None): + if node_name is not None: nodes = [{"node": node_name}] + else: nodes = self.proxmox.nodes.get() + vms_with_name = [] + + for node in nodes: + if vm_id is not None: + try: vms = [self.proxmox.nodes(node['node']).qemu(vm_id).status.current.get()] + except Exception: continue + else: vms = self.proxmox.nodes(node['node']).qemu.get() + + for vm in vms: + if vm_name is None or vm.get('name') == vm_name or vm.get('vmid') == str(vm_id): + cfg = self.proxmox.nodes(node['node']).qemu(vm.get('vmid')).config.get() + vm['ipconfig'] = cfg.get('ipconfig0', None) + vm['cpu'] = cfg.get('cpu', None) + vm['node'] = node['node'] + vm['memory'] = cfg.get('memory', None) + vm['net'] = cfg.get('net0', None) + vm['template'] = int(vm.get('template', 0)) + vm['disk'] = cfg.get(cfg.get('bootdisk', ''), None) + if vm['disk']: + vm['disk_storage'] = vm['disk'].split(':')[0] + else: + vm['disk_storage'] = None + vms_with_name.append(vm) + + return vms_with_name + + def search_vm(self, vm_name=None, node=None, vm_id=None): + vms = self.get_vms_data(vm_name=vm_name, node_name=node, vm_id=vm_id) + if len(vms) > 0: + return vms[0] + return None + + def cmd_vm_by_name(self, vm_name, cmd): + nds = self.proxmox.nodes + nodes = nds.get() + for node in nodes: + for vm in nds(node['node']).qemu.get(): + if vm['name'] == vm_name: + if cmd == "reboot": + upid = nds(node['node']).qemu(vm['vmid']).status.reboot.post() + elif cmd == "shutdown": + upid = nds(node['node']).qemu(vm['vmid']).status.shutdown.post() + elif cmd == "start": + upid = nds(node['node']).qemu(vm['vmid']).status.start.post() + elif cmd == "stop": + upid = nds(node['node']).qemu(vm['vmid']).status.stop.post() + elif cmd == "reset": + upid = nds(node['node']).qemu(vm['vmid']).status.reset.post() + elif cmd == "destroy": + upid = nds(node['node']).qemu(vm['vmid']).status.stop.post() + while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1) + upid = nds(node['node']).qemu(vm['vmid']).delete() + while nds(node['node']).tasks(upid).status.get()['status'] != 'stopped': time.sleep(1) + + + def id_by_name(self, vm_name): + vms = self.get_vms_data(vm_name=vm_name) + if len(vms) > 0: + return vms[0]['vmid'] + return None + + def name_by_id(self, vm_id): + vms = self.get_vms_data(vm_id=vm_id) + if len(vms) > 0: + return vms[0]['name'] + return None + + def wait_for_ip(self, node, vmid, max_iterations=10, period=10): + self.logger.info(f"Wait boot VM {vmid} on node {node}") + vm_ip_v4 = "" + iterations = 0 + while not vm_ip_v4 and iterations < max_iterations: + time.sleep(period) + self.logger.info(f"Get IP attempt {iterations} of {max_iterations}") + try: + vm_status = self.proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get() + if 'result' in vm_status: + for interface in vm_status['result']: + if interface['name'].startswith(('lo', 'br-', 'veth', 'docker')): continue + for ip in interface.get('ip-addresses', []): + if ip['ip-address-type'] == 'ipv4' and not vm_ip_v4: + vm_ip_v4 = ip['ip-address'] + if vm_ip_v4: + self.logger.info(f"VM {vmid} on node {node} received IP: {vm_ip_v4}") + break + except ResourceException as e: + if "QEMU guest agent is not running" in str(e): + pass + #print(f".", end="", flush=True) + except Exception as e: + self.logger.info(f"Error retrieving IP for VM {vmid} on node {node}: {e}") + iterations += 1 + + if not vm_ip_v4: + #print("") + return False + return vm_ip_v4 + + + def check_vm(self, vm_id): + try: + vm = self.search_vm(vm_id=vm_id) + self.logger.info(f"VM {vm_id} status: {vm}") + return True + except ResourceException as e: + self.logger.info(f"Failed to check VM {vm_id} status: {e}") + return False + + def clone_vm(self, template_id, new_vm_name): + try: + new_vm_id = self.proxmox.cluster.nextid.get() + template = self.search_vm(vm_id=template_id) + if template is None: + raise ValueError(f"Template VM with ID {template_id} not found") + node = template['node'] + self.logger.info(f"Clone VM {template_id}->{new_vm_id}: {new_vm_name}...") + upid = self.proxmox.nodes(node).qemu(template_id).clone.post(newid=new_vm_id, name=new_vm_name) + while self.proxmox.nodes(node).tasks(upid).status.get()['status'] != 'stopped': + print(f"{new_vm_name} clone state: {self.proxmox.nodes(node).tasks(upid).status.get()['status']}") + time.sleep(10) + except ResourceException as e: + self.logger.error(f"Failed to clone VM {template_id}: {e}") + raise ValueError(f"Failed to clone VM {template_id}/{new_vm_name}") from e + return new_vm_id, node + + def get_proxmox_nodes(self): + nodes = [] + raw_nodes = self.proxmox.nodes.get() + for node in raw_nodes: + nodes.append({ + "cpu": node.get('cpu'), + "name": node.get('node'), + "status": node.get('status'), + "uptime": node.get('uptime') + }) + return nodes + + def create_vm(self, vm_name, template_id=None): + existing_vm_id = self.id_by_name(vm_name) + old_vm_data = self.search_vm(vm_id=existing_vm_id) + if existing_vm_id is not None: + self.logger.info(f"Virtual machine {vm_name} already exists") + #self.logger.info(f"Find probabilistic template...") + #all_vms = self.get_vms_data(node_name=old_vm_data['node']) + #templates_vms = [vm for vm in all_vms if vm['template'] == 1] + #probabilistic_template = [vm for vm in templates_vms if vm['disk_storage'] == old_vm_data['disk_storage']][0] + #self.logger.info(f"Find {old_vm_data['name']}({old_vm_data['vmid']}) probabilistic template: {probabilistic_template['name']}({probabilistic_template['vmid']})/{probabilistic_template['node']}") + #template_id = probabilistic_template['vmid'] + self.logger.info(f"Deleting VM ID {existing_vm_id}/{old_vm_data['node']}") + self.cmd_vm_by_name(vm_name, "destroy") + else: + self.logger.info(f"Virtual machine with name {vm_name} not found, creating new VM...") + if template_id is None: + self.logger.error(f"Template ID not provided") + raise ValueError("Template ID not provided") + new_vm_id, node = self.clone_vm(template_id, vm_name) + self.proxmox.nodes(node).qemu(new_vm_id).config.post(tags="") + self.logger.info(f"Starting VM {new_vm_id} on node {node}") + self.cmd_vm_by_name(vm_name, "start") + vm_ip = self.wait_for_ip(node, new_vm_id) + if vm_ip == False: + self.logger.info(f"Failed to get IP address for VM {new_vm_id}.") + raise ValueError(f"Failed to get IP address for VM {new_vm_id}.") + if vm_ip != False and vm_ip is not None: + self.logger.info(f"Virtual machine {vm_name} with ID {new_vm_id} has IP address: {vm_ip}") + return vm_ip, node, new_vm_id + raise ValueError(f"Unknown error") + + def check_duplicate_vms(self, vms): + vm_node_mapping = {} + for vm_info in vms: + vm = vm_info['name'] + node = vm_info['node'] + if vm in vm_node_mapping: + vm_node_mapping[vm].append(node) + else: + vm_node_mapping[vm] = [node] + + duplicates = {name: nodes for name, nodes in vm_node_mapping.items() if len(nodes) > 1} + return duplicates + + def modify_tag(self, vm_id, tag, action="add"): + tag = tag.lower() + vm = self.search_vm(vm_id=vm_id) + + if vm is None: + self.logger.error(f"VM {vm_id} not found") + return False + + if vm is not None: + current_config = self.proxmox.nodes(vm['node']).qemu(vm_id).config.get() + tags_list = current_config.get('tags', '').split(';') + + if action == "add": + if tag not in tags_list: + tags_list.append(tag) + updated_tags = ';'.join(filter(None, tags_list)) + self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags) + self.logger.info(f"Added tag '{tag}' to VM {vm_id}") + else: + self.logger.info(f"Tag '{tag}' already exists in VM {vm_id}") + + elif action == "remove": + if tag in tags_list: + tags_list.remove(tag) + updated_tags = ';'.join(filter(None, tags_list)) + self.proxmox.nodes(vm['node']).qemu(vm_id).config.post(tags=updated_tags) + self.logger.info(f"Removed tag '{tag}' from VM {vm_id}") + else: + self.logger.info(f"Tag '{tag}' not found in VM {vm_id}") + + else: + self.logger.error(f"Invalid action: {action}") + return False + + return True + + + + +def parse_ansible_log(ansible_log): + task_blocks = re.split(r"(?=\nTASK \[)", ansible_log) + last_task_block = task_blocks[-1] + + if "failed:" in last_task_block or "fatal:" in last_task_block: + try: + task_name = re.search(r"TASK \[(.*?)\]", last_task_block).group(1) + failed_json_str = re.search(r'=> ({.*?})', last_task_block, re.DOTALL).group(1) + failed_json = json.loads(failed_json_str) + + stdout_lines = failed_json.get("stdout_lines", []) + stderr_lines = failed_json.get("stderr_lines", []) + msg = failed_json.get("msg", "") + print(f"Stdout lines: {stderr_lines}", flush=True) + + for line in stdout_lines[-10:] + stderr_lines[-10:] + [msg]: + if "429 Too Many Requests" in line and "https://www.docker.com/increase-rate-limit" in line: + print("429 Too Many Requests detected in stdout_lines.", flush=True) + return task_name, "429TOOMANYREQUESTS" + if "HTTP Error 504: Gateway Time-out" in line: + print("HTTP Error 504: Gateway Time-out detected in stdout_lines.", flush=True) + return task_name, "504GATEWAYTIMEOUT" + + for line in stdout_lines[-5:]: + print(f"Stdout: {line}", flush=True) + return task_name, None + + for line in stderr_lines[-5:]: + print(f"Stderr: {line}", flush=True) + return task_name, None + + print("No error message found in stdout or stderr.", flush=True) + return task_name, None + + except (json.JSONDecodeError, AttributeError): + print("Error extracting or decoding JSON.", flush=True) + return None, None + else: + print("The last TASK did not end with ASYNC FAILED and fatal.", flush=True) + return None, None + +def deploy_node(name, ansible_host, node_id, proxy, proxy_jump, private_key_file, + docker_username, docker_password, git_version, git_base_url, logger_instance, grist_server, grist_doc_id, grist_api_key): + + if not os.path.exists(private_key_file): + raise Exception(f"SSH private key file not found: {private_key_file}") + + ansible_dir = f"ansible" + + if not os.path.exists(ansible_dir): + os.makedirs(ansible_dir) + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + inventory_filename = f"{ansible_dir}/inventory_{name}_{timestamp}.yml" + log_output_filename = f"{ansible_dir}/log_{name}_{timestamp}.log" + playbook_filename = f"{ansible_dir}/playbook_{name}_{timestamp}.yml" + + + playbook_url = f"{git_base_url}/raw/branch/{git_version}/playbook.yml" + response = requests.get(playbook_url, timeout=10) + if response.status_code == 200: + with open(playbook_filename, 'w', encoding='utf-8') as f: + f.write(response.text) + else: + logger_instance.error(f"Failed to download playbook at {playbook_url}. Status code: {response.status_code}") + raise Exception(f"Failed to download playbook. Status code: {response.status_code}") + + inventory = {'all': {'vars': {}, 'hosts': {}}} + inventory['all']['vars']['ansible_user'] = 'root' + inventory['all']['vars']['ansible_ssh_private_key_file'] = private_key_file + if proxy_jump: inventory['all']['vars']['ansible_ssh_common_args'] = f'-J {proxy_jump}' + inventory['all']['hosts'][name] = {} + inventory['all']['hosts'][name]['ansible_host'] = ansible_host + inventory['all']['hosts'][name]['serverid'] = name + inventory['all']['hosts'][name]['id'] = node_id + inventory['all']['hosts'][name]['proxy'] = proxy + inventory['all']['hosts'][name]['grist_server'] = grist_server + inventory['all']['hosts'][name]['grist_doc_id'] = grist_doc_id + inventory['all']['hosts'][name]['grist_api_key'] = grist_api_key + + inventory['all']['hosts'][name]['git_version'] = git_version + + inventory['all']['hosts'][name]['docker_username'] = docker_username + inventory['all']['hosts'][name]['docker_password'] = docker_password + + with open(inventory_filename, 'w', encoding='utf-8') as file: + yaml.dump(inventory, file, default_flow_style=False) + + logger_instance.info(f"Running nillion deploy playbook...") + log_data = "" + with open(log_output_filename, 'w', encoding='utf-8') as deploy_log: + process = subprocess.Popen( ['ansible-playbook', '-i', inventory_filename, playbook_filename, '-v'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) + + for line in iter(process.stdout.readline, ''): + if line.startswith("TASK ["): logger_instance.info(f"Ansible task: {line.replace('*', '').replace('TASK ', '').strip()}") + deploy_log.write(line) + log_data += line + + process.stdout.close() + return_code = process.wait() + deploy_log.flush() + + + if return_code != 0: + ansible_failed_task = "" + ansible = AnsibleLogParser(play_output=log_data) + plays = ansible.plays() + for _, tasks in plays.items(): + for task_name, task_object in tasks.items(): + for result in task_object.results: + if result.get('status') == 'failed' or result.get('status') == 'fatal': + #print(f"Failed task: {result}") + ansible_failed_task = task_name + logger_instance.error(f"Deploy failed. Failed stage: {ansible_failed_task} Check log file: {log_output_filename}, inventory file: {inventory_filename}") + raise Exception(f"Ansible error on {ansible_failed_task}") + return log_data + + + +def nodes_table_preprocessing(grist_instance, logger): + current_time = grist_instance.to_timestamp(datetime.now()) + max_wip = 60*60*2 + + logger.info(f"Run grist processing NoneState -> Dirty and NoneVersion -> av1") + for row in grist_instance.fetch_table("Nodes"): + if row.State == "": grist_instance.update_column(row.id, "State", "Dirty", "Nodes") + if row.Version == "": grist_instance.update_column(row.id, "Version", "nv1", "Nodes" ) + + logger.info(f"Run grist processing WiP and >max_wip old -> Dirty") + for row in grist_instance.fetch_table("Nodes"): + if row.Deploy_date is not None: + vm_old_age = current_time - row.Deploy_date + if row.State == "WiP" and vm_old_age > max_wip and row.State != "Dirty": + grist_instance.update_column(row.id, "State", "Dirty", "Nodes") + grist_instance.update_column(row.id, "Status", "Set Dirty by WiP Timeout", "Nodes") + + logger.info(f"Run grist processing NoneRetries -> 0/4") + for row in grist_instance.fetch_table("Nodes"): + if row.Retries is None or row.Retries == "": + grist_instance.update_column(row.id, "Retries", "0/4", "Nodes") + + + +def generate_proxy(login, password, proxy_ip, proxy_port, logger): + countries = [ "AT", "AM", "BE", "HU", "DE", "IL", "IE", "ES", "IT", + "LU", "NL", "NO", "PL", "PT", "RS", "FI", "FR", "SE", "CH", "CZ", "EE" ] + country = random.choice(countries) + id_hex = ''.join(random.choices('0123456789abcdef', k=10)) + login_id = f"{login}_s_{id_hex}_c_{country}" + proxy_socks5 = f"socks5://{login_id}:{password}@{proxy_ip}:{proxy_port}" + proxy_http = f"http://{login_id}:{password}@{proxy_ip}:{proxy_port}" + + ipinfo_url = "http://ipinfo.io" + headers = {"User-Agent": "curl/7.68.0"} + + try: + response = requests.get(ipinfo_url, headers=headers, proxies={"http": proxy_http, "https": proxy_http}, timeout=10) + except requests.exceptions.RequestException as e: + logger.error(f"[proxy test] Error while requesting ipinfo.io: {e}") + return None + if response.status_code == 200: + logger.info(f"[proxy test] Successfully retrieved IP information: {response.json().get('ip')}") + return proxy_socks5 + else: + logger.error(f"[proxy test] Error while requesting ipinfo.io: {response.status_code}") + return None + + + +def repeat_deploy(grist_instance, logger, grist_env): + result = None + generated_address = False + + wait_period = int(grist_instance.find_settings("Wait period in minutes", "Settings"))*60 + name_node = grist_instance.find_settings("Node name", "Settings") + sticker_node = grist_instance.find_settings("Node sticker", "Settings") + + multiplier = 1.5 + diff = round(wait_period/multiplier) + min_wait = round(wait_period - diff) + max_wait = round(wait_period + diff) + + with grist_lock: + nodes_table_preprocessing(grist_instance, logger) + dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes") + if len(dirty_vms) == 0: + print(f"No dirty nodes found") + result = False + return result + + dirty_vms.sort(key=lambda vm: (int(vm.Retries.split('/')[0]))) + #dirty_vms = sorted(dirty_vms, key=lambda row: 0 if row.Impact == "" else 1) + dirty_vm = dirty_vms[0] + def grist_callback(msg): grist_instance.update(dirty_vm.id, msg, "Nodes") + grist_instance.update(dirty_vm.id, { "State": "WiP", "Status": "Locked", "Deploy date": datetime.now() }, "Nodes") + if dirty_vm.Retries.count('/') == 1: + try: + attempt = int(dirty_vm.Retries.split('/')[0]) + max_retries = int(dirty_vm.Retries.split('/')[1]) + if attempt >= max_retries: + grist_instance.update(dirty_vm.id, { "State": "Error", "Status": f"{dirty_vm.Status} (Max retries)" }, "Nodes") + result = False + return result + attempt += 1 + grist_instance.update(dirty_vm.id, { "Retries": f"{attempt}/{max_retries}" }, "Nodes") + except ValueError: + grist_instance.update(dirty_vm.id, { "State": "Error", "Status": "Error retries" }, "Nodes") + result = False + return result + + + + if dirty_vm.Proxy is None or dirty_vm.Proxy == "": + grist_callback( { "Status": "Generate proxy" }) + login = grist_instance.find_settings("Proxy generator login", "Settings") + password = grist_instance.find_settings("Proxy generator password", "Settings") + proxy_ip = grist_instance.find_settings("Proxy generator ip", "Settings") + proxy_port = grist_instance.find_settings("Proxy generator port", "Settings") + for _ in range(5): + proxy_socks5 = generate_proxy(login, password, proxy_ip, proxy_port, logger) + if proxy_socks5 is not None: + break + logger.error("[proxy] Failed to generate proxy, retrying...") + time.sleep(1) + grist_instance.update(dirty_vm.id, { "Proxy": proxy_socks5 }, "Nodes") + + dirty_vm = grist_instance.find_record(id=dirty_vm.id, table="Nodes")[0] + vm_name = f"{name_node}-{dirty_vm.NodeID[8:16]}" + + logger_instance = LoggerWrapper(name=f"{vm_name}", log_file=f"logs/{name_node}_creation_{vm_name}.log", grist_callback=grist_callback) + logger = logger_instance.get_logger() + grist_callback( { "Name": vm_name, "State": "WiP", "Status": "Run proxmox worker.." }) + + required_fields = ["NodeID", "Proxy", "Server", "Storage", "Retries"] + for field in required_fields: + if not getattr(dirty_vm, field.replace(" ", "_")): + grist_callback( { "State": "Error", "Status": f"No {field.lower()}" }) + result = False + return result + + prox_host = grist_instance.find_settings("Proxmox host", "Settings") + prox_user = grist_instance.find_settings("Proxmox user", "Settings") + prox_password = grist_instance.find_settings("Proxmox password", "Settings") + prox = Prox(prox_host, prox_user, prox_password, logger) + + tg_token = grist_instance.find_settings("Telegram token", "Settings") + tg_chat_id = grist_instance.find_settings("Telegram chat id", "Settings") + tg_log = TelegramBot(tg_token, f"{sticker_node} {name_node}", tg_chat_id, logger) + + current_version = grist_instance.find_settings("Current version", "Settings") + + logger.info(f"Create-run-deploy node {vm_name} on attempt {attempt}") + deploy_start_time = datetime.now() + + try: + grist_callback( { "IP": "", "Deploy date": datetime.now()}) + all_vms = prox.get_vms_data(node_name=dirty_vm.Server) + vm_prox_node = dirty_vm.Server + templates_vms = [vm for vm in all_vms if vm['template'] == 1] + target_storage_templates = [vm for vm in templates_vms if vm['disk_storage'] == dirty_vm.Storage] + latest_templates = [vm for vm in target_storage_templates if 'tags' in vm and 'latest' in vm['tags'].split(';')] + if len(latest_templates) == 0: + raise ResourceException(status_code=404, status_message="No latest template", content="No latest template") + template_id = latest_templates[0]['vmid'] + try: + vm_ip, vm_prox_node, vm_id = prox.create_vm(vm_name, template_id=template_id) + except Exception as e: + raise ResourceException(status_code=500, status_message="Failed to create VM", content=str(e)) from e + prox.modify_tag(vm_id=vm_id, tag="WiP", action="add") + grist_callback( { "IP": vm_ip }) + + + if generated_address: + delay = random.uniform(min_wait/2, max_wait/2) + grist_callback( { "Status": f"Wait {format_time(delay)} before ansible.." }) + random_sleep(logger, delay=delay) + + grist_callback( { "Status": "Run ansible worker.." }) + deploy_node(name=vm_name, ansible_host=vm_ip, node_id=dirty_vm.NodeID, proxy=dirty_vm.Proxy, + git_version=grist_instance.find_settings("Git version", "Settings"), + git_base_url = grist_instance.find_settings("Git base URL", "Settings"), + proxy_jump=grist_instance.find_settings("Ansible SSH Jump host", "Settings"), + private_key_file=grist_instance.find_settings("Ansible SSH keyfile", "Settings"), + docker_username=grist_instance.find_settings("Docker username", "Settings"), + docker_password=grist_instance.find_settings("Docker password", "Settings"), + grist_server=grist_env["GRIST_SERVER"], + grist_doc_id=grist_env["GRIST_DOC_ID"], + grist_api_key=grist_env["GRIST_API_KEY"], + logger_instance=logger) + + prox.modify_tag(vm_id=vm_id, tag="WiP", action="remove") + prox.modify_tag(vm_id=vm_id, tag=current_version, action="add") + + clear_vms = grist_instance.find_record(state="Clean", table="Nodes") + dirty_vms = grist_instance.find_record(state="Dirty", table="Nodes") + wip_vms = grist_instance.find_record(state="WiP", table="Nodes") + clear_keys = len(clear_vms) + dirty_keys = len(dirty_vms) + len(wip_vms) + deploy_elapsed_time = (datetime.now() - deploy_start_time).total_seconds() + if vm_prox_node is None: vm_prox_node = "None" + tg_log.message(f"✅ Deployed vm {vm_name} in '{vm_prox_node}' (took {format_time(deploy_elapsed_time)}, vm {clear_keys} out of {dirty_keys+clear_keys} in batch, left {dirty_keys} nodes") + logger.info(f"Deployed node {vm_name}/{vm_ip} in {vm_prox_node} ({format_time(deploy_elapsed_time)} elapsed install)") + delay = random.uniform(min_wait/2, max_wait/2) + format_delay = format_time(delay) + grist_callback( { "Status": f"Deployed, wait {format_delay}.." }) + grist_callback( {"State": "Clean", "Version": current_version, "Deploy date": datetime.now()} ) + grist_callback( {"Deploy time": f"{format_time(deploy_elapsed_time)}"} ) + random_sleep(logger, delay=delay) + grist_callback( { "Status": f"Deployed ok on attempt {attempt}/{max_retries}", "Retries": f"{0}/{4}" }) + result = True + return result + + except ResourceException as e: + attempt += 0.5 + logger.error(f"Fail: {e}\n{traceback.format_exc()}") + logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: Proxmox Resource Exception: {e}") + error_message = str(e).replace('\n', '') + grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"}) + tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}") + time.sleep(30) + result = False + return result + + except Exception as e: + attempt += 1 + logger.error(f"Fail: {e}\n{traceback.format_exc()}") + logger.error(f"Failed to deploy node for {vm_name} on attempt {attempt}/{max_retries}: {e}") + tg_log.message(f"❌ Failed to deploy attempt {attempt}/{max_retries} node {vm_name} on '{vm_prox_node}': {e}") + error_message = str(e).replace('\n', '') + grist_callback( {"State": "Dirty", "Version": "", "Deploy date": "", "Status": f"Failed: {error_message}"}) + time.sleep(30) + result = False + return result + + finally: + if result == None: grist_callback( {"State": "Dirty", "Status": f"Ended by interrupt"} ) + + + +def main(): + log_dir = "logs" + log_file = "nexus_deployer.log" + log_path = os.path.join(log_dir, log_file) + if not os.path.exists(log_dir): os.makedirs(log_dir) + if not os.path.exists(log_path): open(log_path, 'a', encoding='utf-8').close() + logger_instance = LoggerWrapper(name="nexus", log_file=log_path) + logger = logger_instance.get_logger() + + grist_env = {} + grist_env["GRIST_SERVER"] = os.getenv("GRIST_SERVER") + grist_env["GRIST_DOC_ID"] = os.getenv("GRIST_DOC_ID") + grist_env["GRIST_API_KEY"] = os.getenv("GRIST_API_KEY") + + grist_instance = GRIST(grist_env["GRIST_SERVER"], grist_env["GRIST_DOC_ID"], grist_env["GRIST_API_KEY"], logger) + + logger.info(f"Starting nexus deployer...") + + deployer_key = grist_instance.find_settings("Ansible SSH keyfile", "Settings") + if not os.path.exists(deployer_key): + logger.error(f"SSH private key file not found: {deployer_key}") + raise FileNotFoundError(f"Required SSH private key file {deployer_key} is missing") + + logger.info(f"Run on-start grist processing WiP -> Dirty") + #repeats = grist_instance.find_settings("Repeats", "Settings") + #for row in grist_instance.fetch_table("Nodes"): + # if row.State == "WiP": + # grist_instance.update(row.id, {"State": "Dirty", "Settings"}) + # grist_instance.update(row.id, {"Status": "Set Dirty by restart deployer", "Settings"}) + #logger.info(f"Run on-start grist processing Dirty -> 0/{repeats}") + #for row in grist_instance.fetch_table("Nodes"): + # if row.State == "Dirty" and row.Retries is not None and row.Retries != f"0/{repeats}": + # grist_instance.update(row.id, {"Retries": f"0/{repeats}"}, "Nodes") + + #while True: + #repeat_deploy(grist_instance, logger, grist_env) + #logger.info("Restart deployer") + #os.execv(sys.executable, [sys.executable] + sys.argv) + #time.sleep(10) + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: + futures = [] + start_times = {} + + #while True: + max_threads = int(grist_instance.find_settings("Workers", "Settings")) + if max_threads > 20: max_threads = 20 + while len(futures) < max_threads: + future = executor.submit(repeat_deploy, grist_instance, logger, grist_env) + futures.append(future) + start_times[future] = time.time() + logger.info(f"Spawned new task. Total active tasks: {len(futures)}, max threads: {max_threads}") + time.sleep(120) + max_threads = int(grist_instance.find_settings("Workers", "Settings")) + + current_time = time.time() + for future in futures.copy(): + if future.done(): + try: + result = future.result() + logger.info(f"Task completed: {result}") + except Exception as e: + logger.error(f"Task failed with error: {e}") + logger.error(f"Fail: {e}\n{traceback.format_exc()}") + futures.remove(future) + start_times.pop(future, None) + elif current_time - start_times[future] > 60*60*3: + future.cancel() + logger.info("Cancelled a task due to timeout.") + futures.remove(future) + start_times.pop(future, None) + + #time.sleep(10) + + except KeyboardInterrupt: + logger.info("Program interrupted, shutting down executor...") + finally: + executor.shutdown(wait=True) + logger.info("Resources released, executor shutdown completed.") + + + +if __name__ == "__main__": + if main() == False: + sys.exit(1) + sys.exit(0) + diff --git a/deployer/requirements.txt b/deployer/requirements.txt new file mode 100644 index 0000000..540ef79 --- /dev/null +++ b/deployer/requirements.txt @@ -0,0 +1,11 @@ +ansible-output-parser==0.1.0 +certifi==2024.7.4 +charset-normalizer==3.3.2 +colorama==0.4.6 +future==1.0.0 +grist-api==0.1.0 +idna==3.8 +proxmoxer==2.1.0 +PyYAML==6.0.2 +requests==2.32.3 +urllib3==2.2.2 \ No newline at end of file diff --git a/playbook.yml b/playbook.yml index 6996df7..cf63d75 100644 --- a/playbook.yml +++ b/playbook.yml @@ -326,15 +326,12 @@ WantedBy=multi-user.target mode: '0644' - - name: Reload systemd - ansible.builtin.systemd: - daemon_reload: yes - - name: Enable and start node-checker service ansible.builtin.systemd: name: node-checker enabled: yes state: started + daemon_reload: yes - name: Remove docker login credentials ansible.builtin.file: diff --git a/prover-id b/prover-id deleted file mode 100644 index eb8341c..0000000 --- a/prover-id +++ /dev/null @@ -1 +0,0 @@ -###ID### diff --git a/ws.code-workspace b/ws.code-workspace new file mode 100644 index 0000000..d3d2cc9 --- /dev/null +++ b/ws.code-workspace @@ -0,0 +1,24 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "workbench.colorCustomizations": { + "activityBar.activeBackground": "#44a003", + "activityBar.background": "#44a003", + "activityBar.foreground": "#e7e7e7", + "activityBar.inactiveForeground": "#e7e7e799", + "activityBarBadge.background": "#0450bc", + "activityBarBadge.foreground": "#e7e7e7", + "commandCenter.border": "#e7e7e799", + "sash.hoverBorder": "#44a003", + "titleBar.activeBackground": "#2f6e02", + "titleBar.activeForeground": "#e7e7e7", + "titleBar.inactiveBackground": "#2f6e0299", + "titleBar.inactiveForeground": "#e7e7e799" + }, + "peacock.color": "#2f6e02" + } +} \ No newline at end of file