Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: [DRGO-1410]: ramin/refactor_ws_connection_management #351

Merged
merged 25 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ece847c
Upgrade connectivity_plus version and minor version in reconnecting l…
ramin-deriv Nov 5, 2024
79c23b8
Minor changes
ramin-deriv Nov 6, 2024
88d0aa7
create separate timer when connecting to send early ping request
ramin-deriv Nov 7, 2024
9dbe3b1
revert an unnecessary change
ramin-deriv Nov 7, 2024
6306cdd
remove listening to connectivity changes
ramin-deriv Nov 7, 2024
b219a66
remove keep alive ping timer, and use websocket channel ping interval…
ramin-deriv Nov 7, 2024
8070fab
add ExponentialBackoffTimer for connection ping
ramin-deriv Nov 7, 2024
b4ffe9e
code cleanup :recycle:
ramin-deriv Nov 7, 2024
33a862e
remove redundant code
ramin-deriv Nov 7, 2024
f8df99a
add tests
ramin-deriv Nov 12, 2024
610809e
remove unnecessary change
ramin-deriv Nov 12, 2024
7c2ca67
remove print logs
ramin-deriv Nov 12, 2024
81e4c42
change keep alive ping inteval to 10 sec
ramin-deriv Nov 12, 2024
6f0b30f
chore: define timer interface
ramin-deriv Nov 13, 2024
5809114
chore: minor code cleanup :recycle:
ramin-deriv Nov 13, 2024
fcc5a3d
chore: code cleanup :recycle:
ramin-deriv Nov 13, 2024
b397420
add jitter and maxAttempt number to the back-off timer
ramin-deriv Nov 13, 2024
cf8d1b7
reset _attemptCount on stop method
ramin-deriv Nov 13, 2024
112b26e
refactor: call reset on start function
ramin-deriv Nov 14, 2024
e312337
add timeout option to BinaryAPI and ConnectionCubit
ramin-deriv Nov 14, 2024
ad31823
chore: create ConnectionConfig and receive in BinaryAPI
ramin-deriv Nov 18, 2024
77e6a08
change the ping interval back to 5 sec
ramin-deriv Nov 19, 2024
2f3a958
feat: keep network listener only for emitting disconnect state
ramin-deriv Dec 2, 2024
0134670
Update pubspec.yaml
ramin-deriv Dec 4, 2024
1d3321a
Update CHANGELOG.md
ramin-deriv Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions lib/services/connection/api_manager/binary_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import 'dart:developer' as dev;
import 'dart:io';

import 'package:flutter/widgets.dart';
import 'package:flutter_deriv_api/api/response/ping_response_result.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/timer/exponential_back_off_timer.dart';
import 'package:flutter_system_proxy/flutter_system_proxy.dart';
import 'package:web_socket_channel/io.dart';

Expand All @@ -20,6 +22,9 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_history.
import 'package:flutter_deriv_api/services/connection/call_manager/call_manager.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'timer/connection_timer.dart';

/// This class is for handling Binary API connection and calling Binary APIs.
class BinaryAPI extends BaseAPI {
Expand All @@ -28,14 +33,36 @@ class BinaryAPI extends BaseAPI {
String? key,
bool enableDebug = false,
this.proxyAwareConnection = false,
}) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug);
ConnectionTimer? connectionTimer,
this.callTimeout,
}) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug) {
_connectionTimer = connectionTimer ??
ExponentialBackoffTimer(
initialInterval: const Duration(milliseconds: 50),
maxInterval: const Duration(seconds: 5),
onDoAction: _ping,
maxAttempts: 10,
);
}

static const Duration _disconnectTimeOut = Duration(seconds: 5);
static const Duration _websocketConnectTimeOut = Duration(seconds: 10);
static const Duration _keepAlivePingInterval = Duration(seconds: 10);

/// A flag to indicate if the connection is proxy aware.
final bool proxyAwareConnection;

/// The timeout duration for the API calls that are request/response model
/// and are not subscription. The return type of these calls are [Future].
///
/// If this duration is set, and the [call] method takes more than this
/// duration to complete, it will throw a [TimeoutException].
///
/// Since these are calls from a remote server and because of lack of
/// connection or some other reason their future may never complete. This can
/// cause the caller of the methods to wait indefinitely. To prevent this, we
/// set a timeout duration for these calls.
final Duration? callTimeout;

/// Represents the active websocket connection.
///
/// This is used to send and receive data from the websocket server.
Expand All @@ -56,6 +83,21 @@ class BinaryAPI extends BaseAPI {
/// Gets API subscription history.
CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory;

/// A timer to schedule sending ping requests right after the WebSocket is
/// ready to receive the first response from the server. This helps ensure
/// that the connection is established.
///
/// Ideally, we would use the [WebSocketChannel.ready] future to determine
/// if the connection is ready to send and receive messages. However, it
/// doesn't always work as expected. In testing, we noticed that even after
/// the `ready` future completes, we often don't receive a response from the
/// server.
///
/// Until we find a better solution to make [WebSocketChannel.ready] more
/// reliable, we rely on the incoming stream to wait for and receive the first
/// `pong` response, which confirms that the connection is established.
late final ConnectionTimer _connectionTimer;

@override
Future<void> connect(
ConnectionInformation? connectionInformation, {
Expand Down Expand Up @@ -95,14 +137,16 @@ class BinaryAPI extends BaseAPI {

// Initialize connection to websocket server.
_webSocketChannel = IOWebSocketChannel.connect('$uri',
pingInterval: _websocketConnectTimeOut, customClient: client);
pingInterval: _keepAlivePingInterval, customClient: client);

unawaited(_webSocketChannel?.ready.then((_) => _startConnectionTimer()));

_webSocketListener = _webSocketChannel?.stream
.map<Map<String, dynamic>?>((Object? result) => jsonDecode('$result'))
.listen(
(Map<String, dynamic>? message) {
onOpen?.call(key);

_stopConnectionTimer();
if (message != null) {
_handleResponse(message, printResponse: printResponse);
}
Expand Down Expand Up @@ -145,11 +189,16 @@ class BinaryAPI extends BaseAPI {
required Request request,
List<String> nullableKeys = const <String>[],
}) async {
final Response response = await (_callManager ??= CallManager(this))(
final Future<Response> responseFuture =
(_callManager ??= CallManager(this))(
request: request,
nullableKeys: nullableKeys,
);

final Response response = await (callTimeout != null
? responseFuture.timeout(callTimeout!)
: responseFuture);

if (response is T) {
return response as T;
}
Expand Down Expand Up @@ -252,6 +301,20 @@ class BinaryAPI extends BaseAPI {
}
}

void _startConnectionTimer() {
if (!_connectionTimer.isActive) {
_connectionTimer.start();
}
}

Future<void> _stopConnectionTimer() async {
if (_connectionTimer.isActive) {
_connectionTimer.stop();
}
}

void _ping() => PingResponse.pingMethod();

void _logDebugInfo(String message, {Object? error}) {
if (enableDebug) {
dev.log('$runtimeType $key $message', error: error);
Expand Down
30 changes: 30 additions & 0 deletions lib/services/connection/api_manager/timer/connection_timer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'dart:ui';

/// A timer calls the provided [onDoAction] callback every time the
/// interval elapses, allowing you to perform the action without needing to
/// manually handle the timing logic.
abstract class ConnectionTimer {
/// Creates a [ConnectionTimer] with a specified [interval], and an
/// [onDoAction] callback.
ConnectionTimer({
required this.onDoAction,
required this.interval,
});

/// The initial interval duration between consecutive actions.
final Duration interval;

/// The callback function to be executed at each timer interval.
final VoidCallback onDoAction;

/// Returns `true` if the timer is active, otherwise `false`.
bool get isActive;

/// Starts the timer.
///
/// The [onDoAction] callback will be triggered based on the [interval].
void start();

/// Stops the timer.
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import 'dart:async';

import 'connection_timer.dart';

/// A timer utility that implements a simple exponential back-off strategy for
/// recurring actions and increases the interval between actions until a
/// maximum interval duration is reached.
///
/// This class is useful for scenarios where you need send a request to a server
/// and you don't want to flood the server with too many frequent requests.
///
/// The timer will call the provided [onDoAction] callback every time the
/// interval elapses, allowing you to perform the action without needing to
/// manually handle the timing logic.
class ExponentialBackoffTimer extends ConnectionTimer {
/// Creates an [ExponentialBackoffTimer] with a specified
/// [initialInterval], [maxInterval], and an [onDoAction] callback.
///
/// - [initialInterval]: The starting interval between consecutive actions.
/// - [maxInterval]: The upper limit for the interval. Once reached,
/// the interval will no longer increase.
/// - [onDoAction]: The callback function that will be executed at each
/// interval.
/// - [multiplier]: Factor by which the interval increases after each attempt.
/// Default is `2.0`.
/// - [jitter]: If `true`, adds random jitter (±15%) to the interval to reduce
/// server load spikes. Default is `true`.
/// - [maxAttempts]: Optional limit on the number of attempts; stops the timer
/// if exceeded.
/// Example usage:
/// ```dart
/// final timer = ExponentialBackoffTimer(
/// initialInterval: Duration(milliseconds: 500),
/// maxInterval: Duration(seconds: 10),
/// onDoAction: () {
/// print('Ping request sent');
/// },
/// );
/// timer.start();
/// ```
ExponentialBackoffTimer({
required this.initialInterval,
required this.maxInterval,
required super.onDoAction,
this.multiplier = 2.0,
this.jitter = true,
this.maxAttempts,
}) : _currentInterval = initialInterval,
super(interval: initialInterval);

/// The initial interval duration between consecutive actions.
final Duration initialInterval;

/// The maximum interval duration for the exponential back-off.
final Duration maxInterval;

/// The factor by which the interval increases after each attempt.
final double multiplier;

/// If `true`, adds random jitter (±15%) to the interval to reduce server
final bool jitter;

/// Optional limit on the number of attempts; stops the timer
/// if exceeded.
final int? maxAttempts;

int _attemptCount = 0;

Timer? _timer;
Duration _currentInterval;

/// Returns `true` if the timer is active, otherwise `false`.
@override
bool get isActive => _timer?.isActive ?? false;

/// Starts the exponential back-off timer.
///
/// The [onDoAction] callback will be triggered at the initial interval,
/// and the interval will double after each execution up to [maxInterval].
@override
void start() {
_reset();
_setupTimer();
}

/// Stops the timer and resets the interval to the initial interval.
@override
void stop() => _reset();

void _reset() {
_attemptCount = 0;
_timer?.cancel();
_currentInterval = initialInterval;
}

void _restartTimer() {
_timer?.cancel();
_setupTimer();
}

void _setupTimer() {
if (maxAttempts != null && _attemptCount >= maxAttempts!) {
return;
}

_timer = Timer(_currentInterval, () {
_attemptCount++;
onDoAction();
_increaseInterval();
_restartTimer();
});
}

/// Increases the current interval for the next action [multiplier] times,
/// up to the maximum allowed interval.
void _increaseInterval() {
final int newInterval =
(_currentInterval.inMilliseconds * multiplier).round();
final int clampedInterval = newInterval.clamp(
initialInterval.inMilliseconds,
maxInterval.inMilliseconds,
);

if (jitter) {
// Add random jitter of ±15% to prevent thundering herd problem
final int jitterRange = (clampedInterval * 0.15).round();
final int random =
DateTime.now().millisecondsSinceEpoch % (jitterRange * 2) -
jitterRange;
_currentInterval = Duration(milliseconds: clampedInterval + random);
} else {
_currentInterval = Duration(milliseconds: clampedInterval);
}
}
}
Loading
Loading