From 53225280f65cc18bfdae7002ccdacf51f466f7ad Mon Sep 17 00:00:00 2001 From: ramin-deriv <55975218+ramin-deriv@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:39:17 +0800 Subject: [PATCH] refactor: [DRGO-1410]: refactor_ws_connection_management (#351) - Reduce the time it takes to establish the WS connection --- CHANGELOG.md | 4 + example/lib/sample_app.dart | 14 +- .../connection/api_manager/base_api.dart | 1 - .../connection/api_manager/binary_api.dart | 83 +++++++++-- .../api_manager/connection_config.dart | 39 +++++ .../api_manager/timer/connection_timer.dart | 30 ++++ .../timer/exponential_back_off_timer.dart | 135 ++++++++++++++++++ lib/state/connection/connection_cubit.dart | 72 ++-------- pubspec.yaml | 3 +- .../exponential_back_off_timer_test.dart | 101 +++++++++++++ 10 files changed, 398 insertions(+), 84 deletions(-) create mode 100644 lib/services/connection/api_manager/connection_config.dart create mode 100644 lib/services/connection/api_manager/timer/connection_timer.dart create mode 100644 lib/services/connection/api_manager/timer/exponential_back_off_timer.dart create mode 100644 test/services/connection/api_manager/timer/exponential_back_off_timer_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index e3363ff726..3bef0bd0f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 1.4.0 + +* Improve the connection management strategy to reduce the time it takes to establish the WebSocket connection. + ## 1.0.0 * First release of the package. diff --git a/example/lib/sample_app.dart b/example/lib/sample_app.dart index 4955fc8ddf..d697ad07be 100644 --- a/example/lib/sample_app.dart +++ b/example/lib/sample_app.dart @@ -18,14 +18,12 @@ class _SampleAppState extends State { void initState() { super.initState(); - _connectionCubit = conn.ConnectionCubit( - ConnectionInformation( - appId: '36544', - brand: 'deriv', - endpoint: 'ws.derivws.com', - authEndpoint: '', - ), - proxyAwareConnection: false); + _connectionCubit = conn.ConnectionCubit(const ConnectionInformation( + appId: '36544', + brand: 'deriv', + endpoint: 'ws.derivws.com', + authEndpoint: '', + )); } @override diff --git a/lib/services/connection/api_manager/base_api.dart b/lib/services/connection/api_manager/base_api.dart index e20df3928a..02d80ba5a9 100644 --- a/lib/services/connection/api_manager/base_api.dart +++ b/lib/services/connection/api_manager/base_api.dart @@ -26,7 +26,6 @@ abstract class BaseAPI { ConnectionCallback? onDone, ConnectionCallback? onOpen, ConnectionCallback? onError, - bool printResponse, }); /// Adds request to stream channel. diff --git a/lib/services/connection/api_manager/binary_api.dart b/lib/services/connection/api_manager/binary_api.dart index 64a7041c70..0aef7e0bcc 100644 --- a/lib/services/connection/api_manager/binary_api.dart +++ b/lib/services/connection/api_manager/binary_api.dart @@ -4,6 +4,9 @@ import 'dart:developer' as dev; import 'dart:io'; import 'package:flutter/widgets.dart'; +import 'package:flutter_deriv_api/api/response/ping_response_result.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/connection_config.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/timer/exponential_back_off_timer.dart'; import 'package:flutter_system_proxy/flutter_system_proxy.dart'; import 'package:web_socket_channel/io.dart'; @@ -20,27 +23,44 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_history. import 'package:flutter_deriv_api/services/connection/call_manager/call_manager.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart'; import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +import 'timer/connection_timer.dart'; /// This class is for handling Binary API connection and calling Binary APIs. class BinaryAPI extends BaseAPI { /// Initializes [BinaryAPI] instance. BinaryAPI({ String? key, - bool enableDebug = false, - this.proxyAwareConnection = false, - }) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug); + ConnectionTimer? connectionTimer, + this.connectionConfig = const ConnectionConfig(), + }) : super( + key: key ?? '${UniqueKey()}', + enableDebug: connectionConfig.enableDebug, + ) { + _connectionTimer = connectionTimer ?? + ExponentialBackoffTimer( + initialInterval: const Duration(milliseconds: 50), + maxInterval: const Duration(seconds: 5), + onDoAction: _ping, + maxAttempts: 10, + ); + } static const Duration _disconnectTimeOut = Duration(seconds: 5); - static const Duration _websocketConnectTimeOut = Duration(seconds: 10); - /// A flag to indicate if the connection is proxy aware. - final bool proxyAwareConnection; + // Instead of the 5-sec ping timer which we had in ConnectionCubit that is + // removed, we are using the 5-sec ping interval for the WebSocketChannel. + static const Duration _keepAlivePingInterval = Duration(seconds: 5); /// Represents the active websocket connection. /// /// This is used to send and receive data from the websocket server. IOWebSocketChannel? _webSocketChannel; + /// Connection configuration. + final ConnectionConfig connectionConfig; + /// Stream subscription to API data. StreamSubscription?>? _webSocketListener; @@ -56,13 +76,27 @@ class BinaryAPI extends BaseAPI { /// Gets API subscription history. CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory; + /// A timer to schedule sending ping requests right after the WebSocket is + /// ready to receive the first response from the server. This helps ensure + /// that the connection is established. + /// + /// Ideally, we would use the [WebSocketChannel.ready] future to determine + /// if the connection is ready to send and receive messages. However, it + /// doesn't always work as expected. In testing, we noticed that even after + /// the `ready` future completes, we often don't receive a response from the + /// server. + /// + /// Until we find a better solution to make [WebSocketChannel.ready] more + /// reliable, we rely on the incoming stream to wait for and receive the first + /// `pong` response, which confirms that the connection is established. + late final ConnectionTimer _connectionTimer; + @override Future connect( ConnectionInformation? connectionInformation, { ConnectionCallback? onOpen, ConnectionCallback? onDone, ConnectionCallback? onError, - bool printResponse = false, }) async { _resetCallManagers(); @@ -84,7 +118,7 @@ class BinaryAPI extends BaseAPI { HttpClient? client; - if (proxyAwareConnection) { + if (connectionConfig.proxyAwareConnection) { final String proxy = await FlutterSystemProxy.findProxyFromEnvironment( uri.toString().replaceAll('wss', 'https')); @@ -95,16 +129,22 @@ class BinaryAPI extends BaseAPI { // Initialize connection to websocket server. _webSocketChannel = IOWebSocketChannel.connect('$uri', - pingInterval: _websocketConnectTimeOut, customClient: client); + pingInterval: _keepAlivePingInterval, customClient: client); + + unawaited(_webSocketChannel?.ready.then((_) => _startConnectionTimer())); _webSocketListener = _webSocketChannel?.stream .map?>((Object? result) => jsonDecode('$result')) .listen( (Map? message) { onOpen?.call(key); - + _stopConnectionTimer(); if (message != null) { - _handleResponse(message, printResponse: printResponse); + _handleResponse( + message, + printResponse: + connectionConfig.enableDebug && connectionConfig.printResponse, + ); } }, onDone: () async { @@ -145,11 +185,16 @@ class BinaryAPI extends BaseAPI { required Request request, List nullableKeys = const [], }) async { - final Response response = await (_callManager ??= CallManager(this))( + final Future responseFuture = + (_callManager ??= CallManager(this))( request: request, nullableKeys: nullableKeys, ); + final Response response = await (connectionConfig.callTimeout != null + ? responseFuture.timeout(connectionConfig.callTimeout!) + : responseFuture); + if (response is T) { return response as T; } @@ -252,6 +297,20 @@ class BinaryAPI extends BaseAPI { } } + void _startConnectionTimer() { + if (!_connectionTimer.isActive) { + _connectionTimer.start(); + } + } + + Future _stopConnectionTimer() async { + if (_connectionTimer.isActive) { + _connectionTimer.stop(); + } + } + + void _ping() => PingResponse.pingMethod(); + void _logDebugInfo(String message, {Object? error}) { if (enableDebug) { dev.log('$runtimeType $key $message', error: error); diff --git a/lib/services/connection/api_manager/connection_config.dart b/lib/services/connection/api_manager/connection_config.dart new file mode 100644 index 0000000000..7616c0c343 --- /dev/null +++ b/lib/services/connection/api_manager/connection_config.dart @@ -0,0 +1,39 @@ +import 'package:flutter_deriv_api/state/connection/connection_cubit.dart'; + +/// [ConnectionCubit] configuration. +class ConnectionConfig { + /// Initializes [ConnectionConfig]. + const ConnectionConfig({ + this.enableDebug = false, + this.printResponse = false, + this.proxyAwareConnection = false, + this.callTimeout, + }); + + /// Enables debug mode. + /// + /// Default value is `false`. + final bool enableDebug; + + /// Prints API response to console, only works if [enableDebug] is `true`. + /// + /// Default value is `false`. + final bool printResponse; + + /// A flag to indicate if the connection is proxy aware. + final bool proxyAwareConnection; + + /// The timeout duration for the API calls that are request/response model + /// and are not subscription. The return type of these calls are [Future]. + /// + /// If this duration is set, and the [call] method takes more than this + /// duration to complete, it will throw a [TimeoutException]. + /// + /// Since these are calls from a remote server and because of lack of + /// connection or some other reason their future may never complete. This can + /// cause the caller of the methods to wait indefinitely. To prevent this, we + /// set a timeout duration for these calls. + /// + /// Default is `null` which means no timeout is considered. + final Duration? callTimeout; +} diff --git a/lib/services/connection/api_manager/timer/connection_timer.dart b/lib/services/connection/api_manager/timer/connection_timer.dart new file mode 100644 index 0000000000..48746853ff --- /dev/null +++ b/lib/services/connection/api_manager/timer/connection_timer.dart @@ -0,0 +1,30 @@ +import 'dart:ui'; + +/// A timer calls the provided [onDoAction] callback every time the +/// interval elapses, allowing you to perform the action without needing to +/// manually handle the timing logic. +abstract class ConnectionTimer { + /// Creates a [ConnectionTimer] with a specified [interval], and an + /// [onDoAction] callback. + ConnectionTimer({ + required this.onDoAction, + required this.interval, + }); + + /// The initial interval duration between consecutive actions. + final Duration interval; + + /// The callback function to be executed at each timer interval. + final VoidCallback onDoAction; + + /// Returns `true` if the timer is active, otherwise `false`. + bool get isActive; + + /// Starts the timer. + /// + /// The [onDoAction] callback will be triggered based on the [interval]. + void start(); + + /// Stops the timer. + void stop(); +} diff --git a/lib/services/connection/api_manager/timer/exponential_back_off_timer.dart b/lib/services/connection/api_manager/timer/exponential_back_off_timer.dart new file mode 100644 index 0000000000..683d1370e1 --- /dev/null +++ b/lib/services/connection/api_manager/timer/exponential_back_off_timer.dart @@ -0,0 +1,135 @@ +import 'dart:async'; + +import 'connection_timer.dart'; + +/// A timer utility that implements a simple exponential back-off strategy for +/// recurring actions and increases the interval between actions until a +/// maximum interval duration is reached. +/// +/// This class is useful for scenarios where you need send a request to a server +/// and you don't want to flood the server with too many frequent requests. +/// +/// The timer will call the provided [onDoAction] callback every time the +/// interval elapses, allowing you to perform the action without needing to +/// manually handle the timing logic. +class ExponentialBackoffTimer extends ConnectionTimer { + /// Creates an [ExponentialBackoffTimer] with a specified + /// [initialInterval], [maxInterval], and an [onDoAction] callback. + /// + /// - [initialInterval]: The starting interval between consecutive actions. + /// - [maxInterval]: The upper limit for the interval. Once reached, + /// the interval will no longer increase. + /// - [onDoAction]: The callback function that will be executed at each + /// interval. + /// - [multiplier]: Factor by which the interval increases after each attempt. + /// Default is `2.0`. + /// - [jitter]: If `true`, adds random jitter (±15%) to the interval to reduce + /// server load spikes. Default is `true`. + /// - [maxAttempts]: Optional limit on the number of attempts; stops the timer + /// if exceeded. + /// Example usage: + /// ```dart + /// final timer = ExponentialBackoffTimer( + /// initialInterval: Duration(milliseconds: 500), + /// maxInterval: Duration(seconds: 10), + /// onDoAction: () { + /// print('Ping request sent'); + /// }, + /// ); + /// timer.start(); + /// ``` + ExponentialBackoffTimer({ + required this.initialInterval, + required this.maxInterval, + required super.onDoAction, + this.multiplier = 2.0, + this.jitter = true, + this.maxAttempts, + }) : _currentInterval = initialInterval, + super(interval: initialInterval); + + /// The initial interval duration between consecutive actions. + final Duration initialInterval; + + /// The maximum interval duration for the exponential back-off. + final Duration maxInterval; + + /// The factor by which the interval increases after each attempt. + final double multiplier; + + /// If `true`, adds random jitter (±15%) to the interval to reduce server + final bool jitter; + + /// Optional limit on the number of attempts; stops the timer + /// if exceeded. + final int? maxAttempts; + + int _attemptCount = 0; + + Timer? _timer; + Duration _currentInterval; + + /// Returns `true` if the timer is active, otherwise `false`. + @override + bool get isActive => _timer?.isActive ?? false; + + /// Starts the exponential back-off timer. + /// + /// The [onDoAction] callback will be triggered at the initial interval, + /// and the interval will double after each execution up to [maxInterval]. + @override + void start() { + _reset(); + _setupTimer(); + } + + /// Stops the timer and resets the interval to the initial interval. + @override + void stop() => _reset(); + + void _reset() { + _attemptCount = 0; + _timer?.cancel(); + _currentInterval = initialInterval; + } + + void _restartTimer() { + _timer?.cancel(); + _setupTimer(); + } + + void _setupTimer() { + if (maxAttempts != null && _attemptCount >= maxAttempts!) { + return; + } + + _timer = Timer(_currentInterval, () { + _attemptCount++; + onDoAction(); + _increaseInterval(); + _restartTimer(); + }); + } + + /// Increases the current interval for the next action [multiplier] times, + /// up to the maximum allowed interval. + void _increaseInterval() { + final int newInterval = + (_currentInterval.inMilliseconds * multiplier).round(); + final int clampedInterval = newInterval.clamp( + initialInterval.inMilliseconds, + maxInterval.inMilliseconds, + ); + + if (jitter) { + // Add random jitter of ±15% to prevent thundering herd problem + final int jitterRange = (clampedInterval * 0.15).round(); + final int random = + DateTime.now().millisecondsSinceEpoch % (jitterRange * 2) - + jitterRange; + _currentInterval = Duration(milliseconds: clampedInterval + random); + } else { + _currentInterval = Duration(milliseconds: clampedInterval); + } + } +} diff --git a/lib/state/connection/connection_cubit.dart b/lib/state/connection/connection_cubit.dart index 49a8daaae9..29189836da 100644 --- a/lib/state/connection/connection_cubit.dart +++ b/lib/state/connection/connection_cubit.dart @@ -7,9 +7,9 @@ import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:flutter_deriv_api/api/api_initializer.dart'; -import 'package:flutter_deriv_api/api/response/ping_response_result.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/base_api.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/binary_api.dart'; +import 'package:flutter_deriv_api/services/connection/api_manager/connection_config.dart'; import 'package:flutter_deriv_api/services/connection/api_manager/connection_information.dart'; import 'package:deriv_dependency_injector/dependency_injector.dart'; @@ -21,53 +21,29 @@ class ConnectionCubit extends Cubit { ConnectionCubit( ConnectionInformation connectionInformation, { BaseAPI? api, - this.enableDebug = false, - // TODO(NA): Refactor to only get BinaryAPI instance. and printResponse and proxyAwareConnection can be part of BinaryAPI only. - this.printResponse = false, - this.proxyAwareConnection = false, + this.connectionConfig = const ConnectionConfig(), }) : super(const ConnectionInitialState()) { _connectionInformation = connectionInformation; APIInitializer().initialize( - api: api ?? - BinaryAPI( - key: _key, - proxyAwareConnection: proxyAwareConnection, - enableDebug: enableDebug, - ), + api: api ?? BinaryAPI(key: _key, connectionConfig: connectionConfig), ); _api = Injector()(); _connect(_connectionInformation); - - _startKeepAliveTimer(); } final String _key = '${UniqueKey()}'; late final BaseAPI _api; - /// Enables debug mode. - /// - /// Default value is `false`. - final bool enableDebug; - - /// Prints API response to console, only works if [enableDebug] is `true`. - /// - /// Default value is `false`. - final bool printResponse; - - /// A flag to indicate if the connection is proxy aware. - final bool proxyAwareConnection; + /// Connection configuration. + final ConnectionConfig connectionConfig; // In some devices like Samsung J6 or Huawei Y7, the call manager doesn't response to the ping call less than 5 sec. final Duration _pingTimeout = const Duration(seconds: 5); - final Duration _connectivityCheckInterval = const Duration(seconds: 5); - - Timer? _connectivityTimer; - static late ConnectionInformation _connectionInformation; /// Gets connection information of WebSocket (endpoint, brand, appId). @@ -84,7 +60,7 @@ class ConnectionCubit extends Cubit { static String get appId => _connectionInformation.appId; /// Stream subscription for connectivity. - StreamSubscription? connectivitySubscription; + StreamSubscription? _connectivitySubscription; /// Getter for [BaseAPI] implementation class. By default, it will be [BinaryAPI]. BaseAPI get api => _api; @@ -123,7 +99,6 @@ class ConnectionCubit extends Cubit { await _api.connect( _connectionInformation, - printResponse: enableDebug && printResponse, onOpen: (String key) { if (_key == key) { emit(const ConnectionConnectedState()); @@ -147,46 +122,19 @@ class ConnectionCubit extends Cubit { } void _setupConnectivityListener() { - connectivitySubscription ??= Connectivity().onConnectivityChanged.listen( + _connectivitySubscription ??= Connectivity().onConnectivityChanged.listen( (ConnectivityResult status) async { - final bool isConnectedToNetwork = status == ConnectivityResult.mobile || - status == ConnectivityResult.wifi; - - if (isConnectedToNetwork) { - final bool isConnected = await _ping(); - - if (!isConnected) { - await reconnect(); - } - } else if (status == ConnectivityResult.none) { + if (status == ConnectivityResult.none) { emit(const ConnectionDisconnectedState()); } }, ); } - void _startKeepAliveTimer() { - if (_connectivityTimer == null || !_connectivityTimer!.isActive) { - _connectivityTimer = - Timer.periodic(_connectivityCheckInterval, (Timer timer) => _ping()); - } - } - - Future _ping() async { - try { - final PingResponse response = - await PingResponse.pingMethod().timeout(_pingTimeout); - return response.ping == PingEnum.pong; - } on Exception catch (_) { - return false; - } - } - @override Future close() { - _connectivityTimer?.cancel(); - connectivitySubscription?.cancel(); - connectivitySubscription = null; + _connectivitySubscription?.cancel(); + _connectivitySubscription = null; return super.close(); } } diff --git a/pubspec.yaml b/pubspec.yaml index 0275e4b57b..4fd752b6b7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: flutter_deriv_api description: Support for the deriv.com websockets API -version: 1.0.0 +version: 1.4.0 homepage: https://developers.binary.com publish_to: "none" @@ -42,3 +42,4 @@ dev_dependencies: json_schema: ^5.2.0 path: ^1.8.0 test: ^1.19.5 + fake_async: ^1.3.2 diff --git a/test/services/connection/api_manager/timer/exponential_back_off_timer_test.dart b/test/services/connection/api_manager/timer/exponential_back_off_timer_test.dart new file mode 100644 index 0000000000..2202773ed8 --- /dev/null +++ b/test/services/connection/api_manager/timer/exponential_back_off_timer_test.dart @@ -0,0 +1,101 @@ +import 'package:flutter_deriv_api/services/connection/api_manager/timer/exponential_back_off_timer.dart'; +import 'package:test/test.dart'; +import 'package:fake_async/fake_async.dart'; + +void main() { + group('ExponentialBackoffTimer', () { + late Duration initialInterval; + late Duration maxInterval; + late int actionCount; + late ExponentialBackoffTimer timer; + + setUp(() { + initialInterval = const Duration(milliseconds: 500); + maxInterval = const Duration(seconds: 5); + actionCount = 0; + + timer = ExponentialBackoffTimer( + initialInterval: initialInterval, + maxInterval: maxInterval, + jitter: false, + onDoAction: () => actionCount++, + ); + }); + + test('should trigger onDoAction at initial interval', () { + fakeAsync((FakeAsync async) { + timer.start(); + expect(actionCount, equals(0)); + + async.elapse(initialInterval); + expect(actionCount, equals(1)); + }); + }); + + test('should exponentially increase interval up to maxInterval', () { + fakeAsync((FakeAsync async) { + timer.start(); + expect(actionCount, equals(0)); + + // First interval should be initialInterval + async.elapse(initialInterval); + expect(actionCount, equals(1)); + + // Second interval should be double initialInterval, 1s + async.elapse(initialInterval * 2); + expect(actionCount, equals(2)); + + // Next action callback should be in 4 * initialInterval, we check if + // after 1 * initialInterval, the action is not triggered yet and same + // value still. + async.elapse(initialInterval); + expect(actionCount, equals(2)); + + // waiting for 3 * initialInterval more, the action should be triggered + async.elapse(initialInterval * 4); + expect(actionCount, equals(3)); + + // Next interval should reach maxInterval + async.elapse(maxInterval); + expect(actionCount, equals(4)); + + // Check that interval does not exceed maxInterval + async.elapse(maxInterval); + expect(actionCount, equals(5)); + }); + }); + + test('should stop and reset interval when stop() is called', () { + fakeAsync((FakeAsync async) { + timer.start(); + + async.elapse(initialInterval); + expect(actionCount, equals(1)); + + timer.stop(); + expect(timer.isActive, isFalse); + + // After stopping, action should not increase + async.elapse(maxInterval); + expect(actionCount, equals(1)); + + // Restarting should reset interval to initialInterval + timer.start(); + async.elapse(initialInterval); + expect(actionCount, equals(2)); + }); + }); + + test('isActive should be true only when timer is running', () { + fakeAsync((FakeAsync async) { + expect(timer.isActive, isFalse); + + timer.start(); + expect(timer.isActive, isTrue); + + timer.stop(); + expect(timer.isActive, isFalse); + }); + }); + }); +}