From 2d31dcfd0b7a5db8bf37ec5a21f3f5923f734262 Mon Sep 17 00:00:00 2001 From: dongho Date: Sat, 21 Dec 2024 15:40:38 +0900 Subject: [PATCH] websocket --- mexc-socket.py | 114 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/mexc-socket.py b/mexc-socket.py index 507dd59..171f776 100644 --- a/mexc-socket.py +++ b/mexc-socket.py @@ -1,26 +1,118 @@ from pymexc import spot, futures from dotenv import load_dotenv +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure, OperationFailure +import pytz +import yaml +from datetime import datetime import os +import logging +from urllib.parse import quote_plus load_dotenv() - +# Set up logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) +timezone = pytz.timezone("Asia/Seoul") api_key = os.getenv("API_KEY") api_secret = os.getenv("API_SECRET") -def handle_message(message): - print(message) + +class MongoDBConn(object): + def __init__(self): + """Initialize MongoDB connection parameters""" + self.username = quote_plus(os.getenv("DB_USER")) + self.password = quote_plus(os.getenv("DB_PWD")) + self.host = os.getenv("DB_HOST") + self.port = os.getenv("DB_PORT") + self.db_name = os.getenv("DB_NAME") + self.client = None + self.db = None + + def connect(self): + """Establish connection to MongoDB""" + try: + # Create connection URI + uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}" + + # Connect to MongoDB + self.client = MongoClient(uri) + + # Test the connection + self.client.admin.command("ping") + + # Get database reference + self.db = self.client[self.db_name] + + logger.info("Successfully connected to MongoDB") + return True + + except ConnectionFailure as e: + logger.error(f"Could not connect to MongoDB: {e}") + return False + except OperationFailure as e: + logger.error(f"Authentication failed: {e}") + return False + except Exception as e: + logger.error(f"An error occurred: {e}") + return False + + def close(self): + """Close the MongoDB connection""" + if self.client: + self.client.close() + logger.info("MongoDB connection closed") + + def insert_coin_price(self, collection_name, document): + """Insert a single document into a collection""" + try: + collection = self.db[collection_name] + result = collection.insert_one(document) + logger.info(f"Document inserted with ID: {result.inserted_id}") + return result.inserted_id + except Exception as e: + logger.error(f"Error inserting document: {e}") + return None +mdb = MongoDBConn() +mdb.connect() + + +def load_coin(): + data = None + with open("coin.yaml", "r") as file: + data = yaml.safe_load(file) + interest_coins = data["interest"] + base = data["base"][0] + return interest_coins, base + + +def handle_message(message): + # insert_coin_price(self, collection_name, document): + global mdb, timezone + col = message["s"] + "_kline" + time = message["t"] + msg = message["d"]["k"] + msg["stime"] = time + msg["time"] = datetime.now(timezone).strftime("%Y/%m/%d, %H:%M:%S") + mdb.insert_coin_price(col.lower(), msg) -# initialize WebSocket client -ws_spot_client = spot.WebSocket(api_key = api_key, api_secret = api_secret) # make http request to api -# create websocket connection to public channel (spot@public.deals.v3.api@BTCUSDT) -# all messages will be handled by function `handle_message` -ws_spot_client.kline_stream(handle_message, "BTCUSDT", "Min1") -ws_spot_client.kline_stream(handle_message, "XRPUSDT", "Min1") -while True: - ... +if __name__ == "__main__": + coins, base = load_coin() + api_key = os.getenv("API_KEY") + api_secret = os.getenv("API_SECRET") + ws_spot_client = spot.WebSocket(api_key=api_key, api_secret=api_secret) + + # all messages will be handled by function `handle_message` + for coin in coins: + target = coin + base + ws_spot_client.kline_stream(handle_message, target, "Min1") + while True: + ...