Skip to content

Commit

Permalink
refactor: decouple HTTP and WebSocket engines
Browse files Browse the repository at this point in the history
- Extracted HTTP calls and WebSocket listeners into a separate module.
- Introduced an abstraction layer for easier implementation swapping.
  • Loading branch information
ttypic committed Sep 24, 2024
1 parent bd2945f commit d1e7ba0
Show file tree
Hide file tree
Showing 39 changed files with 972 additions and 333 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

plugins {
id 'io.codearte.nexus-staging' version '0.21.1'
id 'io.freefair.lombok' version '5.0.1' apply false
}

repositories {
Expand Down
3 changes: 2 additions & 1 deletion dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// in java/build.gradle and android/build.gradle for maven.
dependencies {
implementation 'org.msgpack:msgpack-core:0.8.11'
implementation 'org.java-websocket:Java-WebSocket:1.5.3'
implementation 'com.google.code.gson:gson:2.9.0'
implementation 'com.davidehrmann.vcdiff:vcdiff-core:0.1.1'
implementation project(':network-client-core')
implementation project(':network-client-default')
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'junit:junit:4.12'
testImplementation 'org.nanohttpd:nanohttpd:2.3.0'
Expand Down
4 changes: 2 additions & 2 deletions lib/src/main/java/io/ably/lib/debug/DebugOptions.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.ably.lib.debug;

import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;

import io.ably.lib.http.HttpCore;
import io.ably.lib.network.HttpRequest;
import io.ably.lib.transport.ITransport;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
Expand All @@ -19,7 +19,7 @@ public interface RawProtocolListener {
}

public interface RawHttpListener {
HttpCore.Response onRawHttpRequest(String id, HttpURLConnection conn, String method, String authHeader, Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody);
HttpCore.Response onRawHttpRequest(String id, HttpRequest request, String authHeader, Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody);
void onRawHttpResponse(String id, String method, HttpCore.Response response);
void onRawHttpException(String id, String method, Throwable t);
}
Expand Down
336 changes: 123 additions & 213 deletions lib/src/main/java/io/ably/lib/http/HttpCore.java

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ably.lib.http;

import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
Expand All @@ -9,6 +8,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.ably.lib.network.HttpCall;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.Callback;
import io.ably.lib.types.ErrorInfo;
Expand Down Expand Up @@ -331,15 +331,15 @@ protected void setError(ErrorInfo err) {
}
}
protected synchronized boolean disposeConnection() {
boolean hasConnection = conn != null;
boolean hasConnection = httpCall != null;
if(hasConnection) {
conn.disconnect();
conn = null;
httpCall.cancel();
httpCall = null;
}
return hasConnection;
}

protected HttpURLConnection conn;
protected HttpCall httpCall;
protected T result;
protected ErrorInfo err;

Expand Down
111 changes: 43 additions & 68 deletions lib/src/main/java/io/ably/lib/transport/WebSocketTransport.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package io.ably.lib.transport;

import io.ably.lib.http.HttpUtils;
import io.ably.lib.network.WebSocketClient;
import io.ably.lib.network.WebSocketEngine;
import io.ably.lib.network.WebSocketEngineConfig;
import io.ably.lib.network.WebSocketEngineFactory;
import io.ably.lib.network.WebSocketListener;
import io.ably.lib.network.NotConnectedException;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolSerializer;
import io.ably.lib.util.ClientOptionsUtils;
import io.ably.lib.util.Log;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.exceptions.WebsocketNotConnectedException;
import org.java_websocket.framing.CloseFrame;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;

import javax.net.ssl.HttpsURLConnection;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSession;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -50,7 +47,7 @@ public class WebSocketTransport implements ITransport {
private final boolean channelBinaryMode;
private String wsUri;
private ConnectListener connectListener;
private WsClient wsConnection;
private WebSocketClient webSocketClient;
/******************
* protected constructor
******************/
Expand Down Expand Up @@ -81,15 +78,26 @@ public void connect(ConnectListener connectListener) {

Log.d(TAG, "connect(); wsUri = " + wsUri);
synchronized (this) {
wsConnection = new WsClient(URI.create(wsUri), this::receive);
WebSocketEngineFactory engineFactory = WebSocketEngineFactory.getFirstAvailable();
Log.v(TAG, String.format("Using %s WebSocket Engine", engineFactory.getEngineType().name()));

WebSocketEngineConfig.WebSocketEngineConfigBuilder configBuilder = WebSocketEngineConfig.builder();
configBuilder
.tls(isTls)
.host(params.host)
.proxy(ClientOptionsUtils.covertToProxyConfig(params.getClientOptions()));

if (isTls) {
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null);
SafeSSLSocketFactory factory = new SafeSSLSocketFactory(sslContext.getSocketFactory());
wsConnection.setSocketFactory(factory);
configBuilder.sslSocketFactory(factory);
}

WebSocketEngine engine = engineFactory.create(configBuilder.build());
webSocketClient = engine.create(wsUri, new WebSocketHandler(this::receive));
}
wsConnection.connect();
webSocketClient.connect();
} catch (AblyException e) {
Log.e(TAG, "Unexpected exception attempting connection; wsUri = " + wsUri, e);
connectListener.onTransportUnavailable(this, e.errorInfo);
Expand All @@ -103,9 +111,9 @@ public void connect(ConnectListener connectListener) {
public void close() {
Log.d(TAG, "close()");
synchronized (this) {
if (wsConnection != null) {
wsConnection.close();
wsConnection = null;
if (webSocketClient != null) {
webSocketClient.close();
webSocketClient = null;
}
}
}
Expand All @@ -127,14 +135,14 @@ public void send(ProtocolMessage msg) throws AblyException {
ProtocolMessage decodedMsg = ProtocolSerializer.readMsgpack(encodedMsg);
Log.v(TAG, "send(): " + decodedMsg.action + ": " + new String(ProtocolSerializer.writeJSON(decodedMsg)));
}
wsConnection.send(encodedMsg);
webSocketClient.send(encodedMsg);
} else {
// Check the logging level to avoid performance hit associated with building the message
if (Log.level <= Log.VERBOSE)
Log.v(TAG, "send(): " + new String(ProtocolSerializer.writeJSON(msg)));
wsConnection.send(ProtocolSerializer.writeJSON(msg));
webSocketClient.send(ProtocolSerializer.writeJSON(msg));
}
} catch (WebsocketNotConnectedException e) {
} catch (NotConnectedException e) {
if (connectListener != null) {
connectListener.onTransportUnavailable(this, AblyException.fromThrowable(e).errorInfo);
} else
Expand Down Expand Up @@ -180,7 +188,7 @@ public WebSocketTransport getTransport(TransportParams params, ConnectionManager
* WebSocketHandler methods
**************************/

class WsClient extends WebSocketClient {
class WebSocketHandler implements WebSocketListener {
private final WebSocketReceiver receiver;
/***************************
* WsClient private members
Expand All @@ -189,38 +197,16 @@ class WsClient extends WebSocketClient {
private Timer timer = new Timer();
private TimerTask activityTimerTask = null;
private long lastActivityTime;
private boolean shouldExplicitlyVerifyHostname = true;

WsClient(URI serverUri, WebSocketReceiver receiver) {
super(serverUri);
WebSocketHandler(WebSocketReceiver receiver) {
this.receiver = receiver;
}

@Override
public void onOpen(ServerHandshake handshakedata) {
public void onOpen() {
Log.d(TAG, "onOpen()");
if (params.options.tls && shouldExplicitlyVerifyHostname && !isHostnameVerified(params.host)) {
close();
} else {
connectListener.onTransportAvailable(WebSocketTransport.this);
flagActivity();
}
}

/**
* Added because we had to override the onSetSSLParameters() that usually performs this verification.
* When the minSdkVersion will be updated to 24 we should remove this method and its usages.
* https://github.com/TooTallNate/Java-WebSocket/wiki/No-such-method-error-setEndpointIdentificationAlgorithm#workaround
*/
private boolean isHostnameVerified(String hostname) {
final SSLSession session = getSSLSession();
if (HttpsURLConnection.getDefaultHostnameVerifier().verify(hostname, session)) {
Log.v(TAG, "Successfully verified hostname");
return true;
} else {
Log.e(TAG, "Hostname verification failed, expected " + hostname + ", found " + session.getPeerHost());
return false;
}
connectListener.onTransportAvailable(WebSocketTransport.this);
flagActivity();
}

@Override
Expand Down Expand Up @@ -253,16 +239,14 @@ public void onMessage(String string) {

/* This allows us to detect a websocket ping, so we don't need Ably pings. */
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
public void onWebsocketPing() {
Log.d(TAG, "onWebsocketPing()");
/* Call superclass to ensure the pong is sent. */
super.onWebsocketPing(conn, f);
flagActivity();
}

@Override
public void onClose(final int wsCode, final String wsReason, final boolean remote) {
Log.d(TAG, "onClose(): wsCode = " + wsCode + "; wsReason = " + wsReason + "; remote = " + remote);
public void onClose(final int wsCode, final String wsReason) {
Log.d(TAG, "onClose(): wsCode = " + wsCode + "; wsReason = " + wsReason + "; remote = " + false);

ErrorInfo reason;
switch (wsCode) {
Expand Down Expand Up @@ -301,23 +285,14 @@ public void onClose(final int wsCode, final String wsReason, final boolean remot
}

@Override
public void onError(final Exception e) {
Log.e(TAG, "Connection error ", e);
connectListener.onTransportUnavailable(WebSocketTransport.this, new ErrorInfo(e.getMessage(), 503, 80000));
public void onError(Throwable throwable) {
Log.e(TAG, "Connection error ", throwable);
connectListener.onTransportUnavailable(WebSocketTransport.this, new ErrorInfo(throwable.getMessage(), 503, 80000));
}

@Override
protected void onSetSSLParameters(SSLParameters sslParameters) {
try {
super.onSetSSLParameters(sslParameters);
shouldExplicitlyVerifyHostname = false;
} catch (NoSuchMethodError exception) {
// This error will be thrown on Android below level 24.
// When the minSdkVersion will be updated to 24 we should remove this overridden method.
// https://github.com/TooTallNate/Java-WebSocket/wiki/No-such-method-error-setEndpointIdentificationAlgorithm#workaround
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", exception);
shouldExplicitlyVerifyHostname = true;
}
public void onOldJavaVersionDetected(Throwable throwable) {
Log.w(TAG, "Error when trying to set SSL parameters, most likely due to an old Java API version", throwable);
}

private synchronized void dispose() {
Expand Down Expand Up @@ -391,7 +366,7 @@ private synchronized void onActivityTimerExpiry() {
// If we have no time remaining, then close the connection
if (timeRemaining <= 0) {
Log.e(TAG, "No activity for " + getActivityTimeout() + "ms, closing connection");
closeConnection(CloseFrame.ABNORMAL_CLOSE, "timed out");
webSocketClient.cancel(ABNORMAL_CLOSE, "timed out");
return;
}

Expand Down
34 changes: 34 additions & 0 deletions lib/src/main/java/io/ably/lib/util/ClientOptionsUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.ably.lib.util;

import io.ably.lib.network.ProxyAuthType;
import io.ably.lib.network.ProxyConfig;
import io.ably.lib.types.ClientOptions;

import java.util.List;

public class ClientOptionsUtils {

public static ProxyConfig covertToProxyConfig(ClientOptions clientOptions) {
if (clientOptions.proxy == null) return null;

ProxyConfig.ProxyConfigBuilder builder = ProxyConfig.builder();

builder
.host(clientOptions.proxy.host)
.port(clientOptions.proxy.port)
.username(clientOptions.proxy.username)
.password(clientOptions.proxy.password)
.nonProxyHosts(List.of(clientOptions.proxy.nonProxyHosts));

switch (clientOptions.proxy.prefAuthType) {
case BASIC:
builder.authType(ProxyAuthType.BASIC);
break;
case DIGEST:
builder.authType(ProxyAuthType.DIGEST);
break;
}

return builder.build();
}
}
12 changes: 5 additions & 7 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -35,6 +34,7 @@
import io.ably.lib.debug.DebugOptions.RawProtocolListener;
import io.ably.lib.http.HttpCore;
import io.ably.lib.http.HttpUtils;
import io.ably.lib.network.HttpRequest;
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.realtime.Channel.MessageListener;
Expand Down Expand Up @@ -972,7 +972,6 @@ public static boolean equalNullableStrings(String one, String two) {
public static class RawHttpRequest {
public String id;
public URL url;
public HttpURLConnection conn;
public String method;
public String authHeader;
public Map<String, List<String>> requestHeaders;
Expand All @@ -988,7 +987,7 @@ public static class RawHttpTracker extends LinkedHashMap<String, RawHttpRequest>
private AsyncWaiter<RawHttpRequest> requestWaiter = null;

@Override
public HttpCore.Response onRawHttpRequest(String id, HttpURLConnection conn, String method, String authHeader, Map<String, List<String>> requestHeaders,
public HttpCore.Response onRawHttpRequest(String id, HttpRequest request, String authHeader, Map<String, List<String>> requestHeaders,
HttpCore.RequestBody requestBody) {

/* duplicating if necessary, ensure lower-case versions of header names are present */
Expand All @@ -1001,9 +1000,8 @@ public HttpCore.Response onRawHttpRequest(String id, HttpURLConnection conn, Str
}
RawHttpRequest req = new RawHttpRequest();
req.id = id;
req.url = conn.getURL();
req.conn = conn;
req.method = method;
req.url = request.getUrl();
req.method = request.getMethod();
req.authHeader = authHeader;
req.requestHeaders = normalisedHeaders;
req.requestBody = requestBody;
Expand Down Expand Up @@ -1076,7 +1074,7 @@ public String getRequestParam(String id, String param) {
String result = null;
RawHttpRequest req = get(id);
if(req != null) {
String query = req.conn.getURL().getQuery();
String query = req.url.getQuery();
if(query != null && !query.isEmpty()) {
result = HttpUtils.decodeParams(query).get(param).value;
}
Expand Down
Loading

0 comments on commit d1e7ba0

Please sign in to comment.