This repository has been archived on 2025-01-08. You can view files and clone it, but cannot push or open issues or pull requests.
Files
mexc-trade/pymexc/base_websocket.py
dongho 9e9cbf3547
All checks were successful
SonarQube Scan / SonarQube Trigger (push) Successful in 1m12s
mexc-websocket added
2024-12-20 20:48:02 +09:00

510 lines
17 KiB
Python

import websocket
import threading
import logging
import time
import json
import hmac
logger = logging.getLogger(__name__)
SPOT = "wss://wbs.mexc.com/ws"
FUTURES = "wss://contract.mexc.com/ws"
class _WebSocketManager:
def __init__(self, callback_function, ws_name, api_key=None, api_secret=None,
ping_interval=20, ping_timeout=10, retries=10,
restart_on_error=True, trace_logging=False,
http_proxy_host = None,
http_proxy_port = None,
http_no_proxy = None,
http_proxy_auth = None,
http_proxy_timeout = None):
self.proxy_settings = dict(
http_proxy_host = http_proxy_host,
http_proxy_port = http_proxy_port,
http_no_proxy = http_no_proxy,
http_proxy_auth = http_proxy_auth,
http_proxy_timeout = http_proxy_timeout
)
# Set API keys.
self.api_key = api_key
self.api_secret = api_secret
self.callback = callback_function
self.ws_name = ws_name
if api_key:
self.ws_name += " (Auth)"
# Setup the callback directory following the format:
# {
# "topic_name": function
# }
self.callback_directory = {}
# Record the subscriptions made so that we can resubscribe if the WSS
# connection is broken.
self.subscriptions = []
# Set ping settings.
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.retries = retries
# Other optional data handling settings.
self.handle_error = restart_on_error
# Enable websocket-client's trace logging for extra debug information
# on the websocket connection, including the raw sent & recv messages
websocket.enableTrace(trace_logging)
# Set initial state, initialize dictionary and connect.
self._reset()
self.attempting_connection = False
def _on_open(self):
"""
Log WS open.
"""
logger.debug(f"WebSocket {self.ws_name} opened.")
def _on_message(self, message):
"""
Parse incoming messages.
"""
self.callback(json.loads(message))
def is_connected(self):
try:
if self.ws.sock or not self.ws.sock.is_connected:
return True
else:
return False
except AttributeError:
return False
@staticmethod
def _are_connections_connected(active_connections):
for connection in active_connections:
if not connection.is_connected():
return False
return True
def _ping_loop(self, ping_payload: str, ping_interval: int, ping_timeout: int):
"""
Ping the websocket.
"""
time.sleep(ping_timeout)
while True:
logger.info(f"WebSocket {self.ws_name} send ping...")
self.ws.send(ping_payload)
time.sleep(ping_interval)
def _connect(self, url):
"""
Open websocket in a thread.
"""
def resubscribe_to_topics():
if not self.subscriptions:
# There are no subscriptions to resubscribe to, probably
# because this is a brand new WSS initialisation so there was
# no previous WSS connection.
return
for subscription_message in self.subscriptions:
self.ws.send(subscription_message)
self.attempting_connection = True
# Set endpoint.
self.endpoint = url
# Attempt to connect for X seconds.
retries = self.retries
if retries == 0:
infinitely_reconnect = True
else:
infinitely_reconnect = False
while (infinitely_reconnect or retries > 0) and not self.is_connected():
logger.info(f"WebSocket {self.ws_name} attempting connection...")
self.ws = websocket.WebSocketApp(
url=url,
on_message=lambda ws, msg: self._on_message(msg),
on_close=self._on_close(),
on_open=self._on_open(),
on_error=lambda ws, err: self._on_error(err)
)
# Setup the thread running WebSocketApp.
self.wst = threading.Thread(target=lambda: self.ws.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
**self.proxy_settings
))
# Configure as daemon; start.
self.wst.daemon = True
self.wst.start()
# setup ping loop
self.wsl = threading.Thread(target=lambda: self._ping_loop(
ping_payload='{"method":"ping"}',
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout
))
self.wsl.daemon = True
self.wsl.start()
retries -= 1
time.sleep(1)
# If connection was not successful, raise error.
if not infinitely_reconnect and retries <= 0:
self.exit()
raise websocket.WebSocketTimeoutException(
f"WebSocket {self.ws_name} connection failed. Too many "
f"connection attempts. pybit will "
f"no longer try to reconnect.")
logger.info(f"WebSocket {self.ws_name} connected")
# If given an api_key, authenticate.
if self.api_key and self.api_secret:
self._auth()
resubscribe_to_topics()
self.attempting_connection = False
def _auth(self):
# Generate signature
# make auth if futures. spot has a different auth system.
isspot = self.endpoint.startswith(SPOT)
if isspot:
return
timestamp = str(int(time.time() * 1000))
_val = self.api_key + timestamp
signature = str(hmac.new(
bytes(self.api_secret, "utf-8"),
bytes(_val, "utf-8"), digestmod="sha256"
).hexdigest())
# Authenticate with API.
self.ws.send(
json.dumps({
"subscribe": False,
"method": "login",
"param": {
"apiKey": self.api_key,
"reqTime": timestamp,
"signature": signature
}
})
)
def _on_error(self, error):
"""
Exit on errors and raise exception, or attempt reconnect.
"""
if type(error).__name__ not in ["WebSocketConnectionClosedException",
"ConnectionResetError",
"WebSocketTimeoutException"]:
# Raises errors not related to websocket disconnection.
self.exit()
raise error
if not self.exited:
logger.error(f"WebSocket {self.ws_name} encountered error: {error}.")
self.exit()
# Reconnect.
if self.handle_error and not self.attempting_connection:
self._reset()
self._connect(self.endpoint)
def _on_close(self):
"""
Log WS close.
"""
logger.debug(f"WebSocket {self.ws_name} closed.")
def _reset(self):
"""
Set state booleans and initialize dictionary.
"""
self.exited = False
self.auth = False
self.data = {}
def exit(self):
"""
Closes the websocket connection.
"""
self.ws.close()
while self.ws.sock:
continue
self.exited = True
class _FuturesWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["personal.order", "personal.asset",
"personal.position", "personal.risk.limit",
"personal.adl.level", "personal.position.mode"]
self.symbol_wildcard = "*"
self.symbol_separator = "|"
self.last_subsctiption = None
def subscribe(self, topic, callback, params: dict = {}):
subscription_args = {
"method": topic,
"param": params
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic.replace("sub.", ""), callback)
self.last_subsctiption = topic.replace("sub.", "")
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_auth_message(self, message):
# If we get successful futures auth, notify user
if message.get("data") == "success":
logger.debug(f"Authorization for {self.ws_name} successful.")
self.auth = True
# If we get unsuccessful auth, notify user.
elif message.get("data") != "success": # !!!!
logger.debug(f"Authorization for {self.ws_name} failed. Please "
f"check your API keys and restart.")
def _process_subscription_message(self, message):
#try:
sub = message["channel"]
#except KeyError:
#sub = message["c"] # SPOT PUBLIC & PRIVATE format
# If we get successful futures subscription, notify user
if (
message.get("channel", "").startswith("rs.") or
message.get("channel", "").startswith("push.")
) and message.get("channel", "") != "rs.error":
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["data"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["channel"].replace("push.", "").replace("rs.sub.", "")
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_auth_message():
if message.get("channel", "") == "rs.login":
return True
else:
return False
def is_subscription_message():
if str(message).startswith("{'channel': 'push."):
return True
else:
return False
def is_pong_message():
if message.get("channel", "") in ("pong", "clientId"):
return True
else:
return False
if is_auth_message():
self._process_auth_message(message)
elif is_subscription_message():
self._process_subscription_message(message)
elif is_pong_message():
pass
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _FuturesWebSocket(_FuturesWebSocketManager):
def __init__(self, **kwargs):
self.ws_name = "FuturesV1"
self.endpoint = "wss://contract.mexc.com/ws"
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _FuturesWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)
class _SpotWebSocketManager(_WebSocketManager):
def __init__(self, ws_name, **kwargs):
callback_function = kwargs.pop("callback_function") if \
kwargs.get("callback_function") else self._handle_incoming_message
super().__init__(callback_function, ws_name, **kwargs)
self.private_topics = ["account", "deals", "orders"]
self.last_subsctiption = None
def subscribe(self, topic: str, callback, params_list: list):
subscription_args = {
"method": "SUBSCRIPTION",
"params": [
'@'.join([f"spot@{topic}.v3.api"] + list(map(str, params.values())))
for params in params_list
]
}
self._check_callback_directory(subscription_args)
while not self.is_connected():
# Wait until the connection is open before subscribing.
time.sleep(0.1)
subscription_message = json.dumps(subscription_args)
self.ws.send(subscription_message)
self.subscriptions.append(subscription_message)
self._set_callback(topic, callback)
self.last_subsctiption = topic
def _initialise_local_data(self, topic):
# Create self.data
try:
self.data[topic]
except KeyError:
self.data[topic] = []
def _process_subscription_message(self, message):
sub = message["msg"].replace("spot@", "").split(".v3.api")[0]
# If we get successful futures subscription, notify user
if message.get("id") == 0 and message.get("code") == 0:
logger.debug(f"Subscription to {sub} successful.")
# Futures subscription fail
else:
response = message["msg"]
logger.error("Couldn't subscribe to topic. "
f"Error: {response}.")
if self.last_subsctiption:
self._pop_callback(self.last_subsctiption)
def _process_normal_message(self, message):
topic = message["c"].replace("spot@", "").split(".v3.api")[0]
callback_data = message
callback_function = self._get_callback(topic)
callback_function(callback_data)
def _handle_incoming_message(self, message):
def is_subscription_message():
if (message.get("id") == 0 and
message.get("code") == 0 and
message.get("msg")):
return True
else:
return False
if is_subscription_message():
self._process_subscription_message(message)
else:
self._process_normal_message(message)
def custom_topic_stream(self, topic, callback):
return self.subscribe(topic=topic, callback=callback)
def _check_callback_directory(self, topics):
for topic in topics:
if topic in self.callback_directory:
raise Exception(f"You have already subscribed to this topic: "
f"{topic}")
def _set_callback(self, topic, callback_function):
self.callback_directory[topic] = callback_function
def _get_callback(self, topic):
return self.callback_directory[topic]
def _pop_callback(self, topic):
self.callback_directory.pop(topic)
class _SpotWebSocket(_SpotWebSocketManager):
def __init__(self, endpoint: str = "wss://wbs.mexc.com/ws", **kwargs):
self.ws_name = "SpotV3"
self.endpoint = endpoint
super().__init__(self.ws_name, **kwargs)
self.ws = None
self.active_connections = []
self.kwargs = kwargs
def is_connected(self):
return self._are_connections_connected(self.active_connections)
def _ws_subscribe(self, topic, callback, params: list = []):
if not self.ws:
self.ws = _SpotWebSocketManager(
self.ws_name, **self.kwargs)
self.ws._connect(self.endpoint)
self.active_connections.append(self.ws)
self.ws.subscribe(topic, callback, params)