8 Commits

Author SHA1 Message Date
9a211a4748 fix requirements.txt 2024-09-03 04:44:52 +03:00
14e8c74962 new model 2024-09-03 04:24:43 +03:00
c7cc0079a8 fix bug 2024-08-28 02:24:48 +03:00
c5522e8c72 add timeout 2024-08-28 02:22:54 +03:00
7ecfd10d50 fix exit 2024-08-27 03:56:27 +03:00
d75baceae9 Update config.json 2024-08-26 17:22:44 +03:00
714bf4c863 add env and +x 2024-08-26 17:20:25 +03:00
e65e0d95ed fork original 2024-08-26 17:16:29 +03:00
13 changed files with 291 additions and 46 deletions

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
# Use an official Python runtime as the base image
FROM amd64/python:3.9-buster as project_env
# Set the working directory in the container
WORKDIR /app
# Install dependencies
COPY requirements.txt requirements.txt
RUN pip install --upgrade pip setuptools \
&& pip install -r requirements.txt
FROM project_env
COPY . /app/
# Set the entrypoint command
CMD ["gunicorn", "--conf", "/app/gunicorn_conf.py", "main:app"]

44
app.py
View File

@ -4,7 +4,7 @@ import pandas as pd
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime
from flask import Flask, jsonify, Response from flask import Flask, jsonify, Response
from model import download_data, format_data, train_model from model import download_data, format_data, train_model, training_price_data_path
from config import model_file_path from config import model_file_path
app = Flask(__name__) app = Flask(__name__)
@ -19,14 +19,36 @@ def update_data():
def get_eth_inference(): def get_eth_inference():
"""Load model and predict current price.""" """Load model and predict current price."""
with open(model_file_path, "rb") as f: try:
loaded_model = pickle.load(f) with open(model_file_path, "rb") as f:
loaded_model = pickle.load(f)
now_timestamp = pd.Timestamp(datetime.now()).timestamp() # Загружаем последние данные из файла
X_new = np.array([now_timestamp]).reshape(-1, 1) price_data = pd.read_csv(training_price_data_path)
current_price_pred = loaded_model.predict(X_new)
return current_price_pred[0] # Используем последние значения признаков для предсказания
X_new = (
price_data[
[
"timestamp",
"price_diff",
"volatility",
"volume",
"moving_avg_7",
"moving_avg_30",
]
]
.iloc[-1]
.values.reshape(1, -1)
)
# Делаем предсказание
current_price_pred = loaded_model.predict(X_new)
return current_price_pred[0]
except Exception as e:
print(f"Error during inference: {str(e)}")
raise
@app.route("/inference/<string:token>") @app.route("/inference/<string:token>")
@ -34,13 +56,17 @@ def generate_inference(token):
"""Generate inference for given token.""" """Generate inference for given token."""
if not token or token != "ETH": if not token or token != "ETH":
error_msg = "Token is required" if not token else "Token not supported" error_msg = "Token is required" if not token else "Token not supported"
return Response(json.dumps({"error": error_msg}), status=400, mimetype='application/json') return Response(
json.dumps({"error": error_msg}), status=400, mimetype="application/json"
)
try: try:
inference = get_eth_inference() inference = get_eth_inference()
return Response(str(inference), status=200) return Response(str(inference), status=200)
except Exception as e: except Exception as e:
return Response(json.dumps({"error": str(e)}), status=500, mimetype='application/json') return Response(
json.dumps({"error": str(e)}), status=500, mimetype="application/json"
)
@app.route("/update") @app.route("/update")

View File

@ -7,7 +7,7 @@
"gasAdjustment": 1.0, "gasAdjustment": 1.0,
"nodeRpc": "###RPC_URL###", "nodeRpc": "###RPC_URL###",
"maxRetries": 10, "maxRetries": 10,
"delay": 10, "delay": 30,
"submitTx": false "submitTx": false
}, },
"worker": [ "worker": [

5
config.py Normal file
View File

@ -0,0 +1,5 @@
import os
app_base_path = os.getenv("APP_BASE_PATH", default=os.getcwd())
data_base_path = os.path.join(app_base_path, "data")
model_file_path = os.path.join(data_base_path, "model.pkl")

12
gunicorn_conf.py Normal file
View File

@ -0,0 +1,12 @@
# Gunicorn config variables
loglevel = "info"
errorlog = "-" # stderr
accesslog = "-" # stdout
worker_tmp_dir = "/dev/shm"
graceful_timeout = 120
timeout = 30
keepalive = 5
worker_class = "gthread"
workers = 1
threads = 8
bind = "0.0.0.0:9000"

43
init.config Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
set -e
if [ ! -f config.json ]; then
echo "Error: config.json file not found, please provide one"
exit 1
fi
nodeName=$(jq -r '.wallet.addressKeyName' config.json)
if [ -z "$nodeName" ]; then
echo "No wallet name provided for the node, please provide your preferred wallet name. config.json >> wallet.addressKeyName"
exit 1
fi
# Ensure the worker-data directory exists
mkdir -p ./worker-data
json_content=$(cat ./config.json)
stringified_json=$(echo "$json_content" | jq -c .)
mnemonic=$(jq -r '.wallet.addressRestoreMnemonic' config.json)
if [ -n "$mnemonic" ]; then
echo "ALLORA_OFFCHAIN_NODE_CONFIG_JSON='$stringified_json'" > ./worker-data/env_file
echo "NAME=$nodeName" >> ./worker-data/env_file
echo "ENV_LOADED=true" >> ./worker-data/env_file
echo "wallet mnemonic already provided by you, loading config.json . Please proceed to run docker compose"
exit 0
fi
if [ ! -f ./worker-data/env_file ]; then
echo "ENV_LOADED=false" > ./worker-data/env_file
fi
ENV_LOADED=$(grep '^ENV_LOADED=' ./worker-data/env_file | cut -d '=' -f 2)
if [ "$ENV_LOADED" = "false" ]; then
json_content=$(cat ./config.json)
stringified_json=$(echo "$json_content" | jq -c .)
docker run -it --entrypoint=bash -v $(pwd)/worker-data:/data -v $(pwd)/scripts:/scripts -e NAME="${nodeName}" -e ALLORA_OFFCHAIN_NODE_CONFIG_JSON="${stringified_json}" alloranetwork/allora-chain:latest -c "bash /scripts/init.sh"
echo "config.json saved to ./worker-data/env_file"
else
echo "config.json is already loaded, skipping the operation. You can set ENV_LOADED variable to false in ./worker-data/env_file to reload the config.json"
fi

View File

@ -2,7 +2,6 @@ import subprocess
import json import json
import sys import sys
import time import time
import os
def is_json(myjson): def is_json(myjson):
try: try:
@ -11,7 +10,7 @@ def is_json(myjson):
return False return False
return True return True
def parse_logs(): def parse_logs(timeout):
start_time = time.time() start_time = time.time()
while True: while True:
unsuccessful_attempts = 0 unsuccessful_attempts = 0
@ -50,26 +49,28 @@ def parse_logs():
return False, "Max Retry Reached" return False, "Max Retry Reached"
except Exception as e: except Exception as e:
print(f"Exception occurred: {e}", flush=True) print(f"Exception occurred: {e}", flush=True)
finally:
process.stdout.close()
print("Sleeping before next log request...", flush=True) print("Sleeping before next log request...", flush=True)
time.sleep(30) time.sleep(30)
if time.time() - start_time > 30 * 60: if time.time() - start_time > timeout * 60:
print("Timeout reached: 30 minutes elapsed without success.", flush=True) print(f"Timeout reached: {timeout} minutes elapsed without success.", flush=True)
return False, "Timeout reached: 30 minutes elapsed without success." return False, f"Timeout reached: {timeout} minutes elapsed without success."
return False, "No Success" return False, "No Success"
if __name__ == "__main__": if __name__ == "__main__":
print("Parsing logs...") print("Parsing logs...")
result = parse_logs() if len(sys.argv) > 1:
timeout = eval(sys.argv[1])
else:
timeout = 30
result = parse_logs(timeout)
print(result[1]) print(result[1])
if result[0] == False: if result[0] == False:
print("Exiting 1...") print("Exiting 1...")
os._exit(1) sys.exit(1)
else: else:
print("Exiting 0...") print("Exiting 0...")
os._exit(0) sys.exit(0)

View File

@ -1,15 +1,14 @@
import os import os
import pickle import pickle
import numpy as np
from xgboost import XGBRegressor
from zipfile import ZipFile from zipfile import ZipFile
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
from sklearn import linear_model
from updater import download_binance_monthly_data, download_binance_daily_data from updater import download_binance_monthly_data, download_binance_daily_data
from config import data_base_path, model_file_path from config import data_base_path, model_file_path
binance_data_path = os.path.join(data_base_path, "binance/futures-klines") binance_data_path = os.path.join(data_base_path, "binance/futures-klines")
training_price_data_path = os.path.join(data_base_path, "eth_price_data.csv") training_price_data_path = os.path.join(data_base_path, "eth_price_data.csv")
@ -35,19 +34,14 @@ def download_data():
def format_data(): def format_data():
files = sorted([x for x in os.listdir(binance_data_path)]) files = sorted([x for x in os.listdir(binance_data_path) if x.endswith(".zip")])
# No files to process
if len(files) == 0: if len(files) == 0:
return return
price_df = pd.DataFrame() price_df = pd.DataFrame()
for file in files: for file in files:
zip_file_path = os.path.join(binance_data_path, file) zip_file_path = os.path.join(binance_data_path, file)
if not zip_file_path.endswith(".zip"):
continue
myzip = ZipFile(zip_file_path) myzip = ZipFile(zip_file_path)
with myzip.open(myzip.filelist[0]) as f: with myzip.open(myzip.filelist[0]) as f:
line = f.readline() line = f.readline()
@ -70,30 +64,43 @@ def format_data():
df.index.name = "date" df.index.name = "date"
price_df = pd.concat([price_df, df]) price_df = pd.concat([price_df, df])
price_df["timestamp"] = price_df.index.map(pd.Timestamp.timestamp)
price_df["price_diff"] = price_df["close"].diff()
price_df["volatility"] = (price_df["high"] - price_df["low"]) / price_df["open"]
price_df["volume"] = price_df["volume"]
price_df["moving_avg_7"] = price_df["close"].rolling(window=7).mean()
price_df["moving_avg_30"] = price_df["close"].rolling(window=30).mean()
# Удаляем строки с NaN значениями
price_df.dropna(inplace=True)
# Сохраняем данные
price_df.sort_index().to_csv(training_price_data_path) price_df.sort_index().to_csv(training_price_data_path)
def train_model(): def train_model():
# Load the eth price data
price_data = pd.read_csv(training_price_data_path) price_data = pd.read_csv(training_price_data_path)
df = pd.DataFrame()
# Convert 'date' to a numerical value (timestamp) we can use for regression # Используем дополнительные признаки
df["date"] = pd.to_datetime(price_data["date"]) x = price_data[
df["date"] = df["date"].map(pd.Timestamp.timestamp) [
"timestamp",
"price_diff",
"volatility",
"volume",
"moving_avg_7",
"moving_avg_30",
]
]
y = price_data["close"]
df["price"] = price_data[["open", "close", "high", "low"]].mean(axis=1) x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=0.2, random_state=0
# Reshape the data to the shape expected by sklearn )
x = df["date"].values.reshape(-1, 1)
y = df["price"].values.reshape(-1, 1)
# Split the data into training set and test set
x_train, _, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=0)
# Train the model # Train the model
print("Training model...") print("Training model...")
model = linear_model.Lasso(alpha=0.1) model = XGBRegressor()
model.fit(x_train, y_train) model.fit(x_train, y_train)
print("Model trained.") print("Model trained.")
@ -104,4 +111,8 @@ def train_model():
with open(model_file_path, "wb") as f: with open(model_file_path, "wb") as f:
pickle.dump(model, f) pickle.dump(model, f)
print(f"Trained model saved to {model_file_path}") print(f"Trained model saved to {model_file_path}")
# Optional: Оценка модели
y_pred = model.predict(x_test)
print(f"Mean Absolute Error: {np.mean(np.abs(y_test - y_pred))}")

16
requirements.txt Normal file
View File

@ -0,0 +1,16 @@
flask[async]
gunicorn[gthread]
numpy==1.26.2
pandas==2.1.3
Requests==2.32.0
scikit_learn==1.3.2
werkzeug>=3.0.3 # not directly required, pinned by Snyk to avoid a vulnerability
itsdangerous
Jinja2
MarkupSafe
python-dateutil
pytz
scipy
six
scikit-learn
xgboost

33
scripts/init.sh Normal file
View File

@ -0,0 +1,33 @@
#!/bin/bash
set -e
if allorad keys --home=/data/.allorad --keyring-backend test show $NAME > /dev/null 2>&1 ; then
echo "allora account: $NAME already imported"
else
echo "creating allora account: $NAME"
output=$(allorad keys add $NAME --home=/data/.allorad --keyring-backend test 2>&1)
address=$(echo "$output" | grep 'address:' | sed 's/.*address: //')
mnemonic=$(echo "$output" | tail -n 1)
# Parse and update the JSON string
updated_json=$(echo "$ALLORA_OFFCHAIN_NODE_CONFIG_JSON" | jq --arg name "$NAME" --arg mnemonic "$mnemonic" '
.wallet.addressKeyName = $name |
.wallet.addressRestoreMnemonic = $mnemonic
')
stringified_json=$(echo "$updated_json" | jq -c .)
echo "ALLORA_OFFCHAIN_NODE_CONFIG_JSON='$stringified_json'" > /data/env_file
echo ALLORA_OFFCHAIN_ACCOUNT_ADDRESS=$address >> /data/env_file
echo "NAME=$NAME" >> /data/env_file
echo "Updated ALLORA_OFFCHAIN_NODE_CONFIG_JSON saved to /data/env_file"
fi
if grep -q "ENV_LOADED=false" /data/env_file; then
sed -i 's/ENV_LOADED=false/ENV_LOADED=true/' /data/env_file
else
echo "ENV_LOADED=true" >> /data/env_file
fi

2
update.sh Normal file → Executable file
View File

@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
if [ "$#" -ne 3 ]; then if [ "$#" -ne 3 ]; then
echo "Usage: $0 <mnemonic> <wallet> <rpc_url>" echo "Usage: $0 <mnemonic> <wallet> <rpc_url>"

22
update_app.py Normal file
View File

@ -0,0 +1,22 @@
import os
import requests
inference_address = os.environ["INFERENCE_API_ADDRESS"]
url = f"{inference_address}/update"
print("UPDATING INFERENCE WORKER DATA")
response = requests.get(url)
if response.status_code == 200:
# Request was successful
content = response.text
if content == "0":
print("Response content is '0'")
exit(0)
else:
exit(1)
else:
# Request failed
print(f"Request failed with status code: {response.status_code}")
exit(1)

59
updater.py Normal file
View File

@ -0,0 +1,59 @@
import os
import requests
from concurrent.futures import ThreadPoolExecutor
# Function to download the URL, called asynchronously by several child processes
def download_url(url, download_path):
target_file_path = os.path.join(download_path, os.path.basename(url))
if os.path.exists(target_file_path):
# print(f"File already exists: {url}")
return
response = requests.get(url)
if response.status_code == 404:
# print(f"File not exist: {url}")
pass
else:
# create the entire path if it doesn't exist
os.makedirs(os.path.dirname(target_file_path), exist_ok=True)
with open(target_file_path, "wb") as f:
f.write(response.content)
# print(f"Downloaded: {url} to {target_file_path}")
def download_binance_monthly_data(
cm_or_um, symbols, intervals, years, months, download_path
):
# Verify if CM_OR_UM is correct, if not, exit
if cm_or_um not in ["cm", "um"]:
print("CM_OR_UM can be only cm or um")
return
base_url = f"https://data.binance.vision/data/futures/{cm_or_um}/monthly/klines"
# Main loop to iterate over all the arrays and launch child processes
with ThreadPoolExecutor() as executor:
for symbol in symbols:
for interval in intervals:
for year in years:
for month in months:
url = f"{base_url}/{symbol}/{interval}/{symbol}-{interval}-{year}-{month}.zip"
executor.submit(download_url, url, download_path)
def download_binance_daily_data(
cm_or_um, symbols, intervals, year, month, download_path
):
if cm_or_um not in ["cm", "um"]:
print("CM_OR_UM can be only cm or um")
return
base_url = f"https://data.binance.vision/data/futures/{cm_or_um}/daily/klines"
with ThreadPoolExecutor() as executor:
for symbol in symbols:
for interval in intervals:
for day in range(1, 32): # Assuming days range from 1 to 31
url = f"{base_url}/{symbol}/{interval}/{symbol}-{interval}-{year}-{month:02d}-{day:02d}.zip"
executor.submit(download_url, url, download_path)