Compare commits

..

14 Commits

Author SHA1 Message Date
7a2af2a349 sonar check
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 32s
2025-01-08 21:15:12 +09:00
dae646cd2e auto-trade skeleton added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 4m10s
2024-12-24 00:26:18 +09:00
82e1b74aee fixed title on coin live and added ul on index.html
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 4m22s
2024-12-21 16:13:29 +00:00
82216daaf8 migrationt to websocket
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m15s
2024-12-22 00:57:07 +09:00
2d31dcfd0b websocket
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m24s
2024-12-21 15:40:38 +09:00
1c93392d65 saved
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m15s
2024-12-20 22:06:01 +09:00
9e9cbf3547 mexc-websocket added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-20 20:48:02 +09:00
1b7cd7960e candlebar added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m10s
2024-12-20 17:18:40 +09:00
b4d9bcee51 update timeline
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m42s
2024-12-20 00:37:42 +09:00
ce4a6358cf change dockerfile
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-19 15:29:10 +00:00
0ec1eb7d04 base
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m14s
2024-12-20 00:25:12 +09:00
c94e4d8bf4 test
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-20 00:22:49 +09:00
0db0405ea1 viewer and parser added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m44s
2024-12-20 00:00:21 +09:00
f8fe2c089f package path modified 2024-12-19 22:52:05 +09:00
18 changed files with 6249 additions and 51 deletions

View File

@ -0,0 +1,25 @@
on:
# Trigger analysis when pushing to your main branches, and when creating a pull request.
push:
branches:
- main
- master
- develop
- 'releases/**'
run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
name: SonarQube Scan
jobs:
sonarqube:
name: SonarQube Trigger
runs-on: ubuntu-latest
steps:
- name: Checking out
uses: actions/checkout@v4
with:
# Disabling shallow clone is recommended for improving relevancy of reporting
fetch-depth: 0
- name: SonarQube Scan
uses: kitabisa/sonarqube-action@v1.2.0
with:
host: https://sonar.ailaplacelab.com
login: sqa_a3988da69583c0ae606dae085f14e6ec901675f7

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM python:3.12.3
WORKDIR /code
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./ /code/
CMD ["fastapi", "run", "app.py", "--port", "38080"]

239
app.py
View File

@ -1,67 +1,206 @@
from mexc_sdk import Spot from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pymongo import MongoClient
from datetime import datetime
import uvicorn
from urllib.parse import quote_plus
from dotenv import load_dotenv from dotenv import load_dotenv
import logging
import os import os
import yaml
# Set up logging with open("coin.yaml", "r") as file:
logging.basicConfig( data = yaml.safe_load(file)
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" interest_coins = data["interest"]
)
logger = logging.getLogger(__name__)
# Load the environment variables from the .env file
load_dotenv() load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
data_points = []
class TradeBot(object): # Setup templates directory
def __init__(self, api_key, api_secret): templates = Jinja2Templates(directory="templates")
self.client = Spot(api_key=api_key, api_secret=api_secret)
self.symbol = "BTCUSDT"
def change_symbol(self, new_symbol): MONGO_URI = f'mongodb://{quote_plus(os.getenv("DB_USER"))}:{quote_plus(os.getenv("DB_PWD"))}@{os.getenv("DB_HOST")}:{os.getenv("DB_PORT")}'
self.symbol = new_symbol client = MongoClient(MONGO_URI)
db = client[os.getenv("DB_NAME")] # Replace with your database name
# collection = db["XRP"]
def time(self):
# Format is in dict
return f"Server Time: {self.client.time()['serverTime']}"
def get_market_price(self, symbol=None): @app.get("/", response_class=HTMLResponse)
"""Get current market price for a symbol""" async def read_data(request: Request):
try: return templates.TemplateResponse(
symbol = symbol or self.symbol request=request, name="index.html", context={"coins": interest_coins}
ticker = self.client.ticker_price(symbol) )
price = float(ticker["price"])
logger.info(f"Current {symbol} price: {price}")
return price
except Exception as e:
logger.error(f"Error getting market price: {str(e)}")
return None
def get_account_balance(self):
"""Get account balance for all assets"""
try:
account_info = self.client.account_info()
balances = account_info["balances"]
logger.info("Account balances:") @app.get("/coin/{coin}")
for balance in balances: async def view_coin_graph(request: Request, coin: str):
if float(balance["free"]) > 0 or float(balance["locked"]) > 0: if coin in interest_coins:
logger.info( return templates.TemplateResponse(
f"{balance['asset']}: Free={balance['free']}, Locked={balance['locked']}" request=request, name="coin.html", context={"coin": coin}
) )
else:
return {"coin": "not found"}
return balances
except Exception as e: @app.get("/coin/{coin}/candle1min")
logger.error(f"Error getting account balance: {str(e)}") async def view_coin_candle_graph(request: Request, coin: str):
return None if coin in interest_coins:
return templates.TemplateResponse(
request=request, name="candle_coin.html", context={"coin": coin}
)
else:
return {"coin": "not found"}
@app.post("/update-data")
async def update_data(data: dict):
data_points.append(data)
# Keep only last 300 points to ensure we have enough data to generate 100 candles
if len(data_points) > 300:
data_points.pop(0)
return {"status": "success"}
@app.get("/data/{coin}")
async def get_data(coin):
collection = db[coin]
# Get last 100 data points, sorted by time
cursor = (
collection.find({}, {"_id": 0, "timestamp": 1, "price": 1})
.sort("timestamp", -1)
.limit(100)
)
data = list(cursor)
# Format the data for Chart.js
times = []
prices = []
for item in reversed(data): # Reverse to show oldest to newest
times.append(str(item["timestamp"]))
prices.append(float(item["price"]))
return {"times": times, "prices": prices}
@app.get("/data/{coin}/get-candle/all")
async def get_candle_data_all(coin):
if coin in interest_coins:
target = coin + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (
3000
) # 60 seconds before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {
"$cond": {
"if": {"$eq": ["$volume", 0]},
"then": 0,
"else": {"$divide": ["$weighted_price", "$volume"]},
}
},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
return candles
else:
return None
@app.get("/data/{coin}/get-candle")
async def get_candle_data(coin):
if coin in interest_coins:
target = coin + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (20) # 60 seconds before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {
"$cond": {
"if": {"$eq": ["$volume", 0]},
"then": 0,
"else": {"$divide": ["$weighted_price", "$volume"]},
}
},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
return candles[0]
else:
return None
if __name__ == "__main__": if __name__ == "__main__":
# Access the variables uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
tb = TradeBot(api_key, api_secret)
print(tb.time())
tb.get_market_price()
tb.get_account_balance()

448
auto_trade.py Normal file
View File

@ -0,0 +1,448 @@
from typing import Dict, List
import logging
import pandas as pd
import numpy as np
from pymongo import MongoClient
import time
from datetime import datetime, timedelta
import pytz
class CryptoTradingStrategy:
def __init__(
self,
sma_short: int = 20,
sma_long: int = 50,
rsi_base_period: int = 14,
rsi_base_overbought: float = 70,
rsi_base_oversold: float = 30,
volume_threshold: float = 1.5,
stop_loss: float = 0.02,
take_profit: float = 0.035,
):
self.sma_short = sma_short
self.sma_long = sma_long
self.rsi_base_period = rsi_base_period
self.rsi_base_overbought = rsi_base_overbought
self.rsi_base_oversold = rsi_base_oversold
self.volume_threshold = volume_threshold
self.stop_loss = stop_loss
self.take_profit = take_profit
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate technical indicators."""
# Calculate SMAs
df["SMA_short"] = df["close"].rolling(window=self.sma_short).mean()
df["SMA_long"] = df["close"].rolling(window=self.sma_long).mean()
# Calculate ATR (Average True Range) for volatility
df["TR"] = (df["high"] - df["low"]).abs() # True Range = High - Low
df["ATR"] = df["TR"].rolling(window=14).mean() # 14-period ATR
# Adaptive RSI period based on volatility
# Adaptive RSI period based on volatility
df["RSI_period"] = self.rsi_base_period * (
1 + (df["ATR"] / df["close"].rolling(window=20).mean())
)
# Replace NaN or infinite values with a default RSI period
df["RSI_period"].fillna(self.rsi_base_period, inplace=True)
df["RSI_period"].replace([np.inf, -np.inf], self.rsi_base_period, inplace=True)
# Convert to integer
df["RSI_period"] = df["RSI_period"].astype(int)
# Calculate adaptive RSI
df["RSI"] = 0.0
for period in df["RSI_period"].unique():
mask = df["RSI_period"] == period
delta = df.loc[mask, "close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df.loc[mask, "RSI"] = 100 - (100 / (1 + rs))
# Adaptive RSI thresholds based on Bollinger Band width
df["BB_width"] = (df["close"].rolling(window=20).std() * 2) / df[
"close"
].rolling(window=20).mean()
df["RSI_overbought"] = self.rsi_base_overbought + (df["BB_width"] * 20)
df["RSI_oversold"] = self.rsi_base_oversold - (df["BB_width"] * 20)
# Volume analysis
df["Volume_MA"] = df["volume"].rolling(window=20).mean()
df["Volume_Ratio"] = df["volume"] / df["Volume_MA"]
return df
def generate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
"""Generate trading signals based on indicators."""
df["signal"] = 0 # 0: hold, 1: buy, -1: sell
# Generate buy signals
buy_conditions = (
(df["SMA_short"] > df["SMA_long"]) # Golden cross
& (df["RSI"] < df["RSI_oversold"]) # Adaptive oversold condition
& (df["Volume_Ratio"] > self.volume_threshold) # High volume
)
df.loc[buy_conditions, "signal"] = 1
# Generate sell signals
sell_conditions = (df["SMA_short"] < df["SMA_long"]) | ( # Death cross
df["RSI"] > df["RSI_overbought"]
) # Adaptive overbought condition
df.loc[sell_conditions, "signal"] = -1
return df
def apply_risk_management(self, df: pd.DataFrame, position: Dict) -> Dict:
"""Apply risk management rules to current position."""
current_price = df["close"].iloc[-1]
if position["in_position"]:
# Check stop loss
if current_price <= position["entry_price"] * (1 - self.stop_loss):
position["should_exit"] = True
position["exit_reason"] = "stop_loss"
print("wegood")
# Check take profit
elif current_price >= position["entry_price"] * (1 + self.take_profit):
position["should_exit"] = True
position["exit_reason"] = "take_profit"
return position
def execute_trades(self, df: pd.DataFrame) -> List[Dict]:
"""Execute trading strategy and maintain positions."""
trades = []
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
for i in range(len(df)):
current_data = df.iloc[i]
# Update position status
if position["in_position"]:
position = self.apply_risk_management(
df.iloc[max(0, i - 10) : i + 1], position
)
# Exit position if necessary
if position["in_position"] and (
position["should_exit"] or current_data["signal"] == -1
):
trade = {
"exit_time": current_data.name,
"exit_price": current_data["close"],
"exit_reason": position["exit_reason"] or "signal",
"profit_pct": (current_data["close"] - position["entry_price"])
/ position["entry_price"]
* 100,
}
trades.append({**position, **trade})
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
# Enter new position
elif not position["in_position"] and current_data["signal"] == 1:
position = {
"in_position": True,
"entry_price": current_data["close"],
"entry_time": current_data.name,
"should_exit": False,
"exit_reason": None,
}
return trades
def run_strategy(self, df: pd.DataFrame) -> tuple:
"""Run the complete trading strategy."""
# Prepare data
df = self.calculate_indicators(df)
df = self.generate_signals(df)
# Execute trades
trades = self.execute_trades(df)
# Calculate performance metrics
total_trades = len(trades)
winning_trades = len([t for t in trades if t["profit_pct"] > 0])
total_return = sum(t["profit_pct"] for t in trades)
metrics = {
"total_trades": total_trades,
"winning_trades": winning_trades,
"win_rate": winning_trades / total_trades if total_trades > 0 else 0,
"total_return": total_return,
"average_return": total_return / total_trades if total_trades > 0 else 0,
}
return trades, metrics
class LiveCryptoTradingStrategy(CryptoTradingStrategy):
def __init__(
self,
mongodb_uri: str,
db_name: str,
initial_balance: float = 10000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.client = MongoClient(mongodb_uri)
self.db = self.client[db_name]
self.position_collection = self.db["positions"]
self.trade_collection = self.db["trades"]
self.data_collection = self.db["market_data"]
self.asset_collection = self.db["assets"]
# Initialize assets if not exists
if not self.asset_collection.find_one({"asset_type": "USD"}):
self.asset_collection.insert_one(
{
"asset_type": "USD",
"amount": initial_balance,
"last_updated": datetime.now(pytz.UTC),
}
)
def get_balance(self, asset_type: str = "USD") -> float:
"""Get current balance of specified asset"""
asset_doc = self.asset_collection.find_one({"asset_type": asset_type})
return asset_doc["amount"] if asset_doc else 0
def update_balance(self, asset_type: str, amount: float):
"""Update balance of specified asset"""
self.asset_collection.update_one(
{"asset_type": asset_type},
{"$set": {"amount": amount, "last_updated": datetime.now(pytz.UTC)}},
upsert=True,
)
def get_latest_data(self, symbol: str, lookback_minutes: int = 200) -> pd.DataFrame:
"""Fetch latest data from MongoDB"""
end_time = datetime.now(pytz.UTC)
start_time = end_time - timedelta(minutes=lookback_minutes)
cursor = self.data_collection.find(
{"symbol": symbol, "timestamp": {"$gte": start_time, "$lte": end_time}}
).sort("timestamp", 1)
data = list(cursor)
if not data:
return None
df = pd.DataFrame(data)
df.set_index("timestamp", inplace=True)
return df
def execute_trade(self, symbol: str, side: str, amount: float, price: float):
"""Execute trade and update balances"""
base_asset = symbol[:-4] # BTCUSDT -> BTC
quote_asset = symbol[-4:] # BTCUSDT -> USDT
if side == "buy":
# Check if enough quote asset (USDT) available
quote_balance = self.get_balance(quote_asset)
cost = amount * price
if quote_balance >= cost:
# Update quote asset balance (decrease)
self.update_balance(quote_asset, quote_balance - cost)
# Update base asset balance (increase)
base_balance = self.get_balance(base_asset)
self.update_balance(base_asset, base_balance + amount)
return True
return False
elif side == "sell":
# Check if enough base asset available
base_balance = self.get_balance(base_asset)
if base_balance >= amount:
# Update base asset balance (decrease)
self.update_balance(base_asset, base_balance - amount)
# Update quote asset balance (increase)
quote_balance = self.get_balance(quote_asset)
self.update_balance(quote_asset, quote_balance + (amount * price))
return True
return False
def update_position(self, symbol: str, position: Dict):
"""Update position in MongoDB"""
self.position_collection.update_one(
{"symbol": symbol},
{
"$set": {
"in_position": position["in_position"],
"entry_price": position["entry_price"],
"entry_time": position["entry_time"],
"position_type": position.get("position_type", "long"),
"last_updated": datetime.now(pytz.UTC),
}
},
upsert=True,
)
def log_trade(self, symbol: str, trade: Dict):
"""Log completed trade to MongoDB"""
trade_doc = {
"symbol": symbol,
"entry_time": trade["entry_time"],
"exit_time": trade["exit_time"],
"entry_price": trade["entry_price"],
"exit_price": trade["exit_price"],
"position_type": trade.get("position_type", "long"),
"profit_pct": trade["profit_pct"],
"exit_reason": trade["exit_reason"],
}
self.trade_collection.insert_one(trade_doc)
def run_live_strategy(
self, symbol: str, trade_amount: float = 0.1, interval_seconds: int = 60
):
"""Run strategy in live mode"""
self.logger.info(f"Starting live trading for {symbol}")
# Get or initialize position
position_doc = self.position_collection.find_one({"symbol": symbol})
position = {
"in_position": position_doc["in_position"] if position_doc else False,
"entry_price": position_doc.get("entry_price", 0),
"entry_time": position_doc.get("entry_time"),
"should_exit": False,
"exit_reason": None,
"position_type": position_doc.get("position_type", "long"),
}
while True:
try:
# Get latest data
df = self.get_latest_data(symbol)
if df is None or len(df) < self.sma_long:
self.logger.warning("Insufficient data, waiting...")
time.sleep(interval_seconds)
continue
# Calculate indicators and signals
df = self.calculate_indicators(df)
df = self.generate_signals(df)
current_data = df.iloc[-1]
# Update position status
if position["in_position"]:
position = self.apply_risk_management(df.iloc[-10:], position)
# Handle position exit
if position["in_position"] and (
position["should_exit"] or current_data["signal"] == -1
):
if self.execute_trade(
symbol, "sell", trade_amount, current_data["close"]
):
# Log trade and update position as before
trade = {
"exit_time": current_data.name,
"exit_price": current_data["close"],
"exit_reason": position["exit_reason"] or "signal",
"profit_pct": (
current_data["close"] - position["entry_price"]
)
/ position["entry_price"]
* 100,
"position_type": position["position_type"],
"amount": trade_amount,
"usd_value": trade_amount * current_data["close"],
}
self.log_trade(symbol, {**position, **trade})
# Update position
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
self.update_position(symbol, position)
# Log balances
self.logger.info(
f"Exited position. New balances: "
f"USD: {self.get_balance('USDT')}, "
f"{symbol[:-4]}: {self.get_balance(symbol[:-4])}"
)
# Handle position entry
elif not position["in_position"] and current_data["signal"] == 1:
if self.execute_trade(
symbol, "buy", trade_amount, current_data["close"]
):
position = {
"in_position": True,
"entry_price": current_data["close"],
"entry_time": current_data.name,
"should_exit": False,
"exit_reason": None,
"position_type": "long",
"amount": trade_amount,
}
self.update_position(symbol, position)
# Log balances
self.logger.info(
f"Entered position. New balances: "
f"USD: {self.get_balance('USDT')}, "
f"{symbol[:-4]}: {self.get_balance(symbol[:-4])}"
)
except Exception as e:
self.logger.error(f"Error in live trading loop: {str(e)}")
time.sleep(interval_seconds)
def run_live_trading(mongodb_uri: str, symbols: List[str]):
"""Run live trading for multiple symbols"""
for symbol in symbols:
strategy = LiveCryptoTradingStrategy(
mongodb_uri=mongodb_uri,
db_name="crypto_trading",
sma_short=20,
sma_long=100,
rsi_base_period=21,
rsi_base_overbought=70,
rsi_base_oversold=30,
volume_threshold=1.0,
stop_loss=0.01,
take_profit=0.020,
)
# Run in separate thread for each symbol
import threading
thread = threading.Thread(
target=strategy.run_live_strategy, args=(symbol,), daemon=True
)
thread.start()
if __name__ == "__main__":
mongodb_uri = "mongodb://localhost:27017/"
symbols = ["BTCUSDT", "ETHUSDT"] # Add your symbols here
run_live_trading(mongodb_uri, symbols)

9
coin.yaml Normal file
View File

@ -0,0 +1,9 @@
interest :
- BTC
- XRP
- DOGE
- ETH
- SOL
base:
- USDT

126
mexc-socket.py Normal file
View File

@ -0,0 +1,126 @@
from pymexc import spot, futures
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import pytz
import yaml
from datetime import datetime
import os
import logging
from urllib.parse import quote_plus
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
timezone = pytz.timezone("Asia/Seoul")
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
class MongoDBConn(object):
def __init__(self):
"""Initialize MongoDB connection parameters"""
self.username = quote_plus(os.getenv("DB_USER"))
self.password = quote_plus(os.getenv("DB_PWD"))
self.host = os.getenv("DB_HOST")
self.port = os.getenv("DB_PORT")
self.db_name = os.getenv("DB_NAME")
self.client = None
self.db = None
def connect(self):
"""Establish connection to MongoDB"""
try:
# Create connection URI
uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}"
# Connect to MongoDB
self.client = MongoClient(uri)
# Test the connection
self.client.admin.command("ping")
# Get database reference
self.db = self.client[self.db_name]
logger.info("Successfully connected to MongoDB")
return True
except ConnectionFailure as e:
logger.error(f"Could not connect to MongoDB: {e}")
return False
except OperationFailure as e:
logger.error(f"Authentication failed: {e}")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def close(self):
"""Close the MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
def insert_coin_price(self, collection_name, document):
"""Insert a single document into a collection"""
try:
collection = self.db[collection_name]
result = collection.insert_one(document)
logger.info(f"Document inserted with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Error inserting document: {e}")
return None
mdb = MongoDBConn()
mdb.connect()
def load_coin():
data = None
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
base = data["base"][0]
return interest_coins, base
def handle_message(message):
# insert_coin_price(self, collection_name, document):
global mdb, timezone
col = message["s"] + "_kline"
time = message["t"]
msg = message["d"]["k"]
msg["stime"] = time
msg["c"] = float(msg["c"])
msg["h"] = float(msg["h"])
msg["l"] = float(msg["l"])
msg["o"] = float(msg["o"])
msg["v"] = float(msg["v"])
msg["T"] = int(msg["T"])
msg["a"] = float(msg["a"])
msg["t"] = int(msg["t"])
msg["time"] = datetime.now(timezone).strftime("%Y/%m/%d, %H:%M:%S")
mdb.insert_coin_price(col.lower(), msg)
# make http request to api
if __name__ == "__main__":
coins, base = load_coin()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
ws_spot_client = spot.WebSocket(api_key=api_key, api_secret=api_secret)
# all messages will be handled by function `handle_message`
for coin in coins:
target = coin + base
ws_spot_client.kline_stream(handle_message, target, "Min1")
while True:
...

60
pymexc/__init__.py Normal file
View File

@ -0,0 +1,60 @@
"""
### Usage
```python
from pymexc import spot, futures
api_key = "YOUR API KEY"
api_secret = "YOUR API SECRET KEY"
def handle_message(message):
# handle websocket message
print(message)
# SPOT V3
# initialize HTTP client
spot_client = spot.HTTP(api_key = api_key, api_secret = api_secret)
# initialize WebSocket client
ws_spot_client = spot.WebSocket(api_key = api_key, api_secret = api_secret)
# make http request to api
print(spot_client.exchange_info())
# create websocket connection to public channel (spot@public.deals.v3.api@BTCUSDT)
# all messages will be handled by function `handle_message`
ws_spot_client.deals_stream(handle_message, "BTCUSDT")
# FUTURES V1
# initialize HTTP client
futures_client = futures.HTTP(api_key = api_key, api_secret = api_secret)
# initialize WebSocket client
ws_futures_client = futures.WebSocket(api_key = api_key, api_secret = api_secret)
# make http request to api
print(futures_client.index_price("MX_USDT"))
# create websocket connection to public channel (sub.tickers)
# all messages will be handled by function `handle_message`
ws_futures_client.tickers_stream(handle_message)
# loop forever for save websocket connection
while True:
...
"""
try:
from . import futures
from . import spot
except ImportError:
import futures
import spot
__all__ = [
"futures",
"spot"
]

164
pymexc/base.py Normal file
View File

@ -0,0 +1,164 @@
from abc import ABC, abstractclassmethod
from typing import Union, Literal
import hmac
import hashlib
import requests
from urllib.parse import urlencode
import logging
import time
logger = logging.getLogger(__name__)
class MexcAPIError(Exception):
pass
class MexcSDK(ABC):
"""
Initializes a new instance of the class with the given `api_key` and `api_secret` parameters.
:param api_key: A string representing the API key.
:param api_secret: A string representing the API secret.
:param base_url: A string representing the base URL of the API.
"""
def __init__(self, api_key: str, api_secret: str, base_url: str, proxies: dict = None):
self.api_key = api_key
self.api_secret = api_secret
self.recvWindow = 5000
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
})
if proxies:
self.session.proxies.update(proxies)
@abstractclassmethod
def sign(self, **kwargs) -> str:
...
@abstractclassmethod
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, *args, **kwargs) -> dict:
...
class _SpotHTTP(MexcSDK):
def __init__(self, api_key: str = None, api_secret: str = None, proxies: dict = None):
super().__init__(api_key, api_secret, "https://api.mexc.com", proxies = proxies)
self.session.headers.update({
"X-MEXC-APIKEY": self.api_key
})
def sign(self, query_string: str) -> str:
"""
Generates a signature for an API request using HMAC SHA256 encryption.
Args:
**kwargs: Arbitrary keyword arguments representing request parameters.
Returns:
A hexadecimal string representing the signature of the request.
"""
# Generate signature
signature = hmac.new(self.api_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return signature
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, auth: bool = True, *args, **kwargs) -> dict:
if not router.startswith("/"):
router = f"/{router}"
# clear None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
if kwargs.get('params'):
kwargs['params'] = {k: v for k, v in kwargs['params'].items() if v is not None}
else:
kwargs['params'] = {}
timestamp = str(int(time.time() * 1000))
kwargs['params']['timestamp'] = timestamp
kwargs['params']['recvWindow'] = self.recvWindow
kwargs['params'] = {k: v for k, v in sorted(kwargs['params'].items())}
params = urlencode(kwargs.pop('params'), doseq=True).replace('+', '%20')
if self.api_key and self.api_secret and auth:
params += "&signature=" + self.sign(params)
response = self.session.request(method, f"{self.base_url}{router}", params = params, *args, **kwargs)
if not response.ok:
raise MexcAPIError(f'(code={response.json()["code"]}): {response.json()["msg"]}')
return response.json()
class _FuturesHTTP(MexcSDK):
def __init__(self, api_key: str = None, api_secret: str = None, proxies: dict = None):
super().__init__(api_key, api_secret, "https://contract.mexc.com", proxies = proxies)
self.session.headers.update({
"Content-Type": "application/json",
"ApiKey": self.api_key
})
def sign(self, timestamp: str, **kwargs) -> str:
"""
Generates a signature for an API request using HMAC SHA256 encryption.
:param timestamp: A string representing the timestamp of the request.
:type timestamp: str
:param kwargs: Arbitrary keyword arguments representing request parameters.
:type kwargs: dict
:return: A hexadecimal string representing the signature of the request.
:rtype: str
"""
# Generate signature
query_string = "&".join([f"{k}={v}" for k, v in sorted(kwargs.items())])
query_string = self.api_key + timestamp + query_string
signature = hmac.new(self.api_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return signature
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, *args, **kwargs) -> dict:
"""
Makes a request to the specified HTTP method and router using the provided arguments.
:param method: A string that represents the HTTP method(GET, POST, PUT, or DELETE) to be used.
:type method: str
:param router: A string that represents the API endpoint to be called.
:type router: str
:param *args: Variable length argument list.
:type *args: list
:param **kwargs: Arbitrary keyword arguments.
:type **kwargs: dict
:return: A dictionary containing the JSON response of the request.
"""
if not router.startswith("/"):
router = f"/{router}"
# clear None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
for variant in ('params', 'json'):
if kwargs.get(variant):
kwargs[variant] = {k: v for k, v in kwargs[variant].items() if v is not None}
if self.api_key and self.api_secret:
# add signature
timestamp = str(int(time.time() * 1000))
kwargs['headers'] = {
"Request-Time": timestamp,
"Signature": self.sign(timestamp, **kwargs[variant])
}
response = self.session.request(method, f"{self.base_url}{router}", *args, **kwargs)
return response.json()

509
pymexc/base_websocket.py Normal file
View File

@ -0,0 +1,509 @@
import websocket
import threading
import logging
import time
import json
import hmac
logger = logging.getLogger(__name__)
SPOT = "wss://wbs.mexc.com/ws"
FUTURES = "wss://contract.mexc.com/ws"
class _WebSocketManager:
def __init__(self, callback_function, ws_name, api_key=None, api_secret=None,
ping_interval=20, ping_timeout=10, retries=10,
restart_on_error=True, trace_logging=False,
http_proxy_host = None,
http_proxy_port = None,
http_no_proxy = None,
http_proxy_auth = None,
http_proxy_timeout = None):
self.proxy_settings = dict(
http_proxy_host = http_proxy_host,
http_proxy_port = http_proxy_port,
http_no_proxy = http_no_proxy,
http_proxy_auth = http_proxy_auth,
http_proxy_timeout = http_proxy_timeout
)
# Set API keys.
self.api_key = api_key
self.api_secret = api_secret
self.callback = callback_function
self.ws_name = ws_name
if api_key:
self.ws_name += " (Auth)"
# Setup the callback directory following the format:
# {
# "topic_name": function
# }
self.callback_directory = {}
# Record the subscriptions made so that we can resubscribe if the WSS
# connection is broken.
self.subscriptions = []
# Set ping settings.
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.retries = retries
# Other optional data handling settings.
self.handle_error = restart_on_error
# Enable websocket-client's trace logging for extra debug information
# on the websocket connection, including the raw sent & recv messages
websocket.enableTrace(trace_logging)
# Set initial state, initialize dictionary and connect.
self._reset()
self.attempting_connection = False
def _on_open(self):
"""
Log WS open.
"""
logger.debug(f"WebSocket {self.ws_name} opened.")
def _on_message(self, message):
"""
Parse incoming messages.
"""
self.callback(json.loads(message))
def is_connected(self):
try:
if self.ws.sock or not self.ws.sock.is_connected:
return True
else:
return False
except AttributeError:
return False
@staticmethod
def _are_connections_connected(active_connections):
for connection in active_connections:
if not connection.is_connected():
return False
return True
def _ping_loop(self, ping_payload: str, ping_interval: int, ping_timeout: int):
"""
Ping the websocket.
"""
time.sleep(ping_timeout)
while True:
logger.info(f"WebSocket {self.ws_name} send ping...")
self.ws.send(ping_payload)
time.sleep(ping_interval)
def _connect(self, url):
"""
Open websocket in a thread.
"""
def resubscribe_to_topics():
if not self.subscriptions:
# There are no subscriptions to resubscribe to, probably
# because this is a brand new WSS initialisation so there was
# no previous WSS connection.
return
for subscription_message in self.subscriptions:
self.ws.send(subscription_message)
self.attempting_connection = True
# Set endpoint.
self.endpoint = url
# Attempt to connect for X seconds.
retries = self.retries
if retries == 0:
infinitely_reconnect = True
else:
infinitely_reconnect = False
while (infinitely_reconnect or retries > 0) and not self.is_connected():
logger.info(f"WebSocket {self.ws_name} attempting connection...")
self.ws = websocket.WebSocketApp(
url=url,
on_message=lambda ws, msg: self._on_message(msg),
on_close=self._on_close(),
on_open=self._on_open(),
on_error=lambda ws, err: self._on_error(err)
)
# Setup the thread running WebSocketApp.
self.wst = threading.Thread(target=lambda: self.ws.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
**self.proxy_settings
))
# Configure as daemon; start.
self.wst.daemon = True
self.wst.start()
# setup ping loop
self.wsl = threading.Thread(target=lambda: self._ping_loop(
ping_payload='{"method":"ping"}',
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout
))
self.wsl.daemon = True
self.wsl.start()
retries -= 1
time.sleep(1)
# If connection was not successful, raise error.
if not infinitely_reconnect and retries <= 0:
self.exit()
raise websocket.WebSocketTimeoutException(
f"WebSocket {self.ws_name} connection failed. Too many "
f"connection attempts. pybit will "
f"no longer try to reconnect.")
logger.info(f"WebSocket {self.ws_name} connected")
# If given an api_key, authenticate.
if self.api_key and self.api_secret:
self._auth()
resubscribe_to_topics()
self.attempting_connection = False
def _auth(self):
# Generate signature
# make auth if futures. spot has a different auth system.
isspot = self.endpoint.startswith(SPOT)
if isspot:
return
timestamp = str(int(time.time() * 1000))
_val = self.api_key + timestamp
signature = str(hmac.new(
bytes(self.api_secret, "utf-8"),
bytes(_val, "utf-8"), digestmod="sha256"
).hexdigest())
# Authenticate with API.
self.ws.send(
json.dumps({
"subscribe": False,
"method": "login",
"param": {
"apiKey": self.api_key,
"reqTime": timestamp,
"signature": signature
}
})
)
def _on_error(self, error):
"""
Exit on errors and raise exception, or attempt reconnect.
"""
if type(error).__name__ not in ["WebSocketConnectionClosedException",
"ConnectionResetError",
"WebSocketTimeoutException"]:
# Raises errors not related to websocket disconnection.
self.exit()
raise error
if not self.exited:
logger.error(f"WebSocket {self.ws_name} encountered error: {error}.")
self.exit()
# Reconnect.
if self.handle_error and not self.attempting_connection:
self._reset()
self._connect(self.endpoint)
def _on_close(self):
"""
Log WS close.
"""
logger.debug(f"WebSocket {self.ws_name} closed.")
def _reset(self):
"""
Set state booleans and initialize dictionary.
"""
self.exited = False
self.auth = False
self.data = {}
def exit(self):
"""
Closes the websocket connection.
"""
self.ws.close()
while self.ws.sock:
continue
self.exited = True
class _FuturesWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["personal.order", "personal.asset",
"personal.position", "personal.risk.limit",
"personal.adl.level", "personal.position.mode"]
self.symbol_wildcard = "*"
self.symbol_separator = "|"
self.last_subsctiption = None
def subscribe(self, topic, callback, params: dict = {}):
subscription_args = {
"method": topic,
"param": params
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic.replace("sub.", ""), callback)
self.last_subsctiption = topic.replace("sub.", "")
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_auth_message(self, message):
# If we get successful futures auth, notify user
if message.get("data") == "success":
logger.debug(f"Authorization for {self.ws_name} successful.")
self.auth = True
# If we get unsuccessful auth, notify user.
elif message.get("data") != "success": # !!!!
logger.debug(f"Authorization for {self.ws_name} failed. Please "
f"check your API keys and restart.")
def _process_subscription_message(self, message):
#try:
sub = message["channel"]
#except KeyError:
#sub = message["c"] # SPOT PUBLIC & PRIVATE format
# If we get successful futures subscription, notify user
if (
message.get("channel", "").startswith("rs.") or
message.get("channel", "").startswith("push.")
) and message.get("channel", "") != "rs.error":
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["data"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["channel"].replace("push.", "").replace("rs.sub.", "")
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_auth_message():
if message.get("channel", "") == "rs.login":
return True
else:
return False
def is_subscription_message():
if str(message).startswith("{'channel': 'push."):
return True
else:
return False
def is_pong_message():
if message.get("channel", "") in ("pong", "clientId"):
return True
else:
return False
if is_auth_message():
self._process_auth_message(message)
elif is_subscription_message():
self._process_subscription_message(message)
elif is_pong_message():
pass
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _FuturesWebSocket(_FuturesWebSocketManager):
def __init__(self, **kwargs):
self.ws_name = "FuturesV1"
self.endpoint = "wss://contract.mexc.com/ws"
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _FuturesWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)
class _SpotWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["account", "deals", "orders"]
self.last_subsctiption = None
def subscribe(self, topic: str, callback, params_list: list):
subscription_args = {
"method": "SUBSCRIPTION",
"params": [
'@'.join([f"spot@{topic}.v3.api"] + list(map(str, params.values())))
for params in params_list
]
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic, callback)
self.last_subsctiption = topic
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_subscription_message(self, message):
sub = message["msg"].replace("spot@", "").split(".v3.api")[0]
# If we get successful futures subscription, notify user
if message.get("id") == 0 and message.get("code") == 0:
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["msg"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["c"].replace("spot@", "").split(".v3.api")[0]
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_subscription_message():
if (message.get("id") == 0 and
message.get("code") == 0 and
message.get("msg")):
return True
else:
return False
if is_subscription_message():
self._process_subscription_message(message)
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _SpotWebSocket(_SpotWebSocketManager):
def __init__(self, endpoint: str = "wss://wbs.mexc.com/ws", **kwargs):
self.ws_name = "SpotV3"
self.endpoint = endpoint
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _SpotWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)

1694
pymexc/futures.py Normal file

File diff suppressed because it is too large Load Diff

1967
pymexc/spot.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,45 @@
annotated-types==0.7.0
anyio==4.7.0
attrs==24.3.0 attrs==24.3.0
cattrs==24.1.2 cattrs==24.1.2
certifi==2024.12.14
click==8.1.7
dnspython==2.7.0
email_validator==2.2.0
exceptiongroup==1.2.2 exceptiongroup==1.2.2
fastapi==0.115.6
fastapi-cli==0.0.7
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
idna==3.10
importlib_resources==6.4.5 importlib_resources==6.4.5
Jinja2==3.1.4
jsii==1.106.0 jsii==1.106.0
mexc-sdk @ file:///mnt/c/Users/dongho/Desktop/mexc/mexc-api-sdk/dist/python/mexc_sdk-1.0.0-py3-none-any.whl markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
publication==0.0.3 publication==0.0.3
pydantic==2.10.4
pydantic_core==2.27.2
Pygments==2.18.0
pymongo==4.10.1
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.0.1 python-dotenv==1.0.1
python-multipart==0.0.20
pytz==2024.2
PyYAML==6.0.2
rich==13.9.4
rich-toolkit==0.12.0
shellingham==1.5.4
six==1.17.0 six==1.17.0
sniffio==1.3.1
starlette==0.41.3
typeguard==4.4.1 typeguard==4.4.1
typer==0.15.1
typing_extensions==4.12.2 typing_extensions==4.12.2
uvicorn==0.34.0
uvloop==0.21.0
watchfiles==1.0.3
websockets==14.1

153
server.py Normal file
View File

@ -0,0 +1,153 @@
from mexc_sdk import Spot
from dotenv import load_dotenv
import yaml
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import pytz
import logging
import os
from datetime import datetime
import time
from urllib.parse import quote_plus
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Load the environment variables from the .env file
load_dotenv()
class MongoDBConn(object):
def __init__(self):
"""Initialize MongoDB connection parameters"""
self.username = quote_plus(os.getenv("DB_USER"))
self.password = quote_plus(os.getenv("DB_PWD"))
self.host = os.getenv("DB_HOST")
self.port = os.getenv("DB_PORT")
self.db_name = os.getenv("DB_NAME")
self.client = None
self.db = None
def connect(self):
"""Establish connection to MongoDB"""
try:
# Create connection URI
uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}"
# Connect to MongoDB
self.client = MongoClient(uri)
# Test the connection
self.client.admin.command("ping")
# Get database reference
self.db = self.client[self.db_name]
logger.info("Successfully connected to MongoDB")
return True
except ConnectionFailure as e:
logger.error(f"Could not connect to MongoDB: {e}")
return False
except OperationFailure as e:
logger.error(f"Authentication failed: {e}")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def close(self):
"""Close the MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
def insert_coin_price(self, collection_name, document):
"""Insert a single document into a collection"""
try:
collection = self.db[collection_name]
result = collection.insert_one(document)
logger.info(f"Document inserted with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Error inserting document: {e}")
return None
class TradeBot(object):
def __init__(self, api_key, api_secret, mdb: MongoDBConn):
self.client = Spot(api_key=api_key, api_secret=api_secret)
self._load_coin()
self.timezone = pytz.timezone("Asia/Seoul")
self.mdb = mdb
def ttime(self):
# Format is in dict
return f"Server Time: {self.client.time()['serverTime']}"
def _get_current_time(self):
return datetime.now(self.timezone)
def _get_market_price(self, symbol=None):
"""Get current market price for a symbol"""
try:
symbol = symbol or self.symbol
ticker = self.client.ticker_price(symbol)
price = float(ticker["price"])
logger.info(f"Current {symbol} price: {price}")
return price
except Exception as e:
logger.error(f"Error getting market price: {str(e)}")
return None
def get_all_interset_market_price(self):
for coin in self.interest_coins:
data = {}
data["price"] = self._get_market_price(symbol=coin + self.base)
data["timestamp"] = self._get_current_time().strftime("%Y/%m/%d, %H:%M:%S")
self.mdb.insert_coin_price(collection_name=coin, document=data)
def get_account_balance(self):
"""Get account balance for all assets"""
try:
account_info = self.client.account_info()
balances = account_info["balances"]
logger.info("Account balances:")
for balance in balances:
if float(balance["free"]) > 0 or float(balance["locked"]) > 0:
logger.info(
f"{balance['asset']}: Free={balance['free']}, Locked={balance['locked']}"
)
return balances
except Exception as e:
logger.error(f"Error getting account balance: {str(e)}")
return None
def _load_coin(self):
data = None
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
self.interest_coins = data["interest"]
self.base = data["base"][0]
def main(tb: TradeBot):
while True:
tb.get_all_interset_market_price()
time.sleep(5)
if __name__ == "__main__":
mdb = MongoDBConn()
mdb.connect()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
tb = TradeBot(api_key, api_secret, mdb)
main(tb)

View File

@ -0,0 +1,423 @@
/*!
* @license
* chartjs-chart-financial
* http://chartjs.org/
* Version: 0.2.0
*
* Copyright 2024 Chart.js Contributors
* Released under the MIT license
* https://github.com/chartjs/chartjs-chart-financial/blob/master/LICENSE.md
*/
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(require('chart.js'), require('chart.js/helpers')) :
typeof define === 'function' && define.amd ? define(['chart.js', 'chart.js/helpers'], factory) :
(global = typeof globalThis !== 'undefined' ? globalThis : global || self, factory(global.Chart, global.Chart.helpers));
})(this, (function (chart_js, helpers) { 'use strict';
/**
* This class is based off controller.bar.js from the upstream Chart.js library
*/
class FinancialController extends chart_js.BarController {
static overrides = {
label: '',
parsing: false,
hover: {
mode: 'label'
},
animations: {
numbers: {
type: 'number',
properties: ['x', 'y', 'base', 'width', 'open', 'high', 'low', 'close']
}
},
scales: {
x: {
type: 'timeseries',
offset: true,
ticks: {
major: {
enabled: true,
},
source: 'data',
maxRotation: 0,
autoSkip: true,
autoSkipPadding: 75,
sampleSize: 100
},
},
y: {
type: 'linear'
}
},
plugins: {
tooltip: {
intersect: false,
mode: 'index',
callbacks: {
label(ctx) {
const point = ctx.parsed;
if (!helpers.isNullOrUndef(point.y)) {
return chart_js.defaults.plugins.tooltip.callbacks.label(ctx);
}
const {o, h, l, c} = point;
return `O: ${o} H: ${h} L: ${l} C: ${c}`;
}
}
}
}
};
getLabelAndValue(index) {
const me = this;
const parsed = me.getParsed(index);
const axis = me._cachedMeta.iScale.axis;
const {o, h, l, c} = parsed;
const value = `O: ${o} H: ${h} L: ${l} C: ${c}`;
return {
label: `${me._cachedMeta.iScale.getLabelForValue(parsed[axis])}`,
value
};
}
getUserBounds(scale) {
const {min, max, minDefined, maxDefined} = scale.getUserBounds();
return {
min: minDefined ? min : Number.NEGATIVE_INFINITY,
max: maxDefined ? max : Number.POSITIVE_INFINITY
};
}
/**
* Implement this ourselves since it doesn't handle high and low values
* https://github.com/chartjs/Chart.js/issues/7328
* @protected
*/
getMinMax(scale) {
const meta = this._cachedMeta;
const _parsed = meta._parsed;
const axis = meta.iScale.axis;
const otherScale = this._getOtherScale(scale);
const {min: otherMin, max: otherMax} = this.getUserBounds(otherScale);
if (_parsed.length < 2) {
return {min: 0, max: 1};
}
if (scale === meta.iScale) {
return {min: _parsed[0][axis], max: _parsed[_parsed.length - 1][axis]};
}
const newParsedData = _parsed.filter(({x}) => x >= otherMin && x < otherMax);
let min = Number.POSITIVE_INFINITY;
let max = Number.NEGATIVE_INFINITY;
for (let i = 0; i < newParsedData.length; i++) {
const data = newParsedData[i];
min = Math.min(min, data.l);
max = Math.max(max, data.h);
}
return {min, max};
}
/**
* @protected
*/
calculateElementProperties(index, ruler, reset, options) {
const me = this;
const vscale = me._cachedMeta.vScale;
const base = vscale.getBasePixel();
const ipixels = me._calculateBarIndexPixels(index, ruler, options);
const data = me.chart.data.datasets[me.index].data[index];
const open = vscale.getPixelForValue(data.o);
const high = vscale.getPixelForValue(data.h);
const low = vscale.getPixelForValue(data.l);
const close = vscale.getPixelForValue(data.c);
return {
base: reset ? base : low,
x: ipixels.center,
y: (low + high) / 2,
width: ipixels.size,
open,
high,
low,
close
};
}
draw() {
const me = this;
const chart = me.chart;
const rects = me._cachedMeta.data;
helpers.clipArea(chart.ctx, chart.chartArea);
for (let i = 0; i < rects.length; ++i) {
rects[i].draw(me._ctx);
}
helpers.unclipArea(chart.ctx);
}
}
/**
* Helper function to get the bounds of the bar regardless of the orientation
* @param {Rectangle} bar the bar
* @param {boolean} [useFinalPosition]
* @return {object} bounds of the bar
* @private
*/
function getBarBounds(bar, useFinalPosition) {
const {x, y, base, width, height} = bar.getProps(['x', 'low', 'high', 'width', 'height'], useFinalPosition);
let left, right, top, bottom, half;
if (bar.horizontal) {
half = height / 2;
left = Math.min(x, base);
right = Math.max(x, base);
top = y - half;
bottom = y + half;
} else {
half = width / 2;
left = x - half;
right = x + half;
top = Math.min(y, base); // use min because 0 pixel at top of screen
bottom = Math.max(y, base);
}
return {left, top, right, bottom};
}
function inRange(bar, x, y, useFinalPosition) {
const skipX = x === null;
const skipY = y === null;
const bounds = !bar || (skipX && skipY) ? false : getBarBounds(bar, useFinalPosition);
return bounds
&& (skipX || x >= bounds.left && x <= bounds.right)
&& (skipY || y >= bounds.top && y <= bounds.bottom);
}
class FinancialElement extends chart_js.BarElement {
static defaults = {
backgroundColors: {
up: 'rgba(75, 192, 192, 0.5)',
down: 'rgba(255, 99, 132, 0.5)',
unchanged: 'rgba(201, 203, 207, 0.5)',
},
borderColors: {
up: 'rgb(75, 192, 192)',
down: 'rgb(255, 99, 132)',
unchanged: 'rgb(201, 203, 207)',
}
};
height() {
return this.base - this.y;
}
inRange(mouseX, mouseY, useFinalPosition) {
return inRange(this, mouseX, mouseY, useFinalPosition);
}
inXRange(mouseX, useFinalPosition) {
return inRange(this, mouseX, null, useFinalPosition);
}
inYRange(mouseY, useFinalPosition) {
return inRange(this, null, mouseY, useFinalPosition);
}
getRange(axis) {
return axis === 'x' ? this.width / 2 : this.height / 2;
}
getCenterPoint(useFinalPosition) {
const {x, low, high} = this.getProps(['x', 'low', 'high'], useFinalPosition);
return {
x,
y: (high + low) / 2
};
}
tooltipPosition(useFinalPosition) {
const {x, open, close} = this.getProps(['x', 'open', 'close'], useFinalPosition);
return {
x,
y: (open + close) / 2
};
}
}
class CandlestickElement extends FinancialElement {
static id = 'candlestick';
static defaults = {
...FinancialElement.defaults,
borderWidth: 1,
};
draw(ctx) {
const me = this;
const {x, open, high, low, close} = me;
let borderColors = me.options.borderColors;
if (typeof borderColors === 'string') {
borderColors = {
up: borderColors,
down: borderColors,
unchanged: borderColors
};
}
let borderColor;
if (close < open) {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.up : undefined, chart_js.defaults.elements.candlestick.borderColors.up);
ctx.fillStyle = helpers.valueOrDefault(me.options.backgroundColors ? me.options.backgroundColors.up : undefined, chart_js.defaults.elements.candlestick.backgroundColors.up);
} else if (close > open) {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.down : undefined, chart_js.defaults.elements.candlestick.borderColors.down);
ctx.fillStyle = helpers.valueOrDefault(me.options.backgroundColors ? me.options.backgroundColors.down : undefined, chart_js.defaults.elements.candlestick.backgroundColors.down);
} else {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.unchanged : undefined, chart_js.defaults.elements.candlestick.borderColors.unchanged);
ctx.fillStyle = helpers.valueOrDefault(me.backgroundColors ? me.backgroundColors.unchanged : undefined, chart_js.defaults.elements.candlestick.backgroundColors.unchanged);
}
ctx.lineWidth = helpers.valueOrDefault(me.options.borderWidth, chart_js.defaults.elements.candlestick.borderWidth);
ctx.strokeStyle = borderColor;
ctx.beginPath();
ctx.moveTo(x, high);
ctx.lineTo(x, Math.min(open, close));
ctx.moveTo(x, low);
ctx.lineTo(x, Math.max(open, close));
ctx.stroke();
ctx.fillRect(x - me.width / 2, close, me.width, open - close);
ctx.strokeRect(x - me.width / 2, close, me.width, open - close);
ctx.closePath();
}
}
class CandlestickController extends FinancialController {
static id = 'candlestick';
static defaults = {
...FinancialController.defaults,
dataElementType: CandlestickElement.id
};
static defaultRoutes = chart_js.BarController.defaultRoutes;
updateElements(elements, start, count, mode) {
const reset = mode === 'reset';
const ruler = this._getRuler();
const {sharedOptions, includeOptions} = this._getSharedOptions(start, mode);
for (let i = start; i < start + count; i++) {
const options = sharedOptions || this.resolveDataElementOptions(i, mode);
const baseProperties = this.calculateElementProperties(i, ruler, reset, options);
if (includeOptions) {
baseProperties.options = options;
}
this.updateElement(elements[i], i, baseProperties, mode);
}
}
}
const defaults = chart_js.Chart.defaults;
class OhlcElement extends FinancialElement {
static id = 'ohlc';
static defaults = {
...FinancialElement.defaults,
lineWidth: 2,
armLength: null,
armLengthRatio: 0.8
};
draw(ctx) {
const me = this;
const {x, open, high, low, close} = me;
const armLengthRatio = helpers.valueOrDefault(me.armLengthRatio, defaults.elements.ohlc.armLengthRatio);
let armLength = helpers.valueOrDefault(me.armLength, defaults.elements.ohlc.armLength);
if (armLength === null) {
// The width of an ohlc is affected by barPercentage and categoryPercentage
// This behavior is caused by extending controller.financial, which extends controller.bar
// barPercentage and categoryPercentage are now set to 1.0 (see controller.ohlc)
// and armLengthRatio is multipled by 0.5,
// so that when armLengthRatio=1.0, the arms from neighbour ohcl touch,
// and when armLengthRatio=0.0, ohcl are just vertical lines.
armLength = me.width * armLengthRatio * 0.5;
}
if (close < open) {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.up : undefined, defaults.elements.ohlc.borderColors.up);
} else if (close > open) {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.down : undefined, defaults.elements.ohlc.borderColors.down);
} else {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.unchanged : undefined, defaults.elements.ohlc.borderColors.unchanged);
}
ctx.lineWidth = helpers.valueOrDefault(me.lineWidth, defaults.elements.ohlc.lineWidth);
ctx.beginPath();
ctx.moveTo(x, high);
ctx.lineTo(x, low);
ctx.moveTo(x - armLength, open);
ctx.lineTo(x, open);
ctx.moveTo(x + armLength, close);
ctx.lineTo(x, close);
ctx.stroke();
}
}
class OhlcController extends FinancialController {
static id = 'ohlc';
static defaults = {
...FinancialController.defaults,
dataElementType: OhlcElement.id,
datasets: {
barPercentage: 1.0,
categoryPercentage: 1.0
}
};
updateElements(elements, start, count, mode) {
const reset = mode === 'reset';
const ruler = this._getRuler();
const {sharedOptions, includeOptions} = this._getSharedOptions(start, mode);
for (let i = start; i < start + count; i++) {
const options = sharedOptions || this.resolveDataElementOptions(i, mode);
const baseProperties = this.calculateElementProperties(i, ruler, reset, options);
if (includeOptions) {
baseProperties.options = options;
}
this.updateElement(elements[i], i, baseProperties, mode);
}
}
}
chart_js.Chart.register(CandlestickController, OhlcController, CandlestickElement, OhlcElement);
}));

207
templates/candle_coin.html Normal file
View File

@ -0,0 +1,207 @@
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Realtime Candlestick Chart</title>
<script src="https://cdn.jsdelivr.net/npm/luxon@3.4.4"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-luxon@1.3.1"></script>
<script src="{{ url_for('static', path='js/chartjs-chart-financial.js') }}"></script>
<link href="https://cdn.jsdelivr.net/npm/flowbite@2.5.2/dist/flowbite.min.css" rel="stylesheet" />
<script src="https://cdn.jsdelivr.net/npm/flowbite@2.5.2/dist/flowbite.min.js"></script>
</head>
<body>
<div class="flex w-full mt-8">
<canvas id="chart"></canvas>
</div>
<script>
var barCount = 100;
var barData = [];
var lineData = [];
var ctx = document.getElementById('chart').getContext('2d');
ctx.canvas.width = 1000;
ctx.canvas.height = 250;
var chart = new Chart(ctx, {
type: 'candlestick',
data: {
datasets: [{
label: '{{coin}}',
data: barData
}, {
label: 'Close price',
type: 'line',
data: lineData,
hidden: true
}]
},
options: {
responsive: true,
scales: {
x: {
type: 'time',
time: {
unit: 'minute',
displayFormats: {
minute: 'HH:mm'
}
},
adapters: {
date: {
zone: 'UTC'
}
}
},
y: {
type: 'linear',
position: 'right'
}
},
animation: false
}
});
async function fetchHistoricalData() {
try {
const response = await fetch('/data/{{coin}}/get-candle/all');
const data = await response.json();
return data;
} catch (error) {
console.error('Error fetching historical data:', error);
return null;
}
}
async function fetchLatestData() {
try {
const response = await fetch('/data/{{coin}}/get-candle');
const data = await response.json();
return data;
} catch (error) {
console.error('Error fetching latest data:', error);
return null;
}
}
async function initializeChart() {
// Fetch historical data first
const historicalData = await fetchHistoricalData();
if (!historicalData) return;
// Process historical data
historicalData.forEach(data => {
const candleData = {
x: data.time,
o: data.open,
h: data.high,
l: data.low,
c: data.close,
v: data.volume,
vwap: data.vwap,
trades: data.trades
};
barData.push(candleData);
lineData.push({
x: data.time,
y: data.close
});
});
chart.update();
// Start real-time updates
setInterval(updateChart, 5000);
}
async function updateChart() {
const newData = await fetchLatestData();
if (!newData) return;
if (barData.length >= barCount) {
barData.shift();
lineData.shift();
}
const newBarData = {
x: newData.time,
o: newData.open,
h: newData.high,
l: newData.low,
c: newData.close,
v: newData.volume,
vwap: newData.vwap,
trades: newData.trades
};
// Check if the last candle's timestamp matches the new data
const lastCandle = barData[barData.length - 1];
if (lastCandle && lastCandle.x === newData.time) {
// Update the existing candle
Object.assign(lastCandle, newBarData);
lineData[lineData.length - 1].y = newData.close;
} else {
// Add new candle
barData.push(newBarData);
lineData.push({
x: newData.time,
y: newData.close
});
}
chart.update();
}
// Initialize the chart when the page loads
initializeChart();
var update = function () {
var dataset = chart.config.data.datasets[0];
// candlestick vs ohlc
var type = document.getElementById('type').value;
chart.config.type = type;
// linear vs log
var scaleType = document.getElementById('scale-type').value;
chart.config.options.scales.y.type = scaleType;
// color
var colorScheme = document.getElementById('color-scheme').value;
if (colorScheme === 'neon') {
chart.config.data.datasets[0].backgroundColors = {
up: '#01ff01',
down: '#fe0000',
unchanged: '#999',
};
} else {
delete chart.config.data.datasets[0].backgroundColors;
}
// border
var border = document.getElementById('border').value;
if (border === 'false') {
dataset.borderColors = 'rgba(0, 0, 0, 0)';
} else {
delete dataset.borderColors;
}
// mixed charts
var mixed = document.getElementById('mixed').value;
if (mixed === 'true') {
chart.config.data.datasets[1].hidden = false;
} else {
chart.config.data.datasets[1].hidden = true;
}
chart.update();
};
[...document.getElementsByTagName('select')].forEach(element => element.addEventListener('change', update));
</script>
</body>
</html>

123
templates/coin.html Normal file
View File

@ -0,0 +1,123 @@
<!DOCTYPE html>
<html>
<head>
<title>XRP Price Live Chart</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/3.7.0/chart.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
background-color: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
h1 {
color: #333;
text-align: center;
margin-bottom: 20px;
}
.chart-container {
position: relative;
height: 60vh;
width: 100%;
}
</style>
</head>
<body>
<div class="container">
<h1>{{coin}} Price Live Chart</h1>
<div class="chart-container">
<canvas id="priceChart"></canvas>
</div>
</div>
<script>
let chart;
async function fetchData() {
try {
const response = await fetch('/data/{{coin}}');
const data = await response.json();
return data;
} catch (error) {
console.error('Error fetching data:', error);
return null;
}
}
async function updateChart() {
const data = await fetchData();
if (!data) return;
if (chart) {
chart.data.labels = data.times;
chart.data.datasets[0].data = data.prices;
chart.update();
} else {
const ctx = document.getElementById('priceChart').getContext('2d');
chart = new Chart(ctx, {
type: 'line',
data: {
labels: data.times,
datasets: [{
label: 'XRP Price (USD)',
data: data.prices,
borderColor: 'rgb(75, 192, 192)',
borderWidth: 2,
fill: true,
backgroundColor: 'rgba(75, 192, 192, 0.1)',
tension: 0.1,
pointRadius: 1,
pointHoverRadius: 5
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
plugins: {
legend: {
position: 'top',
},
title: {
display: true,
text: 'XRP Price History'
}
},
scales: {
y: {
beginAtZero: false,
ticks: {
callback: function(value) {
return '$' + value.toFixed(4);
}
}
},
x: {
ticks: {
maxTicksLimit: 10,
maxRotation: 45,
minRotation: 45
}
}
},
interaction: {
intersect: false,
mode: 'index'
}
}
});
}
}
// Update every 5 seconds
updateChart();
setInterval(updateChart, 5000);
</script>
</body>
</html>

47
templates/index.html Normal file
View File

@ -0,0 +1,47 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Billionaire yes</title>
<style>
/* Add your CSS here */
body {
margin: 0;
padding: 0;
font-family: Arial, sans-serif;
}
</style>
</head>
<body>
<!-- Add your content here -->
<header>
<h1>To Become Billionaire</h1>
</header>
<main>
<section>
<h2>Interested Coin Price Live Chart</h2>
<ul>
{% for coin in coins %}
<li><a href="/coin/{{coin}}">{{coin}}</a></li>
{% endfor %}
</ul>
<h2>Interested Coin Price Candle Live Chart</h2>
<ul>
{% for coin in coins %}
<li><a href="/coin/{{coin}}/candle1min">{{coin}}</a></li>
{% endfor %}
</ul>
</section>
</main>
<footer>
<p>&copy; 2024 Dongho Kim</p>
</footer>
<script>
// Add your JavaScript here
</script>
</body>
</html>

64
test.py Normal file
View File

@ -0,0 +1,64 @@
from urllib.parse import quote_plus
from dotenv import load_dotenv
from pymongo import MongoClient
from mexc_sdk import Spot
import yaml
import os
from datetime import timedelta
load_dotenv()
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
MONGO_URI = f'mongodb://{quote_plus(os.getenv("DB_USER"))}:{quote_plus(os.getenv("DB_PWD"))}@{os.getenv("DB_HOST")}:{os.getenv("DB_PORT")}'
client = MongoClient(MONGO_URI)
db = client[os.getenv("DB_NAME")] # Replace with your database name
if __name__ == "__main__":
target = interest_coins[0] + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (
60 * 100
) # 100 minutes before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {"$divide": ["$weighted_price", "$volume"]},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
print(candles)