Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: [DRGO-1410]: ramin/refactor_ws_connection_management #351

Merged
merged 25 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ece847c
Upgrade connectivity_plus version and minor version in reconnecting l…
ramin-deriv Nov 5, 2024
79c23b8
Minor changes
ramin-deriv Nov 6, 2024
88d0aa7
create separate timer when connecting to send early ping request
ramin-deriv Nov 7, 2024
9dbe3b1
revert an unnecessary change
ramin-deriv Nov 7, 2024
6306cdd
remove listening to connectivity changes
ramin-deriv Nov 7, 2024
b219a66
remove keep alive ping timer, and use websocket channel ping interval…
ramin-deriv Nov 7, 2024
8070fab
add ExponentialBackoffTimer for connection ping
ramin-deriv Nov 7, 2024
b4ffe9e
code cleanup :recycle:
ramin-deriv Nov 7, 2024
33a862e
remove redundant code
ramin-deriv Nov 7, 2024
f8df99a
add tests
ramin-deriv Nov 12, 2024
610809e
remove unnecessary change
ramin-deriv Nov 12, 2024
7c2ca67
remove print logs
ramin-deriv Nov 12, 2024
81e4c42
change keep alive ping inteval to 10 sec
ramin-deriv Nov 12, 2024
6f0b30f
chore: define timer interface
ramin-deriv Nov 13, 2024
5809114
chore: minor code cleanup :recycle:
ramin-deriv Nov 13, 2024
fcc5a3d
chore: code cleanup :recycle:
ramin-deriv Nov 13, 2024
b397420
add jitter and maxAttempt number to the back-off timer
ramin-deriv Nov 13, 2024
cf8d1b7
reset _attemptCount on stop method
ramin-deriv Nov 13, 2024
112b26e
refactor: call reset on start function
ramin-deriv Nov 14, 2024
e312337
add timeout option to BinaryAPI and ConnectionCubit
ramin-deriv Nov 14, 2024
ad31823
chore: create ConnectionConfig and receive in BinaryAPI
ramin-deriv Nov 18, 2024
77e6a08
change the ping interval back to 5 sec
ramin-deriv Nov 19, 2024
2f3a958
feat: keep network listener only for emitting disconnect state
ramin-deriv Dec 2, 2024
0134670
Update pubspec.yaml
ramin-deriv Dec 4, 2024
1d3321a
Update CHANGELOG.md
ramin-deriv Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions lib/services/connection/api_manager/binary_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import 'dart:developer' as dev;
import 'dart:io';

import 'package:flutter/widgets.dart';
import 'package:flutter_deriv_api/api/response/ping_response_result.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/exponential_back_off_timer.dart';
import 'package:flutter_system_proxy/flutter_system_proxy.dart';
import 'package:web_socket_channel/io.dart';

Expand All @@ -20,6 +22,7 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_history.
import 'package:flutter_deriv_api/services/connection/call_manager/call_manager.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

/// This class is for handling Binary API connection and calling Binary APIs.
class BinaryAPI extends BaseAPI {
Expand All @@ -31,7 +34,7 @@ class BinaryAPI extends BaseAPI {
}) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug);

static const Duration _disconnectTimeOut = Duration(seconds: 5);
static const Duration _websocketConnectTimeOut = Duration(seconds: 10);
static const Duration _websocketConnectTimeOut = Duration(seconds: 5);
ramin-deriv marked this conversation as resolved.
Show resolved Hide resolved

/// A flag to indicate if the connection is proxy aware.
final bool proxyAwareConnection;
Expand All @@ -56,6 +59,25 @@ class BinaryAPI extends BaseAPI {
/// Gets API subscription history.
CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory;

/// A timer to schedule sending ping requests right after the WebSocket is
/// ready to receive the first response from the server. This helps ensure
/// that the connection is established.
///
/// Ideally, we would use the [WebSocketChannel.ready] future to determine
/// if the connection is ready to send and receive messages. However, it
/// doesn't always work as expected. In testing, we noticed that even after
/// the `ready` future completes, we often don't receive a response from the
/// server.
///
/// Until we find a better solution to make [WebSocketChannel.ready] more
/// reliable, we rely on the incoming stream to wait for and receive the first
/// `pong` response, which confirms that the connection is established.
late final ExponentialBackoffTimer _connectionTimer = ExponentialBackoffTimer(
initialInterval: const Duration(milliseconds: 50),
maxInterval: const Duration(seconds: 5),
onDoAction: _ping,
);

@override
Future<void> connect(
ConnectionInformation? connectionInformation, {
Expand Down Expand Up @@ -97,12 +119,17 @@ class BinaryAPI extends BaseAPI {
_webSocketChannel = IOWebSocketChannel.connect('$uri',
pingInterval: _websocketConnectTimeOut, customClient: client);

unawaited(_webSocketChannel?.ready.then((_) {
print('#### Timer started ${DateTime.now()}');
_startConnectionTimer();
}));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safer to set this callback before calling IOWebSocketChannel.connect

_webSocketListener = _webSocketChannel?.stream
.map<Map<String, dynamic>?>((Object? result) => jsonDecode('$result'))
.listen(
(Map<String, dynamic>? message) {
onOpen?.call(key);

_stopConnectionTimer();
if (message != null) {
_handleResponse(message, printResponse: printResponse);
}
Expand Down Expand Up @@ -252,6 +279,24 @@ class BinaryAPI extends BaseAPI {
}
}

void _startConnectionTimer() {
if (!_connectionTimer.isActive) {
_connectionTimer.start();
}
}

Future<void> _stopConnectionTimer() async {
await Future<void>.delayed(const Duration(seconds: 5));
ramin-deriv marked this conversation as resolved.
Show resolved Hide resolved
if (_connectionTimer.isActive) {
_connectionTimer.stop();
}
}

void _ping() {
print('#### Sending ping ${DateTime.now()}');
PingResponse.pingMethod();
}

void _logDebugInfo(String message, {Object? error}) {
if (enableDebug) {
dev.log('$runtimeType $key $message', error: error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import 'dart:async';
import 'dart:ui';

/// A timer utility that implements an exponential back-off strategy for
/// recurring actions.
///
/// This class is useful for scenarios where you need send a request to a server
/// and you don't want to flood the server with too many frequent requests.
///
/// The timer will call the provided [onDoAction] callback every time the
/// interval elapses, allowing you to perform the action without needing to
/// manually handle the timing logic.
class ExponentialBackoffTimer {
/// Creates an [ExponentialBackoffTimer] with a specified [initialInterval],
/// [maxInterval], and an [onDoAction] callback.
///
/// - [initialInterval]: The starting interval between consecutive actions.
/// - [maxInterval]: The upper limit for the interval. Once reached,
/// the interval will no longer increase.
/// - [onDoAction]: The callback function that will be executed at each
/// interval.
/// Example usage:
/// ```dart
/// final timer = ExponentialBackoffTimer(
/// initialInterval: Duration(milliseconds: 500),
/// maxInterval: Duration(seconds: 10),
/// onDoAction: () {
/// print('Ping request sent');
/// },
/// );
/// timer.start();
/// ```
ExponentialBackoffTimer({
required this.initialInterval,
required this.maxInterval,
required this.onDoAction,
}) : _currentInterval = initialInterval;

/// The initial interval duration between consecutive actions.
final Duration initialInterval;

/// The maximum interval duration for the exponential back-off.
final Duration maxInterval;

/// The callback function to be executed at each timer interval.
final VoidCallback onDoAction;

Timer? _timer;
Duration _currentInterval;

/// Returns `true` if the timer is active, otherwise `false`.
bool get isActive => _timer?.isActive ?? false;

/// Starts the exponential back-off timer.
///
/// The [onDoAction] callback will be triggered at the initial interval,
/// and the interval will double after each execution up to [maxInterval].
void start() => _setupTimer();

/// Stops the timer and resets the interval to the initial interval.
void stop() {
_timer?.cancel();
_currentInterval = initialInterval;
}

void _restartTimer() {
_timer?.cancel();
_setupTimer();
}

void _setupTimer() {
_timer = Timer(_currentInterval, () {
onDoAction();
_increaseInterval();
_restartTimer();
});
}

/// Doubles the current interval for the next action, up to the maximum
/// allowed interval.
void _increaseInterval() {
_currentInterval = Duration(
milliseconds: (_currentInterval.inMilliseconds * 2).clamp(
initialInterval.inMilliseconds,
maxInterval.inMilliseconds,
),
);
}
}
59 changes: 0 additions & 59 deletions lib/state/connection/connection_cubit.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import 'dart:async';
import 'dart:developer' as dev;

import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:equatable/equatable.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';

import 'package:flutter_deriv_api/api/api_initializer.dart';
import 'package:flutter_deriv_api/api/response/ping_response_result.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/connection_information.dart';
Expand Down Expand Up @@ -40,8 +38,6 @@ class ConnectionCubit extends Cubit<ConnectionState> {
_api = Injector()<BaseAPI>();

_connect(_connectionInformation);

_startKeepAliveTimer();
}

final String _key = '${UniqueKey()}';
Expand All @@ -64,10 +60,6 @@ class ConnectionCubit extends Cubit<ConnectionState> {
// In some devices like Samsung J6 or Huawei Y7, the call manager doesn't response to the ping call less than 5 sec.
final Duration _pingTimeout = const Duration(seconds: 5);

final Duration _connectivityCheckInterval = const Duration(seconds: 5);

Timer? _connectivityTimer;

static late ConnectionInformation _connectionInformation;

/// Gets connection information of WebSocket (endpoint, brand, appId).
Expand All @@ -83,9 +75,6 @@ class ConnectionCubit extends Cubit<ConnectionState> {
/// Gets app id of websocket.
static String get appId => _connectionInformation.appId;

/// Stream subscription for connectivity.
StreamSubscription<ConnectivityResult>? connectivitySubscription;

/// Getter for [BaseAPI] implementation class. By default, it will be [BinaryAPI].
BaseAPI get api => _api;

Expand Down Expand Up @@ -140,53 +129,5 @@ class ConnectionCubit extends Cubit<ConnectionState> {
}
},
);

if (_api is BinaryAPI) {
_setupConnectivityListener();
}
}

void _setupConnectivityListener() {
connectivitySubscription ??= Connectivity().onConnectivityChanged.listen(
(ConnectivityResult status) async {
final bool isConnectedToNetwork = status == ConnectivityResult.mobile ||
status == ConnectivityResult.wifi;

if (isConnectedToNetwork) {
final bool isConnected = await _ping();

if (!isConnected) {
await reconnect();
}
} else if (status == ConnectivityResult.none) {
emit(const ConnectionDisconnectedState());
}
},
);
}

void _startKeepAliveTimer() {
if (_connectivityTimer == null || !_connectivityTimer!.isActive) {
_connectivityTimer =
Timer.periodic(_connectivityCheckInterval, (Timer timer) => _ping());
}
}

Future<bool> _ping() async {
try {
final PingResponse response =
await PingResponse.pingMethod().timeout(_pingTimeout);
return response.ping == PingEnum.pong;
} on Exception catch (_) {
return false;
}
}

@override
Future<void> close() {
_connectivityTimer?.cancel();
connectivitySubscription?.cancel();
connectivitySubscription = null;
return super.close();
}
}
3 changes: 2 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ dependencies:
url: https://github.com/BrowserStackCE/flutter_system_proxy.git
ref: v0.1.5
package_info_plus: ^8.0.2
connectivity_plus: ^5.0.2
web_socket_channel: ^3.0.1
device_info_plus: ^10.1.2
win32: ^5.8.0

dev_dependencies:
bloc_test: ^9.1.1
Expand All @@ -42,3 +42,4 @@ dev_dependencies:
json_schema: ^5.2.0
path: ^1.8.0
test: ^1.19.5
fake_async: ^1.3.2
ramin-deriv marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading