Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After 3 days websocket disconnected (Failed to read, error: IO error: Connection reset by peer (os error 104)) and did not automatically reconnect after #59

Open
panicfarm opened this issue Jan 13, 2023 · 16 comments

Comments

@panicfarm
Copy link

panicfarm commented Jan 13, 2023

I left the basic README websocket example running for a few days, and then ECONNRESET 104 Connection reset by peer occurred and websocket disconnected. After that ping/pongs were failing but there seems to be no auto-reconnection mechanism. I reproduced this several times.
I think it should reconnect on ERROR crypto_ws_client::common::connect_async] Failed to read, error: IO error: Connection reset by peer (os error 104) and also on failed ping sends.
The log at the moment of the disconnect 2023-01-13T12:44:15Z follows:

[2023-01-13T12:44:09Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613849587,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c152190f2fb20001bd63ba\",\"price\":\"16.569\",\"sequence\":\"1184114405824513\",\"side\":\"buy\",\"size\":\"5.4773\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521917f8760001e12e2b\",\"time\":\"1673613849515000000\",\"tradeId\":\"1184114405824513\",\"type\":\"match\"}}"}
[2023-01-13T12:44:10Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613850830,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c15219d686c500014277ae\",\"price\":\"16.57\",\"sequence\":\"1184114486564865\",\"side\":\"buy\",\"size\":\"6.5612\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521af109f00001e76310\",\"time\":\"1673613850760000000\",\"tradeId\":\"1184114486564865\",\"type\":\"match\"}}"}
[2023-01-13T12:44:12Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631244, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:12Z DEBUG crypto_ws_client::common::ws_client_internal] Received {"id":"crypto-ws-client","type":"pong"} from kucoin, reset num_unanswered_ping to 0
[2023-01-13T12:44:15Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613855208,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c1521efe6ad80001d821ae\",\"price\":\"16.569\",\"sequence\":\"1184114833381377\",\"side\":\"buy\",\"size\":\"2.1725\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521fbdf44300018a2cdc\",\"time\":\"1673613855040000000\",\"tradeId\":\"1184114833381377\",\"type\":\"match\"}}"}
[2023-01-13T12:44:15Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613855209,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c1521efe6ad80001d821ae\",\"price\":\"16.569\",\"sequence\":\"1184114833643521\",\"side\":\"buy\",\"size\":\"0.6579\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521f6a048f000144ef4c\",\"time\":\"1673613855040000000\",\"tradeId\":\"1184114833643521\",\"type\":\"match\"}}"}
[2023-01-13T12:44:15Z ERROR crypto_ws_client::common::connect_async] Failed to read, error: IO error: Connection reset by peer (os error 104)
[2023-01-13T12:44:15Z DEBUG tokio_tungstenite] websocket start_send error: IO error: Broken pipe (os error 32)
[2023-01-13T12:44:15Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631247, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:15Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:44:43Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631275, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:43Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:44:46Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631278, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:46Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:14Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631306, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:14Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:17Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631309, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:17Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:45Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631337, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:45Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:48Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631340, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}                                                                :
[2023-01-13T12:45:48Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
@panicfarm panicfarm changed the title After 3 days websocket disconnected (Failed to read, error: IO error: Connection reset by peer (os error 104)) and does not automatically reconnect after After 3 days websocket disconnected (Failed to read, error: IO error: Connection reset by peer (os error 104)) and did not automatically reconnect after Jan 13, 2023
@soulmachine
Copy link
Collaborator

soulmachine commented Jan 14, 2023

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197.

I'll think about a solution about how to re-connect.

@panicfarm
Copy link
Author

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197.

I'll think about a solution about how to re-connect.

I think it's imperative for any solid crypto data recording or trading code to be continuously and reliably runnable. Therefore a fast autoreconnect solution (with the smallest possible gap in the recorded data) is necessary.

@soulmachine
Copy link
Collaborator

Agreed, in theory it is possible to support auto-reconnect as long as we remember all subscribed symbols.

@panicfarm
Copy link
Author

panicfarm commented Jan 18, 2023

If you could outline how the reconnect should occur, as well as where you think the best place is to store the subscribed symbols, I could probably submit a PR in a bit.

The original read error occurred in

error!("Failed to read, error: {}", err);
. Then there was a subsequent independent error (because the socket was already disconnected) in tokio-tungstenite on https://github.com/snapview/tokio-tungstenite/blob/87d2f7eb09a538c0a0ee77bd92e032e362118f72/src/lib.rs#L337 (that is an implementation of the futures_util::sink::Sink trait ) . The docs of start_send() say "In most cases, if the sink encounters an error, the sink will permanently be unable to receive items." and thus we got subsequent repeat errors on pings sending. This actually tells me that after
error!("Failed to read, error: {}", err);
it broke out of the loop in and thus exited connect_async()
connect_async_internal(ws_stream, uplink_limit).await
without error, because the error was never caught there it seems.

Perhaps we could catch there error on exit from connect_async_internal() (it appears that it actually returns an Ok() variant after breaking out of the receive loop, or there

let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
where it's assigned to _ and initiate a reconnect?

PS. you can simulate kucoin closing websocket connection (sending RST, or ECONNRESET 104) by tcpkill host ws-api-spot.kucoin.com and port 443 while the process is running

@foonsun
Copy link

foonsun commented Jan 27, 2023

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197.
I'll think about a solution about how to re-connect.

I think it's imperative for any solid crypto data recording or trading code to be continuously and reliably runnable. Therefore a fast autoreconnect solution (with the smallest possible gap in the recorded data) is necessary.

yes. autoreconnect solution is better than pm2 restart.

@panicfarm
Copy link
Author

@soulmachine should I work on a PR or will you add this functionality?

@soulmachine
Copy link
Collaborator

@panicfarm I would appreciate if you could create a PR. I think you need to modify line 197 and 221 in ws_client_internal.rs

Currently the two lines just crash for simplicity, you'll need to modify the two lines to support auto reconnect

@panicfarm
Copy link
Author

panicfarm commented Feb 2, 2023

@soulmachine When it disconnects due to ECONNRESET 104 (presumably when an exchange restarts its server), the error occurs on 104 in connect_async.rs and it returns from connect_async here without an error and without a message to be handled in the while loop here. Therefore it never gets to neither 197 nor 221 in ws_client_internal.rs. I think it should return an Error from connect_async here and handle the reconnect on matching that Error variant. Do you agree?

@somefact
Copy link

somefact commented Feb 3, 2023

But without modifiy lines 197 and 221 in ws_client_internal.rs it will not reconnect on websocket Closed (very often case, especially on binance perp) or some kind of Reconnect message ..

@panicfarm
Copy link
Author

panicfarm commented Feb 3, 2023

@somefact yes, there should be the same reconnection logic in all these places, although i have only encountered three disconnects on 104 in connect_async.rs after running it for about 10.days total with Kucoin

@soulmachine
Copy link
Collaborator

@panicfarm You've made very good point, I think you can start to implement the re-connection logic inside connect_async_internal()

Feel free to send a PR.

@panicfarm
Copy link
Author

Feel free to send a PR.

@soulmachine
I have looked at the code in detail and would appreciate any feedback before I make changes.

When the exchange reboots its server, the disconnect happens here. However, by this time you have already returned from connect_async_internal because you drop the handle to tokio::task::spawn and therefore the program returns from ws_client.send(&commands).await and you are now in

.

  1. I could hold on to the send task handle
let send_task_handle = tokio::task::spawn(async move {
    loop {
        tokio::select! {
          command = command_rx.recv() => {
            match command {
              Some(command) => {
                match command {
...

and pass it into run(), and then when the error occurs, error out in run and enclose the entire send();run(); block in each exchange (unfortunately) into a reconnect loop, something like

   loop {
        let ws_client = KuCoinSpotWSClient::new(tx, None).await;
        match ws_client.send(&commands).await {
            Error => continue,
            Ok(send_task_handle) => { 
                  match ws_client.run(send_task_handle).await {
                  Error => continue;
            }
        }
  }
  ws_client.close().await;
  1. Alternatively, I could somehow cram the reconnect logic into
    async fn connect_async_internal<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
    but I am not sure if it's a good idea, because by this time you are already in the run() function, except that the task that you spun in send() function was also running separately.

@soulmachine
Copy link
Collaborator

soulmachine commented Feb 17, 2023

@panicfarm The first way looks ugly because you make run() return something weird, which looks scary to users, run should always return void.

The second way looks better, handling the re-connect in connect_async_internal() so that it is invisible to users.

@panicfarm
Copy link
Author

panicfarm commented Feb 17, 2023

@soulmachine I don't think I can completely confine it inside connect_async_internal(ws_stream :WebSocketStream, ...), because after that disconnect ws_stream that is passed into the function is no longer connected? At the minimum I have to do it in 'connect_async()'.

But what about the other cases of disconnect, mentioned in this issue? They happen inside of run(). Would be nice to unify them all.

@soulmachine
Copy link
Collaborator

@panicfarm Yes, let's encapsulate re-connection logic inside connect_async() instead of connect_async_internal(). For line 197, we can check msg inside connect_async_internal(), if msg is a Message::Close, then we should re-connect. As to line 221, let's ignore it for now

@hittori-hanzo
Copy link

hey fellas did this fyix ever get done?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants