127 lines
3.7 KiB
Python
127 lines
3.7 KiB
Python
from pymexc import spot, futures
|
|
from dotenv import load_dotenv
|
|
from pymongo import MongoClient
|
|
from pymongo.errors import ConnectionFailure, OperationFailure
|
|
import pytz
|
|
import yaml
|
|
|
|
from datetime import datetime
|
|
import os
|
|
import logging
|
|
from urllib.parse import quote_plus
|
|
|
|
load_dotenv()
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
timezone = pytz.timezone("Asia/Seoul")
|
|
api_key = os.getenv("API_KEY")
|
|
api_secret = os.getenv("API_SECRET")
|
|
|
|
|
|
class MongoDBConn(object):
|
|
def __init__(self):
|
|
"""Initialize MongoDB connection parameters"""
|
|
self.username = quote_plus(os.getenv("DB_USER"))
|
|
self.password = quote_plus(os.getenv("DB_PWD"))
|
|
self.host = os.getenv("DB_HOST")
|
|
self.port = os.getenv("DB_PORT")
|
|
self.db_name = os.getenv("DB_NAME")
|
|
self.client = None
|
|
self.db = None
|
|
|
|
def connect(self):
|
|
"""Establish connection to MongoDB"""
|
|
try:
|
|
# Create connection URI
|
|
uri = f"mongodb://{self.username}:{self.password}@{self.host}:{self.port}"
|
|
|
|
# Connect to MongoDB
|
|
self.client = MongoClient(uri)
|
|
|
|
# Test the connection
|
|
self.client.admin.command("ping")
|
|
|
|
# Get database reference
|
|
self.db = self.client[self.db_name]
|
|
|
|
logger.info("Successfully connected to MongoDB")
|
|
return True
|
|
|
|
except ConnectionFailure as e:
|
|
logger.error(f"Could not connect to MongoDB: {e}")
|
|
return False
|
|
except OperationFailure as e:
|
|
logger.error(f"Authentication failed: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"An error occurred: {e}")
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close the MongoDB connection"""
|
|
if self.client:
|
|
self.client.close()
|
|
logger.info("MongoDB connection closed")
|
|
|
|
def insert_coin_price(self, collection_name, document):
|
|
"""Insert a single document into a collection"""
|
|
try:
|
|
collection = self.db[collection_name]
|
|
result = collection.insert_one(document)
|
|
logger.info(f"Document inserted with ID: {result.inserted_id}")
|
|
return result.inserted_id
|
|
except Exception as e:
|
|
logger.error(f"Error inserting document: {e}")
|
|
return None
|
|
|
|
|
|
mdb = MongoDBConn()
|
|
mdb.connect()
|
|
|
|
|
|
def load_coin():
|
|
data = None
|
|
with open("coin.yaml", "r") as file:
|
|
data = yaml.safe_load(file)
|
|
interest_coins = data["interest"]
|
|
base = data["base"][0]
|
|
return interest_coins, base
|
|
|
|
|
|
def handle_message(message):
|
|
# insert_coin_price(self, collection_name, document):
|
|
global mdb, timezone
|
|
col = message["s"] + "_kline"
|
|
time = message["t"]
|
|
msg = message["d"]["k"]
|
|
msg["stime"] = time
|
|
msg["c"] = float(msg["c"])
|
|
msg["h"] = float(msg["h"])
|
|
msg["l"] = float(msg["l"])
|
|
msg["o"] = float(msg["o"])
|
|
msg["v"] = float(msg["v"])
|
|
msg["T"] = int(msg["T"])
|
|
msg["a"] = float(msg["a"])
|
|
msg["t"] = int(msg["t"])
|
|
msg["time"] = datetime.now(timezone).strftime("%Y/%m/%d, %H:%M:%S")
|
|
mdb.insert_coin_price(col.lower(), msg)
|
|
|
|
|
|
# make http request to api
|
|
|
|
if __name__ == "__main__":
|
|
coins, base = load_coin()
|
|
api_key = os.getenv("API_KEY")
|
|
api_secret = os.getenv("API_SECRET")
|
|
ws_spot_client = spot.WebSocket(api_key=api_key, api_secret=api_secret)
|
|
|
|
# all messages will be handled by function `handle_message`
|
|
for coin in coins:
|
|
target = coin + base
|
|
ws_spot_client.kline_stream(handle_message, target, "Min1")
|
|
while True:
|
|
...
|