Compare commits

...

12 Commits

Author SHA1 Message Date
7a2af2a349 sonar check
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 32s
2025-01-08 21:15:12 +09:00
dae646cd2e auto-trade skeleton added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 4m10s
2024-12-24 00:26:18 +09:00
82e1b74aee fixed title on coin live and added ul on index.html
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 4m22s
2024-12-21 16:13:29 +00:00
82216daaf8 migrationt to websocket
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m15s
2024-12-22 00:57:07 +09:00
2d31dcfd0b websocket
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m24s
2024-12-21 15:40:38 +09:00
1c93392d65 saved
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m15s
2024-12-20 22:06:01 +09:00
9e9cbf3547 mexc-websocket added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-20 20:48:02 +09:00
1b7cd7960e candlebar added
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m10s
2024-12-20 17:18:40 +09:00
b4d9bcee51 update timeline
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m42s
2024-12-20 00:37:42 +09:00
ce4a6358cf change dockerfile
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-19 15:29:10 +00:00
0ec1eb7d04 base
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m14s
2024-12-20 00:25:12 +09:00
c94e4d8bf4 test
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
2024-12-20 00:22:49 +09:00
16 changed files with 5849 additions and 7 deletions

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM python:3.12.3
WORKDIR /code
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./ /code/
CMD ["fastapi", "run", "app.py", "--port", "38080"]

136
app.py
View File

@ -5,11 +5,13 @@ from fastapi.staticfiles import StaticFiles
from pymongo import MongoClient
from datetime import datetime
import uvicorn
from urllib.parse import quote_plus
from dotenv import load_dotenv
import os
import yaml
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
@ -17,7 +19,8 @@ interest_coins = data["interest"]
load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
data_points = []
# Setup templates directory
templates = Jinja2Templates(directory="templates")
@ -45,6 +48,25 @@ async def view_coin_graph(request: Request, coin: str):
return {"coin": "not found"}
@app.get("/coin/{coin}/candle1min")
async def view_coin_candle_graph(request: Request, coin: str):
if coin in interest_coins:
return templates.TemplateResponse(
request=request, name="candle_coin.html", context={"coin": coin}
)
else:
return {"coin": "not found"}
@app.post("/update-data")
async def update_data(data: dict):
data_points.append(data)
# Keep only last 300 points to ensure we have enough data to generate 100 candles
if len(data_points) > 300:
data_points.pop(0)
return {"status": "success"}
@app.get("/data/{coin}")
async def get_data(coin):
collection = db[coin]
@ -68,5 +90,117 @@ async def get_data(coin):
return {"times": times, "prices": prices}
@app.get("/data/{coin}/get-candle/all")
async def get_candle_data_all(coin):
if coin in interest_coins:
target = coin + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (
3000
) # 60 seconds before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {
"$cond": {
"if": {"$eq": ["$volume", 0]},
"then": 0,
"else": {"$divide": ["$weighted_price", "$volume"]},
}
},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
return candles
else:
return None
@app.get("/data/{coin}/get-candle")
async def get_candle_data(coin):
if coin in interest_coins:
target = coin + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (20) # 60 seconds before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {
"$cond": {
"if": {"$eq": ["$volume", 0]},
"then": 0,
"else": {"$divide": ["$weighted_price", "$volume"]},
}
},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
return candles[0]
else:
return None
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)

448
auto_trade.py Normal file
View File

@ -0,0 +1,448 @@
from typing import Dict, List
import logging
import pandas as pd
import numpy as np
from pymongo import MongoClient
import time
from datetime import datetime, timedelta
import pytz
class CryptoTradingStrategy:
def __init__(
self,
sma_short: int = 20,
sma_long: int = 50,
rsi_base_period: int = 14,
rsi_base_overbought: float = 70,
rsi_base_oversold: float = 30,
volume_threshold: float = 1.5,
stop_loss: float = 0.02,
take_profit: float = 0.035,
):
self.sma_short = sma_short
self.sma_long = sma_long
self.rsi_base_period = rsi_base_period
self.rsi_base_overbought = rsi_base_overbought
self.rsi_base_oversold = rsi_base_oversold
self.volume_threshold = volume_threshold
self.stop_loss = stop_loss
self.take_profit = take_profit
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate technical indicators."""
# Calculate SMAs
df["SMA_short"] = df["close"].rolling(window=self.sma_short).mean()
df["SMA_long"] = df["close"].rolling(window=self.sma_long).mean()
# Calculate ATR (Average True Range) for volatility
df["TR"] = (df["high"] - df["low"]).abs() # True Range = High - Low
df["ATR"] = df["TR"].rolling(window=14).mean() # 14-period ATR
# Adaptive RSI period based on volatility
# Adaptive RSI period based on volatility
df["RSI_period"] = self.rsi_base_period * (
1 + (df["ATR"] / df["close"].rolling(window=20).mean())
)
# Replace NaN or infinite values with a default RSI period
df["RSI_period"].fillna(self.rsi_base_period, inplace=True)
df["RSI_period"].replace([np.inf, -np.inf], self.rsi_base_period, inplace=True)
# Convert to integer
df["RSI_period"] = df["RSI_period"].astype(int)
# Calculate adaptive RSI
df["RSI"] = 0.0
for period in df["RSI_period"].unique():
mask = df["RSI_period"] == period
delta = df.loc[mask, "close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df.loc[mask, "RSI"] = 100 - (100 / (1 + rs))
# Adaptive RSI thresholds based on Bollinger Band width
df["BB_width"] = (df["close"].rolling(window=20).std() * 2) / df[
"close"
].rolling(window=20).mean()
df["RSI_overbought"] = self.rsi_base_overbought + (df["BB_width"] * 20)
df["RSI_oversold"] = self.rsi_base_oversold - (df["BB_width"] * 20)
# Volume analysis
df["Volume_MA"] = df["volume"].rolling(window=20).mean()
df["Volume_Ratio"] = df["volume"] / df["Volume_MA"]
return df
def generate_signals(self, df: pd.DataFrame) -> pd.DataFrame:
"""Generate trading signals based on indicators."""
df["signal"] = 0 # 0: hold, 1: buy, -1: sell
# Generate buy signals
buy_conditions = (
(df["SMA_short"] > df["SMA_long"]) # Golden cross
& (df["RSI"] < df["RSI_oversold"]) # Adaptive oversold condition
& (df["Volume_Ratio"] > self.volume_threshold) # High volume
)
df.loc[buy_conditions, "signal"] = 1
# Generate sell signals
sell_conditions = (df["SMA_short"] < df["SMA_long"]) | ( # Death cross
df["RSI"] > df["RSI_overbought"]
) # Adaptive overbought condition
df.loc[sell_conditions, "signal"] = -1
return df
def apply_risk_management(self, df: pd.DataFrame, position: Dict) -> Dict:
"""Apply risk management rules to current position."""
current_price = df["close"].iloc[-1]
if position["in_position"]:
# Check stop loss
if current_price <= position["entry_price"] * (1 - self.stop_loss):
position["should_exit"] = True
position["exit_reason"] = "stop_loss"
print("wegood")
# Check take profit
elif current_price >= position["entry_price"] * (1 + self.take_profit):
position["should_exit"] = True
position["exit_reason"] = "take_profit"
return position
def execute_trades(self, df: pd.DataFrame) -> List[Dict]:
"""Execute trading strategy and maintain positions."""
trades = []
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
for i in range(len(df)):
current_data = df.iloc[i]
# Update position status
if position["in_position"]:
position = self.apply_risk_management(
df.iloc[max(0, i - 10) : i + 1], position
)
# Exit position if necessary
if position["in_position"] and (
position["should_exit"] or current_data["signal"] == -1
):
trade = {
"exit_time": current_data.name,
"exit_price": current_data["close"],
"exit_reason": position["exit_reason"] or "signal",
"profit_pct": (current_data["close"] - position["entry_price"])
/ position["entry_price"]
* 100,
}
trades.append({**position, **trade})
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
# Enter new position
elif not position["in_position"] and current_data["signal"] == 1:
position = {
"in_position": True,
"entry_price": current_data["close"],
"entry_time": current_data.name,
"should_exit": False,
"exit_reason": None,
}
return trades
def run_strategy(self, df: pd.DataFrame) -> tuple:
"""Run the complete trading strategy."""
# Prepare data
df = self.calculate_indicators(df)
df = self.generate_signals(df)
# Execute trades
trades = self.execute_trades(df)
# Calculate performance metrics
total_trades = len(trades)
winning_trades = len([t for t in trades if t["profit_pct"] > 0])
total_return = sum(t["profit_pct"] for t in trades)
metrics = {
"total_trades": total_trades,
"winning_trades": winning_trades,
"win_rate": winning_trades / total_trades if total_trades > 0 else 0,
"total_return": total_return,
"average_return": total_return / total_trades if total_trades > 0 else 0,
}
return trades, metrics
class LiveCryptoTradingStrategy(CryptoTradingStrategy):
def __init__(
self,
mongodb_uri: str,
db_name: str,
initial_balance: float = 10000,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.client = MongoClient(mongodb_uri)
self.db = self.client[db_name]
self.position_collection = self.db["positions"]
self.trade_collection = self.db["trades"]
self.data_collection = self.db["market_data"]
self.asset_collection = self.db["assets"]
# Initialize assets if not exists
if not self.asset_collection.find_one({"asset_type": "USD"}):
self.asset_collection.insert_one(
{
"asset_type": "USD",
"amount": initial_balance,
"last_updated": datetime.now(pytz.UTC),
}
)
def get_balance(self, asset_type: str = "USD") -> float:
"""Get current balance of specified asset"""
asset_doc = self.asset_collection.find_one({"asset_type": asset_type})
return asset_doc["amount"] if asset_doc else 0
def update_balance(self, asset_type: str, amount: float):
"""Update balance of specified asset"""
self.asset_collection.update_one(
{"asset_type": asset_type},
{"$set": {"amount": amount, "last_updated": datetime.now(pytz.UTC)}},
upsert=True,
)
def get_latest_data(self, symbol: str, lookback_minutes: int = 200) -> pd.DataFrame:
"""Fetch latest data from MongoDB"""
end_time = datetime.now(pytz.UTC)
start_time = end_time - timedelta(minutes=lookback_minutes)
cursor = self.data_collection.find(
{"symbol": symbol, "timestamp": {"$gte": start_time, "$lte": end_time}}
).sort("timestamp", 1)
data = list(cursor)
if not data:
return None
df = pd.DataFrame(data)
df.set_index("timestamp", inplace=True)
return df
def execute_trade(self, symbol: str, side: str, amount: float, price: float):
"""Execute trade and update balances"""
base_asset = symbol[:-4] # BTCUSDT -> BTC
quote_asset = symbol[-4:] # BTCUSDT -> USDT
if side == "buy":
# Check if enough quote asset (USDT) available
quote_balance = self.get_balance(quote_asset)
cost = amount * price
if quote_balance >= cost:
# Update quote asset balance (decrease)
self.update_balance(quote_asset, quote_balance - cost)
# Update base asset balance (increase)
base_balance = self.get_balance(base_asset)
self.update_balance(base_asset, base_balance + amount)
return True
return False
elif side == "sell":
# Check if enough base asset available
base_balance = self.get_balance(base_asset)
if base_balance >= amount:
# Update base asset balance (decrease)
self.update_balance(base_asset, base_balance - amount)
# Update quote asset balance (increase)
quote_balance = self.get_balance(quote_asset)
self.update_balance(quote_asset, quote_balance + (amount * price))
return True
return False
def update_position(self, symbol: str, position: Dict):
"""Update position in MongoDB"""
self.position_collection.update_one(
{"symbol": symbol},
{
"$set": {
"in_position": position["in_position"],
"entry_price": position["entry_price"],
"entry_time": position["entry_time"],
"position_type": position.get("position_type", "long"),
"last_updated": datetime.now(pytz.UTC),
}
},
upsert=True,
)
def log_trade(self, symbol: str, trade: Dict):
"""Log completed trade to MongoDB"""
trade_doc = {
"symbol": symbol,
"entry_time": trade["entry_time"],
"exit_time": trade["exit_time"],
"entry_price": trade["entry_price"],
"exit_price": trade["exit_price"],
"position_type": trade.get("position_type", "long"),
"profit_pct": trade["profit_pct"],
"exit_reason": trade["exit_reason"],
}
self.trade_collection.insert_one(trade_doc)
def run_live_strategy(
self, symbol: str, trade_amount: float = 0.1, interval_seconds: int = 60
):
"""Run strategy in live mode"""
self.logger.info(f"Starting live trading for {symbol}")
# Get or initialize position
position_doc = self.position_collection.find_one({"symbol": symbol})
position = {
"in_position": position_doc["in_position"] if position_doc else False,
"entry_price": position_doc.get("entry_price", 0),
"entry_time": position_doc.get("entry_time"),
"should_exit": False,
"exit_reason": None,
"position_type": position_doc.get("position_type", "long"),
}
while True:
try:
# Get latest data
df = self.get_latest_data(symbol)
if df is None or len(df) < self.sma_long:
self.logger.warning("Insufficient data, waiting...")
time.sleep(interval_seconds)
continue
# Calculate indicators and signals
df = self.calculate_indicators(df)
df = self.generate_signals(df)
current_data = df.iloc[-1]
# Update position status
if position["in_position"]:
position = self.apply_risk_management(df.iloc[-10:], position)
# Handle position exit
if position["in_position"] and (
position["should_exit"] or current_data["signal"] == -1
):
if self.execute_trade(
symbol, "sell", trade_amount, current_data["close"]
):
# Log trade and update position as before
trade = {
"exit_time": current_data.name,
"exit_price": current_data["close"],
"exit_reason": position["exit_reason"] or "signal",
"profit_pct": (
current_data["close"] - position["entry_price"]
)
/ position["entry_price"]
* 100,
"position_type": position["position_type"],
"amount": trade_amount,
"usd_value": trade_amount * current_data["close"],
}
self.log_trade(symbol, {**position, **trade})
# Update position
position = {
"in_position": False,
"entry_price": 0,
"entry_time": None,
"should_exit": False,
"exit_reason": None,
}
self.update_position(symbol, position)
# Log balances
self.logger.info(
f"Exited position. New balances: "
f"USD: {self.get_balance('USDT')}, "
f"{symbol[:-4]}: {self.get_balance(symbol[:-4])}"
)
# Handle position entry
elif not position["in_position"] and current_data["signal"] == 1:
if self.execute_trade(
symbol, "buy", trade_amount, current_data["close"]
):
position = {
"in_position": True,
"entry_price": current_data["close"],
"entry_time": current_data.name,
"should_exit": False,
"exit_reason": None,
"position_type": "long",
"amount": trade_amount,
}
self.update_position(symbol, position)
# Log balances
self.logger.info(
f"Entered position. New balances: "
f"USD: {self.get_balance('USDT')}, "
f"{symbol[:-4]}: {self.get_balance(symbol[:-4])}"
)
except Exception as e:
self.logger.error(f"Error in live trading loop: {str(e)}")
time.sleep(interval_seconds)
def run_live_trading(mongodb_uri: str, symbols: List[str]):
"""Run live trading for multiple symbols"""
for symbol in symbols:
strategy = LiveCryptoTradingStrategy(
mongodb_uri=mongodb_uri,
db_name="crypto_trading",
sma_short=20,
sma_long=100,
rsi_base_period=21,
rsi_base_overbought=70,
rsi_base_oversold=30,
volume_threshold=1.0,
stop_loss=0.01,
take_profit=0.020,
)
# Run in separate thread for each symbol
import threading
thread = threading.Thread(
target=strategy.run_live_strategy, args=(symbol,), daemon=True
)
thread.start()
if __name__ == "__main__":
mongodb_uri = "mongodb://localhost:27017/"
symbols = ["BTCUSDT", "ETHUSDT"] # Add your symbols here
run_live_trading(mongodb_uri, symbols)

126
mexc-socket.py Normal file
View File

@ -0,0 +1,126 @@
from pymexc import spot, futures
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import pytz
import yaml
from datetime import datetime
import os
import logging
from urllib.parse import quote_plus
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
timezone = pytz.timezone("Asia/Seoul")
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
class MongoDBConn(object):
def __init__(self):
"""Initialize MongoDB connection parameters"""
self.username = quote_plus(os.getenv("DB_USER"))
self.password = quote_plus(os.getenv("DB_PWD"))
self.host = os.getenv("DB_HOST")
self.port = os.getenv("DB_PORT")
self.db_name = os.getenv("DB_NAME")
self.client = None
self.db = None
def connect(self):
"""Establish connection to MongoDB"""
try:
# Create connection URI
uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}"
# Connect to MongoDB
self.client = MongoClient(uri)
# Test the connection
self.client.admin.command("ping")
# Get database reference
self.db = self.client[self.db_name]
logger.info("Successfully connected to MongoDB")
return True
except ConnectionFailure as e:
logger.error(f"Could not connect to MongoDB: {e}")
return False
except OperationFailure as e:
logger.error(f"Authentication failed: {e}")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def close(self):
"""Close the MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
def insert_coin_price(self, collection_name, document):
"""Insert a single document into a collection"""
try:
collection = self.db[collection_name]
result = collection.insert_one(document)
logger.info(f"Document inserted with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Error inserting document: {e}")
return None
mdb = MongoDBConn()
mdb.connect()
def load_coin():
data = None
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
base = data["base"][0]
return interest_coins, base
def handle_message(message):
# insert_coin_price(self, collection_name, document):
global mdb, timezone
col = message["s"] + "_kline"
time = message["t"]
msg = message["d"]["k"]
msg["stime"] = time
msg["c"] = float(msg["c"])
msg["h"] = float(msg["h"])
msg["l"] = float(msg["l"])
msg["o"] = float(msg["o"])
msg["v"] = float(msg["v"])
msg["T"] = int(msg["T"])
msg["a"] = float(msg["a"])
msg["t"] = int(msg["t"])
msg["time"] = datetime.now(timezone).strftime("%Y/%m/%d, %H:%M:%S")
mdb.insert_coin_price(col.lower(), msg)
# make http request to api
if __name__ == "__main__":
coins, base = load_coin()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
ws_spot_client = spot.WebSocket(api_key=api_key, api_secret=api_secret)
# all messages will be handled by function `handle_message`
for coin in coins:
target = coin + base
ws_spot_client.kline_stream(handle_message, target, "Min1")
while True:
...

60
pymexc/__init__.py Normal file
View File

@ -0,0 +1,60 @@
"""
### Usage
```python
from pymexc import spot, futures
api_key = "YOUR API KEY"
api_secret = "YOUR API SECRET KEY"
def handle_message(message):
# handle websocket message
print(message)
# SPOT V3
# initialize HTTP client
spot_client = spot.HTTP(api_key = api_key, api_secret = api_secret)
# initialize WebSocket client
ws_spot_client = spot.WebSocket(api_key = api_key, api_secret = api_secret)
# make http request to api
print(spot_client.exchange_info())
# create websocket connection to public channel (spot@public.deals.v3.api@BTCUSDT)
# all messages will be handled by function `handle_message`
ws_spot_client.deals_stream(handle_message, "BTCUSDT")
# FUTURES V1
# initialize HTTP client
futures_client = futures.HTTP(api_key = api_key, api_secret = api_secret)
# initialize WebSocket client
ws_futures_client = futures.WebSocket(api_key = api_key, api_secret = api_secret)
# make http request to api
print(futures_client.index_price("MX_USDT"))
# create websocket connection to public channel (sub.tickers)
# all messages will be handled by function `handle_message`
ws_futures_client.tickers_stream(handle_message)
# loop forever for save websocket connection
while True:
...
"""
try:
from . import futures
from . import spot
except ImportError:
import futures
import spot
__all__ = [
"futures",
"spot"
]

164
pymexc/base.py Normal file
View File

@ -0,0 +1,164 @@
from abc import ABC, abstractclassmethod
from typing import Union, Literal
import hmac
import hashlib
import requests
from urllib.parse import urlencode
import logging
import time
logger = logging.getLogger(__name__)
class MexcAPIError(Exception):
pass
class MexcSDK(ABC):
"""
Initializes a new instance of the class with the given `api_key` and `api_secret` parameters.
:param api_key: A string representing the API key.
:param api_secret: A string representing the API secret.
:param base_url: A string representing the base URL of the API.
"""
def __init__(self, api_key: str, api_secret: str, base_url: str, proxies: dict = None):
self.api_key = api_key
self.api_secret = api_secret
self.recvWindow = 5000
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
})
if proxies:
self.session.proxies.update(proxies)
@abstractclassmethod
def sign(self, **kwargs) -> str:
...
@abstractclassmethod
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, *args, **kwargs) -> dict:
...
class _SpotHTTP(MexcSDK):
def __init__(self, api_key: str = None, api_secret: str = None, proxies: dict = None):
super().__init__(api_key, api_secret, "https://api.mexc.com", proxies = proxies)
self.session.headers.update({
"X-MEXC-APIKEY": self.api_key
})
def sign(self, query_string: str) -> str:
"""
Generates a signature for an API request using HMAC SHA256 encryption.
Args:
**kwargs: Arbitrary keyword arguments representing request parameters.
Returns:
A hexadecimal string representing the signature of the request.
"""
# Generate signature
signature = hmac.new(self.api_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return signature
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, auth: bool = True, *args, **kwargs) -> dict:
if not router.startswith("/"):
router = f"/{router}"
# clear None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
if kwargs.get('params'):
kwargs['params'] = {k: v for k, v in kwargs['params'].items() if v is not None}
else:
kwargs['params'] = {}
timestamp = str(int(time.time() * 1000))
kwargs['params']['timestamp'] = timestamp
kwargs['params']['recvWindow'] = self.recvWindow
kwargs['params'] = {k: v for k, v in sorted(kwargs['params'].items())}
params = urlencode(kwargs.pop('params'), doseq=True).replace('+', '%20')
if self.api_key and self.api_secret and auth:
params += "&signature=" + self.sign(params)
response = self.session.request(method, f"{self.base_url}{router}", params = params, *args, **kwargs)
if not response.ok:
raise MexcAPIError(f'(code={response.json()["code"]}): {response.json()["msg"]}')
return response.json()
class _FuturesHTTP(MexcSDK):
def __init__(self, api_key: str = None, api_secret: str = None, proxies: dict = None):
super().__init__(api_key, api_secret, "https://contract.mexc.com", proxies = proxies)
self.session.headers.update({
"Content-Type": "application/json",
"ApiKey": self.api_key
})
def sign(self, timestamp: str, **kwargs) -> str:
"""
Generates a signature for an API request using HMAC SHA256 encryption.
:param timestamp: A string representing the timestamp of the request.
:type timestamp: str
:param kwargs: Arbitrary keyword arguments representing request parameters.
:type kwargs: dict
:return: A hexadecimal string representing the signature of the request.
:rtype: str
"""
# Generate signature
query_string = "&".join([f"{k}={v}" for k, v in sorted(kwargs.items())])
query_string = self.api_key + timestamp + query_string
signature = hmac.new(self.api_secret.encode('utf-8'), query_string.encode('utf-8'), hashlib.sha256).hexdigest()
return signature
def call(self, method: Union[Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"]], router: str, *args, **kwargs) -> dict:
"""
Makes a request to the specified HTTP method and router using the provided arguments.
:param method: A string that represents the HTTP method(GET, POST, PUT, or DELETE) to be used.
:type method: str
:param router: A string that represents the API endpoint to be called.
:type router: str
:param *args: Variable length argument list.
:type *args: list
:param **kwargs: Arbitrary keyword arguments.
:type **kwargs: dict
:return: A dictionary containing the JSON response of the request.
"""
if not router.startswith("/"):
router = f"/{router}"
# clear None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
for variant in ('params', 'json'):
if kwargs.get(variant):
kwargs[variant] = {k: v for k, v in kwargs[variant].items() if v is not None}
if self.api_key and self.api_secret:
# add signature
timestamp = str(int(time.time() * 1000))
kwargs['headers'] = {
"Request-Time": timestamp,
"Signature": self.sign(timestamp, **kwargs[variant])
}
response = self.session.request(method, f"{self.base_url}{router}", *args, **kwargs)
return response.json()

509
pymexc/base_websocket.py Normal file
View File

@ -0,0 +1,509 @@
import websocket
import threading
import logging
import time
import json
import hmac
logger = logging.getLogger(__name__)
SPOT = "wss://wbs.mexc.com/ws"
FUTURES = "wss://contract.mexc.com/ws"
class _WebSocketManager:
def __init__(self, callback_function, ws_name, api_key=None, api_secret=None,
ping_interval=20, ping_timeout=10, retries=10,
restart_on_error=True, trace_logging=False,
http_proxy_host = None,
http_proxy_port = None,
http_no_proxy = None,
http_proxy_auth = None,
http_proxy_timeout = None):
self.proxy_settings = dict(
http_proxy_host = http_proxy_host,
http_proxy_port = http_proxy_port,
http_no_proxy = http_no_proxy,
http_proxy_auth = http_proxy_auth,
http_proxy_timeout = http_proxy_timeout
)
# Set API keys.
self.api_key = api_key
self.api_secret = api_secret
self.callback = callback_function
self.ws_name = ws_name
if api_key:
self.ws_name += " (Auth)"
# Setup the callback directory following the format:
# {
# "topic_name": function
# }
self.callback_directory = {}
# Record the subscriptions made so that we can resubscribe if the WSS
# connection is broken.
self.subscriptions = []
# Set ping settings.
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.retries = retries
# Other optional data handling settings.
self.handle_error = restart_on_error
# Enable websocket-client's trace logging for extra debug information
# on the websocket connection, including the raw sent & recv messages
websocket.enableTrace(trace_logging)
# Set initial state, initialize dictionary and connect.
self._reset()
self.attempting_connection = False
def _on_open(self):
"""
Log WS open.
"""
logger.debug(f"WebSocket {self.ws_name} opened.")
def _on_message(self, message):
"""
Parse incoming messages.
"""
self.callback(json.loads(message))
def is_connected(self):
try:
if self.ws.sock or not self.ws.sock.is_connected:
return True
else:
return False
except AttributeError:
return False
@staticmethod
def _are_connections_connected(active_connections):
for connection in active_connections:
if not connection.is_connected():
return False
return True
def _ping_loop(self, ping_payload: str, ping_interval: int, ping_timeout: int):
"""
Ping the websocket.
"""
time.sleep(ping_timeout)
while True:
logger.info(f"WebSocket {self.ws_name} send ping...")
self.ws.send(ping_payload)
time.sleep(ping_interval)
def _connect(self, url):
"""
Open websocket in a thread.
"""
def resubscribe_to_topics():
if not self.subscriptions:
# There are no subscriptions to resubscribe to, probably
# because this is a brand new WSS initialisation so there was
# no previous WSS connection.
return
for subscription_message in self.subscriptions:
self.ws.send(subscription_message)
self.attempting_connection = True
# Set endpoint.
self.endpoint = url
# Attempt to connect for X seconds.
retries = self.retries
if retries == 0:
infinitely_reconnect = True
else:
infinitely_reconnect = False
while (infinitely_reconnect or retries > 0) and not self.is_connected():
logger.info(f"WebSocket {self.ws_name} attempting connection...")
self.ws = websocket.WebSocketApp(
url=url,
on_message=lambda ws, msg: self._on_message(msg),
on_close=self._on_close(),
on_open=self._on_open(),
on_error=lambda ws, err: self._on_error(err)
)
# Setup the thread running WebSocketApp.
self.wst = threading.Thread(target=lambda: self.ws.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
**self.proxy_settings
))
# Configure as daemon; start.
self.wst.daemon = True
self.wst.start()
# setup ping loop
self.wsl = threading.Thread(target=lambda: self._ping_loop(
ping_payload='{"method":"ping"}',
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout
))
self.wsl.daemon = True
self.wsl.start()
retries -= 1
time.sleep(1)
# If connection was not successful, raise error.
if not infinitely_reconnect and retries <= 0:
self.exit()
raise websocket.WebSocketTimeoutException(
f"WebSocket {self.ws_name} connection failed. Too many "
f"connection attempts. pybit will "
f"no longer try to reconnect.")
logger.info(f"WebSocket {self.ws_name} connected")
# If given an api_key, authenticate.
if self.api_key and self.api_secret:
self._auth()
resubscribe_to_topics()
self.attempting_connection = False
def _auth(self):
# Generate signature
# make auth if futures. spot has a different auth system.
isspot = self.endpoint.startswith(SPOT)
if isspot:
return
timestamp = str(int(time.time() * 1000))
_val = self.api_key + timestamp
signature = str(hmac.new(
bytes(self.api_secret, "utf-8"),
bytes(_val, "utf-8"), digestmod="sha256"
).hexdigest())
# Authenticate with API.
self.ws.send(
json.dumps({
"subscribe": False,
"method": "login",
"param": {
"apiKey": self.api_key,
"reqTime": timestamp,
"signature": signature
}
})
)
def _on_error(self, error):
"""
Exit on errors and raise exception, or attempt reconnect.
"""
if type(error).__name__ not in ["WebSocketConnectionClosedException",
"ConnectionResetError",
"WebSocketTimeoutException"]:
# Raises errors not related to websocket disconnection.
self.exit()
raise error
if not self.exited:
logger.error(f"WebSocket {self.ws_name} encountered error: {error}.")
self.exit()
# Reconnect.
if self.handle_error and not self.attempting_connection:
self._reset()
self._connect(self.endpoint)
def _on_close(self):
"""
Log WS close.
"""
logger.debug(f"WebSocket {self.ws_name} closed.")
def _reset(self):
"""
Set state booleans and initialize dictionary.
"""
self.exited = False
self.auth = False
self.data = {}
def exit(self):
"""
Closes the websocket connection.
"""
self.ws.close()
while self.ws.sock:
continue
self.exited = True
class _FuturesWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["personal.order", "personal.asset",
"personal.position", "personal.risk.limit",
"personal.adl.level", "personal.position.mode"]
self.symbol_wildcard = "*"
self.symbol_separator = "|"
self.last_subsctiption = None
def subscribe(self, topic, callback, params: dict = {}):
subscription_args = {
"method": topic,
"param": params
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic.replace("sub.", ""), callback)
self.last_subsctiption = topic.replace("sub.", "")
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_auth_message(self, message):
# If we get successful futures auth, notify user
if message.get("data") == "success":
logger.debug(f"Authorization for {self.ws_name} successful.")
self.auth = True
# If we get unsuccessful auth, notify user.
elif message.get("data") != "success": # !!!!
logger.debug(f"Authorization for {self.ws_name} failed. Please "
f"check your API keys and restart.")
def _process_subscription_message(self, message):
#try:
sub = message["channel"]
#except KeyError:
#sub = message["c"] # SPOT PUBLIC & PRIVATE format
# If we get successful futures subscription, notify user
if (
message.get("channel", "").startswith("rs.") or
message.get("channel", "").startswith("push.")
) and message.get("channel", "") != "rs.error":
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["data"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["channel"].replace("push.", "").replace("rs.sub.", "")
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_auth_message():
if message.get("channel", "") == "rs.login":
return True
else:
return False
def is_subscription_message():
if str(message).startswith("{'channel': 'push."):
return True
else:
return False
def is_pong_message():
if message.get("channel", "") in ("pong", "clientId"):
return True
else:
return False
if is_auth_message():
self._process_auth_message(message)
elif is_subscription_message():
self._process_subscription_message(message)
elif is_pong_message():
pass
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _FuturesWebSocket(_FuturesWebSocketManager):
def __init__(self, **kwargs):
self.ws_name = "FuturesV1"
self.endpoint = "wss://contract.mexc.com/ws"
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _FuturesWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)
class _SpotWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["account", "deals", "orders"]
self.last_subsctiption = None
def subscribe(self, topic: str, callback, params_list: list):
subscription_args = {
"method": "SUBSCRIPTION",
"params": [
'@'.join([f"spot@{topic}.v3.api"] + list(map(str, params.values())))
for params in params_list
]
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic, callback)
self.last_subsctiption = topic
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_subscription_message(self, message):
sub = message["msg"].replace("spot@", "").split(".v3.api")[0]
# If we get successful futures subscription, notify user
if message.get("id") == 0 and message.get("code") == 0:
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["msg"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["c"].replace("spot@", "").split(".v3.api")[0]
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_subscription_message():
if (message.get("id") == 0 and
message.get("code") == 0 and
message.get("msg")):
return True
else:
return False
if is_subscription_message():
self._process_subscription_message(message)
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _SpotWebSocket(_SpotWebSocketManager):
def __init__(self, endpoint: str = "wss://wbs.mexc.com/ws", **kwargs):
self.ws_name = "SpotV3"
self.endpoint = endpoint
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _SpotWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)

1694
pymexc/futures.py Normal file

File diff suppressed because it is too large Load Diff

1967
pymexc/spot.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,45 @@
annotated-types==0.7.0
anyio==4.7.0
attrs==24.3.0
cattrs==24.1.2
certifi==2024.12.14
click==8.1.7
dnspython==2.7.0
email_validator==2.2.0
exceptiongroup==1.2.2
fastapi==0.115.6
fastapi-cli==0.0.7
h11==0.14.0
httpcore==1.0.7
httptools==0.6.4
httpx==0.28.1
idna==3.10
importlib_resources==6.4.5
Jinja2==3.1.4
jsii==1.106.0
mexc-sdk @ ./package/mexc_sdk-1.0.0-py3-none-any.whl
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
publication==0.0.3
pydantic==2.10.4
pydantic_core==2.27.2
Pygments==2.18.0
pymongo==4.10.1
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
python-multipart==0.0.20
pytz==2024.2
PyYAML==6.0.2
rich==13.9.4
rich-toolkit==0.12.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
starlette==0.41.3
typeguard==4.4.1
typer==0.15.1
typing_extensions==4.12.2
uvicorn==0.34.0
uvloop==0.21.0
watchfiles==1.0.3
websockets==14.1

View File

@ -109,7 +109,7 @@ class TradeBot(object):
for coin in self.interest_coins:
data = {}
data["price"] = self._get_market_price(symbol=coin + self.base)
data["timestamp"] = self._get_current_time()
data["timestamp"] = self._get_current_time().strftime("%Y/%m/%d, %H:%M:%S")
self.mdb.insert_coin_price(collection_name=coin, document=data)
def get_account_balance(self):

View File

@ -0,0 +1,423 @@
/*!
* @license
* chartjs-chart-financial
* http://chartjs.org/
* Version: 0.2.0
*
* Copyright 2024 Chart.js Contributors
* Released under the MIT license
* https://github.com/chartjs/chartjs-chart-financial/blob/master/LICENSE.md
*/
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(require('chart.js'), require('chart.js/helpers')) :
typeof define === 'function' && define.amd ? define(['chart.js', 'chart.js/helpers'], factory) :
(global = typeof globalThis !== 'undefined' ? globalThis : global || self, factory(global.Chart, global.Chart.helpers));
})(this, (function (chart_js, helpers) { 'use strict';
/**
* This class is based off controller.bar.js from the upstream Chart.js library
*/
class FinancialController extends chart_js.BarController {
static overrides = {
label: '',
parsing: false,
hover: {
mode: 'label'
},
animations: {
numbers: {
type: 'number',
properties: ['x', 'y', 'base', 'width', 'open', 'high', 'low', 'close']
}
},
scales: {
x: {
type: 'timeseries',
offset: true,
ticks: {
major: {
enabled: true,
},
source: 'data',
maxRotation: 0,
autoSkip: true,
autoSkipPadding: 75,
sampleSize: 100
},
},
y: {
type: 'linear'
}
},
plugins: {
tooltip: {
intersect: false,
mode: 'index',
callbacks: {
label(ctx) {
const point = ctx.parsed;
if (!helpers.isNullOrUndef(point.y)) {
return chart_js.defaults.plugins.tooltip.callbacks.label(ctx);
}
const {o, h, l, c} = point;
return `O: ${o} H: ${h} L: ${l} C: ${c}`;
}
}
}
}
};
getLabelAndValue(index) {
const me = this;
const parsed = me.getParsed(index);
const axis = me._cachedMeta.iScale.axis;
const {o, h, l, c} = parsed;
const value = `O: ${o} H: ${h} L: ${l} C: ${c}`;
return {
label: `${me._cachedMeta.iScale.getLabelForValue(parsed[axis])}`,
value
};
}
getUserBounds(scale) {
const {min, max, minDefined, maxDefined} = scale.getUserBounds();
return {
min: minDefined ? min : Number.NEGATIVE_INFINITY,
max: maxDefined ? max : Number.POSITIVE_INFINITY
};
}
/**
* Implement this ourselves since it doesn't handle high and low values
* https://github.com/chartjs/Chart.js/issues/7328
* @protected
*/
getMinMax(scale) {
const meta = this._cachedMeta;
const _parsed = meta._parsed;
const axis = meta.iScale.axis;
const otherScale = this._getOtherScale(scale);
const {min: otherMin, max: otherMax} = this.getUserBounds(otherScale);
if (_parsed.length < 2) {
return {min: 0, max: 1};
}
if (scale === meta.iScale) {
return {min: _parsed[0][axis], max: _parsed[_parsed.length - 1][axis]};
}
const newParsedData = _parsed.filter(({x}) => x >= otherMin && x < otherMax);
let min = Number.POSITIVE_INFINITY;
let max = Number.NEGATIVE_INFINITY;
for (let i = 0; i < newParsedData.length; i++) {
const data = newParsedData[i];
min = Math.min(min, data.l);
max = Math.max(max, data.h);
}
return {min, max};
}
/**
* @protected
*/
calculateElementProperties(index, ruler, reset, options) {
const me = this;
const vscale = me._cachedMeta.vScale;
const base = vscale.getBasePixel();
const ipixels = me._calculateBarIndexPixels(index, ruler, options);
const data = me.chart.data.datasets[me.index].data[index];
const open = vscale.getPixelForValue(data.o);
const high = vscale.getPixelForValue(data.h);
const low = vscale.getPixelForValue(data.l);
const close = vscale.getPixelForValue(data.c);
return {
base: reset ? base : low,
x: ipixels.center,
y: (low + high) / 2,
width: ipixels.size,
open,
high,
low,
close
};
}
draw() {
const me = this;
const chart = me.chart;
const rects = me._cachedMeta.data;
helpers.clipArea(chart.ctx, chart.chartArea);
for (let i = 0; i < rects.length; ++i) {
rects[i].draw(me._ctx);
}
helpers.unclipArea(chart.ctx);
}
}
/**
* Helper function to get the bounds of the bar regardless of the orientation
* @param {Rectangle} bar the bar
* @param {boolean} [useFinalPosition]
* @return {object} bounds of the bar
* @private
*/
function getBarBounds(bar, useFinalPosition) {
const {x, y, base, width, height} = bar.getProps(['x', 'low', 'high', 'width', 'height'], useFinalPosition);
let left, right, top, bottom, half;
if (bar.horizontal) {
half = height / 2;
left = Math.min(x, base);
right = Math.max(x, base);
top = y - half;
bottom = y + half;
} else {
half = width / 2;
left = x - half;
right = x + half;
top = Math.min(y, base); // use min because 0 pixel at top of screen
bottom = Math.max(y, base);
}
return {left, top, right, bottom};
}
function inRange(bar, x, y, useFinalPosition) {
const skipX = x === null;
const skipY = y === null;
const bounds = !bar || (skipX && skipY) ? false : getBarBounds(bar, useFinalPosition);
return bounds
&& (skipX || x >= bounds.left && x <= bounds.right)
&& (skipY || y >= bounds.top && y <= bounds.bottom);
}
class FinancialElement extends chart_js.BarElement {
static defaults = {
backgroundColors: {
up: 'rgba(75, 192, 192, 0.5)',
down: 'rgba(255, 99, 132, 0.5)',
unchanged: 'rgba(201, 203, 207, 0.5)',
},
borderColors: {
up: 'rgb(75, 192, 192)',
down: 'rgb(255, 99, 132)',
unchanged: 'rgb(201, 203, 207)',
}
};
height() {
return this.base - this.y;
}
inRange(mouseX, mouseY, useFinalPosition) {
return inRange(this, mouseX, mouseY, useFinalPosition);
}
inXRange(mouseX, useFinalPosition) {
return inRange(this, mouseX, null, useFinalPosition);
}
inYRange(mouseY, useFinalPosition) {
return inRange(this, null, mouseY, useFinalPosition);
}
getRange(axis) {
return axis === 'x' ? this.width / 2 : this.height / 2;
}
getCenterPoint(useFinalPosition) {
const {x, low, high} = this.getProps(['x', 'low', 'high'], useFinalPosition);
return {
x,
y: (high + low) / 2
};
}
tooltipPosition(useFinalPosition) {
const {x, open, close} = this.getProps(['x', 'open', 'close'], useFinalPosition);
return {
x,
y: (open + close) / 2
};
}
}
class CandlestickElement extends FinancialElement {
static id = 'candlestick';
static defaults = {
...FinancialElement.defaults,
borderWidth: 1,
};
draw(ctx) {
const me = this;
const {x, open, high, low, close} = me;
let borderColors = me.options.borderColors;
if (typeof borderColors === 'string') {
borderColors = {
up: borderColors,
down: borderColors,
unchanged: borderColors
};
}
let borderColor;
if (close < open) {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.up : undefined, chart_js.defaults.elements.candlestick.borderColors.up);
ctx.fillStyle = helpers.valueOrDefault(me.options.backgroundColors ? me.options.backgroundColors.up : undefined, chart_js.defaults.elements.candlestick.backgroundColors.up);
} else if (close > open) {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.down : undefined, chart_js.defaults.elements.candlestick.borderColors.down);
ctx.fillStyle = helpers.valueOrDefault(me.options.backgroundColors ? me.options.backgroundColors.down : undefined, chart_js.defaults.elements.candlestick.backgroundColors.down);
} else {
borderColor = helpers.valueOrDefault(borderColors ? borderColors.unchanged : undefined, chart_js.defaults.elements.candlestick.borderColors.unchanged);
ctx.fillStyle = helpers.valueOrDefault(me.backgroundColors ? me.backgroundColors.unchanged : undefined, chart_js.defaults.elements.candlestick.backgroundColors.unchanged);
}
ctx.lineWidth = helpers.valueOrDefault(me.options.borderWidth, chart_js.defaults.elements.candlestick.borderWidth);
ctx.strokeStyle = borderColor;
ctx.beginPath();
ctx.moveTo(x, high);
ctx.lineTo(x, Math.min(open, close));
ctx.moveTo(x, low);
ctx.lineTo(x, Math.max(open, close));
ctx.stroke();
ctx.fillRect(x - me.width / 2, close, me.width, open - close);
ctx.strokeRect(x - me.width / 2, close, me.width, open - close);
ctx.closePath();
}
}
class CandlestickController extends FinancialController {
static id = 'candlestick';
static defaults = {
...FinancialController.defaults,
dataElementType: CandlestickElement.id
};
static defaultRoutes = chart_js.BarController.defaultRoutes;
updateElements(elements, start, count, mode) {
const reset = mode === 'reset';
const ruler = this._getRuler();
const {sharedOptions, includeOptions} = this._getSharedOptions(start, mode);
for (let i = start; i < start + count; i++) {
const options = sharedOptions || this.resolveDataElementOptions(i, mode);
const baseProperties = this.calculateElementProperties(i, ruler, reset, options);
if (includeOptions) {
baseProperties.options = options;
}
this.updateElement(elements[i], i, baseProperties, mode);
}
}
}
const defaults = chart_js.Chart.defaults;
class OhlcElement extends FinancialElement {
static id = 'ohlc';
static defaults = {
...FinancialElement.defaults,
lineWidth: 2,
armLength: null,
armLengthRatio: 0.8
};
draw(ctx) {
const me = this;
const {x, open, high, low, close} = me;
const armLengthRatio = helpers.valueOrDefault(me.armLengthRatio, defaults.elements.ohlc.armLengthRatio);
let armLength = helpers.valueOrDefault(me.armLength, defaults.elements.ohlc.armLength);
if (armLength === null) {
// The width of an ohlc is affected by barPercentage and categoryPercentage
// This behavior is caused by extending controller.financial, which extends controller.bar
// barPercentage and categoryPercentage are now set to 1.0 (see controller.ohlc)
// and armLengthRatio is multipled by 0.5,
// so that when armLengthRatio=1.0, the arms from neighbour ohcl touch,
// and when armLengthRatio=0.0, ohcl are just vertical lines.
armLength = me.width * armLengthRatio * 0.5;
}
if (close < open) {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.up : undefined, defaults.elements.ohlc.borderColors.up);
} else if (close > open) {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.down : undefined, defaults.elements.ohlc.borderColors.down);
} else {
ctx.strokeStyle = helpers.valueOrDefault(me.options.borderColors ? me.options.borderColors.unchanged : undefined, defaults.elements.ohlc.borderColors.unchanged);
}
ctx.lineWidth = helpers.valueOrDefault(me.lineWidth, defaults.elements.ohlc.lineWidth);
ctx.beginPath();
ctx.moveTo(x, high);
ctx.lineTo(x, low);
ctx.moveTo(x - armLength, open);
ctx.lineTo(x, open);
ctx.moveTo(x + armLength, close);
ctx.lineTo(x, close);
ctx.stroke();
}
}
class OhlcController extends FinancialController {
static id = 'ohlc';
static defaults = {
...FinancialController.defaults,
dataElementType: OhlcElement.id,
datasets: {
barPercentage: 1.0,
categoryPercentage: 1.0
}
};
updateElements(elements, start, count, mode) {
const reset = mode === 'reset';
const ruler = this._getRuler();
const {sharedOptions, includeOptions} = this._getSharedOptions(start, mode);
for (let i = start; i < start + count; i++) {
const options = sharedOptions || this.resolveDataElementOptions(i, mode);
const baseProperties = this.calculateElementProperties(i, ruler, reset, options);
if (includeOptions) {
baseProperties.options = options;
}
this.updateElement(elements[i], i, baseProperties, mode);
}
}
}
chart_js.Chart.register(CandlestickController, OhlcController, CandlestickElement, OhlcElement);
}));

207
templates/candle_coin.html Normal file
View File

@ -0,0 +1,207 @@
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Realtime Candlestick Chart</title>
<script src="https://cdn.jsdelivr.net/npm/luxon@3.4.4"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.1/dist/chart.umd.js"></script>
<script src="https://cdn.jsdelivr.net/npm/chartjs-adapter-luxon@1.3.1"></script>
<script src="{{ url_for('static', path='js/chartjs-chart-financial.js') }}"></script>
<link href="https://cdn.jsdelivr.net/npm/flowbite@2.5.2/dist/flowbite.min.css" rel="stylesheet" />
<script src="https://cdn.jsdelivr.net/npm/flowbite@2.5.2/dist/flowbite.min.js"></script>
</head>
<body>
<div class="flex w-full mt-8">
<canvas id="chart"></canvas>
</div>
<script>
var barCount = 100;
var barData = [];
var lineData = [];
var ctx = document.getElementById('chart').getContext('2d');
ctx.canvas.width = 1000;
ctx.canvas.height = 250;
var chart = new Chart(ctx, {
type: 'candlestick',
data: {
datasets: [{
label: '{{coin}}',
data: barData
}, {
label: 'Close price',
type: 'line',
data: lineData,
hidden: true
}]
},
options: {
responsive: true,
scales: {
x: {
type: 'time',
time: {
unit: 'minute',
displayFormats: {
minute: 'HH:mm'
}
},
adapters: {
date: {
zone: 'UTC'
}
}
},
y: {
type: 'linear',
position: 'right'
}
},
animation: false
}
});
async function fetchHistoricalData() {
try {
const response = await fetch('/data/{{coin}}/get-candle/all');
const data = await response.json();
return data;
} catch (error) {
console.error('Error fetching historical data:', error);
return null;
}
}
async function fetchLatestData() {
try {
const response = await fetch('/data/{{coin}}/get-candle');
const data = await response.json();
return data;
} catch (error) {
console.error('Error fetching latest data:', error);
return null;
}
}
async function initializeChart() {
// Fetch historical data first
const historicalData = await fetchHistoricalData();
if (!historicalData) return;
// Process historical data
historicalData.forEach(data => {
const candleData = {
x: data.time,
o: data.open,
h: data.high,
l: data.low,
c: data.close,
v: data.volume,
vwap: data.vwap,
trades: data.trades
};
barData.push(candleData);
lineData.push({
x: data.time,
y: data.close
});
});
chart.update();
// Start real-time updates
setInterval(updateChart, 5000);
}
async function updateChart() {
const newData = await fetchLatestData();
if (!newData) return;
if (barData.length >= barCount) {
barData.shift();
lineData.shift();
}
const newBarData = {
x: newData.time,
o: newData.open,
h: newData.high,
l: newData.low,
c: newData.close,
v: newData.volume,
vwap: newData.vwap,
trades: newData.trades
};
// Check if the last candle's timestamp matches the new data
const lastCandle = barData[barData.length - 1];
if (lastCandle && lastCandle.x === newData.time) {
// Update the existing candle
Object.assign(lastCandle, newBarData);
lineData[lineData.length - 1].y = newData.close;
} else {
// Add new candle
barData.push(newBarData);
lineData.push({
x: newData.time,
y: newData.close
});
}
chart.update();
}
// Initialize the chart when the page loads
initializeChart();
var update = function () {
var dataset = chart.config.data.datasets[0];
// candlestick vs ohlc
var type = document.getElementById('type').value;
chart.config.type = type;
// linear vs log
var scaleType = document.getElementById('scale-type').value;
chart.config.options.scales.y.type = scaleType;
// color
var colorScheme = document.getElementById('color-scheme').value;
if (colorScheme === 'neon') {
chart.config.data.datasets[0].backgroundColors = {
up: '#01ff01',
down: '#fe0000',
unchanged: '#999',
};
} else {
delete chart.config.data.datasets[0].backgroundColors;
}
// border
var border = document.getElementById('border').value;
if (border === 'false') {
dataset.borderColors = 'rgba(0, 0, 0, 0)';
} else {
delete dataset.borderColors;
}
// mixed charts
var mixed = document.getElementById('mixed').value;
if (mixed === 'true') {
chart.config.data.datasets[1].hidden = false;
} else {
chart.config.data.datasets[1].hidden = true;
}
chart.update();
};
[...document.getElementsByTagName('select')].forEach(element => element.addEventListener('change', update));
</script>
</body>
</html>

View File

@ -31,7 +31,7 @@
</head>
<body>
<div class="container">
<h1>XRP Price Live Chart</h1>
<h1>{{coin}} Price Live Chart</h1>
<div class="chart-container">
<canvas id="priceChart"></canvas>
</div>
@ -120,4 +120,4 @@
setInterval(updateChart, 5000);
</script>
</body>
</html>
</html>

View File

@ -21,12 +21,18 @@
<main>
<section>
<h2>Interested Coints Live Chart</h2>
<h2>Interested Coin Price Live Chart</h2>
<ul>
{% for coin in coins %}
<li><a href="/coin/{{coin}}">{{coin}}</a></li>
{% endfor %}
</ul>
<h2>Interested Coin Price Candle Live Chart</h2>
<ul>
{% for coin in coins %}
<li><a href="/coin/{{coin}}/candle1min">{{coin}}</a></li>
{% endfor %}
</ul>
</section>
</main>
@ -38,4 +44,4 @@
// Add your JavaScript here
</script>
</body>
</html>
</html>

64
test.py Normal file
View File

@ -0,0 +1,64 @@
from urllib.parse import quote_plus
from dotenv import load_dotenv
from pymongo import MongoClient
from mexc_sdk import Spot
import yaml
import os
from datetime import timedelta
load_dotenv()
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
MONGO_URI = f'mongodb://{quote_plus(os.getenv("DB_USER"))}:{quote_plus(os.getenv("DB_PWD"))}@{os.getenv("DB_HOST")}:{os.getenv("DB_PORT")}'
client = MongoClient(MONGO_URI)
db = client[os.getenv("DB_NAME")] # Replace with your database name
if __name__ == "__main__":
target = interest_coins[0] + "USDT" + "_kline"
collection = db[target.lower()]
latest_doc = collection.find_one(sort=[("T", -1)])
latest_end_time = latest_doc["T"]
time_threshold = latest_end_time - (
60 * 100
) # 100 minutes before the latest endTime
pipeline = [
# Filter based on endTime
{"$match": {"T": {"$gte": time_threshold}}},
# Group by start time and end time
{
"$group": {
"_id": {"startTime": "$t", "endTime": "$T"},
"open": {"$first": "$o"},
"high": {"$max": "$h"},
"low": {"$min": "$l"},
"close": {"$last": "$c"},
"volume": {"$sum": "$v"},
"weighted_price": {"$sum": {"$multiply": ["$c", "$v"]}},
"count": {"$sum": 1},
}
},
# Sort by start time
{"$sort": {"_id.startTime": 1}},
# Reshape for output
{
"$project": {
"_id": 0,
"time": {"$multiply": ["$_id.startTime", 1000]},
"open": 1,
"high": 1,
"low": 1,
"close": 1,
"volume": 1,
"vwap": {"$divide": ["$weighted_price", "$volume"]},
"trades": "$count",
"endTime": "$_id.endTime",
}
},
]
candles = list(collection.aggregate(pipeline))
print(candles)