allora/model.py
2024-09-04 22:22:25 +03:00

154 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import pickle
import numpy as np
from xgboost import XGBRegressor
from zipfile import ZipFile
from datetime import datetime, timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from updater import download_binance_monthly_data, download_binance_daily_data
from config import data_base_path, model_file_path
binance_data_path = os.path.join(data_base_path, "binance/futures-klines")
training_price_data_path = os.path.join(data_base_path, "eth_price_data.csv")
def download_data():
cm_or_um = "um"
symbols = ["ETHUSDT"]
intervals = ["10min"]
years = ["2020", "2021", "2022", "2023", "2024"]
months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
download_path = binance_data_path
download_binance_monthly_data(
cm_or_um, symbols, intervals, years, months, download_path
)
print(f"Downloaded monthly data to {download_path}.")
current_datetime = datetime.now()
current_year = current_datetime.year
current_month = current_datetime.month
download_binance_daily_data(
cm_or_um, symbols, intervals, current_year, current_month, download_path
)
print(f"Downloaded daily data to {download_path}.")
def format_data():
files = sorted([x for x in os.listdir(binance_data_path) if x.endswith(".zip")])
if len(files) == 0:
return
price_df = pd.DataFrame()
for file in files:
zip_file_path = os.path.join(binance_data_path, file)
myzip = ZipFile(zip_file_path)
with myzip.open(myzip.filelist[0]) as f:
line = f.readline()
header = 0 if line.decode("utf-8").startswith("open_time") else None
df = pd.read_csv(myzip.open(myzip.filelist[0]), header=header).iloc[:, :11]
df.columns = [
"start_time",
"open",
"high",
"low",
"close",
"volume",
"end_time",
"volume_usd",
"n_trades",
"taker_volume",
"taker_volume_usd",
]
df.index = [pd.Timestamp(x + 1, unit="ms") for x in df["end_time"]]
df.index.name = "date"
price_df = pd.concat([price_df, df])
price_df["timestamp"] = price_df.index.map(pd.Timestamp.timestamp)
price_df["price_diff"] = price_df["close"].diff()
price_df["volatility"] = (price_df["high"] - price_df["low"]) / price_df["open"]
price_df["volume"] = price_df["volume"]
price_df["moving_avg_7"] = price_df["close"].rolling(window=7).mean()
price_df["moving_avg_30"] = price_df["close"].rolling(window=30).mean()
# Удаляем строки с NaN значениями
price_df.dropna(inplace=True)
# Сохраняем данные
price_df.sort_index().to_csv(training_price_data_path)
def train_model():
price_data = pd.read_csv(training_price_data_path)
# Используем дополнительные признаки
x = price_data[
[
"timestamp",
"price_diff",
"volatility",
"volume",
"moving_avg_7",
"moving_avg_30",
]
]
y = price_data["close"]
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=0.2, random_state=0
)
# Train the model
print("Training model...")
model = XGBRegressor()
model.fit(x_train, y_train)
print("Model trained.")
os.makedirs(os.path.dirname(model_file_path), exist_ok=True)
with open(model_file_path, "wb") as f:
pickle.dump(model, f)
print(f"Trained model saved to {model_file_path}")
# Optional: Оценка модели
y_pred = model.predict(x_test)
print(f"Mean Absolute Error: {np.mean(np.abs(y_test - y_pred))}")
def get_inference_data(token, period):
"""
Генерирует данные для инференса на основе переданного токена и периода.
"""
price_data = pd.read_csv(training_price_data_path)
# Настроить разницу времени в зависимости от периода
time_delta_map = {
"1min": timedelta(minutes=1),
"5min": timedelta(minutes(5)),
"10min": timedelta(minutes(10)),
"30min": timedelta(minutes(30)),
"1h": timedelta(hours=1),
"1d": timedelta(days=1),
}
# Получаем последний таймстамп и вычисляем следующий
last_timestamp = pd.to_datetime(price_data["timestamp"].iloc[-1], unit="s")
next_timestamp = last_timestamp + time_delta_map.get(period, timedelta(minutes=10))
# Используем последние значения признаков
last_data = price_data.iloc[-1]
X_new = np.array(
[
[
next_timestamp.timestamp(),
last_data["price_diff"],
last_data["volatility"],
last_data["volume"],
last_data["moving_avg_7"],
last_data["moving_avg_30"],
]
]
)
return X_new