Merge pull request #18 from allora-network/clement/SOLU-1362

Add real-time data fetching and configuration options
This commit is contained in:
kush-alloralabs 2024-09-04 14:26:37 -04:00 committed by GitHub
commit 70cf49d0a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 387 additions and 160 deletions

7
.env.example Normal file
View File

@ -0,0 +1,7 @@
TOKEN=
TRAINING_DAYS=
TIMEFRAME=
MODEL=
REGION=
DATA_PROVIDER=
CG_API_KEY=

13
.gitignore vendored
View File

@ -9,6 +9,13 @@ inference-data
worker-data
config.json
env
env_file
.env
/data
**/*.venv*
**/.cache
**/.env
**/env_file
**/.gitkeep*
**/*.csv
**/*.pkl
**/*.zip

View File

@ -1,5 +1,4 @@
# Use an official Python runtime as the base image
FROM amd64/python:3.9-buster as project_env
FROM python:3.11-slim as project_env
# Set the working directory in the container
WORKDIR /app

View File

@ -1,12 +1,12 @@
# Basic ETH Price Prediction Node
# Basic Price Prediction Node
This repository provides an example Allora network worker node, designed to offer price predictions for ETH. The primary objective is to demonstrate the use of a basic inference model running within a dedicated container, showcasing its integration with the Allora network infrastructure to contribute valuable inferences.
This repository provides an example Allora network worker node, designed to offer price predictions. The primary objective is to demonstrate the use of a basic inference model running within a dedicated container, showcasing its integration with the Allora network infrastructure to contribute valuable inferences.
## Components
- **Worker**: The node that publishes inferences to the Allora chain.
- **Inference**: A container that conducts inferences, maintains the model state, and responds to internal inference requests via a Flask application. This node operates with a basic linear regression model for price predictions.
- **Updater**: A cron-like container designed to update the inference node's data by daily fetching the latest market information from Binance, ensuring the model stays current with new market trends.
- **Updater**: A cron-like container designed to update the inference node's data by daily fetching the latest market information from the data provider, ensuring the model stays current with new market trends.
Check the `docker-compose.yml` file for the detailed setup of each component.
@ -17,14 +17,45 @@ A complete working example is provided in the `docker-compose.yml` file.
### Steps to Setup
1. **Clone the Repository**
2. **Copy and Populate Configuration**
2. **Copy and Populate Model Configuration environment file**
Copy the example .env.example file and populate it with your variables:
```sh
cp .env.example .env
```
Here are the currently accepted configurations:
- TOKEN
Must be one in ['ETH','SOL','BTC','BNB','ARB'].
Note: if you are using `Binance` as the data provider, any token could be used.
If you are using Coingecko, you should add its `coin_id` in the [token_map here](https://github.com/allora-network/basic-coin-prediction-node/blob/main/updater.py#L107). Find [more info here](https://docs.coingecko.com/reference/simple-price) and the [list here](https://docs.google.com/spreadsheets/d/1wTTuxXt8n9q7C4NDXqQpI3wpKu1_5bGVmP9Xz0XGSyU/edit?usp=sharing).
- TRAINING_DAYS
Must be an `int` >= 1.
Represents how many days of historical data to use.
- TIMEFRAME
This should be in this form: `10m`, `1h`, `1d`, etc.
Note: For Coingecko, Data granularity (candle's body) is automatic - [see here](https://docs.coingecko.com/reference/coins-id-ohlc). To avoid downsampling, it is recommanded to use with Coingecko:
- TIMEFRAME >= 30m if TRAINING_DAYS <= 2
- TIMEFRAME >= 4h if TRAINING_DAYS <= 30
- TIMEFRAME >= 4d if TRAINING_DAYS >= 31
- MODEL
Must be one in ['LinearRegression','SVR','KernelRidge','BayesianRidge'].
You can easily add support for any other models by [adding it here](https://github.com/allora-network/basic-coin-prediction-node/blob/main/model.py#L133).
- REGION
Must be `EU` or `US` - it is used for the Binance API.
- DATA_PROVIDER
Must be `Binance` or `Coingecko`. Feel free to add support for other data providers to personalize your model!
- CG_API_KEY
This is your `Coingecko` API key, if you've set `DATA_PROVIDER=coingecko`.
3. **Copy and Populate Worker Configuration**
Copy the example configuration file and populate it with your variables:
```sh
cp config.example.json config.json
```
3. **Initialize Worker**
4. **Initialize Worker**
Run the following commands from the project's root directory to initialize the worker:
```sh
@ -35,11 +66,11 @@ A complete working example is provided in the `docker-compose.yml` file.
- Automatically create Allora keys for your worker.
- Export the needed variables from the created account to be used by the worker node, bundle them with your provided `config.json`, and pass them to the node as environment variables.
4. **Faucet Your Worker Node**
5. **Faucet Your Worker Node**
You can find the offchain worker node's address in `./worker-data/env_file` under `ALLORA_OFFCHAIN_ACCOUNT_ADDRESS`. [Add faucet funds](https://docs.allora.network/devs/get-started/setup-wallet#add-faucet-funds) to your worker's wallet before starting it.
5. **Start the Services**
6. **Start the Services**
Run the following command to start the worker node, inference, and updater nodes:
```sh

33
app.py
View File

@ -1,43 +1,28 @@
import json
import pickle
import pandas as pd
import numpy as np
from datetime import datetime
from flask import Flask, jsonify, Response
from model import download_data, format_data, train_model
from config import model_file_path
from flask import Flask, Response
from model import download_data, format_data, train_model, get_inference
from config import model_file_path, TOKEN, TIMEFRAME, TRAINING_DAYS, REGION, DATA_PROVIDER
app = Flask(__name__)
def update_data():
"""Download price data, format data and train model."""
download_data()
format_data()
train_model()
def get_eth_inference():
"""Load model and predict current price."""
with open(model_file_path, "rb") as f:
loaded_model = pickle.load(f)
now_timestamp = pd.Timestamp(datetime.now()).timestamp()
X_new = np.array([now_timestamp]).reshape(-1, 1)
current_price_pred = loaded_model.predict(X_new)
return current_price_pred[0][0]
files = download_data(TOKEN, TRAINING_DAYS, REGION, DATA_PROVIDER)
format_data(files, DATA_PROVIDER)
train_model(TIMEFRAME)
@app.route("/inference/<string:token>")
def generate_inference(token):
"""Generate inference for given token."""
if not token or token != "ETH":
if not token or token.upper() != TOKEN:
error_msg = "Token is required" if not token else "Token not supported"
return Response(json.dumps({"error": error_msg}), status=400, mimetype='application/json')
try:
inference = get_eth_inference()
inference = get_inference(token.upper(), TIMEFRAME, REGION, DATA_PROVIDER)
return Response(str(inference), status=200)
except Exception as e:
return Response(json.dumps({"error": str(e)}), status=500, mimetype='application/json')

View File

@ -3,12 +3,12 @@
"addressKeyName": "test",
"addressRestoreMnemonic": "",
"alloraHomeDir": "",
"gas": "1000000",
"gasAdjustment": 1.0,
"nodeRpc": "http://localhost:26657",
"gas": "auto",
"gasAdjustment": 1.5,
"nodeRpc": "https://allora-rpc.testnet.allora.network",
"maxRetries": 1,
"delay": 1,
"submitTx": false
"submitTx": true
},
"worker": [
{

View File

@ -1,5 +1,21 @@
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
app_base_path = os.getenv("APP_BASE_PATH", default=os.getcwd())
data_base_path = os.path.join(app_base_path, "data")
model_file_path = os.path.join(data_base_path, "model.pkl")
TOKEN = os.getenv("TOKEN").upper()
TRAINING_DAYS = os.getenv("TRAINING_DAYS")
TIMEFRAME = os.getenv("TIMEFRAME")
MODEL = os.getenv("MODEL")
REGION = os.getenv("REGION").lower()
if REGION in ["us", "com", "usa"]:
REGION = "us"
else:
REGION = "com"
DATA_PROVIDER = os.getenv("DATA_PROVIDER").lower()
CG_API_KEY = os.getenv("CG_API_KEY", default=None)

View File

@ -1,12 +1,14 @@
services:
inference:
container_name: inference-basic-eth-pred
env_file:
- .env
build: .
command: python -u /app/app.py
ports:
- "8000:8000"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/inference/ETH"]
test: ["CMD", "curl", "-f", "http://localhost:8000/inference/${TOKEN}"]
interval: 10s
timeout: 5s
retries: 12
@ -31,7 +33,7 @@ services:
worker:
container_name: worker
image: alloranetwork/allora-offchain-node:latest
image: alloranetwork/allora-offchain-node:v0.3.0
volumes:
- ./worker-data:/data
depends_on:

View File

@ -36,7 +36,7 @@ ENV_LOADED=$(grep '^ENV_LOADED=' ./worker-data/env_file | cut -d '=' -f 2)
if [ "$ENV_LOADED" = "false" ]; then
json_content=$(cat ./config.json)
stringified_json=$(echo "$json_content" | jq -c .)
docker run -it --entrypoint=bash -v $(pwd)/worker-data:/data -v $(pwd)/scripts:/scripts -e NAME="${nodeName}" -e ALLORA_OFFCHAIN_NODE_CONFIG_JSON="${stringified_json}" alloranetwork/allora-chain:latest -c "bash /scripts/init.sh"
docker run -it --entrypoint=bash -v $(pwd)/worker-data:/data -v $(pwd)/scripts:/scripts -e NAME="${nodeName}" -e ALLORA_OFFCHAIN_NODE_CONFIG_JSON="${stringified_json}" alloranetwork/allora-chain:v0.4.0 -c "bash /scripts/init.sh"
echo "config.json saved to ./worker-data/env_file"
else
echo "config.json is already loaded, skipping the operation. You can set ENV_LOADED variable to false in ./worker-data/env_file to reload the config.json"

198
model.py
View File

@ -1,98 +1,141 @@
import json
import os
import pickle
from zipfile import ZipFile
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from updater import download_binance_monthly_data, download_binance_daily_data
from config import data_base_path, model_file_path
from sklearn.kernel_ridge import KernelRidge
from sklearn.linear_model import BayesianRidge, LinearRegression
from sklearn.svm import SVR
from updater import download_binance_daily_data, download_binance_current_day_data, download_coingecko_data, download_coingecko_current_day_data
from config import data_base_path, model_file_path, TOKEN, MODEL, CG_API_KEY
binance_data_path = os.path.join(data_base_path, "binance/futures-klines")
training_price_data_path = os.path.join(data_base_path, "eth_price_data.csv")
binance_data_path = os.path.join(data_base_path, "binance")
coingecko_data_path = os.path.join(data_base_path, "coingecko")
training_price_data_path = os.path.join(data_base_path, "price_data.csv")
def download_data():
cm_or_um = "um"
symbols = ["ETHUSDT"]
intervals = ["1d"]
years = ["2020", "2021", "2022", "2023", "2024"]
months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
download_path = binance_data_path
download_binance_monthly_data(
cm_or_um, symbols, intervals, years, months, download_path
)
print(f"Downloaded monthly data to {download_path}.")
current_datetime = datetime.now()
current_year = current_datetime.year
current_month = current_datetime.month
download_binance_daily_data(
cm_or_um, symbols, intervals, current_year, current_month, download_path
)
print(f"Downloaded daily data to {download_path}.")
def download_data_binance(token, training_days, region):
files = download_binance_daily_data(f"{token}USDT", training_days, region, binance_data_path)
print(f"Downloaded {len(files)} new files")
return files
def download_data_coingecko(token, training_days):
files = download_coingecko_data(token, training_days, coingecko_data_path, CG_API_KEY)
print(f"Downloaded {len(files)} new files")
return files
def format_data():
files = sorted([x for x in os.listdir(binance_data_path)])
def download_data(token, training_days, region, data_provider):
if data_provider == "coingecko":
return download_data_coingecko(token, int(training_days))
elif data_provider == "binance":
return download_data_binance(token, training_days, region)
else:
raise ValueError("Unsupported data provider")
def format_data(files, data_provider):
if not files:
print("Already up to date")
return
if data_provider == "binance":
files = sorted([x for x in os.listdir(binance_data_path) if x.startswith(f"{TOKEN}USDT")])
elif data_provider == "coingecko":
files = sorted([x for x in os.listdir(coingecko_data_path) if x.endswith(".json")])
# No files to process
if len(files) == 0:
return
price_df = pd.DataFrame()
for file in files:
zip_file_path = os.path.join(binance_data_path, file)
if data_provider == "binance":
for file in files:
zip_file_path = os.path.join(binance_data_path, file)
if not zip_file_path.endswith(".zip"):
continue
if not zip_file_path.endswith(".zip"):
continue
myzip = ZipFile(zip_file_path)
with myzip.open(myzip.filelist[0]) as f:
line = f.readline()
header = 0 if line.decode("utf-8").startswith("open_time") else None
df = pd.read_csv(myzip.open(myzip.filelist[0]), header=header).iloc[:, :11]
df.columns = [
"start_time",
"open",
"high",
"low",
"close",
"volume",
"end_time",
"volume_usd",
"n_trades",
"taker_volume",
"taker_volume_usd",
]
df.index = [pd.Timestamp(x + 1, unit="ms") for x in df["end_time"]]
df.index.name = "date"
price_df = pd.concat([price_df, df])
myzip = ZipFile(zip_file_path)
with myzip.open(myzip.filelist[0]) as f:
line = f.readline()
header = 0 if line.decode("utf-8").startswith("open_time") else None
df = pd.read_csv(myzip.open(myzip.filelist[0]), header=header).iloc[:, :11]
df.columns = [
"start_time",
"open",
"high",
"low",
"close",
"volume",
"end_time",
"volume_usd",
"n_trades",
"taker_volume",
"taker_volume_usd",
]
df.index = [pd.Timestamp(x + 1, unit="ms").to_datetime64() for x in df["end_time"]]
df.index.name = "date"
price_df = pd.concat([price_df, df])
price_df.sort_index().to_csv(training_price_data_path)
price_df.sort_index().to_csv(training_price_data_path)
elif data_provider == "coingecko":
for file in files:
with open(os.path.join(coingecko_data_path, file), "r") as f:
data = json.load(f)
df = pd.DataFrame(data)
df.columns = [
"timestamp",
"open",
"high",
"low",
"close"
]
df["date"] = pd.to_datetime(df["timestamp"], unit="ms")
df.drop(columns=["timestamp"], inplace=True)
df.set_index("date", inplace=True)
price_df = pd.concat([price_df, df])
price_df.sort_index().to_csv(training_price_data_path)
def train_model():
# Load the eth price data
def load_frame(frame, timeframe):
print(f"Loading data...")
df = frame.loc[:,['open','high','low','close']].dropna()
df[['open','high','low','close']] = df[['open','high','low','close']].apply(pd.to_numeric)
df['date'] = frame['date'].apply(pd.to_datetime)
df.set_index('date', inplace=True)
df.sort_index(inplace=True)
return df.resample(f'{timeframe}', label='right', closed='right', origin='end').mean()
def train_model(timeframe):
# Load the price data
price_data = pd.read_csv(training_price_data_path)
df = pd.DataFrame()
df = load_frame(price_data, timeframe)
# Convert 'date' to a numerical value (timestamp) we can use for regression
df["date"] = pd.to_datetime(price_data["date"])
df["date"] = df["date"].map(pd.Timestamp.timestamp)
print(df.tail())
df["price"] = price_data[["open", "close", "high", "low"]].mean(axis=1)
y_train = df['close'].shift(-1).dropna().values
X_train = df[:-1]
# Reshape the data to the shape expected by sklearn
x = df["date"].values.reshape(-1, 1)
y = df["price"].values.reshape(-1, 1)
# Split the data into training set and test set
x_train, _, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=0)
print(f"Training data shape: {X_train.shape}, {y_train.shape}")
# Define the model
if MODEL == "LinearRegression":
model = LinearRegression()
elif MODEL == "SVR":
model = SVR()
elif MODEL == "KernelRidge":
model = KernelRidge()
elif MODEL == "BayesianRidge":
model = BayesianRidge()
# Add more models here
else:
raise ValueError("Unsupported model")
# Train the model
model = LinearRegression()
model.fit(x_train, y_train)
model.fit(X_train, y_train)
# create the model's parent directory if it doesn't exist
os.makedirs(os.path.dirname(model_file_path), exist_ok=True)
@ -102,3 +145,22 @@ def train_model():
pickle.dump(model, f)
print(f"Trained model saved to {model_file_path}")
def get_inference(token, timeframe, region, data_provider):
"""Load model and predict current price."""
with open(model_file_path, "rb") as f:
loaded_model = pickle.load(f)
# Get current price
if data_provider == "coingecko":
X_new = load_frame(download_coingecko_current_day_data(token, CG_API_KEY), timeframe)
else:
X_new = load_frame(download_binance_current_day_data(f"{TOKEN}USDT", region), timeframe)
print(X_new.tail())
print(X_new.shape)
current_price_pred = loaded_model.predict(X_new)
return current_price_pred[0]

View File

@ -1,7 +1,9 @@
flask[async]
gunicorn[gthread]
numpy==1.26.2
pandas==2.1.3
Requests==2.32.0
scikit_learn==1.3.2
werkzeug>=3.0.3 # not directly required, pinned by Snyk to avoid a vulnerability
numpy
pandas
Requests
aiohttp
multiprocess
scikit_learn
python-dotenv

View File

@ -1,59 +1,175 @@
import os
from datetime import date, timedelta
import pathlib
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import json
# Define the retry strategy
retry_strategy = Retry(
total=4, # Maximum number of retries
backoff_factor=2, # Exponential backoff factor (e.g., 2 means 1, 2, 4, 8 seconds, ...)
status_forcelist=[429, 500, 502, 503, 504], # HTTP status codes to retry on
)
# Create an HTTP adapter with the retry strategy and mount it to session
adapter = HTTPAdapter(max_retries=retry_strategy)
# Create a new session object
session = requests.Session()
session.mount('http://', adapter)
session.mount('https://', adapter)
files = []
# Function to download the URL, called asynchronously by several child processes
def download_url(url, download_path):
target_file_path = os.path.join(download_path, os.path.basename(url))
if os.path.exists(target_file_path):
# print(f"File already exists: {url}")
return
def download_url(url, download_path, name=None):
try:
global files
if name:
file_name = os.path.join(download_path, name)
else:
file_name = os.path.join(download_path, os.path.basename(url))
dir_path = os.path.dirname(file_name)
pathlib.Path(dir_path).mkdir(parents=True, exist_ok=True)
if os.path.isfile(file_name):
# print(f"{file_name} already exists")
return
# Make a request using the session object
response = session.get(url)
if response.status_code == 404:
print(f"File does not exist: {url}")
elif response.status_code == 200:
with open(file_name, 'wb') as f:
f.write(response.content)
# print(f"Downloaded: {url} to {file_name}")
files.append(file_name)
return
else:
print(f"Failed to download {url}")
return
except Exception as e:
print(str(e))
# Function to generate a range of dates
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)
# Function to download daily data from Binance
def download_binance_daily_data(pair, training_days, region, download_path):
base_url = f"https://data.binance.vision/data/spot/daily/klines"
end_date = date.today()
start_date = end_date - timedelta(days=int(training_days))
response = requests.get(url)
if response.status_code == 404:
# print(f"File not exist: {url}")
pass
global files
files = []
with ThreadPoolExecutor() as executor:
print(f"Downloading data for {pair}")
for single_date in daterange(start_date, end_date):
url = f"{base_url}/{pair}/1m/{pair}-1m-{single_date}.zip"
executor.submit(download_url, url, download_path)
return files
def download_binance_current_day_data(pair, region):
limit = 1000
base_url = f'https://api.binance.{region}/api/v3/klines?symbol={pair}&interval=1m&limit={limit}'
# Make a request using the session object
response = session.get(base_url)
response.raise_for_status()
resp = str(response.content, 'utf-8').rstrip()
columns = ['start_time','open','high','low','close','volume','end_time','volume_usd','n_trades','taker_volume','taker_volume_usd','ignore']
df = pd.DataFrame(json.loads(resp),columns=columns)
df['date'] = [pd.to_datetime(x+1,unit='ms') for x in df['end_time']]
df['date'] = df['date'].apply(pd.to_datetime)
df[["volume", "taker_volume", "open", "high", "low", "close"]] = df[["volume", "taker_volume", "open", "high", "low", "close"]].apply(pd.to_numeric)
return df.sort_index()
def get_coingecko_coin_id(token):
token_map = {
'ETH': 'ethereum',
'SOL': 'solana',
'BTC': 'bitcoin',
'BNB': 'binancecoin',
'ARB': 'arbitrum',
# Add more tokens here
}
token = token.upper()
if token in token_map:
return token_map[token]
else:
# create the entire path if it doesn't exist
os.makedirs(os.path.dirname(target_file_path), exist_ok=True)
with open(target_file_path, "wb") as f:
f.write(response.content)
# print(f"Downloaded: {url} to {target_file_path}")
raise ValueError("Unsupported token")
def download_binance_monthly_data(
cm_or_um, symbols, intervals, years, months, download_path
):
# Verify if CM_OR_UM is correct, if not, exit
if cm_or_um not in ["cm", "um"]:
print("CM_OR_UM can be only cm or um")
return
base_url = f"https://data.binance.vision/data/futures/{cm_or_um}/monthly/klines"
def download_coingecko_data(token, training_days, download_path, CG_API_KEY):
if training_days <= 7:
days = 7
elif training_days <= 14:
days = 14
elif training_days <= 30:
days = 30
elif training_days <= 90:
days = 90
elif training_days <= 180:
days = 180
elif training_days <= 365:
days = 365
else:
days = "max"
print(f"Days: {days}")
# Main loop to iterate over all the arrays and launch child processes
with ThreadPoolExecutor() as executor:
for symbol in symbols:
for interval in intervals:
for year in years:
for month in months:
url = f"{base_url}/{symbol}/{interval}/{symbol}-{interval}-{year}-{month}.zip"
executor.submit(download_url, url, download_path)
coin_id = get_coingecko_coin_id(token)
print(f"Coin ID: {coin_id}")
# Get OHLC data from Coingecko
url = f'https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc?vs_currency=usd&days={days}&api_key={CG_API_KEY}'
def download_binance_daily_data(
cm_or_um, symbols, intervals, year, month, download_path
):
if cm_or_um not in ["cm", "um"]:
print("CM_OR_UM can be only cm or um")
return
base_url = f"https://data.binance.vision/data/futures/{cm_or_um}/daily/klines"
global files
files = []
with ThreadPoolExecutor() as executor:
for symbol in symbols:
for interval in intervals:
for day in range(1, 32): # Assuming days range from 1 to 31
url = f"{base_url}/{symbol}/{interval}/{symbol}-{interval}-{year}-{month:02d}-{day:02d}.zip"
executor.submit(download_url, url, download_path)
print(f"Downloading data for {coin_id}")
name = os.path.basename(url).split("?")[0].replace("/", "_") + ".json"
executor.submit(download_url, url, download_path, name)
return files
def download_coingecko_current_day_data(token, CG_API_KEY):
coin_id = get_coingecko_coin_id(token)
print(f"Coin ID: {coin_id}")
url = f'https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc?vs_currency=usd&days=1&api_key={CG_API_KEY}'
# Make a request using the session object
response = session.get(url)
response.raise_for_status()
resp = str(response.content, 'utf-8').rstrip()
columns = ['timestamp','open','high','low','close']
df = pd.DataFrame(json.loads(resp), columns=columns)
df['date'] = [pd.to_datetime(x,unit='ms') for x in df['timestamp']]
df['date'] = df['date'].apply(pd.to_datetime)
df[["open", "high", "low", "close"]] = df[["open", "high", "low", "close"]].apply(pd.to_numeric)
return df.sort_index()