websocket
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m24s

This commit is contained in:
dongho
2024-12-21 15:40:38 +09:00
parent 1c93392d65
commit 2d31dcfd0b

View File

@ -1,26 +1,118 @@
from pymexc import spot, futures
from dotenv import load_dotenv
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import pytz
import yaml
from datetime import datetime
import os
import logging
from urllib.parse import quote_plus
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
timezone = pytz.timezone("Asia/Seoul")
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
def handle_message(message):
print(message)
class MongoDBConn(object):
def __init__(self):
"""Initialize MongoDB connection parameters"""
self.username = quote_plus(os.getenv("DB_USER"))
self.password = quote_plus(os.getenv("DB_PWD"))
self.host = os.getenv("DB_HOST")
self.port = os.getenv("DB_PORT")
self.db_name = os.getenv("DB_NAME")
self.client = None
self.db = None
def connect(self):
"""Establish connection to MongoDB"""
try:
# Create connection URI
uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}"
# Connect to MongoDB
self.client = MongoClient(uri)
# Test the connection
self.client.admin.command("ping")
# Get database reference
self.db = self.client[self.db_name]
logger.info("Successfully connected to MongoDB")
return True
except ConnectionFailure as e:
logger.error(f"Could not connect to MongoDB: {e}")
return False
except OperationFailure as e:
logger.error(f"Authentication failed: {e}")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def close(self):
"""Close the MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
def insert_coin_price(self, collection_name, document):
"""Insert a single document into a collection"""
try:
collection = self.db[collection_name]
result = collection.insert_one(document)
logger.info(f"Document inserted with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Error inserting document: {e}")
return None
mdb = MongoDBConn()
mdb.connect()
def load_coin():
data = None
with open("coin.yaml", "r") as file:
data = yaml.safe_load(file)
interest_coins = data["interest"]
base = data["base"][0]
return interest_coins, base
def handle_message(message):
# insert_coin_price(self, collection_name, document):
global mdb, timezone
col = message["s"] + "_kline"
time = message["t"]
msg = message["d"]["k"]
msg["stime"] = time
msg["time"] = datetime.now(timezone).strftime("%Y/%m/%d, %H:%M:%S")
mdb.insert_coin_price(col.lower(), msg)
# initialize WebSocket client
ws_spot_client = spot.WebSocket(api_key = api_key, api_secret = api_secret)
# make http request to api
# create websocket connection to public channel (spot@public.deals.v3.api@BTCUSDT)
# all messages will be handled by function `handle_message`
ws_spot_client.kline_stream(handle_message, "BTCUSDT", "Min1")
ws_spot_client.kline_stream(handle_message, "XRPUSDT", "Min1")
while True:
...
if __name__ == "__main__":
coins, base = load_coin()
api_key = os.getenv("API_KEY")
api_secret = os.getenv("API_SECRET")
ws_spot_client = spot.WebSocket(api_key=api_key, api_secret=api_secret)
# all messages will be handled by function `handle_message`
for coin in coins:
target = coin + base
ws_spot_client.kline_stream(handle_message, target, "Min1")
while True:
...