13 Commits

Author SHA1 Message Date
1ba4c0158d probably fix 2024-09-06 02:17:59 +03:00
fc7097fd50 remove topics 2024-09-05 18:20:25 +03:00
4b7f57d0dd remove topics 2024-09-05 16:13:16 +03:00
61fa099391 fix interval and retries 2024-09-05 03:23:06 +03:00
520416b772 fix warning 2024-09-05 03:20:57 +03:00
3f17f7f0b7 fix url 2024-09-05 03:20:48 +03:00
59672292e2 fix port 2024-09-05 02:29:32 +03:00
505ba1a42d add new topics to configjs 2024-09-04 22:32:28 +03:00
7fd61d13e5 fix min>m 2024-09-04 22:32:07 +03:00
ca552f5a7a add many tokens 2024-09-04 22:25:48 +03:00
2475e22c1a add universal for many periods 2024-09-04 22:22:25 +03:00
9a211a4748 fix requirements.txt 2024-09-03 04:44:52 +03:00
14e8c74962 new model 2024-09-03 04:24:43 +03:00
7 changed files with 190 additions and 93 deletions

View File

@ -4,6 +4,8 @@ FROM amd64/python:3.9-buster as project_env
# Set the working directory in the container # Set the working directory in the container
WORKDIR /app WORKDIR /app
ENV FLASK_ENV=production
# Install dependencies # Install dependencies
COPY requirements.txt requirements.txt COPY requirements.txt requirements.txt
RUN pip install --upgrade pip setuptools \ RUN pip install --upgrade pip setuptools \

92
app.py
View File

@ -4,43 +4,83 @@ import pandas as pd
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime
from flask import Flask, jsonify, Response from flask import Flask, jsonify, Response
from model import download_data, format_data, train_model from model import download_data, format_data, train_model, get_training_data_path
from config import model_file_path from config import model_file_path
app = Flask(__name__) app = Flask(__name__)
def update_data(): def update_data():
"""Download price data, format data and train model.""" """Download price data, format data and train model for each token."""
tokens = ["ETH", "BTC", "SOL", "BNB", "ARB"]
download_data() download_data()
format_data() for token in tokens:
train_model() format_data(token)
train_model(token)
def get_eth_inference(): def get_inference(token, period):
"""Load model and predict current price."""
with open(model_file_path, "rb") as f:
loaded_model = pickle.load(f)
now_timestamp = pd.Timestamp(datetime.now()).timestamp()
X_new = np.array([now_timestamp]).reshape(-1, 1)
current_price_pred = loaded_model.predict(X_new)
return current_price_pred[0]
@app.route("/inference/<string:token>")
def generate_inference(token):
"""Generate inference for given token."""
if not token or token != "ETH":
error_msg = "Token is required" if not token else "Token not supported"
return Response(json.dumps({"error": error_msg}), status=400, mimetype='application/json')
try: try:
inference = get_eth_inference() model_path = model_file_path[token]
with open(model_path, "rb") as f:
loaded_model = pickle.load(f)
# Загружаем последние данные для данного токена
training_price_data_path = get_training_data_path(token)
price_data = pd.read_csv(training_price_data_path)
# Используем последние значения признаков для предсказания
last_row = price_data.iloc[-1]
last_timestamp = last_row["timestamp"]
# Преобразуем период в секунды
period_seconds = convert_period_to_seconds(period)
new_timestamp = last_timestamp + period_seconds
# Формируем данные для предсказания с новым timestamp
X_new = np.array(
[
new_timestamp,
last_row["price_diff"],
last_row["volatility"],
last_row["volume"],
last_row["moving_avg_7"],
last_row["moving_avg_30"],
]
).reshape(1, -1)
# Делаем предсказание
future_price_pred = loaded_model.predict(X_new)
return future_price_pred[0]
except Exception as e:
print(f"Error during inference: {str(e)}")
raise
def convert_period_to_seconds(period):
"""Конвертируем период в секунды."""
if period.endswith("m"):
return int(period[:-1]) * 60
elif period.endswith("h"):
return int(period[:-1]) * 3600
elif period.endswith("d"):
return int(period[:-1]) * 86400
else:
raise ValueError(f"Unknown period format: {period}")
@app.route("/inference/<string:token>/<string:period>")
def generate_inference(token, period):
"""Generate inference for given token and period."""
try:
inference = get_inference(token, period)
return Response(str(inference), status=200) return Response(str(inference), status=200)
except Exception as e: except Exception as e:
return Response(json.dumps({"error": str(e)}), status=500, mimetype='application/json') return Response(
json.dumps({"error": str(e)}), status=500, mimetype="application/json"
)
@app.route("/update") @app.route("/update")
@ -55,4 +95,4 @@ def update():
if __name__ == "__main__": if __name__ == "__main__":
update_data() update_data()
app.run(host="0.0.0.0", port=8000) app.run(host="0.0.0.0", port=8080)

View File

@ -8,35 +8,53 @@
"nodeRpc": "###RPC_URL###", "nodeRpc": "###RPC_URL###",
"maxRetries": 10, "maxRetries": 10,
"delay": 30, "delay": 30,
"submitTx": false "submitTx": true
}, },
"worker": [ "worker": [
{ {
"topicId": 1, "topicId": 1,
"inferenceEntrypointName": "api-worker-reputer", "inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"loopSeconds": 5, "parameters": { "InferenceEndpoint": "http://inference:8080/inference/ETH/10m", "Token": "ETH" }
"parameters": {
"InferenceEndpoint": "http://inference:8000/inference/{Token}",
"Token": "ETH"
}
}, },
{ {
"topicId": 2, "topicId": 2,
"inferenceEntrypointName": "api-worker-reputer", "inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"loopSeconds": 5, "parameters": { "InferenceEndpoint": "http://inference:8080/inference/ETH/24h", "Token": "ETH" }
"parameters": {
"InferenceEndpoint": "http://inference:8000/inference/{Token}",
"Token": "ETH"
}
}, },
{ {
"topicId": 3,
"inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/BTC/10m", "Token": "BTC" }
},
{
"topicId": 4,
"inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/BTC/24h", "Token": "BTC" }
},
{
"topicId": 5,
"inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/SOL/10m", "Token": "SOL" }
},
{
"topicId": 6,
"inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/SOL/24h", "Token": "SOL" }
},
{
"topicId": 7, "topicId": 7,
"inferenceEntrypointName": "api-worker-reputer", "inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"loopSeconds": 5, "parameters": { "InferenceEndpoint": "http://inference:8080/inference/ETH/20m", "Token": "ETH" }
"parameters": { },
"InferenceEndpoint": "http://inference:8000/inference/{Token}", {
"Token": "ETH" "topicId": 8,
} "inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/BNB/20m", "Token": "BNB" }
},
{
"topicId": 9,
"inferenceEntrypointName": "api-worker-reputer", "loopSeconds": 5,
"parameters": { "InferenceEndpoint": "http://inference:8080/inference/ARB/20m", "Token": "ARB" }
} }
] ]
} }

View File

@ -2,4 +2,15 @@ import os
app_base_path = os.getenv("APP_BASE_PATH", default=os.getcwd()) app_base_path = os.getenv("APP_BASE_PATH", default=os.getcwd())
data_base_path = os.path.join(app_base_path, "data") data_base_path = os.path.join(app_base_path, "data")
model_file_path = os.path.join(data_base_path, "model.pkl")
model_file_path = {
"ETH": os.path.join(data_base_path, "eth_model.pkl"),
"BTC": os.path.join(data_base_path, "btc_model.pkl"),
"SOL": os.path.join(data_base_path, "sol_model.pkl"),
"BNB": os.path.join(data_base_path, "bnb_model.pkl"),
"ARB": os.path.join(data_base_path, "arb_model.pkl"),
}
def get_training_data_path(token):
return os.path.join(data_base_path, f"{token.lower()}_price_data.csv")

View File

@ -4,12 +4,12 @@ services:
build: . build: .
command: python -u /app/app.py command: python -u /app/app.py
ports: ports:
- "8000:8000" - "8080:8080"
healthcheck: healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/inference/ETH"] test: ["CMD", "curl", "-f", "http://localhost:8080/inference/ETH/10m"]
interval: 10s interval: 30s
timeout: 5s timeout: 5s
retries: 12 retries: 20
volumes: volumes:
- ./inference-data:/app/data - ./inference-data:/app/data
restart: always restart: always
@ -18,7 +18,7 @@ services:
container_name: updater-basic-eth-pred container_name: updater-basic-eth-pred
build: . build: .
environment: environment:
- INFERENCE_API_ADDRESS=http://inference:8000 - INFERENCE_API_ADDRESS=http://inference:8080
command: > command: >
sh -c " sh -c "
while true; do while true; do

View File

@ -1,23 +1,28 @@
import os import os
import pickle import pickle
from zipfile import ZipFile
from datetime import datetime
import pandas as pd
import numpy as np import numpy as np
from xgboost import XGBRegressor
from zipfile import ZipFile
from datetime import datetime, timedelta
import pandas as pd
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
from sklearn import linear_model
from updater import download_binance_monthly_data, download_binance_daily_data from updater import download_binance_monthly_data, download_binance_daily_data
from config import data_base_path, model_file_path from config import data_base_path, model_file_path
binance_data_path = os.path.join(data_base_path, "binance/futures-klines") binance_data_path = os.path.join(data_base_path, "binance/futures-klines")
training_price_data_path = os.path.join(data_base_path, "eth_price_data.csv")
def get_training_data_path(token):
"""
Возвращает путь к файлу данных для указанного токена.
"""
return os.path.join(data_base_path, f"{token}_price_data.csv")
def download_data(): def download_data():
cm_or_um = "um" cm_or_um = "um"
symbols = ["ETHUSDT"] symbols = ["ETHUSDT", "BTCUSDT", "SOLUSDT", "BNBUSDT", "ARBUSDT"]
intervals = ["1d"] intervals = ["10min", "1d"]
years = ["2020", "2021", "2022", "2023", "2024"] years = ["2020", "2021", "2022", "2023", "2024"]
months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"] months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
download_path = binance_data_path download_path = binance_data_path
@ -34,20 +39,17 @@ def download_data():
print(f"Downloaded daily data to {download_path}.") print(f"Downloaded daily data to {download_path}.")
def format_data(): def format_data(token):
files = sorted([x for x in os.listdir(binance_data_path)]) files = sorted(
[x for x in os.listdir(binance_data_path) if x.endswith(".zip") and token in x]
)
# No files to process
if len(files) == 0: if len(files) == 0:
return return
price_df = pd.DataFrame() price_df = pd.DataFrame()
for file in files: for file in files:
zip_file_path = os.path.join(binance_data_path, file) zip_file_path = os.path.join(binance_data_path, file)
if not zip_file_path.endswith(".zip"):
continue
myzip = ZipFile(zip_file_path) myzip = ZipFile(zip_file_path)
with myzip.open(myzip.filelist[0]) as f: with myzip.open(myzip.filelist[0]) as f:
line = f.readline() line = f.readline()
@ -70,38 +72,53 @@ def format_data():
df.index.name = "date" df.index.name = "date"
price_df = pd.concat([price_df, df]) price_df = pd.concat([price_df, df])
price_df["timestamp"] = price_df.index.map(pd.Timestamp.timestamp)
price_df["price_diff"] = price_df["close"].diff()
price_df["volatility"] = (price_df["high"] - price_df["low"]) / price_df["open"]
price_df["volume"] = price_df["volume"]
price_df["moving_avg_7"] = price_df["close"].rolling(window=7).mean()
price_df["moving_avg_30"] = price_df["close"].rolling(window=30).mean()
# Удаляем строки с NaN значениями
price_df.dropna(inplace=True)
# Сохраняем данные
training_price_data_path = get_training_data_path(token)
price_df.sort_index().to_csv(training_price_data_path) price_df.sort_index().to_csv(training_price_data_path)
def train_model(): def train_model(token):
# Load the eth price data training_price_data_path = get_training_data_path(token)
price_data = pd.read_csv(training_price_data_path) price_data = pd.read_csv(training_price_data_path)
df = pd.DataFrame()
# Convert 'date' to a numerical value (timestamp) we can use for regression # Используем дополнительные признаки
df["date"] = pd.to_datetime(price_data["date"]) x = price_data[
df["date"] = df["date"].map(pd.Timestamp.timestamp) [
"timestamp",
"price_diff",
"volatility",
"volume",
"moving_avg_7",
"moving_avg_30",
]
]
y = price_data["close"]
df["price"] = price_data[["open", "close", "high", "low"]].mean(axis=1) x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=0.2, random_state=0
)
# Reshape the data to the shape expected by sklearn model = XGBRegressor()
x = df["date"].values.reshape(-1, 1)
y = df["price"].values.reshape(-1, 1)
# Split the data into training set and test set
x_train, _, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=0)
# Train the model
print("Training model...")
model = linear_model.Lasso(alpha=0.1)
model.fit(x_train, y_train) model.fit(x_train, y_train)
print("Model trained.")
# create the model's parent directory if it doesn't exist token_model_path = model_file_path[token]
os.makedirs(os.path.dirname(model_file_path), exist_ok=True) os.makedirs(os.path.dirname(token_model_path), exist_ok=True)
# Save the trained model to a file with open(token_model_path, "wb") as f:
with open(model_file_path, "wb") as f:
pickle.dump(model, f) pickle.dump(model, f)
print(f"Trained model saved to {model_file_path}") print(f"Trained model saved to {token_model_path}")
# Optional: Оценка модели
y_pred = model.predict(x_test)
print(f"Mean Absolute Error: {np.mean(np.abs(y_test - y_pred))}")

View File

@ -4,4 +4,13 @@ numpy==1.26.2
pandas==2.1.3 pandas==2.1.3
Requests==2.32.0 Requests==2.32.0
scikit_learn==1.3.2 scikit_learn==1.3.2
werkzeug>=3.0.3 # not directly required, pinned by Snyk to avoid a vulnerability werkzeug>=3.0.3 # not directly required, pinned by Snyk to avoid a vulnerability
itsdangerous
Jinja2
MarkupSafe
python-dateutil
pytz
scipy
six
scikit-learn
xgboost