diff --git a/CHANGELOG.md b/CHANGELOG.md index d44757e..986fb30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 4.0.0rc1 - 2023-05-18 + +### Changed +- Redesign of Websocket part. Please consult `README.md` for details on its new usage. + ## 3.3.1 - 2023-03-21 ### Updated diff --git a/README.md b/README.md index c41aefd..570034b 100644 --- a/README.md +++ b/README.md @@ -176,36 +176,66 @@ There are 2 types of error returned from the library: ## Websocket +### Connector v4 + +WebSocket can be established through the following connections: +- USD-M WebSocket Stream (`https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams`) +- COIN-M WebSocket Stream (`https://binance-docs.github.io/apidocs/delivery/en/#websocket-market-streams`) + ```python +# WebSocket Stream Client import time -from binance.websocket.cm_futures.websocket_client import CMFuturesWebsocketClient +from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient -def message_handler(message): - print(message) +def message_handler(_, message): + logging.info(message) -ws_client = CMFuturesWebsocketClient() -ws_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) -ws_client.mini_ticker( - symbol='bnbusdt', - id=1, - callback=message_handler, -) +# Subscribe to a single symbol stream +my_client.agg_trade(symbol="bnbusdt") +time.sleep(5) +logging.info("closing ws connection") +my_client.stop() +``` -# Combine selected streams -ws_client.instant_subscribe( - stream=['bnbusdt@bookTicker', 'ethusdt@bookTicker'], - callback=message_handler, -) +#### Request Id -time.sleep(10) +Client can assign a request id to each request. The request id will be returned in the response message. Not mandatory in the library, it generates a uuid format string if not provided. -print("closing ws connection") -ws_client.stop() +```python +# id provided by client +my_client.agg_trade(symbol="bnbusdt", id="my_request_id") +# library will generate a random uuid string +my_client.agg_trade(symbol="bnbusdt") ``` + +#### Combined Streams +- If you set `is_combined` to `True`, `"/stream/"` will be appended to the `baseURL` to allow for Combining streams. +- `is_combined` defaults to `False` and `"/ws/"` (raw streams) will be appended to the `baseURL`. + More websocket examples are available in the `examples` folder +## Websocket < v4 + +```python +import time +from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient + +def message_handler(message): + print(message) + +my_client = UMFuturesWebsocketClient(on_message=message_handler) + +# Subscribe to a single symbol stream +my_client.agg_trade(symbol="bnbusdt") +time.sleep(5) +print("closing ws connection") +my_client.stop() + +``` + ### Heartbeat Once connected, the websocket server sends a ping frame every 3 minutes and requires a response pong frame back within diff --git a/binance/__version__.py b/binance/__version__.py index ff04168..0d337e8 100644 --- a/binance/__version__.py +++ b/binance/__version__.py @@ -1 +1 @@ -__version__ = "3.3.1" +__version__ = "4.0.0rc1" diff --git a/binance/websocket/binance_client_factory.py b/binance/websocket/binance_client_factory.py deleted file mode 100644 index e75da36..0000000 --- a/binance/websocket/binance_client_factory.py +++ /dev/null @@ -1,46 +0,0 @@ -import logging -from autobahn.twisted.websocket import WebSocketClientFactory -from twisted.internet.protocol import ReconnectingClientFactory -from binance.websocket.binance_client_protocol import BinanceClientProtocol - - -class BinanceReconnectingClientFactory(ReconnectingClientFactory): - initialDelay = 0.1 - maxDelay = 10 - maxRetries = 10 - - -class BinanceClientFactory(WebSocketClientFactory, BinanceReconnectingClientFactory): - def __init__(self, *args, payload=None, **kwargs): - WebSocketClientFactory.__init__(self, *args, **kwargs) - self.protocol_instance = None - self.base_client = None - self.payload = payload - - _reconnect_error_payload = {"e": "error", "m": "Max reconnect retries reached"} - - def startedConnecting(self, connector): - logging.info("Start to connect....") - - def clientConnectionFailed(self, connector, reason): - logging.error( - "Can't connect to server. Reason: {}. Retrying: {}".format( - reason, self.retries + 1 - ) - ) - self.retry(connector) - if self.retries > self.maxRetries: - self.callback(self._reconnect_error_payload) - - def clientConnectionLost(self, connector, reason): - logging.error( - "Lost connection to Server. Reason: {}. Retrying: {}".format( - reason, self.retries + 1 - ) - ) - self.retry(connector) - if self.retries > self.maxRetries: - self.callback(self._reconnect_error_payload) - - def buildProtocol(self, addr): - return BinanceClientProtocol(self, payload=self.payload) diff --git a/binance/websocket/binance_client_protocol.py b/binance/websocket/binance_client_protocol.py deleted file mode 100644 index 1416ad1..0000000 --- a/binance/websocket/binance_client_protocol.py +++ /dev/null @@ -1,45 +0,0 @@ -import json -import logging -from autobahn.twisted.websocket import WebSocketClientProtocol - - -class BinanceClientProtocol(WebSocketClientProtocol): - def __init__(self, factory, payload=None): - super().__init__() - self.factory = factory - self.payload = payload - - def onOpen(self): - self.factory.protocol_instance = self - - def onConnect(self, response): - logging.info("Server connected") - if self.payload: - logging.info("Sending message to Server: {}".format(self.payload)) - self.sendMessage(self.payload, isBinary=False) - # reset the delay after reconnecting - self.factory.resetDelay() - - def onMessage(self, payload, isBinary): - if not isBinary: - try: - payload_obj = json.loads(payload.decode("utf8")) - except ValueError: - pass - else: - self.factory.callback(payload_obj) - - def onClose(self, wasClean, code, reason): - logging.warn( - "WebSocket connection closed: {0}, code: {1}, clean: {2}, reason: {0}".format( - reason, code, wasClean - ) - ) - - def onPing(self, payload): - logging.info("Received Ping from server") - self.sendPong() - logging.info("Responded Pong to server") - - def onPong(self, payload): - logging.info("Received Pong from server") diff --git a/binance/websocket/binance_socket_manager.py b/binance/websocket/binance_socket_manager.py index 0cc4092..858bd1f 100644 --- a/binance/websocket/binance_socket_manager.py +++ b/binance/websocket/binance_socket_manager.py @@ -1,82 +1,104 @@ -import json import logging import threading -from urllib.parse import urlparse -from twisted.internet import reactor, ssl -from twisted.internet.error import ReactorAlreadyRunning -from autobahn.twisted.websocket import WebSocketClientFactory, connectWS -from binance.websocket.binance_client_protocol import BinanceClientProtocol -from binance.websocket.binance_client_factory import BinanceClientFactory +from websocket import ( + ABNF, + create_connection, + WebSocketException, + WebSocketConnectionClosedException, +) class BinanceSocketManager(threading.Thread): - def __init__(self, stream_url): + def __init__( + self, + stream_url, + on_message=None, + on_open=None, + on_close=None, + on_error=None, + on_ping=None, + on_pong=None, + logger=None, + ): threading.Thread.__init__(self) - - self.factories = {} - self._connected_event = threading.Event() + if not logger: + logger = logging.getLogger(__name__) + self.logger = logger self.stream_url = stream_url - self._conns = {} - self._user_callback = None - - def _start_socket( - self, stream_name, payload, callback, is_combined=False, is_live=True - ): - if stream_name in self._conns: - return False - - if is_combined: - factory_url = self.stream_url + "/stream" - else: - factory_url = self.stream_url + "/ws" - - if not is_live: - payload_obj = json.loads(payload.decode("utf8")) - - if is_combined: - factory_url = factory_url + "?streams=" + payload_obj["params"] - else: - factory_url = factory_url + "/" + payload_obj["params"] - payload = None - - logging.info("Connection with URL: {}".format(factory_url)) + self.on_message = on_message + self.on_open = on_open + self.on_close = on_close + self.on_ping = on_ping + self.on_pong = on_pong + self.on_error = on_error + self.create_ws_connection() - factory = BinanceClientFactory(factory_url, payload=payload) - factory.base_client = self - factory.protocol = BinanceClientProtocol - factory.setProtocolOptions( - openHandshakeTimeout=5, autoPingInterval=300, autoPingTimeout=5 + def create_ws_connection(self): + self.logger.debug( + "Creating connection with WebSocket Server: %s", self.stream_url ) - factory.callback = callback - self.factories[stream_name] = factory - reactor.callFromThread(self.add_connection, stream_name, self.stream_url) + self.ws = create_connection(self.stream_url) + self.logger.debug( + "WebSocket connection has been established: %s", self.stream_url + ) + self._callback(self.on_open) - def add_connection(self, stream_name, url): - if not url.startswith("wss://"): - raise ValueError("expected wss:// URL prefix") + def run(self): + self.read_data() - factory = self.factories[stream_name] - options = ssl.optionsForClientTLS(hostname=urlparse(url).hostname) - self._conns[stream_name] = connectWS(factory, options) + def send_message(self, message): + self.logger.debug("Sending message to Binance WebSocket Server: %s", message) + self.ws.send(message) - def stop_socket(self, conn_key): - if conn_key not in self._conns: - return + def ping(self): + self.ws.ping() - # disable reconnecting if we are closing - self._conns[conn_key].factory = WebSocketClientFactory(self.stream_url) - self._conns[conn_key].disconnect() - del self._conns[conn_key] + def read_data(self): + data = "" + while True: + try: + op_code, frame = self.ws.recv_data_frame(True) + except WebSocketException as e: + if isinstance(e, WebSocketConnectionClosedException): + self.logger.error("Lost websocket connection") + else: + self.logger.error("Websocket exception: {}".format(e)) + raise e + except Exception as e: + self.logger.error("Exception in read_data: {}".format(e)) + raise e - def run(self): - try: - reactor.run(installSignalHandlers=False) - except ReactorAlreadyRunning: - # Ignore error about reactor already running - pass + if op_code == ABNF.OPCODE_CLOSE: + self.logger.warning( + "CLOSE frame received, closing websocket connection" + ) + self._callback(self.on_close) + break + elif op_code == ABNF.OPCODE_PING: + self._callback(self.on_ping, frame.data) + self.ws.pong("") + self.logger.debug("Received Ping; PONG frame sent back") + elif op_code == ABNF.OPCODE_PONG: + self.logger.debug("Received PONG frame") + self._callback(self.on_pong) + else: + data = frame.data + if op_code == ABNF.OPCODE_TEXT: + data = data.decode("utf-8") + self._callback(self.on_message, data) def close(self): - keys = set(self._conns.keys()) - for key in keys: - self.stop_socket(key) - self._conns = {} + if not self.ws.connected: + self.logger.warn("Websocket already closed") + else: + self.ws.send_close() + return + + def _callback(self, callback, *args): + if callback: + try: + callback(self, *args) + except Exception as e: + self.logger.error("Error from callback {}: {}".format(callback, e)) + if self.on_error: + self.on_error(self, e) diff --git a/binance/websocket/cm_futures/websocket_client.py b/binance/websocket/cm_futures/websocket_client.py index cbd5e65..cd19404 100644 --- a/binance/websocket/cm_futures/websocket_client.py +++ b/binance/websocket/cm_futures/websocket_client.py @@ -2,10 +2,32 @@ class CMFuturesWebsocketClient(BinanceWebsocketClient): - def __init__(self, stream_url="wss://dstream.binance.com"): - super().__init__(stream_url) + def __init__( + self, + stream_url="wss://dstream.binance.com", + on_message=None, + on_open=None, + on_close=None, + on_error=None, + on_ping=None, + on_pong=None, + is_combined=False, + ): + if is_combined: + stream_url = stream_url + "/stream" + else: + stream_url = stream_url + "/ws" + super().__init__( + stream_url, + on_message=on_message, + on_open=on_open, + on_close=on_close, + on_error=on_error, + on_ping=on_ping, + on_pong=on_pong, + ) - def agg_trade(self, symbol: str, id: int, callback, **kwargs): + def agg_trade(self, symbol: str, id=None, action=None, **kwargs): """Aggregate Trade Streams The Aggregate Trade Streams push market trade information that is aggregated for a single taker order every 100 milliseconds. @@ -17,11 +39,11 @@ def agg_trade(self, symbol: str, id: int, callback, **kwargs): Update Speed: 100ms """ - self.live_subscribe( - "{}@aggTrade".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@aggTrade".format(symbol.lower()) - def index_price(self, pair: str, id: int, speed: int, callback, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def index_price(self, pair: str, id=None, speed=1, action=None, **kwargs): """Index Price Streams Stream Name: @indexPrice OR @indexPrice@1s @@ -30,11 +52,14 @@ def index_price(self, pair: str, id: int, speed: int, callback, **kwargs): Update Speed: 3000ms OR 1000ms """ - self.live_subscribe( - "{}@indexPrice@{}s".format(pair.lower(), speed), id, callback, **kwargs - ) + if speed == 1: + stream_name = "{}@indexPrice@{}s".format(pair.lower(), speed) + else: + stream_name = "{}@indexPrice".format(pair.lower()) - def mark_price(self, symbol: str, id: int, speed: int, callback, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def mark_price(self, symbol: str, speed=1, id=None, action=None, **kwargs): """Mark Price Streams Stream Name: @markPrice OR @markPrice@1s @@ -43,11 +68,14 @@ def mark_price(self, symbol: str, id: int, speed: int, callback, **kwargs): Update Speed: 3000ms OR 1000ms """ - self.live_subscribe( - "{}@markPrice@{}s".format(symbol.lower(), speed), id, callback, **kwargs - ) + if speed == 1: + stream_name = "{}@markPrice@{}s".format(symbol.lower(), speed) + else: + stream_name = "{}@markPrice".format(symbol.lower()) + + self.send_message_to_server(stream_name, action=action, id=id) - def pair_mark_price(self, pair: str, id: int, speed: int, callback, **kwargs): + def pair_mark_price(self, pair: str, speed=1, id=None, action=None, **kwargs): """Mark Price of All Symbols of a Pair Stream Name: @markPrice OR @markPrice@1s @@ -56,11 +84,14 @@ def pair_mark_price(self, pair: str, id: int, speed: int, callback, **kwargs): Update Speed: 3000ms OR 1000ms """ - self.live_subscribe( - "{}@markPrice@{}s".format(pair.lower(), speed), id, callback, **kwargs - ) + if speed == 1: + stream_name = "{}@markPrice@{}s".format(pair.lower(), speed) + else: + stream_name = "{}@markPrice".format(pair.lower()) + + self.send_message_to_server(stream_name, action=action, id=id) - def kline(self, symbol: str, id: int, interval: str, callback, **kwargs): + def kline(self, symbol: str, interval: str, id=None, action=None, **kwargs): """Kline/Candlestick Streams The Kline/Candlestick Stream push updates to the current klines/candlestick every 250 milliseconds (if existing) @@ -90,13 +121,18 @@ def kline(self, symbol: str, id: int, interval: str, callback, **kwargs): Update Speed: 250ms """ + stream_name = "{}@kline_{}".format(symbol.lower(), interval) - self.live_subscribe( - "{}@kline_{}".format(symbol.lower(), interval), id, callback, **kwargs - ) + self.send_message_to_server(stream_name, action=action, id=id) def continuous_kline( - self, pair: str, id: int, contractType: str, interval: str, callback, **kwargs + self, + pair: str, + contractType: str, + interval: str, + id=None, + action=None, + **kwargs ): """Continuous Kline/Candlestick Streams @@ -127,15 +163,13 @@ def continuous_kline( Update Speed: 250ms """ - - self.live_subscribe( - "{}_{}@continuousKline_{}".format(pair.lower(), contractType, interval), - id, - callback, - **kwargs + stream_name = "{}_{}@continuousKline_{}".format( + pair.lower(), contractType, interval ) - def index_kline(self, pair: str, id: int, interval: str, callback, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def index_kline(self, pair: str, interval: str, id=None, action=None, **kwargs): """Kline/Candlestick chart intervals Streams Stream Name: @indexPriceKline_ @@ -163,15 +197,11 @@ def index_kline(self, pair: str, id: int, interval: str, callback, **kwargs): Update Speed: 250ms """ + stream_name = "{}@indexPriceKline_{}".format(pair.lower(), interval) - self.live_subscribe( - "{}@indexPriceKline_{}".format(pair.lower(), interval), - id, - callback, - **kwargs - ) + self.send_message_to_server(stream_name, action=action, id=id) - def mark_kline(self, symbol: str, id: int, interval: str, callback, **kwargs): + def mark_kline(self, symbol: str, interval: str, id=None, action=None, **kwargs): """Kline/Candlestick chart intervals Streams Stream Name: @markPriceKline_ @@ -199,15 +229,11 @@ def mark_kline(self, symbol: str, id: int, interval: str, callback, **kwargs): Update Speed: 250ms """ + stream_name = "{}@markPriceKline_{}".format(symbol.lower(), interval) - self.live_subscribe( - "{}@markPriceKline_{}".format(symbol.lower(), interval), - id, - callback, - **kwargs - ) + self.send_message_to_server(stream_name, action=action, id=id) - def mini_ticker(self, id: int, callback, symbol=None, **kwargs): + def mini_ticker(self, symbol=None, id=None, action=None, **kwargs): """Individual symbol or all symbols mini ticker 24hr rolling window mini-ticker statistics. @@ -223,13 +249,13 @@ def mini_ticker(self, id: int, callback, symbol=None, **kwargs): """ if symbol is None: - self.live_subscribe("!miniTicker@arr", id, callback, **kwargs) + stream_name = "!miniTicker@arr" else: - self.live_subscribe( - "{}@miniTicker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@miniTicker".format(symbol.lower()) + + self.send_message_to_server(stream_name, action=action, id=id) - def ticker(self, id: int, callback, symbol=None, **kwargs): + def ticker(self, symbol=None, id=None, action=None, **kwargs): """Individual symbol or all symbols ticker 24hr rolling window ticker statistics for a single symbol. @@ -245,13 +271,12 @@ def ticker(self, id: int, callback, symbol=None, **kwargs): """ if symbol is None: - self.live_subscribe("!ticker@arr", id, callback, **kwargs) + stream_name = "!ticker@arr" else: - self.live_subscribe( - "{}@ticker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@ticker".format(symbol.lower()) + self.send_message_to_server(stream_name, action=action, id=id) - def book_ticker(self, id: int, callback, symbol=None, **kwargs): + def book_ticker(self, symbol, id=None, action=None, **kwargs): """Individual symbol or all book ticker Pushes any update to the best bid or ask's price or quantity in real-time for a specified symbol. @@ -264,36 +289,29 @@ def book_ticker(self, id: int, callback, symbol=None, **kwargs): Update Speed: Real-time """ - if symbol is None: - self.live_subscribe("!bookTicker", id, callback, **kwargs) + stream_name = "!bookTicker" else: - self.live_subscribe( - "{}@bookTicker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@bookTicker".format(symbol.lower()) + self.send_message_to_server(stream_name, action=action, id=id) - def liquidation_order(self, id: int, callback, symbol=None, **kwargs): - """The Liquidation Order Snapshot Streams push force liquidation order information for specific symbol. - The All Liquidation Order Snapshot Streams push force liquidation order information for all symbols in the market. - - For each symbol,only the latest one liquidation order within 1000ms will be pushed as the snapshot. If no liquidation happens in the interval of 1000ms, no stream will be pushed. + def diff_book_depth(self, symbol: str, speed=100, id=None, action=None, **kwargs): + """Diff. Depth Stream + Order book price and quantity depth updates used to locally manage an order book. - Stream Name: @forceOrder or !forceOrder@arr + Stream Name: @depth OR @depth@500ms OR@depth@100ms - https://binance-docs.github.io/apidocs/delivery/en/#liquidation-order-streams - https://binance-docs.github.io/apidocs/delivery/en/#all-market-liquidation-order-streams + https://binance-docs.github.io/apidocs/delivery/en/#diff-book-depth-streams - Update Speed: 1000ms + Update Speed: 250ms, 500ms or 100ms """ - if symbol is None: - self.live_subscribe("!forceOrder@arr", id, callback, **kwargs) - else: - self.live_subscribe( - "{}@forceOrder".format(symbol.lower()), id, callback, **kwargs - ) + + self.send_message_to_server( + "{}@depth@{}ms".format(symbol.lower(), speed), action=action, id=id + ) def partial_book_depth( - self, symbol: str, id: int, level, speed, callback, **kwargs + self, symbol: str, level=5, speed=500, id=None, action=None, **kwargs ): """Partial Book Depth Streams @@ -305,29 +323,29 @@ def partial_book_depth( Update Speed: 250ms, 500ms or 100ms """ - - self.live_subscribe( - "{}@depth{}@{}ms".format(symbol.lower(), level, speed), - id, - callback, - **kwargs + self.send_message_to_server( + "{}@depth{}@{}ms".format(symbol.lower(), level, speed), id=id, action=action ) - def diff_book_depth(self, symbol: str, id: int, speed, callback, **kwargs): - """Diff. Depth Stream - Order book price and quantity depth updates used to locally manage an order book. + def liquidation_order(self, symbol: str, id=None, action=None, **kwargs): + """The Liquidation Order Snapshot Streams push force liquidation order information for specific symbol. + The All Liquidation Order Snapshot Streams push force liquidation order information for all symbols in the market. - Stream Name: @depth OR @depth@500ms OR@depth@100ms + For each symbol,only the latest one liquidation order within 1000ms will be pushed as the snapshot. If no liquidation happens in the interval of 1000ms, no stream will be pushed. - https://binance-docs.github.io/apidocs/delivery/en/#diff-book-depth-streams + Stream Name: @forceOrder or !forceOrder@arr - Update Speed: 250ms, 500ms or 100ms - """ + https://binance-docs.github.io/apidocs/delivery/en/#liquidation-order-streams + https://binance-docs.github.io/apidocs/delivery/en/#all-market-liquidation-order-streams - self.live_subscribe( - "{}@depth@{}ms".format(symbol.lower(), speed), id, callback, **kwargs - ) + Update Speed: 1000ms + """ + if symbol is None: + stream_name = "!forceOrder@arr" + else: + stream_name = "{}@forceOrder".format(symbol.lower()) + self.send_message_to_server(stream_name, id=id, action=action) - def user_data(self, listen_key: str, id: int, callback, **kwargs): - """listen to user data by provided listenkey""" - self.live_subscribe(listen_key, id, callback, **kwargs) + def user_data(self, listen_key: str, id=None, action=None, **kwargs): + """Listen to user data by using the provided listen_key""" + self.send_message_to_server(listen_key, action=action, id=id) diff --git a/binance/websocket/um_futures/websocket_client.py b/binance/websocket/um_futures/websocket_client.py index f7ab79e..6174765 100644 --- a/binance/websocket/um_futures/websocket_client.py +++ b/binance/websocket/um_futures/websocket_client.py @@ -2,10 +2,32 @@ class UMFuturesWebsocketClient(BinanceWebsocketClient): - def __init__(self, stream_url="wss://fstream.binance.com"): - super().__init__(stream_url) + def __init__( + self, + stream_url="wss://fstream.binance.com", + on_message=None, + on_open=None, + on_close=None, + on_error=None, + on_ping=None, + on_pong=None, + is_combined=False, + ): + if is_combined: + stream_url = stream_url + "/stream" + else: + stream_url = stream_url + "/ws" + super().__init__( + stream_url, + on_message=on_message, + on_open=on_open, + on_close=on_close, + on_error=on_error, + on_ping=on_ping, + on_pong=on_pong, + ) - def agg_trade(self, symbol: str, id: int, callback, **kwargs): + def agg_trade(self, symbol: str, id=None, action=None, **kwargs): """Aggregate Trade Streams The Aggregate Trade Streams push market trade information that is aggregated for a single taker order every 100 milliseconds. @@ -17,11 +39,11 @@ def agg_trade(self, symbol: str, id: int, callback, **kwargs): Update Speed: 100ms """ - self.live_subscribe( - "{}@aggTrade".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@aggTrade".format(symbol.lower()) - def mark_price(self, symbol: str, id: int, speed: int, callback, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def mark_price(self, symbol: str, speed: int, id=None, action=None, **kwargs): """Mark Price Streams Mark price and funding rate for all symbols pushed every 3 seconds or every second. @@ -32,11 +54,11 @@ def mark_price(self, symbol: str, id: int, speed: int, callback, **kwargs): Update Speed: 3000ms or 1000ms """ - self.live_subscribe( - "{}@markPrice@{}s".format(symbol.lower(), speed), id, callback, **kwargs - ) + stream_name = "{}@markPrice@{}s".format(symbol.lower(), speed) + + self.send_message_to_server(stream_name, action=action, id=id) - def kline(self, symbol: str, id: int, interval: str, callback, **kwargs): + def kline(self, symbol: str, interval: str, id=None, action=None, **kwargs): """Kline/Candlestick Streams The Kline/Candlestick Stream push updates to the current klines/candlestick every 250 milliseconds (if existing) @@ -66,13 +88,18 @@ def kline(self, symbol: str, id: int, interval: str, callback, **kwargs): Update Speed: 250ms """ + stream_name = "{}@kline_{}".format(symbol.lower(), interval) - self.live_subscribe( - "{}@kline_{}".format(symbol.lower(), interval), id, callback, **kwargs - ) + self.send_message_to_server(stream_name, action=action, id=id) def continuous_kline( - self, pair: str, id: int, contractType: str, interval: str, callback, **kwargs + self, + pair: str, + contractType: str, + interval: str, + id=None, + action=None, + **kwargs ): """Continuous Kline/Candlestick Streams @@ -103,15 +130,13 @@ def continuous_kline( Update Speed: 250ms """ - - self.live_subscribe( - "{}_{}@continuousKline_{}".format(pair.lower(), contractType, interval), - id, - callback, - **kwargs + stream_name = "{}_{}@continuousKline_{}".format( + pair.lower(), contractType, interval ) - def mini_ticker(self, id: int, callback, symbol=None, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def mini_ticker(self, symbol=None, id=None, action=None, **kwargs): """Individual symbol or all symbols mini ticker 24hr rolling window mini-ticker statistics. @@ -127,13 +152,13 @@ def mini_ticker(self, id: int, callback, symbol=None, **kwargs): """ if symbol is None: - self.live_subscribe("!miniTicker@arr", id, callback, **kwargs) + stream_name = "!miniTicker@arr" else: - self.live_subscribe( - "{}@miniTicker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@miniTicker".format(symbol.lower()) - def ticker(self, id: int, callback, symbol=None, **kwargs): + self.send_message_to_server(stream_name, action=action, id=id) + + def ticker(self, symbol=None, id=None, action=None, **kwargs): """Individual symbol or all symbols ticker 24hr rolling window ticker statistics for a single symbol. @@ -149,13 +174,12 @@ def ticker(self, id: int, callback, symbol=None, **kwargs): """ if symbol is None: - self.live_subscribe("!ticker@arr", id, callback, **kwargs) + stream_name = "!ticker@arr" else: - self.live_subscribe( - "{}@ticker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@ticker".format(symbol.lower()) + self.send_message_to_server(stream_name, action=action, id=id) - def book_ticker(self, id: int, callback, symbol=None, **kwargs): + def book_ticker(self, symbol, id=None, action=None, **kwargs): """Individual symbol or all book ticker Pushes any update to the best bid or ask's price or quantity in real-time for a specified symbol. @@ -168,37 +192,29 @@ def book_ticker(self, id: int, callback, symbol=None, **kwargs): Update Speed: Real-time """ - if symbol is None: - self.live_subscribe("!bookTicker", id, callback, **kwargs) + stream_name = "!bookTicker" else: - self.live_subscribe( - "{}@bookTicker".format(symbol.lower()), id, callback, **kwargs - ) + stream_name = "{}@bookTicker".format(symbol.lower()) + self.send_message_to_server(stream_name, action=action, id=id) - def liquidation_order(self, id: int, callback, symbol=None, **kwargs): - """The Liquidation Order Snapshot Streams push force liquidation order information for specific symbol. - The All Liquidation Order Snapshot Streams push force liquidation order information for all symbols in the market. - - For each symbol,only the latest one liquidation order within 1000ms will be pushed as the snapshot. If no liquidation happens in the interval of 1000ms, no stream will be pushed. + def diff_book_depth(self, symbol: str, speed=100, id=None, action=None, **kwargs): + """Diff. Depth Stream + Order book price and quantity depth updates used to locally manage an order book. - Stream Name: @forceOrder or - Stream Name: !forceOrder@arr + Stream Name: @depth OR @depth@500ms OR@depth@100ms - https://binance-docs.github.io/apidocs/futures/en/#liquidation-order-streams - https://binance-docs.github.io/apidocs/futures/en/#all-market-liquidation-order-streams + https://binance-docs.github.io/apidocs/futures/en/#diff-book-depth-streams - Update Speed: 1000ms + Update Speed: 250ms, 500ms or 100ms """ - if symbol is None: - self.live_subscribe("!forceOrder@arr", id, callback, **kwargs) - else: - self.live_subscribe( - "{}@forceOrder".format(symbol.lower()), id, callback, **kwargs - ) + + self.send_message_to_server( + "{}@depth@{}ms".format(symbol.lower(), speed), action=action, id=id + ) def partial_book_depth( - self, symbol: str, id: int, level, speed, callback, **kwargs + self, symbol: str, level=5, speed=500, id=None, action=None, **kwargs ): """Partial Book Depth Streams @@ -210,30 +226,31 @@ def partial_book_depth( Update Speed: 250ms, 500ms or 100ms """ - - self.live_subscribe( - "{}@depth{}@{}ms".format(symbol.lower(), level, speed), - id, - callback, - **kwargs + self.send_message_to_server( + "{}@depth{}@{}ms".format(symbol.lower(), level, speed), id=id, action=action ) - def diff_book_depth(self, symbol: str, id: int, speed, callback, **kwargs): - """Diff. Depth Stream - Order book price and quantity depth updates used to locally manage an order book. + def liquidation_order(self, symbol: str, id=None, action=None, **kwargs): + """The Liquidation Order Snapshot Streams push force liquidation order information for specific symbol. + The All Liquidation Order Snapshot Streams push force liquidation order information for all symbols in the market. - Stream Name: @depth OR @depth@500ms OR@depth@100ms + For each symbol,only the latest one liquidation order within 1000ms will be pushed as the snapshot. If no liquidation happens in the interval of 1000ms, no stream will be pushed. - https://binance-docs.github.io/apidocs/futures/en/#diff-book-depth-streams + Stream Name: @forceOrder or + Stream Name: !forceOrder@arr - Update Speed: 250ms, 500ms or 100ms - """ + https://binance-docs.github.io/apidocs/futures/en/#liquidation-order-streams + https://binance-docs.github.io/apidocs/futures/en/#all-market-liquidation-order-streams - self.live_subscribe( - "{}@depth@{}ms".format(symbol.lower(), speed), id, callback, **kwargs - ) + Update Speed: 1000ms + """ + if symbol is None: + stream_name = "!forceOrder@arr" + else: + stream_name = "{}@forceOrder".format(symbol.lower()) + self.send_message_to_server(stream_name, id=id, action=action) - def composite_index(self, symbol: str, id: int, callback, **kwargs): + def composite_index(self, symbol: str, id=None, action=None, **kwargs): """Composite Index Info Stream Composite index information for index symbols pushed every second. @@ -244,10 +261,10 @@ def composite_index(self, symbol: str, id: int, callback, **kwargs): Update Speed: 1000ms """ - self.live_subscribe( - "{}@compositeIndex".format(symbol.lower()), id, callback, **kwargs + self.send_message_to_server( + "{}@compositeIndex".format(symbol.lower()), id=id, action=action ) - def user_data(self, listen_key: str, id: int, callback, **kwargs): - """listen to user data by provided listenkey""" - self.live_subscribe(listen_key, id, callback, **kwargs) + def user_data(self, listen_key: str, id=None, action=None, **kwargs): + """Listen to user data by using the provided listen_key""" + self.send_message_to_server(listen_key, action=action, id=id) diff --git a/binance/websocket/websocket_client.py b/binance/websocket/websocket_client.py index e153e9a..187090f 100644 --- a/binance/websocket/websocket_client.py +++ b/binance/websocket/websocket_client.py @@ -1,17 +1,63 @@ import json -from twisted.internet import reactor +import logging +from binance.lib.utils import get_timestamp from binance.websocket.binance_socket_manager import BinanceSocketManager -class BinanceWebsocketClient(BinanceSocketManager): - def __init__(self, stream_url): - super().__init__(stream_url) +class BinanceWebsocketClient: + ACTION_SUBSCRIBE = "SUBSCRIBE" + ACTION_UNSUBSCRIBE = "UNSUBSCRIBE" - def stop(self): - try: - self.close() - finally: - reactor.stop() + def __init__( + self, + stream_url, + on_message=None, + on_open=None, + on_close=None, + on_error=None, + on_ping=None, + on_pong=None, + logger=None, + ): + if not logger: + logger = logging.getLogger(__name__) + self.logger = logger + self.socket_manager = self._initialize_socket( + stream_url, + on_message, + on_open, + on_close, + on_error, + on_ping, + on_pong, + logger, + ) + + # start the thread + self.socket_manager.start() + self.logger.debug("Binance WebSocket Client started.") + + def _initialize_socket( + self, + stream_url, + on_message, + on_open, + on_close, + on_error, + on_ping, + on_pong, + logger, + ): + return BinanceSocketManager( + stream_url, + on_message=on_message, + on_open=on_open, + on_close=on_close, + on_error=on_error, + on_ping=on_ping, + on_pong=on_pong, + logger=logger, + ) def _single_stream(self, stream): if isinstance(stream, str): @@ -21,50 +67,50 @@ def _single_stream(self, stream): else: raise ValueError("Invalid stream name, expect string or array") - def live_subscribe(self, stream, id, callback, **kwargs): - """live subscribe websocket - Connect to the server - - UM Futures: wss://fstream.binance.com/ws - - UM Futures testnet: wss://stream.binancefuture.com/ws - - CM Futures: wss://dstream.binance.com/ws - - CM Futures testnet: wss://dstream.binancefuture.com/ws + def send(self, message: dict): + self.socket_manager.send_message(json.dumps(message)) - and sending the subscribe message, e.g. + def send_message_to_server(self, message, action=None, id=None): + if not id: + id = get_timestamp() - {"method": "SUBSCRIBE","params":["btcusdt@miniTicker"],"id": 100} + if action != self.ACTION_UNSUBSCRIBE: + return self.subscribe(message, id=id) + return self.unsubscribe(message, id=id) - """ - combined = False + def subscribe(self, stream, id=None): + if not id: + id = get_timestamp() if self._single_stream(stream): stream = [stream] - else: - combined = True + json_msg = json.dumps({"method": "SUBSCRIBE", "params": stream, "id": id}) + self.socket_manager.send_message(json_msg) - data = {"method": "SUBSCRIBE", "params": stream, "id": id} + def unsubscribe(self, stream, id=None): + if not id: + id = get_timestamp() + if self._single_stream(stream): + stream = [stream] + json_msg = json.dumps({"method": "UNSUBSCRIBE", "params": stream, "id": id}) + self.socket_manager.send_message(json_msg) - data.update(**kwargs) - payload = json.dumps(data, ensure_ascii=False).encode("utf8") - stream_name = "-".join(stream) - return self._start_socket( - stream_name, payload, callback, is_combined=combined, is_live=True - ) + def ping(self): + self.logger.debug("Sending ping to Binance WebSocket Server") + self.socket_manager.ping() - def instant_subscribe(self, stream, callback, **kwargs): - """Instant subscribe, e.g. - wss://fstream.binance.com/ws/btcusdt@bookTicker - wss://fstream.binance.com/stream?streams=btcusdt@bookTicker/bnbusdt@bookTicker + def stop(self, id=None): + self.socket_manager.close() + self.socket_manager.join() + + def list_subscribe(self, id=None): + """sending the list subscription message, e.g. + + {"method": "LIST_SUBSCRIPTIONS","id": 100} """ - combined = False - if not self._single_stream(stream): - combined = True - stream = "/".join(stream) - - data = {"method": "SUBSCRIBE", "params": stream} - - data.update(**kwargs) - payload = json.dumps(data, ensure_ascii=False).encode("utf8") - stream_name = "-".join(stream) - return self._start_socket( - stream_name, payload, callback, is_combined=combined, is_live=False + + if not id: + id = get_timestamp() + self.socket_manager.send_message( + json.dumps({"method": "LIST_SUBSCRIPTIONS", "id": id}) ) diff --git a/examples/websocket/cm_futures/agg_trade.py b/examples/websocket/cm_futures/agg_trade.py index 3f797bd..c779ac9 100644 --- a/examples/websocket/cm_futures/agg_trade.py +++ b/examples/websocket/cm_futures/agg_trade.py @@ -7,12 +7,11 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.agg_trade( symbol="btcusd_perp", diff --git a/examples/websocket/cm_futures/book_ticker.py b/examples/websocket/cm_futures/book_ticker.py index 09cf4c9..de76bd8 100644 --- a/examples/websocket/cm_futures/book_ticker.py +++ b/examples/websocket/cm_futures/book_ticker.py @@ -7,12 +7,11 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.book_ticker( id=13, diff --git a/examples/websocket/cm_futures/combined_streams.py b/examples/websocket/cm_futures/combined_streams.py new file mode 100644 index 0000000..8808c4e --- /dev/null +++ b/examples/websocket/cm_futures/combined_streams.py @@ -0,0 +1,22 @@ +import logging +import time + +from binance.lib.utils import config_logging +from binance.websocket.cm_futures.websocket_client import CMFuturesWebsocketClient + +config_logging(logging, logging.DEBUG) + + +def message_handler(_, message): + logging.info(message) + + +my_client = CMFuturesWebsocketClient(on_message=message_handler, is_combined=True) + + +my_client.subscribe( + stream=["btcusd_perp@ticker", "btcusd_perp@markPrice@1s"], +) + +time.sleep(10) +my_client.stop() diff --git a/examples/websocket/cm_futures/continuous_klines.py b/examples/websocket/cm_futures/continuous_klines.py index fd30037..e0f24fe 100644 --- a/examples/websocket/cm_futures/continuous_klines.py +++ b/examples/websocket/cm_futures/continuous_klines.py @@ -8,19 +8,17 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.continuous_kline( pair="btcusd", id=1, - contractType="PERPETUAL", + contractType="perpetual", interval="1m", - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/diff_book_depth.py b/examples/websocket/cm_futures/diff_book_depth.py index a64f3d0..283342a 100644 --- a/examples/websocket/cm_futures/diff_book_depth.py +++ b/examples/websocket/cm_futures/diff_book_depth.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.diff_book_depth( symbol="bnbusd_perp", speed=100, id=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/index_kline.py b/examples/websocket/cm_futures/index_kline.py index b8b2ad7..5ea10c1 100644 --- a/examples/websocket/cm_futures/index_kline.py +++ b/examples/websocket/cm_futures/index_kline.py @@ -8,12 +8,11 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.index_kline( pair="btcusd", diff --git a/examples/websocket/cm_futures/index_price.py b/examples/websocket/cm_futures/index_price.py index 135939a..c34610f 100644 --- a/examples/websocket/cm_futures/index_price.py +++ b/examples/websocket/cm_futures/index_price.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.index_price( pair="btcusd", speed=1, id=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/kline.py b/examples/websocket/cm_futures/kline.py index c9f29ad..c172ce2 100644 --- a/examples/websocket/cm_futures/kline.py +++ b/examples/websocket/cm_futures/kline.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.kline( symbol="btcusd_perp", id=12, interval="1m", - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/liquidation_order.py b/examples/websocket/cm_futures/liquidation_order.py index 9766f2e..fb43837 100644 --- a/examples/websocket/cm_futures/liquidation_order.py +++ b/examples/websocket/cm_futures/liquidation_order.py @@ -8,17 +8,15 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.liquidation_order( symbol="btcusd_perp", id=13, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/mark_kline.py b/examples/websocket/cm_futures/mark_kline.py index cb81184..62df120 100644 --- a/examples/websocket/cm_futures/mark_kline.py +++ b/examples/websocket/cm_futures/mark_kline.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.mark_kline( symbol="btcusd_perp", id=1, interval="1m", - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/mark_price.py b/examples/websocket/cm_futures/mark_price.py index dfdb90c..6c79b12 100644 --- a/examples/websocket/cm_futures/mark_price.py +++ b/examples/websocket/cm_futures/mark_price.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.mark_price( symbol="btcusd_perp", id=1, speed=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/mini_ticker.py b/examples/websocket/cm_futures/mini_ticker.py index adf8374..259a179 100644 --- a/examples/websocket/cm_futures/mini_ticker.py +++ b/examples/websocket/cm_futures/mini_ticker.py @@ -8,14 +8,13 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) -my_client.mini_ticker(id=1, callback=message_handler, symbol="btcusd_perp") +my_client.mini_ticker(id=1, symbol="btcusd_perp") time.sleep(10) diff --git a/examples/websocket/cm_futures/pair_mark_price.py b/examples/websocket/cm_futures/pair_mark_price.py index afeb01c..1f097b6 100644 --- a/examples/websocket/cm_futures/pair_mark_price.py +++ b/examples/websocket/cm_futures/pair_mark_price.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.pair_mark_price( pair="btcusd", id=1, speed=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/cm_futures/partial_book_depth.py b/examples/websocket/cm_futures/partial_book_depth.py index 7d5b0b0..69b8ae7 100644 --- a/examples/websocket/cm_futures/partial_book_depth.py +++ b/examples/websocket/cm_futures/partial_book_depth.py @@ -8,19 +8,17 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.partial_book_depth( symbol="bnbusd_perp", id=1, level=10, speed=100, - callback=message_handler, ) time.sleep(2) diff --git a/examples/websocket/cm_futures/ticker.py b/examples/websocket/cm_futures/ticker.py index 80d8c20..439c13f 100644 --- a/examples/websocket/cm_futures/ticker.py +++ b/examples/websocket/cm_futures/ticker.py @@ -8,16 +8,14 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = CMFuturesWebsocketClient() -my_client.start() +my_client = CMFuturesWebsocketClient(on_message=message_handler) my_client.ticker( id=13, - callback=message_handler, symbol="btcusd_perp", ) diff --git a/examples/websocket/cm_futures/user_data.py b/examples/websocket/cm_futures/user_data.py index 4aebf12..de76cd6 100644 --- a/examples/websocket/cm_futures/user_data.py +++ b/examples/websocket/cm_futures/user_data.py @@ -9,7 +9,7 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) @@ -19,13 +19,11 @@ def message_handler(message): logging.info("Listen key : {}".format(response["listenKey"])) -ws_client = CMFuturesWebsocketClient() -ws_client.start() +ws_client = CMFuturesWebsocketClient(on_message=message_handler) ws_client.user_data( listen_key=response["listenKey"], id=1, - callback=message_handler, ) time.sleep(30) diff --git a/examples/websocket/um_futures/agg_trade.py b/examples/websocket/um_futures/agg_trade.py index 70d518f..4e186a6 100644 --- a/examples/websocket/um_futures/agg_trade.py +++ b/examples/websocket/um_futures/agg_trade.py @@ -1,33 +1,30 @@ #!/usr/bin/env python -import time import logging +import time from binance.lib.utils import config_logging from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient config_logging(logging, logging.DEBUG) -def message_handler(message): - print(message) +def message_handler(_, message): + logging.info(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler, is_combined=True) -my_client.agg_trade( - symbol="btcusdt", - id=1, - callback=message_handler, -) +# Subscribe to a single symbol stream +my_client.agg_trade(symbol="bnbusdt") -time.sleep(10) +time.sleep(5) +# Unsubscribe my_client.agg_trade( - symbol="ethusdt", - id=1, - callback=message_handler, + symbol="bnbusdt", action=UMFuturesWebsocketClient.ACTION_UNSUBSCRIBE ) -logging.debug("closing ws connection") +time.sleep(5) + +logging.info("closing ws connection") my_client.stop() diff --git a/examples/websocket/um_futures/book_ticker.py b/examples/websocket/um_futures/book_ticker.py index e454424..838dc2a 100644 --- a/examples/websocket/um_futures/book_ticker.py +++ b/examples/websocket/um_futures/book_ticker.py @@ -8,20 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) -my_client.book_ticker( - id=13, - callback=message_handler, - symbol="btcusdt", -) -time.sleep(2) +my_client.book_ticker(symbol="btcusdt") + +time.sleep(10) logging.debug("closing ws connection") my_client.stop() diff --git a/examples/websocket/um_futures/combined_streams.py b/examples/websocket/um_futures/combined_streams.py new file mode 100644 index 0000000..fc2e881 --- /dev/null +++ b/examples/websocket/um_futures/combined_streams.py @@ -0,0 +1,22 @@ +import logging +import time + +from binance.lib.utils import config_logging +from binance.websocket.um_futures.websocket_client import UMFuturesWebsocketClient + +config_logging(logging, logging.DEBUG) + + +def message_handler(_, message): + logging.info(message) + + +my_client = UMFuturesWebsocketClient(on_message=message_handler, is_combined=True) + + +my_client.subscribe( + stream=["bnbusdt@bookTicker", "ethusdt@bookTicker"], +) + +time.sleep(10) +my_client.stop() diff --git a/examples/websocket/um_futures/composite_index.py b/examples/websocket/um_futures/composite_index.py index 6b0e142..7e96503 100644 --- a/examples/websocket/um_futures/composite_index.py +++ b/examples/websocket/um_futures/composite_index.py @@ -8,17 +8,15 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.composite_index( symbol="DEFIUSDT", id=13, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/continuous_klines.py b/examples/websocket/um_futures/continuous_klines.py index 688abbf..87a51bb 100644 --- a/examples/websocket/um_futures/continuous_klines.py +++ b/examples/websocket/um_futures/continuous_klines.py @@ -8,19 +8,17 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.continuous_kline( pair="btcusdt", id=1, contractType="perpetual", interval="1d", - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/diff_book_depth.py b/examples/websocket/um_futures/diff_book_depth.py index 400dded..85e54a7 100644 --- a/examples/websocket/um_futures/diff_book_depth.py +++ b/examples/websocket/um_futures/diff_book_depth.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.diff_book_depth( symbol="bnbusdt", speed=100, id=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/kline.py b/examples/websocket/um_futures/kline.py index f2a3945..7f2ab6d 100644 --- a/examples/websocket/um_futures/kline.py +++ b/examples/websocket/um_futures/kline.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.kline( symbol="btcusdt", id=12, interval="1d", - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/liquidation_order.py b/examples/websocket/um_futures/liquidation_order.py index 17f9776..67e63d9 100644 --- a/examples/websocket/um_futures/liquidation_order.py +++ b/examples/websocket/um_futures/liquidation_order.py @@ -8,16 +8,14 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.liquidation_order( id=13, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/mark_price.py b/examples/websocket/um_futures/mark_price.py index 13620f1..7344c7b 100644 --- a/examples/websocket/um_futures/mark_price.py +++ b/examples/websocket/um_futures/mark_price.py @@ -8,18 +8,16 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.mark_price( symbol="btcusdt", id=13, speed=1, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/mini_ticker.py b/examples/websocket/um_futures/mini_ticker.py index f8949f5..d4b3283 100644 --- a/examples/websocket/um_futures/mini_ticker.py +++ b/examples/websocket/um_futures/mini_ticker.py @@ -8,14 +8,13 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) -my_client.mini_ticker(id=1, callback=message_handler, symbol="btcusdt") +my_client.mini_ticker(id=1, symbol="btcusdt") time.sleep(10) diff --git a/examples/websocket/um_futures/partial_book_depth.py b/examples/websocket/um_futures/partial_book_depth.py index 4177b2c..d116851 100644 --- a/examples/websocket/um_futures/partial_book_depth.py +++ b/examples/websocket/um_futures/partial_book_depth.py @@ -12,15 +12,13 @@ def message_handler(message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.partial_book_depth( symbol="bnbusdt", id=1, level=10, speed=100, - callback=message_handler, ) time.sleep(10) diff --git a/examples/websocket/um_futures/symbol_kline.py b/examples/websocket/um_futures/symbol_kline.py index 3cababe..2e00655 100644 --- a/examples/websocket/um_futures/symbol_kline.py +++ b/examples/websocket/um_futures/symbol_kline.py @@ -8,14 +8,13 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) -my_client.kline(symbol="btcusdt", id=1, interval="1m", callback=message_handler) +my_client.kline(symbol="btcusdt", id=1, interval="1m") time.sleep(5) diff --git a/examples/websocket/um_futures/ticker.py b/examples/websocket/um_futures/ticker.py index 19dca97..47afb70 100644 --- a/examples/websocket/um_futures/ticker.py +++ b/examples/websocket/um_futures/ticker.py @@ -8,16 +8,14 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) -my_client = UMFuturesWebsocketClient() -my_client.start() +my_client = UMFuturesWebsocketClient(on_message=message_handler) my_client.ticker( id=13, - callback=message_handler, symbol="btcusdt", ) diff --git a/examples/websocket/um_futures/user_data.py b/examples/websocket/um_futures/user_data.py index 19d62b2..9f348f1 100644 --- a/examples/websocket/um_futures/user_data.py +++ b/examples/websocket/um_futures/user_data.py @@ -9,7 +9,7 @@ config_logging(logging, logging.DEBUG) -def message_handler(message): +def message_handler(_, message): print(message) @@ -19,13 +19,11 @@ def message_handler(message): logging.info("Listen key : {}".format(response["listenKey"])) -ws_client = UMFuturesWebsocketClient() -ws_client.start() +ws_client = UMFuturesWebsocketClient(on_message=message_handler) ws_client.user_data( listen_key=response["listenKey"], id=1, - callback=message_handler, ) time.sleep(30) diff --git a/requirements/common.txt b/requirements/common.txt index 57396c8..06cdc6b 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -1,6 +1,3 @@ -autobahn>=21.2.1 -Twisted>=22.2.0 requests>=2.25.1 -pyOpenSSL>=19.0.0 -service-identity>=21.1.0 +websocket-client>=1.5.0 pycryptodome>=3.15.0 \ No newline at end of file