Skip to content

Commit

Permalink
refactor: [DRGO-1410]: refactor_ws_connection_management (#351)
Browse files Browse the repository at this point in the history
- Reduce the time it takes to establish the WS connection
  • Loading branch information
ramin-deriv authored Dec 4, 2024
1 parent 4d4a4bf commit 5322528
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 84 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.4.0

* Improve the connection management strategy to reduce the time it takes to establish the WebSocket connection.

## 1.0.0

* First release of the package.
14 changes: 6 additions & 8 deletions example/lib/sample_app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ class _SampleAppState extends State<SampleApp> {
void initState() {
super.initState();

_connectionCubit = conn.ConnectionCubit(
ConnectionInformation(
appId: '36544',
brand: 'deriv',
endpoint: 'ws.derivws.com',
authEndpoint: '',
),
proxyAwareConnection: false);
_connectionCubit = conn.ConnectionCubit(const ConnectionInformation(
appId: '36544',
brand: 'deriv',
endpoint: 'ws.derivws.com',
authEndpoint: '',
));
}

@override
Expand Down
1 change: 0 additions & 1 deletion lib/services/connection/api_manager/base_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ abstract class BaseAPI {
ConnectionCallback? onDone,
ConnectionCallback? onOpen,
ConnectionCallback? onError,
bool printResponse,
});

/// Adds request to stream channel.
Expand Down
83 changes: 71 additions & 12 deletions lib/services/connection/api_manager/binary_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import 'dart:developer' as dev;
import 'dart:io';

import 'package:flutter/widgets.dart';
import 'package:flutter_deriv_api/api/response/ping_response_result.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/connection_config.dart';
import 'package:flutter_deriv_api/services/connection/api_manager/timer/exponential_back_off_timer.dart';
import 'package:flutter_system_proxy/flutter_system_proxy.dart';
import 'package:web_socket_channel/io.dart';

Expand All @@ -20,27 +23,44 @@ import 'package:flutter_deriv_api/services/connection/call_manager/call_history.
import 'package:flutter_deriv_api/services/connection/call_manager/call_manager.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/exceptions/call_manager_exception.dart';
import 'package:flutter_deriv_api/services/connection/call_manager/subscription_manager.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'timer/connection_timer.dart';

/// This class is for handling Binary API connection and calling Binary APIs.
class BinaryAPI extends BaseAPI {
/// Initializes [BinaryAPI] instance.
BinaryAPI({
String? key,
bool enableDebug = false,
this.proxyAwareConnection = false,
}) : super(key: key ?? '${UniqueKey()}', enableDebug: enableDebug);
ConnectionTimer? connectionTimer,
this.connectionConfig = const ConnectionConfig(),
}) : super(
key: key ?? '${UniqueKey()}',
enableDebug: connectionConfig.enableDebug,
) {
_connectionTimer = connectionTimer ??
ExponentialBackoffTimer(
initialInterval: const Duration(milliseconds: 50),
maxInterval: const Duration(seconds: 5),
onDoAction: _ping,
maxAttempts: 10,
);
}

static const Duration _disconnectTimeOut = Duration(seconds: 5);
static const Duration _websocketConnectTimeOut = Duration(seconds: 10);

/// A flag to indicate if the connection is proxy aware.
final bool proxyAwareConnection;
// Instead of the 5-sec ping timer which we had in ConnectionCubit that is
// removed, we are using the 5-sec ping interval for the WebSocketChannel.
static const Duration _keepAlivePingInterval = Duration(seconds: 5);

/// Represents the active websocket connection.
///
/// This is used to send and receive data from the websocket server.
IOWebSocketChannel? _webSocketChannel;

/// Connection configuration.
final ConnectionConfig connectionConfig;

/// Stream subscription to API data.
StreamSubscription<Map<String, dynamic>?>? _webSocketListener;

Expand All @@ -56,13 +76,27 @@ class BinaryAPI extends BaseAPI {
/// Gets API subscription history.
CallHistory? get subscriptionHistory => _subscriptionManager?.callHistory;

/// A timer to schedule sending ping requests right after the WebSocket is
/// ready to receive the first response from the server. This helps ensure
/// that the connection is established.
///
/// Ideally, we would use the [WebSocketChannel.ready] future to determine
/// if the connection is ready to send and receive messages. However, it
/// doesn't always work as expected. In testing, we noticed that even after
/// the `ready` future completes, we often don't receive a response from the
/// server.
///
/// Until we find a better solution to make [WebSocketChannel.ready] more
/// reliable, we rely on the incoming stream to wait for and receive the first
/// `pong` response, which confirms that the connection is established.
late final ConnectionTimer _connectionTimer;

@override
Future<void> connect(
ConnectionInformation? connectionInformation, {
ConnectionCallback? onOpen,
ConnectionCallback? onDone,
ConnectionCallback? onError,
bool printResponse = false,
}) async {
_resetCallManagers();

Expand All @@ -84,7 +118,7 @@ class BinaryAPI extends BaseAPI {

HttpClient? client;

if (proxyAwareConnection) {
if (connectionConfig.proxyAwareConnection) {
final String proxy = await FlutterSystemProxy.findProxyFromEnvironment(
uri.toString().replaceAll('wss', 'https'));

Expand All @@ -95,16 +129,22 @@ class BinaryAPI extends BaseAPI {

// Initialize connection to websocket server.
_webSocketChannel = IOWebSocketChannel.connect('$uri',
pingInterval: _websocketConnectTimeOut, customClient: client);
pingInterval: _keepAlivePingInterval, customClient: client);

unawaited(_webSocketChannel?.ready.then((_) => _startConnectionTimer()));

_webSocketListener = _webSocketChannel?.stream
.map<Map<String, dynamic>?>((Object? result) => jsonDecode('$result'))
.listen(
(Map<String, dynamic>? message) {
onOpen?.call(key);

_stopConnectionTimer();
if (message != null) {
_handleResponse(message, printResponse: printResponse);
_handleResponse(
message,
printResponse:
connectionConfig.enableDebug && connectionConfig.printResponse,
);
}
},
onDone: () async {
Expand Down Expand Up @@ -145,11 +185,16 @@ class BinaryAPI extends BaseAPI {
required Request request,
List<String> nullableKeys = const <String>[],
}) async {
final Response response = await (_callManager ??= CallManager(this))(
final Future<Response> responseFuture =
(_callManager ??= CallManager(this))(
request: request,
nullableKeys: nullableKeys,
);

final Response response = await (connectionConfig.callTimeout != null
? responseFuture.timeout(connectionConfig.callTimeout!)
: responseFuture);

if (response is T) {
return response as T;
}
Expand Down Expand Up @@ -252,6 +297,20 @@ class BinaryAPI extends BaseAPI {
}
}

void _startConnectionTimer() {
if (!_connectionTimer.isActive) {
_connectionTimer.start();
}
}

Future<void> _stopConnectionTimer() async {
if (_connectionTimer.isActive) {
_connectionTimer.stop();
}
}

void _ping() => PingResponse.pingMethod();

void _logDebugInfo(String message, {Object? error}) {
if (enableDebug) {
dev.log('$runtimeType $key $message', error: error);
Expand Down
39 changes: 39 additions & 0 deletions lib/services/connection/api_manager/connection_config.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import 'package:flutter_deriv_api/state/connection/connection_cubit.dart';

/// [ConnectionCubit] configuration.
class ConnectionConfig {
/// Initializes [ConnectionConfig].
const ConnectionConfig({
this.enableDebug = false,
this.printResponse = false,
this.proxyAwareConnection = false,
this.callTimeout,
});

/// Enables debug mode.
///
/// Default value is `false`.
final bool enableDebug;

/// Prints API response to console, only works if [enableDebug] is `true`.
///
/// Default value is `false`.
final bool printResponse;

/// A flag to indicate if the connection is proxy aware.
final bool proxyAwareConnection;

/// The timeout duration for the API calls that are request/response model
/// and are not subscription. The return type of these calls are [Future].
///
/// If this duration is set, and the [call] method takes more than this
/// duration to complete, it will throw a [TimeoutException].
///
/// Since these are calls from a remote server and because of lack of
/// connection or some other reason their future may never complete. This can
/// cause the caller of the methods to wait indefinitely. To prevent this, we
/// set a timeout duration for these calls.
///
/// Default is `null` which means no timeout is considered.
final Duration? callTimeout;
}
30 changes: 30 additions & 0 deletions lib/services/connection/api_manager/timer/connection_timer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'dart:ui';

/// A timer calls the provided [onDoAction] callback every time the
/// interval elapses, allowing you to perform the action without needing to
/// manually handle the timing logic.
abstract class ConnectionTimer {
/// Creates a [ConnectionTimer] with a specified [interval], and an
/// [onDoAction] callback.
ConnectionTimer({
required this.onDoAction,
required this.interval,
});

/// The initial interval duration between consecutive actions.
final Duration interval;

/// The callback function to be executed at each timer interval.
final VoidCallback onDoAction;

/// Returns `true` if the timer is active, otherwise `false`.
bool get isActive;

/// Starts the timer.
///
/// The [onDoAction] callback will be triggered based on the [interval].
void start();

/// Stops the timer.
void stop();
}
Loading

0 comments on commit 5322528

Please sign in to comment.