diff --git a/.github/workflows/test-browser.yml b/.github/workflows/test-browser.yml index c434c2e58..acc724f96 100644 --- a/.github/workflows/test-browser.yml +++ b/.github/workflows/test-browser.yml @@ -25,11 +25,69 @@ jobs: with: node-version: 20.x - run: npm ci + + # Set up Python (for pipx) + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + + # Install pipx (for mitmproxy) + # See https://pipx.pypa.io/stable/installation/ + - name: Install pipx + run: | + python3 -m pip install --user pipx + sudo pipx --global ensurepath + + # https://docs.mitmproxy.org/stable/overview-installation/#installation-from-the-python-package-index-pypi + - name: Install mitmproxy + run: | + pipx install mitmproxy + # We use this library in our addon + pipx inject mitmproxy websockets + + - name: Generate mitmproxy SSL certs + run: mitmdump -s test/mitmproxy_addon_generate_certs_and_exit.py + + - name: Start interception proxy server + run: ./start-interception-proxy + - name: Install Playwright browsers and dependencies run: npx playwright install --with-deps - - env: + + # For certutil + - name: Install NSS tools + run: sudo apt install libnss3-tools + + # This is for Chromium (see https://chromium.googlesource.com/chromium/src/+/master/docs/linux/cert_management.md) + # Note this is the same command that we use for adding it to the Firefox profile (see playwrightHelpers.js) + - name: Install mitmproxy root CA in NSS shared DB + run: | + mkdir -p ~/.pki/nssdb + certutil -A -d sql:$HOME/.pki/nssdb -t "C" -n "Mitmproxy Root Cert" -i ~/.mitmproxy/mitmproxy-ca-cert.pem + certutil -L -d sql:$HOME/.pki/nssdb + + # This is for WebKit (I think because it uses OpenSSL) + - name: Install mitmproxy root CA in /usr/local/share/ca-certificates + run: | + sudo cp ~/.mitmproxy/mitmproxy-ca-cert.cer /usr/local/share/ca-certificates/mitmproxy-ca-cert.crt + sudo update-ca-certificates + + - name: Run the tests + env: PLAYWRIGHT_BROWSER: ${{ matrix.browser }} run: npm run test:playwright + + - name: Save interception proxy server logs + if: always() + run: sudo journalctl -u ably-sdk-test-proxy.service > interception-proxy-logs.txt + + - name: Upload interception proxy server logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: interception-proxy-logs-${{ matrix.browser }} + path: interception-proxy-logs.txt + - name: Upload test results if: always() uses: ably/test-observability-action@v1 diff --git a/.github/workflows/test-node.yml b/.github/workflows/test-node.yml index 39ccf4d0e..ec119bb79 100644 --- a/.github/workflows/test-node.yml +++ b/.github/workflows/test-node.yml @@ -25,9 +25,120 @@ jobs: with: node-version: ${{ matrix.node-version }} - run: npm ci - - run: npm run test:node + + # Set up Python (for pipx) + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + + # Install pipx (for mitmproxy) + # See https://pipx.pypa.io/stable/installation/ + - name: Install pipx + run: | + python3 -m pip install --user pipx + sudo pipx --global ensurepath + + # https://docs.mitmproxy.org/stable/overview-installation/#installation-from-the-python-package-index-pypi + - name: Install mitmproxy + run: | + pipx install mitmproxy + # We use this library in our addon + pipx inject mitmproxy websockets + + - name: Create a user to run the tests + run: sudo useradd --create-home ably-test-user + + - name: Create a group for sharing the working directory + run: | + sudo groupadd ably-test-users + # Add relevant users to the group + sudo usermod --append --groups ably-test-users $USER + sudo usermod --append --groups ably-test-users ably-test-user + # Give the group ownership of the working directory and everything under it... + sudo chown -R :ably-test-users . + # ...and give group members full read/write access to its contents (i.e. rw access to files, rwx access to directories) + # (We use xargs because `find` does not fail if an `exec` command fails; see https://serverfault.com/a/905039) + find . -type f -print0 | xargs -n1 -0 chmod g+rw + find . -type d -print0 | xargs -n1 -0 chmod g+rwx + # TODO understand better + # + # This is to make `npm run` work when run as ably-test-user; else it fails because of a `statx()` call on package.json: + # + # > 2024-04-17T13:08:09.1302251Z [pid 2051] statx(AT_FDCWD, `"/home/runner/work/ably-js/ably-js/package.json"`, AT_STATX_SYNC_AS_STAT, STATX_ALL, 0x7f4875ffcb40) = -1 EACCES (Permission denied) + # + # statx documentation says: + # + # > in the case of **statx**() with a pathname, execute (search) permission is required on all of the directories in _pathname_ that lead to the file. + # + # The fact that I’m having to do this probably means that I’m doing something inappropriate elsewhere. (And I don’t know what the other consequences of doing this might be.) + chmod o+x ~ + + # TODO set umask appropriately, so that new files created are readable/writable by the group + + - name: Generate mitmproxy SSL certs + run: mitmdump -s test/mitmproxy_addon_generate_certs_and_exit.py + + - name: Set up iptables rules + run: | + # The rules suggested by mitmproxy etc are aimed at intercepting _all_ the outgoing traffic on a machine. I don’t want that, given that we want to be able to run this test suite on developers’ machines in a non-invasive manner. Instead we just want to target traffic generated by the process that contains the Ably SDK, which we’ll make identifable by iptables by running that process as a specific user created for that purpose (ably-test-user). + # + # Relevant parts of iptables documentation: + # + # nat: + # > This table is consulted when a packet that creates a new connection is encountered. It consists of three built-ins: PREROUTING (for altering packets as soon as they come in), OUTPUT (for altering locally-generated packets before routing), and POSTROUTING (for altering packets as they are about to go out). + # + # owner: + # > This module attempts to match various characteristics of the packet creator, for locally-generated packets. It is only valid in the OUTPUT chain, and even this some packets (such as ICMP ping responses) may have no owner, and hence never match. + # + # REDIRECT: + # > This target is only valid in the nat table, in the PREROUTING and OUTPUT chains, and user-defined chains which are only called from those chains. It redirects the packet to the machine itself by changing the destination IP to the primary address of the incoming interface (locally-generated packets are mapped to the 127.0.0.1 address). It takes one option: + # > + # > --to-ports port[-port] + # > This specifies a destination port or range of ports to use: without this, the destination port is never altered. This is only valid if the rule also specifies -p tcp or -p udp. + # + # I don’t exactly understand what the nat table means; I assume its rules apply to all _subsequent_ packets in the connection, too? + # + # So, what I expect to happen: + # + # 1. iptables rule causes default-port HTTP(S) datagram from test process to get its destination IP rewritten to 127.0.0.1, and rewrites the TCP header’s destination port to 8080 + # 2. 127.0.0.1 destination causes OS’s routing to send this datagram on the loopback interface + # 3. nature of the loopback interface means that this datagram is then received on the loopback interface + # 4. mitmproxy, listening on port 8080 (not sure how or why it uses a single port for both non-TLS and TLS traffic) receives these datagrams, and uses Host header or SNI to figure out where they were originally destined. + # + # TODO (how) do we achieve the below on macOS? I have a feeling that it’s currently just working by accident; e.g. it's because the TCP connection to the control server exists before we start mitmproxy and hence the connection doesn’t get passed to its NETransparentProxyProvider or something. To be on the safe side, though, I’ve added a check in the mitmproxy addon so that we only mess with stuff for ports 80 or 443 + # + # Note that in the current setup with ably-js, the test suite and the Ably SDK run in the same process. We want to make sure that we don’t intercept the test suite’s WebSocket communications with the interception proxy’s control API (which it serves at 127.0.0.1:8001), hence only targeting the default HTTP(S) ports. (TODO consider that Realtime team also run a Realtime on non-default ports when testing locally) + sudo iptables --table nat --append OUTPUT --match owner --uid-owner ably-test-user --protocol tcp --destination-port 80 --jump REDIRECT --to-ports 8080 + sudo iptables --table nat --append OUTPUT --match owner --uid-owner ably-test-user --protocol tcp --destination-port 443 --jump REDIRECT --to-ports 8080 + sudo ip6tables --table nat --append OUTPUT --match owner --uid-owner ably-test-user --protocol tcp --destination-port 80 --jump REDIRECT --to-ports 8080 + sudo ip6tables --table nat --append OUTPUT --match owner --uid-owner ably-test-user --protocol tcp --destination-port 443 --jump REDIRECT --to-ports 8080 + + # TODO how will this behave with: + # + # 1. the WebSocket connection from test suite to control API (see above note; not a problem in this CI setup, think about it on macOS) + # 2. the WebSocket connection from mitmproxy to control API (not an issue on Linux or macOS with our current setup since we don’t intercept any traffic from mitmproxy) + # 3. the WebSocket connections that mitmproxy proxies to the interception proxy (which it sends to localhost:8002) (ditto 2) + # 4. the WebSocket connections for which interception proxy is a client (not an issue for Linux or macOS with our current setup since we don’t intercept any traffic from interception proxy) + + - name: Start interception proxy server + run: ./start-interception-proxy + + - name: Run the tests + run: sudo -u ably-test-user NODE_EXTRA_CA_CERTS=~/.mitmproxy/mitmproxy-ca-cert.pem npm run test:node env: CI: true + + - name: Save interception proxy server logs + if: always() + run: sudo journalctl -u ably-sdk-test-proxy.service > interception-proxy-logs.txt + + - name: Upload interception proxy server logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: interception-proxy-logs-${{ matrix.node-version }} + path: interception-proxy-logs.txt + - name: Upload test results if: always() uses: ably/test-observability-action@v1 diff --git a/.gitignore b/.gitignore index 353ef848e..707c11e63 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ react/ typedoc/generated/ junit/ test/support/mocha_junit_reporter/build/ +tmp/ # Python stuff (for interception proxy) __pycache__ diff --git a/package.json b/package.json index ccee09e06..faa5f2f5a 100644 --- a/package.json +++ b/package.json @@ -140,8 +140,10 @@ "test:node:skip-build": "mocha", "test:webserver": "grunt test:webserver", "test:playwright": "node test/support/runPlaywrightTests.js", + "test:playwright:open-browser": "node test/support/openPlaywrightBrowser.js", "test:react": "vitest run", "test:package": "grunt test:package", + "test:proxy": "npm run build && esr test/interception-proxy/server.ts", "concat": "grunt concat", "build": "grunt build:all && npm run build:react", "build:node": "grunt build:node", diff --git a/start-interception-proxy b/start-interception-proxy new file mode 100755 index 000000000..e0b9cc379 --- /dev/null +++ b/start-interception-proxy @@ -0,0 +1,94 @@ +#!/usr/bin/env bash + +# Runs the server as a background service, exiting once the server is ready to receive requests. +# +# Intended for use in SDKs’ CI jobs. Must be run from the root of this repository. + +set -e + +# We run as the current user so that we can access the generated server certificate (in ~/.mitmproxy) without having to worry about permissions. TODO consider instead generating our own cert and telling mitmproxy to use it, then we also won’t have to have that step where we run mitmproxy once just to generate the certs +start_systemd_service () { + systemd_service=$(cat </dev/null + + echo "Starting ably-sdk-test-proxy systemd service..." 1>&2 + sudo systemctl start ably-sdk-test-proxy.service + echo "Started ably-sdk-test-proxy systemd service." 1>&2 +} + +start_launchd_daemon () { + launchd_daemon=$(cat < + + + + Label + com.ably.test.proxy + WorkingDirectory + $(pwd) + ProgramArguments + + npm + run + test:proxy + + RunAtLoad + + + +LAUNCHD_DAEMON +) + + # https://stackoverflow.com/questions/84882/sudo-echo-something-etc-privilegedfile-doesnt-work + echo "${launchd_daemon}" | sudo tee /Library/LaunchDaemons/com.ably.test.proxy.plist + + echo "Loading ably-sdk-test-proxy launchd daemon..." 1>&2 + sudo launchctl load /Library/LaunchDaemons/com.ably.test.proxy.plist + echo "Loaded ably-sdk-test-proxy launchd daemon." 1>&2 +} + +check_daemon_still_running () { + if uname | grep Linux 1>/dev/null + then + systemctl is-active --quiet ably-sdk-test-proxy.service + elif uname | grep Darwin 1>/dev/null + then + launchctl print system/com.ably.test.proxy 1>/dev/null + fi +} + +if uname | grep Linux 1>/dev/null +then + start_systemd_service +elif uname | grep Darwin 1>/dev/null +then + start_launchd_daemon +else + echo "Unsupported system $(uname); exiting" 1>&2 + exit 1 +fi + +echo "Waiting for sdk-test-proxy server to start on port 8001..." 1>&2 + +# https://stackoverflow.com/questions/27599839/how-to-wait-for-an-open-port-with-netcat +while ! nc -z localhost 8001; do + # Check that the service hasn’t failed (else we’ll be waiting forever) + check_daemon_still_running + sleep 0.5 +done + +echo "sdk-test-proxy server is now listening on port 8001." 1>&2 diff --git a/test/common/globals/named_dependencies.js b/test/common/globals/named_dependencies.js index d89cdbe7a..78efde183 100644 --- a/test/common/globals/named_dependencies.js +++ b/test/common/globals/named_dependencies.js @@ -18,5 +18,9 @@ define(function () { async: { browser: 'node_modules/async/lib/async' }, chai: { browser: 'node_modules/chai/chai', node: 'node_modules/chai/chai' }, ulid: { browser: 'node_modules/ulid/dist/index.umd', node: 'node_modules/ulid/dist/index.umd' }, + interception_proxy_client: { + browser: 'test/common/modules/interception_proxy_client', + node: 'test/common/modules/interception_proxy_client', + }, }); }); diff --git a/test/common/modules/interception_proxy_client.js b/test/common/modules/interception_proxy_client.js new file mode 100644 index 000000000..e7224dcce --- /dev/null +++ b/test/common/modules/interception_proxy_client.js @@ -0,0 +1,200 @@ +'use strict'; + +define(['ably', 'shared_helper'], function (Ably, helper) { + // copied from crypto test + var msgpack = typeof window == 'object' ? Ably.msgpack : require('@ably/msgpack-js'); + // similar approach + var WebSocket = typeof window == 'object' ? window.WebSocket : require('ws'); + var BufferUtils = Ably.Realtime.Platform.BufferUtils; + + class InterceptionProxyClient { + currentContext = null; + + // this expects the interception proxy to already be running (i.e. the test suite doesn't launch it) + // this method is called by test suite’s root hooks. test cases shouldn't call this method; rather, they should use + async connect() { + this.webSocket = new WebSocket('ws://localhost:8001'); + + await new Promise((resolve, reject) => { + this.webSocket.addEventListener('open', () => { + console.log('connected to interception proxy'); + resolve(); + }); + this.webSocket.addEventListener('error', (error) => { + console.log('failed to connect to interception proxy:', error); + reject(error); + }); + this.webSocket.addEventListener('message', (message) => { + console.log('interception proxy got message', message.data); + this.handleJSONRPCMessage(JSON.parse(message.data)); + }); + }); + + await this.startInterception(); + console.log('startInterception completed'); + + // TODO something if connection lost + } + + async startInterception() { + const promise = new Promise((resolve, reject) => { + this.onStartedInterception = resolve; + this.onFailedToStartInterception = reject; + }); + + // i.e. for browser we use proxy, for Node we use local + const params = typeof window == 'object' ? { mode: 'proxy' } : { mode: 'local', pid: process.pid }; + + const request = { + jsonrpc: '2.0', + method: 'startInterception', + params, + id: helper.randomString(), + }; + const requestData = JSON.stringify(request); + + console.log('interception proxy sending startInterception request', request); + this.webSocket.send(requestData); + + return promise; + } + + async disconnect() { + if (this.webSocket.readyState === 3) { + // already closed + console.log('interception proxy client already disconnected'); + return; + } + + this.webSocket.close(); + + return new Promise((resolve) => { + this.webSocket.addEventListener('close', () => { + console.log('interception proxy client disconnected'); + resolve(); + }); + }); + } + + // TODO explain motivation for this API (so that a lingering test can’t accidentally override the interception in your test; of course, the interception in your test might accidentally _intercept_ messages sent by a lingering test but that’s a separate issue) + // + // This is written as (done, action) for compatibility with the way our tests are currently written; a promise-based version would be good to have too + // + // action receives a context object. it can modify the following properties of this object to modify the interception: + // + // - `transformClientMessage` or `transformServerMessage` (for message from client or server respectively) + // + // Receives an object with the following properties: + // + // - id: a unique identifier for this WebSocket message (generated by the interception proxy) + // - connectionID: a unique identifier for this WebSocket connection (generated by the interception proxy) + // - deserialized: a JSON-like object (i.e. the result of JSON.parse or msgpack.decode) + // + // And returns one of: + // + // - a JSON-like object (to modify the message) + // - `null` (to drop the message) + // + // Can also return a promise. + // + // If not set, then messages will be passed through unaltered. + // + // TODO some thoughts on API: + // + // - user currently has to make sure they remember to return something from the transform* function, even if they’re failing their test in some exceptional manner — this is to make sure that the connection can be allowed to disconnect at the proxy + // + // - users might think that they can directly mutate the object passed to transform* functions + intercept(done, action) { + if (this.currentContext !== null) { + throw new Error( + 'A call to `intercept` is already active; check you’re not running multiple tests at the same time', + ); + } + + this.currentContext = { + transformClientMessage: null, + transformServerMessage: null, + }; + + const newDone = (error) => { + this.currentContext = null; + done(error); + }; + + action(newDone, this.currentContext); + } + + handleJSONRPCMessage(message) { + if (message.method === 'transformInterceptedMessage') { + let deserialized; + if (message.params.type === 'binary') { + const data = BufferUtils.base64Decode(message.params.data); + deserialized = msgpack.decode(data); + } else if (message.params.type === 'text') { + const data = message.params.data; + deserialized = JSON.parse(data); + } + + console.log( + 'interception proxy awaiting response of transformInterceptedMessage for message', + message, + 'deserialized to', + deserialized, + ); + + const messageForTransform = { id: message.params.id, connectionID: message.params.connectionID, deserialized }; + + const noOpTransformInterceptedMessage = (message) => { + console.log(`default transformInterceptedMessage implementation passing message ${message.id} unaltered`); + return message.deserialized; + }; + + const transformInterceptedMessage = + (message.params.fromClient + ? this.currentContext?.transformClientMessage + : this.currentContext?.transformServerMessage) ?? noOpTransformInterceptedMessage; + + Promise.resolve(transformInterceptedMessage(messageForTransform)).then((result) => { + try { + console.log(`interception proxy got result of transforming message ${messageForTransform.id}`, result); + + let responseResult; + + if (result === null) { + responseResult = { action: 'drop' }; + } else { + let data; + + if (message.params.type === 'binary') { + const serialized = msgpack.encode(result); + data = BufferUtils.base64Encode(serialized); + } else if (message.params.type === 'text') { + data = JSON.stringify(result); + } + + responseResult = { action: 'replace', type: message.params.type, data }; + } + + const response = { jsonrpc: '2.0', id: message.id, result: responseResult }; + console.log('interception proxy sending transformInterceptedMessage response', response); + + const responseJSON = JSON.stringify(response); + this.webSocket.send(responseJSON); + } catch (err) { + // TODO better error handling + console.log('interception proxy caught', err); + } + }); + } else { + // assume it's the response to our startInterception call; TODO sort this out + if ('error' in message) { + this.onFailedToStartInterception(new Error(message.error.message)); + } else { + this.onStartedInterception(); + } + } + } + } + + return (module.exports = new InterceptionProxyClient()); +}); diff --git a/test/interception-proxy/ControlRPC.ts b/test/interception-proxy/ControlRPC.ts new file mode 100644 index 000000000..8348a0884 --- /dev/null +++ b/test/interception-proxy/ControlRPC.ts @@ -0,0 +1,126 @@ +import { MessageAction } from './InterceptedMessagesQueue'; +import { WebSocketMessageData } from './WebSocketMessageData'; + +// TODO proper types for the DTOs, and align them better with the internal language + +export class JSONRPCRequest { + constructor(readonly id: string) {} + + createDTO() { + return { jsonrpc: '2.0', id: this.id }; + } +} + +export class TransformInterceptedMessageJSONRPCRequest extends JSONRPCRequest { + constructor( + id: string, + readonly messageID: string, + readonly connectionID: string, + readonly data: WebSocketMessageData, + readonly fromClient: boolean, + ) { + super(id); + } + + createDTO() { + let dataParam: string; + switch (this.data.type) { + case 'binary': + dataParam = this.data.data.toString('base64'); + break; + case 'text': + dataParam = this.data.data; + break; + } + + const params = { + id: this.messageID, + connectionID: this.connectionID, + type: this.data.type, + data: dataParam, + fromClient: this.fromClient, + }; + + return { ...super.createDTO(), method: 'transformInterceptedMessage', params }; + } +} + +export interface JSONRPCRequestDTO { + id: string; + params: T; +} + +export type InterceptionModeDTO = { mode: 'local'; pid: number } | { mode: 'proxy' }; + +export type StartInterceptionJSONRPCRequestDTO = JSONRPCRequestDTO; + +export class StartInterceptionJSONRPCRequest extends JSONRPCRequest { + constructor(id: string, readonly mode: InterceptionModeDTO) { + super(id); + } + + static fromDTO(dto: StartInterceptionJSONRPCRequestDTO) { + return new StartInterceptionJSONRPCRequest(dto.id, dto.params); + } +} + +type JSONObject = Partial>; + +export interface JSONRPCResponseDTO { + id: string; + result: T; +} + +type TransformInterceptedMessageJSONRPCResponseDTO = JSONRPCResponseDTO< + { action: 'drop' } | { action: 'replace'; type: 'binary' | 'text'; data: string } +>; + +export class JSONRPCResponse { + constructor(readonly id: string, readonly errorMessage: string | null = null) {} + + createDTO() { + const dto: Record = { jsonrpc: '2.0', id: this.id }; + + if (this.errorMessage !== null) { + dto.error = { code: 1, message: this.errorMessage }; + } else { + dto.result = {}; + } + + return dto; + } + + static fromDTO(dto: JSONRPCResponseDTO): JSONRPCResponse { + // TODO when we add more methods we’ll need a way to know which method the request corresponds to, via the ID + return TransformInterceptedMessageJSONRPCResponse.fromDTO(dto as TransformInterceptedMessageJSONRPCResponseDTO); + } +} + +export class TransformInterceptedMessageJSONRPCResponse extends JSONRPCResponse { + constructor(id: string, readonly action: MessageAction) { + super(id); + } + + static fromDTO(dto: TransformInterceptedMessageJSONRPCResponseDTO) { + let action: MessageAction; + + switch (dto.result.action) { + case 'drop': + action = { type: 'drop' }; + break; + case 'replace': + switch (dto.result.type) { + case 'binary': + const data = Buffer.from(dto.result.data, 'base64'); + action = { type: 'replace', data: { type: 'binary', data } }; + break; + case 'text': + action = { type: 'replace', data: { type: 'text', data: dto.result.data } }; + break; + } + break; + } + + return new TransformInterceptedMessageJSONRPCResponse(dto.id, action); + } +} diff --git a/test/interception-proxy/ControlServer.ts b/test/interception-proxy/ControlServer.ts new file mode 100644 index 000000000..b171b2a03 --- /dev/null +++ b/test/interception-proxy/ControlServer.ts @@ -0,0 +1,59 @@ +import { WebSocket, WebSocketServer } from 'ws'; +import { InterceptionContext } from './InterceptionContext'; + +const port = 8001; + +// think of it as an opaque type for now +export type ControlServerConnection = WebSocket; + +export class ControlServer { + private wss = new WebSocketServer({ port }); + private webSocketConnections: WebSocket[] = []; + private activeConnection: WebSocket | null = null; + + constructor(private readonly interceptionContext: InterceptionContext) { + interceptionContext.controlServer = this; + } + + start() { + this.wss.on('connection', (ws) => { + console.log('New connection to control server'); + this.webSocketConnections.push(ws); + ws.on('error', console.error); + + ws.on('message', (data, isBinary) => { + try { + if (isBinary) { + throw new Error('Control server got a binary message; it only works with text messages'); + } + + const text = data.toString('utf-8'); + console.info('Control server received message', text); + this.interceptionContext.onControlWebSocketMessage(text, ws); + } catch (err) { + console.error('Control server got error handling message', err); + } + }); + }); + console.log(`Started control server on port ${port}`); + } + + send(data: string, connection: ControlServerConnection) { + console.log('Control server sending message', data); + connection.send(data); + } + + sendToActiveConnection(data: string) { + console.log('Control server sending message to active connection', data); + this.activeConnection?.send(data); + } + + setActiveConnection(connection: ControlServerConnection) { + if (this.activeConnection !== null) { + throw new Error('There is already an active connection to the control server'); + } + + console.log('Control server set active connection'); + this.activeConnection = connection; + } +} diff --git a/test/interception-proxy/InterceptedConnection.ts b/test/interception-proxy/InterceptedConnection.ts new file mode 100644 index 000000000..de114d9c4 --- /dev/null +++ b/test/interception-proxy/InterceptedConnection.ts @@ -0,0 +1,311 @@ +import { randomUUID } from 'crypto'; +import { stateMachineDefinition, InterceptedConnectionState, InterceptedConnectionEvent } from './StateMachine'; +import { WebSocket, WebSocketServer } from 'ws'; +import { ProxyMessage } from './Proxy'; +import { WebSocketMessageData } from './WebSocketMessageData'; +import { InterceptionContext } from './InterceptionContext'; + +export enum WhichConnection { + ToServer, + ToClient, +} + +/** + * Represents a WebSocket connection that we’re forwarding from a client to a server. + */ +export class InterceptedConnection { + readonly id = randomUUID(); + private _state = InterceptedConnectionState.ConnectedToClientButNotYetServer; + serverConnection: WebSocket | null = null; + + private messageQueues: { + /** + * Messages that are waiting to be sent to the server + */ + toServer: WebSocketMessageData[]; + /** + * Messages that are waiting to be sent to the client + */ + toClient: WebSocketMessageData[]; + } = { + toServer: [], + toClient: [], + }; + + private keepClientConnectionAlive = false; + private keepServerConnectionAlive = false; + + /** + * @param clientConnection An open WebSocket connection to the client. + */ + constructor( + private readonly interceptionContext: InterceptionContext, + host: string, + proto: string, + url: string, + readonly clientConnection: WebSocket, + ) { + this.log(`New connection to proxy server (forwarded host ${host}, forwarded proto ${proto})`); + + clientConnection.on('close', () => { + this.log('clientConnection close event'); + this.on(InterceptedConnectionEvent.ClientClose); + // TODO I’ve written a state machine but instead of state machine actions I have these calls below (ditto for the server connection); seems maybe suboptimal but not important now + this.tryFlushSendQueue(false); // drop all queued client-bound messages + this.tryPropagateClosure(WhichConnection.ToServer); + }); + // TODO do something with this event? + clientConnection.on('error', (err) => { + this.log(`clientConnection error event: ${err}`); + }); + + this.connectToServer(host, proto, url); + + clientConnection.on('message', (data, isBinary) => { + console.info('Got message from client'); + // TODO sort out type assertion + this.onMessage(data as Buffer, isBinary, true); + }); + } + + get state(): InterceptedConnectionState { + return this._state; + } + + log(message: string) { + console.log(`Connection ${this.id} (state ${InterceptedConnectionState[this.state]}): ${message}`); + } + + on(event: InterceptedConnectionEvent) { + const transition = stateMachineDefinition.fetchTransition(this.state, event); + + if (transition === null) { + throw new Error( + `No transition defined for current state ${InterceptedConnectionState[this.state]} and event ${ + InterceptedConnectionEvent[event] + }`, + ); + } + + this.log(`Transitioning to state ${InterceptedConnectionState[transition.newState]}`); + this._state = transition.newState; + + if (this._state === InterceptedConnectionState.Disconnected) { + this.log(`Finished`); + } + } + + private connectToServer(host: string, proto: string, resourceName: string) { + const uri = `${proto}://${host}${resourceName}`; + this.log(`Starting forwarding to ${uri}`); + + const serverConnection = new WebSocket(uri); + this.serverConnection = serverConnection; + + serverConnection.on('open', () => { + this.log('serverConnection open event'); + this.on(InterceptedConnectionEvent.ServerOpen); + this.tryFlushSendQueue(true); // forward all queued server-bound messages + }); + serverConnection.on('close', () => { + this.log('serverConnection close event'); + this.on(InterceptedConnectionEvent.ServerClose); + this.tryFlushSendQueue(true); // drop all queued server-bound messages + this.tryPropagateClosure(WhichConnection.ToClient); + }); + // TODO better logging, do something with this event? + serverConnection.on('error', (err) => { + this.log(`serverConnection error event: ${err}`); + }); + + serverConnection.on('message', (data, isBinary) => { + // TODO sort out type assertion + this.onMessage(data as Buffer, isBinary, false); + }); + } + + private onMessage(data: Buffer, isBinary: boolean, fromClient: boolean) { + const proxyMessage = new ProxyMessage( + isBinary ? { type: 'binary', data } : { type: 'text', data: data.toString('utf-8') }, + fromClient, + ); + + this.log(`Got message ${proxyMessage.loggingDescription}`); + + // We don’t forward this message directly to the other peer; rather we pass it to the interception context, which will use the control API to ask its client what to do with this message. It might, for example, result in a replacement message being injected via `inject`. + + this.interceptionContext.enqueueMessage(proxyMessage, this); + } + + inject(fromClient: boolean, data: WebSocketMessageData) { + this.enqueueForSend(fromClient, data); + this.tryFlushSendQueue(fromClient); + } + + /** + * When called with `keepAlive` set to `true`, tells the proxy not to close the connection described by `connection` until called again with `keepAlive` set to `false`. + */ + setKeepConnectionAlive(keepAlive: boolean, connection: WhichConnection) { + this.log(`set ${keepAlive ? '' : 'no '}keepAlive connection ${WhichConnection[connection]}`); + + switch (connection) { + case WhichConnection.ToServer: + this.keepServerConnectionAlive = keepAlive; + break; + case WhichConnection.ToClient: + this.keepClientConnectionAlive = keepAlive; + break; + } + + this.tryPropagateClosure(connection); + } + + private tryPropagateClosure(targetConnectionDescription: WhichConnection) { + let keepAlive: boolean; + let queuedMessages: WebSocketMessageData[]; + let targetConnection: WebSocket | null; + + switch (targetConnectionDescription) { + case WhichConnection.ToServer: + keepAlive = this.keepServerConnectionAlive; + queuedMessages = this.messageQueues.toServer; + targetConnection = this.serverConnection; + break; + case WhichConnection.ToClient: + keepAlive = this.keepClientConnectionAlive; + queuedMessages = this.messageQueues.toClient; + targetConnection = this.clientConnection; + break; + } + + // if there are queued messages then we’ll call tryPropagateClosure again after sending them + if (keepAlive || queuedMessages.length !== 0) { + return; + } + + let propagate; + + switch (this.state) { + case InterceptedConnectionState.ConnectingToServerButNoLongerConnectedToClient: + case InterceptedConnectionState.ConnectedToServerButNoLongerToClient: + propagate = targetConnectionDescription === WhichConnection.ToServer; + break; + case InterceptedConnectionState.ConnectedToClientAndFailedToConnectToServer: + case InterceptedConnectionState.ConnectedToClientButNoLongerToServer: + propagate = targetConnectionDescription === WhichConnection.ToClient; + break; + case InterceptedConnectionState.ConnectedToClientButNotYetServer: + case InterceptedConnectionState.ConnectedToClientAndServer: + propagate = false; // nothing to propagate + break; + case InterceptedConnectionState.Disconnected: + propagate = false; // already propagated + break; + } + + // TODO make use of the information about how the connection closed (i.e. code and data), instead of just a generic close. Not immediately important I think because I don’t think Ably / our tests care about whether clean or what the closing handshake said + if (propagate) { + this.log(`Propagating close of connection ${WhichConnection[targetConnectionDescription]}`); + targetConnection?.close(); + } + } + + private enqueueForSend(fromClient: boolean, data: WebSocketMessageData) { + const queue = fromClient ? this.messageQueues.toServer : this.messageQueues.toClient; + queue.push(data); + } + + private tryFlushSendQueue(fromClient: boolean) { + const queue = fromClient ? this.messageQueues.toServer : this.messageQueues.toClient; + + if (queue.length === 0) { + return; + } + + let whenCanSend: 'now' | 'later' | 'never'; + + switch (this.state) { + case InterceptedConnectionState.ConnectedToClientButNotYetServer: + whenCanSend = fromClient ? 'later' : 'now'; + break; + case InterceptedConnectionState.ConnectingToServerButNoLongerConnectedToClient: + whenCanSend = fromClient ? 'later' : 'never'; + break; + case InterceptedConnectionState.ConnectedToClientAndFailedToConnectToServer: + whenCanSend = fromClient ? 'never' : 'now'; + break; + case InterceptedConnectionState.ConnectedToClientAndServer: + whenCanSend = 'now'; + break; + case InterceptedConnectionState.ConnectedToClientButNoLongerToServer: + whenCanSend = fromClient ? 'never' : 'now'; + break; + case InterceptedConnectionState.ConnectedToServerButNoLongerToClient: + whenCanSend = fromClient ? 'now' : 'never'; + break; + case InterceptedConnectionState.Disconnected: + whenCanSend = 'never'; + break; + } + + let clearQueue = false; + + switch (whenCanSend) { + case 'now': + this.log(`Sending ${queue.length} injected messages to ${fromClient ? 'server' : 'client'}`); + const outgoingConnection = fromClient ? this.serverConnection : this.clientConnection; + for (const data of queue) { + this.send(data, outgoingConnection!); + } + clearQueue = true; + break; + case 'later': + this.log( + `There are ${queue.length} injected messages to send to ${ + fromClient ? 'server' : 'client' + }; will try sending later since connection is in state ${InterceptedConnectionState[this.state]}`, + ); + break; + case 'never': + this.log( + `There are ${queue.length} injected messages to send to ${ + fromClient ? 'server' : 'client' + }; cannot do this ever since connection is in state ${ + InterceptedConnectionState[this.state] + }. Dropping messages.`, + ); + clearQueue = true; + break; + } + + if (clearQueue) { + queue.length = 0; // clear the queue + this.tryPropagateClosure(fromClient ? WhichConnection.ToServer : WhichConnection.ToClient); + } + } + + private send(data: WebSocketMessageData, connection: WebSocket) { + let buffer: Buffer; + switch (data.type) { + case 'binary': + buffer = data.data; + break; + case 'text': + buffer = Buffer.from(data.data, 'utf-8'); + break; + } + + let binary: boolean; + switch (data.type) { + case 'binary': + binary = true; + break; + case 'text': + binary = false; + break; + } + + // TODO what does the callback that you can pass here do? + connection.send(buffer, { binary }); + } +} diff --git a/test/interception-proxy/InterceptedMessagesQueue.ts b/test/interception-proxy/InterceptedMessagesQueue.ts new file mode 100644 index 000000000..95a446f51 --- /dev/null +++ b/test/interception-proxy/InterceptedMessagesQueue.ts @@ -0,0 +1,86 @@ +import { InterceptedConnection } from './InterceptedConnection'; +import { ProxyMessage } from './Proxy'; +import { WebSocketMessageData } from './WebSocketMessageData'; + +export class InterceptedMessagePredicate { + constructor(readonly interceptedConnection: InterceptedConnection, readonly fromClient: boolean) {} + + get loggingDescription() { + return `(messages from ${this.fromClient ? 'client' : 'server'} for connection ID ${ + this.interceptedConnection.id + })`; + } + + get keyForMap() { + return `${this.interceptedConnection.id}-${this.fromClient}`; + } +} + +// A handle for locating a message within InterceptedMessageQueue. +export class InterceptedMessageHandle { + constructor(readonly predicate: InterceptedMessagePredicate, readonly messageId: string) {} +} + +export type MessageAction = { type: 'drop' } | { type: 'replace'; data: WebSocketMessageData }; + +export class InterceptedMessage { + action: MessageAction | null = null; + + constructor(readonly message: ProxyMessage) {} + + get id(): string { + return this.message.id; + } +} + +// Per-connection, per-direction message queue. We use it to queue intercepted messages whilst waiting for a control server message telling us what to do with the message at the head of the queue. +export class InterceptedMessagesQueue { + // Maps an InterceptedMessagePredicate’s `keyForMap` to a queue + private readonly queues = new Map(); + + private messagesFor(predicate: InterceptedMessagePredicate, createIfNeeded = false) { + const queue = this.queues.get(predicate.keyForMap); + if (queue !== undefined) { + return queue; + } else { + const result: InterceptedMessage[] = []; + + if (createIfNeeded) { + this.queues.set(predicate.keyForMap, result); + } + + return result; + } + } + + pop(predicate: InterceptedMessagePredicate) { + const messages = this.messagesFor(predicate); + + if (messages.length === 0) { + throw new Error('pop called for empty queue'); + } + + return messages.shift()!; + } + + hasMessages(predicate: InterceptedMessagePredicate) { + return this.messagesFor(predicate).length > 0; + } + + append(message: InterceptedMessage, predicate: InterceptedMessagePredicate) { + this.messagesFor(predicate, true).push(message); + } + + count(predicate: InterceptedMessagePredicate) { + return this.messagesFor(predicate).length; + } + + isHead(handle: InterceptedMessageHandle) { + const head = this.messagesFor(handle.predicate)[0]; + return head.id === handle.messageId; + } + + getHead(predicate: InterceptedMessagePredicate) { + return this.messagesFor(predicate)[0]; + } +} diff --git a/test/interception-proxy/InterceptionContext.ts b/test/interception-proxy/InterceptionContext.ts new file mode 100644 index 000000000..63854d0da --- /dev/null +++ b/test/interception-proxy/InterceptionContext.ts @@ -0,0 +1,256 @@ +import { ChildProcessWithoutNullStreams, spawn } from 'child_process'; +import { randomUUID } from 'crypto'; +import { + JSONRPCRequest, + JSONRPCResponse, + StartInterceptionJSONRPCRequest, + TransformInterceptedMessageJSONRPCRequest, + TransformInterceptedMessageJSONRPCResponse, +} from './ControlRPC'; +import { ControlServer, ControlServerConnection } from './ControlServer'; +import { InterceptedConnection, WhichConnection } from './InterceptedConnection'; +import { + InterceptedMessagesQueue, + InterceptedMessageHandle, + InterceptedMessagePredicate, + InterceptedMessage, +} from './InterceptedMessagesQueue'; +import { ProxyMessage } from './Proxy'; +import { ProxyServer } from './ProxyServer'; +import { webSocketMessageDataLoggingDescription } from './WebSocketMessageData'; + +export class InterceptionContext { + controlServer: ControlServer | null = null; + proxyServer: ProxyServer | null = null; + private interceptedMessagesQueue = new InterceptedMessagesQueue(); + // TODO use a Map + private jsonRPCRequestIDsToHandles: Partial> = {}; + private onMitmproxyReady: (() => void) | null = null; + + onControlWebSocketMessage(data: string, controlServerConnection: ControlServerConnection) { + const dto = JSON.parse(data); + + if ('error' in dto) { + throw new Error('Not expecting an error in JSON-RPC response'); + } else if ('result' in dto) { + const response = JSONRPCResponse.fromDTO(dto); + this.handleJSONRPCResponse(response); + } else if ('method' in dto) { + if ('id' in dto) { + const request = StartInterceptionJSONRPCRequest.fromDTO(dto); + this.handleJSONRPCRequest(request, controlServerConnection); + } else if (dto.method === 'mitmproxyReady') { + // notification telling us that mitmproxy is running + this.onMitmproxyReady?.(); + } + } else { + throw new Error(`Got unrecognised control API message: ${dto}`); + } + } + + private handleJSONRPCRequest(request: JSONRPCRequest, controlServerConnection: ControlServerConnection) { + try { + if (request instanceof StartInterceptionJSONRPCRequest) { + this.controlServer!.setActiveConnection(controlServerConnection); + + console.log('starting mitmdump, mode', request.mode); + + let mitmdumpBinary: string; + let mitmdumpMode: string | null; + + // TODO this is currently written on the assumption that darwin means running locally and linux means running in GitHub action; sort out so that you can run (locally or on CI) on (macOS or Linux) + switch (process.platform) { + // currently this actually means " + case 'darwin': + mitmdumpBinary = 'mitmdump'; + switch (request.mode.mode) { + case 'local': + mitmdumpMode = `local:${request.mode.pid}`; + break; + case 'proxy': + mitmdumpMode = null; + break; + } + break; + case 'linux': + // Currently we expect that you set up the iptables rules externally + mitmdumpBinary = '/opt/pipx_bin/mitmdump'; + switch (request.mode.mode) { + case 'local': + mitmdumpMode = 'transparent'; + break; + case 'proxy': + mitmdumpMode = null; + break; + } + break; + default: + throw new Error(`Don’t know how to set up mitmdump interception for platform ${process.platform}`); + } + + // sounds like we don’t need to explicitly stop this when we stop the current process: https://nodejs.org/api/child_process.html#optionsdetached + const mitmdump = spawn(mitmdumpBinary, [ + '--set', + 'stream_large_bodies=1', + ...(mitmdumpMode === null ? [] : ['--mode', mitmdumpMode]), + '-s', + 'test/mitmproxy_addon_2.py', + '--set', + // "full request URL with response status code and HTTP headers" (the default truncates the URL) + 'flow_detail=2', + ]); + + const formatMitmdumpOutput = (source: string, data: Buffer) => { + const text = data.toString('utf-8'); + const lines = text.split('\n'); + return lines.map((line) => `mitmdump ${source}: ${line}`).join('\n'); + }; + + mitmdump.stdout.on('data', (data) => { + console.log(formatMitmdumpOutput('stdout', data)); + }); + + mitmdump.stderr.on('data', (data) => { + console.log(formatMitmdumpOutput('stderr', data)); + }); + + console.log(`Waiting for mitmdump to start`); + + this.onMitmproxyReady = () => { + this.onMitmproxyReady = null; + console.log(`mitmdump has started`); + const response = new JSONRPCResponse(request.id); + const responseDTO = response.createDTO(); + this.controlServer!.sendToActiveConnection(JSON.stringify(responseDTO)); + }; + } else { + throw new Error(`Got unknown control API request: ${request}`); + } + } catch (error) { + console.log(`Error handling JSON-RPC request ${request.id}:`, error); + const response = new JSONRPCResponse(request.id, `Error handling request: ${error}`); + const responseDTO = response.createDTO(); + this.controlServer!.send(JSON.stringify(responseDTO), controlServerConnection); + } + } + + private handleJSONRPCResponse(response: JSONRPCResponse) { + if (response instanceof TransformInterceptedMessageJSONRPCResponse) { + this.handleTransformInterceptedMessageResponse(response); + } else { + throw new Error(`Got unknown control API response: ${response}`); + } + } + + private handleTransformInterceptedMessageResponse(response: TransformInterceptedMessageJSONRPCResponse) { + const handle = this.jsonRPCRequestIDsToHandles[response.id]; + + if (handle === undefined) { + throw new Error(`Unrecognised control response ID: ${response.id}`); + } + + delete this.jsonRPCRequestIDsToHandles[response.id]; + + if (!this.interceptedMessagesQueue.isHead(handle)) { + throw new Error('Got response for an intercepted message that’s not at head of queue; shouldn’t be possible'); + } + + const interceptedMessage = this.interceptedMessagesQueue.getHead(handle.predicate); + + if (interceptedMessage.action !== null) { + throw new Error('Response asked us to set the action for a message that already has action set'); + } + + interceptedMessage.action = response.action; + + this.dequeueInterceptedMessage(handle.predicate); + } + + private dequeueInterceptedMessage(predicate: InterceptedMessagePredicate) { + console.log('Dequeueing intercepted message for predicate', predicate.loggingDescription); + + const message = this.interceptedMessagesQueue.pop(predicate); + + if (message.action === null) { + throw new Error(`Attempted to dequeue message that doesn’t have an action: ${message}`); + } + + switch (message.action.type) { + case 'replace': + console.log( + `Injecting replacement message for message ${message.id}, with type ${ + message.action.data.type + } and data (${webSocketMessageDataLoggingDescription(message.action.data)})`, + ); + + predicate.interceptedConnection.inject(predicate.fromClient, message.action.data); + break; + case 'drop': + console.log(`Dropping message ${message}`); + break; + } + + if (this.interceptedMessagesQueue.hasMessages(predicate)) { + this.broadcastNextMessage(predicate); + } else { + // We’re not waiting to forward any more messages, so tell the proxy that it can propagate any pending connection close and propagate any future connection close + predicate.interceptedConnection.setKeepConnectionAlive( + false, + predicate.fromClient ? WhichConnection.ToServer : WhichConnection.ToClient, + ); + } + } + + private broadcastJSONRPCRequest(request: JSONRPCRequest) { + const data = JSON.stringify(request.createDTO()); + + console.log(`Broadcasting request JSON ${data}`); + this.controlServer?.sendToActiveConnection(data); + } + + private broadcastNextMessage(predicate: InterceptedMessagePredicate) { + const interceptedMessage = this.interceptedMessagesQueue.getHead(predicate); + + const jsonRPCRequestID = randomUUID(); + const handle = new InterceptedMessageHandle(predicate, interceptedMessage.id); + this.jsonRPCRequestIDsToHandles[jsonRPCRequestID] = handle; + + // Broadcast to everyone connected to the control server. + // TODO I think would be better for there to be one client who sends an explicit message to become the active client, or to only allow a single connection at a time; not important now though + + const jsonRPCRequest = new TransformInterceptedMessageJSONRPCRequest( + jsonRPCRequestID, + interceptedMessage.id, + predicate.interceptedConnection.id, + interceptedMessage.message.data, + interceptedMessage.message.fromClient, + ); + + this.broadcastJSONRPCRequest(jsonRPCRequest); + } + + enqueueMessage(message: ProxyMessage, interceptedConnection: InterceptedConnection) { + console.log(`enqueueMessage ${message.id}`); + + // Tell the proxy to not propagate a connection close until we’ve had a chance to inject this message + interceptedConnection.setKeepConnectionAlive( + true, + message.fromClient ? WhichConnection.ToServer : WhichConnection.ToClient, + ); + + const predicate = new InterceptedMessagePredicate(interceptedConnection, message.fromClient); + + const interceptedMessage = new InterceptedMessage(message); + this.interceptedMessagesQueue.append(interceptedMessage, predicate); + + if (this.interceptedMessagesQueue.count(predicate) === 1) { + this.broadcastNextMessage(predicate); + } else { + console.log( + `Enqueued message ${interceptedMessage.id} since there are ${ + this.interceptedMessagesQueue.count(predicate) - 1 + } pending messages`, + ); + } + } +} diff --git a/test/interception-proxy/Proxy.ts b/test/interception-proxy/Proxy.ts new file mode 100644 index 000000000..431ada1cd --- /dev/null +++ b/test/interception-proxy/Proxy.ts @@ -0,0 +1,17 @@ +// TODO implement the proxy! These types are placeholders + +import { randomUUID } from 'crypto'; +import { WebSocketMessageData, webSocketMessageDataLoggingDescription } from './WebSocketMessageData'; + +export class ProxyMessage { + // unique identifier that we generate for this message + id = randomUUID(); + + constructor(readonly data: WebSocketMessageData, readonly fromClient: boolean) {} + + get loggingDescription(): string { + const sourceDescription = `from ${this.fromClient ? 'client' : 'server'}`; + + return `${sourceDescription} (id: ${this.id}, ${webSocketMessageDataLoggingDescription(this.data)})`; + } +} diff --git a/test/interception-proxy/ProxyServer.ts b/test/interception-proxy/ProxyServer.ts new file mode 100644 index 000000000..3eb8a1f50 --- /dev/null +++ b/test/interception-proxy/ProxyServer.ts @@ -0,0 +1,55 @@ +import { randomUUID } from 'crypto'; +import { WebSocket, WebSocketServer } from 'ws'; +import { InterceptionContext } from './InterceptionContext'; +import { ProxyMessage } from './Proxy'; +import { WebSocketMessageData } from './WebSocketMessageData'; +import { stateMachineDefinition, InterceptedConnectionState, InterceptedConnectionEvent } from './StateMachine'; +import { InterceptedConnection } from './InterceptedConnection'; + +const port = 8002; + +// TODO make sure this is as accurate as possible (in terms of frames) — forward PING and PONG without emitting our own, etc +// TODO more generally there are definitely going to be nuances that I’ve missed that mean this proxy introduces a not 100% faithful reproduction of the comms, but we can iterate +// TODO Understand the server API better +// TODO note that here we always accept the connection from the client, which, again, isn’t necessarily faithful + +export class ProxyServer { + private wss = new WebSocketServer({ port }); + // Keyed by connections’ `id` + private interceptedConnections = new Map(); + + constructor(private readonly interceptionContext: InterceptionContext) { + interceptionContext.proxyServer = this; + } + + start() { + this.wss.on('connection', (clientConnection, req) => { + const host = req.headers['ably-test-host'] as string | undefined; + const proto = req.headers['ably-test-proto'] as string | undefined; + + if (host === undefined) { + console.error('Connection to proxy server without Ably-Test-Host header; closing'); + clientConnection.close(); + return; + } + + if (proto === undefined) { + console.error('Connection to proxy server without Ably-Test-Proto header; closing'); + clientConnection.close(); + return; + } + + const interceptedConnection = new InterceptedConnection( + this.interceptionContext, + host, + proto, + req.url!, + clientConnection, + ); + // TODO do we actually need to keep hold of it or will it keep itself around since it’s a listener of a WebSocket? + this.interceptedConnections.set(interceptedConnection.id, interceptedConnection); + }); + + console.log(`Started interception proxy server to receive traffic from mitmproxy on port ${port}`); + } +} diff --git a/test/interception-proxy/StateMachine.ts b/test/interception-proxy/StateMachine.ts new file mode 100644 index 000000000..7361a2791 --- /dev/null +++ b/test/interception-proxy/StateMachine.ts @@ -0,0 +1,185 @@ +export enum InterceptedConnectionEvent { + /** + * The proxy’s WebSocket connection to the client emitted a `close` event. + */ + ClientClose, + + /** + * The proxy’s WebSocket connection to the server emitted an `open` event. + */ + ServerOpen, + + /** + * The proxy’s WebSocket connection to the server emitted a `close` event. + */ + ServerClose, +} + +export enum InterceptedConnectionState { + /** + * Initial state. + * + * Transitions: + * + * Event: ClientClose + * New state: ConnectingToServerButNoLongerConnectedToClient + * + * Event: ServerOpen + * New state: ConnectedToClientAndServer + * + * Event: ServerClose + * New state: ConnectedToClientAndFailedToConnectToServer + */ + ConnectedToClientButNotYetServer, + + /** + * The client closed the connection whilst we were connecting to the server. + * + * Transitions: + * + * Event: ServerOpen + * New state: ConnectedToServerButNoLongerToClient + * + * Event: ServerClose + * New state: Disconnected + */ + ConnectingToServerButNoLongerConnectedToClient, + + /** + * We failed to establish a connection to the server. + * + * Transitions: + * + * Event: ClientClose + * New state: Disconnected + */ + ConnectedToClientAndFailedToConnectToServer, + + /** + * In this state, we can send messages in both directions. + * + * Transitions: + * + * Event: ClientClose + * New state: ConnectedToServerButNoLongerToClient + * + * Event: ServerClose + * New state: ConnectedToClientButNoLongerToServer + */ + ConnectedToClientAndServer, + + /** + * The server closed the connection. + * + * Transitions: + * + * Event: ClientClose + * New state: Disconnected + */ + ConnectedToClientButNoLongerToServer, + + /** + * The client closed the connection. + * + * Transitions: + * + * Event: ServerClose + * New state: Disconnected + */ + ConnectedToServerButNoLongerToClient, + + /** + * Final state. + */ + Disconnected, +} + +export interface InterceptedConnectionStateMachineTransition { + newState: InterceptedConnectionState; +} + +interface InterceptedConnectionStateMachineRule { + fromState: InterceptedConnectionState; + event: InterceptedConnectionEvent; + transition: InterceptedConnectionStateMachineTransition; +} + +class InterceptedConnectionStateMachineDefinition { + private rules: InterceptedConnectionStateMachineRule[] = []; + + constructor() { + this.addRule( + InterceptedConnectionState.ConnectedToClientButNotYetServer, + InterceptedConnectionEvent.ClientClose, + InterceptedConnectionState.ConnectingToServerButNoLongerConnectedToClient, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientButNotYetServer, + InterceptedConnectionEvent.ServerOpen, + InterceptedConnectionState.ConnectedToClientAndServer, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientButNotYetServer, + InterceptedConnectionEvent.ServerClose, + InterceptedConnectionState.ConnectedToClientAndFailedToConnectToServer, + ); + this.addRule( + InterceptedConnectionState.ConnectingToServerButNoLongerConnectedToClient, + InterceptedConnectionEvent.ServerOpen, + InterceptedConnectionState.ConnectedToServerButNoLongerToClient, + ); + this.addRule( + InterceptedConnectionState.ConnectingToServerButNoLongerConnectedToClient, + InterceptedConnectionEvent.ServerClose, + InterceptedConnectionState.Disconnected, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientAndFailedToConnectToServer, + InterceptedConnectionEvent.ClientClose, + InterceptedConnectionState.Disconnected, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientAndServer, + InterceptedConnectionEvent.ClientClose, + InterceptedConnectionState.ConnectedToServerButNoLongerToClient, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientAndServer, + InterceptedConnectionEvent.ServerClose, + InterceptedConnectionState.ConnectedToClientButNoLongerToServer, + ); + this.addRule( + InterceptedConnectionState.ConnectedToClientButNoLongerToServer, + InterceptedConnectionEvent.ClientClose, + InterceptedConnectionState.Disconnected, + ); + this.addRule( + InterceptedConnectionState.ConnectedToServerButNoLongerToClient, + InterceptedConnectionEvent.ServerClose, + InterceptedConnectionState.Disconnected, + ); + } + + private addRule( + fromState: InterceptedConnectionState, + event: InterceptedConnectionEvent, + newState: InterceptedConnectionState, + ) { + this.rules.push({ fromState, event, transition: { newState } }); + } + + fetchTransition( + fromState: InterceptedConnectionState, + event: InterceptedConnectionEvent, + ): InterceptedConnectionStateMachineTransition | null { + for (const rule of this.rules) { + if (rule.fromState == fromState && rule.event == event) { + return rule.transition; + } + } + + return null; + } +} + +export const stateMachineDefinition = new InterceptedConnectionStateMachineDefinition(); diff --git a/test/interception-proxy/WebSocketMessageData.ts b/test/interception-proxy/WebSocketMessageData.ts new file mode 100644 index 000000000..2a5a63f81 --- /dev/null +++ b/test/interception-proxy/WebSocketMessageData.ts @@ -0,0 +1,10 @@ +export type WebSocketMessageData = { type: 'binary'; data: Buffer } | { type: 'text'; data: string }; + +export function webSocketMessageDataLoggingDescription(data: WebSocketMessageData) { + switch (data.type) { + case 'binary': + return `binary data: ${data.data.toString('base64')}`; + case 'text': + return `text data: ${data.data}`; + } +} diff --git a/test/interception-proxy/server.ts b/test/interception-proxy/server.ts new file mode 100644 index 000000000..0050ede2f --- /dev/null +++ b/test/interception-proxy/server.ts @@ -0,0 +1,14 @@ +import { ControlServer } from './ControlServer'; +import { InterceptionContext } from './InterceptionContext'; +import { ProxyServer } from './ProxyServer'; + +// TODO cleanup as control server connections go away +// TODO cleanup as intercepted connections go away + +const interceptionContext = new InterceptionContext(); + +const controlServer = new ControlServer(interceptionContext); +controlServer.start(); + +const proxyServer = new ProxyServer(interceptionContext); +proxyServer.start(); diff --git a/test/mitm-server-thoughts.md b/test/mitm-server-thoughts.md new file mode 100644 index 000000000..01a28954f --- /dev/null +++ b/test/mitm-server-thoughts.md @@ -0,0 +1,303 @@ +# Thoughts on a man-in-the-middle-server for replacing some private API usage + +**Note:** Most of the contents of this note are now better explained in [this RFC](https://ably.atlassian.net/wiki/x/IYDItQ); keeping this note around for now because there are some details I didn’t include there for brevity. + +The context being that we eventually intend to use the ably-js test suite for the unified test suite. + +See [`private-api-usage.md`](./private-api-usage.md) for the private APIs we’re referring to here. + +## Requirements + +TODO + +Key realtime protocol-related methods used by tests (will look at REST later): + +### Outgoing + +- replacing `channel.sendPresence` + - check presence message’s client ID +- replacing `transport.send` + - check the encoded data in a protocol message + - to look for an outgoing `AUTH` and check its properties + - to check the `clientId` on the outgoing message + - to listen for `MESSAGE`, check its properties, **and then continue the test** +- replacing `channel.sendMessage` + - with an empty implementation, to "sabotage the reattach attempt" + - with an implementation that fails the test if called + - to check that only a `DETACH` is being sent, **and to continue the test once second `DETACH` sent** +- replacing `connectionManager.send` + - to do `msg.setFlag('ATTACH_RESUME')` on the outgoing message +- accesses `var transport = realtime.connection.connectionManager.activeProtocol.transport.uri` or `.recvRequest.recvUri` to check the `v=3` query parameter +- checks `realtime.connection.connectionManager.httpHosts[0];` to check it’s using correct default host, also checks length of that array +- replacing `realtime.conneciton.connectionManager.tryATransport` + - looking at the host that’s being used to connect, although not sure exactly to what end + - to "simulate the internet being failed" +- replaces `realtime.connection.connectionManager.connectImpl` + - to check `transportParams.format` + +### Incoming + +- calling `channel.processMessage` + - inject a protocol message +- calling `onProtocolMessage` (on a specific transport) + + - inject an `ERROR` + - inject a `DISCONNECTED` + - etc. inject protocol message (won't keep repeating) + - accesses `connectionManager.connectionDetails`, modifies its `maxMessageSize`, then re-injects it via a `CONNECTED` passed to `onProtocolMessage()` (the point here being that maybe we can get this `connectionManager.connectionDetails` some other way, or just modify the original `CONNECTED`?) + +- replacing `channel.processMessage` + - drop an `ATTACHED` + - spies on this to, after processing a received `SYNC`, inject a `PRESENCE` +- replacing `onProtocolMessage` (on a specific transport) + - drop `ACK` + - change protocol message’s `connectionDetails.maxIdleInterval` + - with no-op so that last activity timer doesn’t get bumped + - to look for `CONNECTED`, make an assertion about it, and then set its `connectionKey` and `clientId` to fixed values (so we can assert they’re subsequently used to populate some user-facing properties) +- replacing `connectionManager.onChannelMessage` + - listen for `MESSAGE` and then run some code in response + +## API thoughts + +Ideal would be a 2-way communication between test suite and proxy server, that for each incoming message asks what you want to do with it (drop, or maintain with edits), instead of having to have a convoluted declarative API for pre-configuring what to do with messages. That will be the easiest drop-in for the existing private API usage. + +## Implementation thoughts + +Options: + +1. a WebSocket server (application layer) + + - Ably clients created by the tests would be configured with this server’s URL as their `realtimeHost` + - this would complicate the use of TLS; either the server would only operate over HTTPS, with the test client being configured to do the same, or we’d need to use self-signed certificates, which may be easy or hard to use depending on the client library under test + - this would complicate the testing of things like `environment` client option or fallback hosts + +2. a WebSocket proxy server (application layer) + + - e.g. a SOCKS5 proxy + - the environment in which client library is running (e.g. browser, or JVM, or platform’s OS) would be (per language in 4.1 of RFC 6455) configured to use a proxy when using WebSocket to connect to `sandbox.ably.com` + - client libraries’ WebSocket implementations would pick up this proxy configuration and ask the proxy to open a TCP connection to the library’s configured host (see _Proxy Usage_ in 4.1) + - this would mean that we could let the client use its default URLs, and things like `environment`, default behaviour of fallback hosts etc would be easy to test + - TLS: if we wanted to be able to MITM then we’d still need to work with self-signed certificates (TODO understand better) + - main issue I foresee here is that we don’t specify that our libraries need to work correctly behind a WebSocket proxy, and have reason to believe that some don’t (e.g. https://github.com/ably/ably-java/issues/120, although it says that the ably-js WebSocket library supports SOCKS, and https://github.com/ably/ably-dotnet/issues/159). I know off the top of my head that the library we use for WebSocket in ably-cocoa does (claim to) work with proxies, and presumably it’d work fine in a browser in ably-js + +- not sure whether Node has support for proxies (other than specifically configuring your HTTP / WS client) +- the other thing is that it would be nice to not need to have to make any global settings changes (e.g. OS proxy settings) + +3. some lower-level option + + Some other option that’s less visible to the Ably client library. For example, see the [modes of operation offered by mitmproxy](https://docs.mitmproxy.org/stable/concepts-modes/). + + One question in that case is whether it would be easy to use an option like this for all of the runtime environments in which we want to run the unified test suite (e.g. iOS Simulator, Android emulator, …?) + +## What about using mitmproxy? + +It's an interesting idea — it gives us access to a bunch of alternatives for how we'd implement the proxy, e.g. 1. above I think can be done with mitmproxy’s reverse proxy mode, and 2. with its regular mode. + +And then (TODO check) we'd have a unified API (i.e. abstraction) for doing the MITM-ing, regardless of the implementation details. Maybe, once I’ve got a better idea of the requirements, take a look at its API. (One problem is that I don’t know Python.) + +mitmproxy has good platform compatibility (Mac, Windows, Linux, and has instructions for using with Android Emulator, iOS Simulator). And has good instructions for how to set up self-signed certificates. + +It also means that I can use the proxy approach for now, which requires the least changes to the test suite, and then switch later on if / when necessary. + +It’s a mature piece of software, too, currently on something like version 12, seems to be actively developed. + +Also, someone has already made [a WebSocket-based tool](https://github.com/hacker1024/mitmproxy_remote_interceptions) that puts an external API on mitmproxy. Not sure if works with WebSocket. + +## Playwright + +Just noticed that this offers an API for e.g. intercepting WebSocket: https://playwright.dev/docs/network. Not very useful cross-platform though. + +## What about other transports? + +In the above, I’ve only considered the WebSocket transport. Haven’t taken into account what we’d do for ably-js’s Comet transports. But having a unified API between the test suite and the mock server would help to build a solution that works for all transports. + +## What we’ve used in the past + +TODO + +## Using mitmproxy + +Going to read the documentation + +I’ve been using the Wireguard mode, which seems very easy to use (installed the macOS client from App Store). Oh, no, that doesn't seem to work, you need to run on a separate machine: + +> With the current implementation, it is not possible to proxy all traffic of the host that mitmproxy itself is running on, since this would result in outgoing WireGuard packets being sent over the WireGuard tunnel themselves. + +There's something coming called local redirect mode ([here](https://github.com/mitmproxy/mitmproxy/discussions/5795) referred to as `osproxy` mode), not sure how far along. Ah, it's apparently out on Mac now! [Here](https://mitmproxy.org/posts/local-redirect/macos/) is the blog post. You can target a process by PID or by name. + +For Node to work with mitmproxy certs: `NODE_EXTRA_CA_CERTS=~/.mitmproxy/mitmproxy-ca-cert.pem npm run test:node` + +I think I need to learn a bit of Python now? + +## Writing mitmproxy addon + +- install mitmproxy using pipx so that we can install websockets package +- `pipx inject mitmproxy websockets` +- run mitmproxy with `mitmdump --set stream_large_bodies=1 --mode local:node -s test/interception-proxy/src/mitmproxy_addon.py` +- run Node tests with `NODE_EXTRA_CA_CERTS=~/.mitmproxy/mitmproxy-ca-cert.pem npm run test:node` +- to connect to the control API for now, use `websocat ws://localhost:8001` + +TODO (something to do with injecting a message) + +TODO need to figure out how it’ll work when running across multiple test cases — how to distinguish between different connections (e.g. ignoring ones from accidental lingering clients) + +## mitmproxy limitations + +- sounds like [you can’t intercept PING / PONG frames](https://github.com/mitmproxy/mitmproxy/blob/8cf0cba1cb6e87f1cf48789e90526b75caa5436d/docs/src/content/concepts-protocols.md?plain=1#L56) — this might be an issue. TODO double check whether our tests _currently_ make use of ping / pong (I don't think they do) + +### mitmproxy and Comet + +Evgenii said that we perhaps don't need to worry about Comet for now (i.e. if this doesn't work out then I’ll try running the test suite with just websockets) + +see https://docs.mitmproxy.org/stable/overview-features/#streaming + +note that you can't manipulate streamed responses; is that an issue? in web i don't think we stream any more; what about in Node? + +## Replacing a few usages of private API in tests + +- `test/realtime/connection.test.js` - spies on `transport.send` to listen for `MESSAGE`, check its properties, and then continue the test + +## JSON-RPC notifications sent by our little mitmproxy addon when running + +It connects to the control API described below and sends a notification, method `mitmproxyReady`, no params. + +## JSON-RPC methods implemented in TypeScript proxy server + +Implemented via text WebSocket messages exchanged between proxy and test suite. The WebSocket server is run by the proxy at `http://localhost:8001`. + +### `startInterception` + +The test suite calls this method on the proxy at the start of the test suite. It: + +- results in an error if there is already an active test suite +- marks the WebSocket connection as belonging the active test suite (there is currently no way to undo this; to set a new active test suite you must restart the proxy) +- sets up a proxy for intercepting traffic (this may require cooperation from the tests; see `mode` below) + +Request params is one of the following objects: + +- `{ 'mode': 'local', 'pid': number }`: transparently intercept traffic from the process with the given PID (note that this is currently only used on macOS; in Linux we do interception by UID, see test-node.yml workflow for now) +- `{ 'mode': 'proxy' }`: run an HTTP proxy which listens on port 8080 + +Response result is an empty object. + +### `transformInterceptedMessage` + +The proxy calls this method on the active test suite each time a WebSocket message is intercepted. The test suite must return a result telling the proxy what to do with the message. Subsequent messages intercepted on that WebSocket connection, in the direction described by `fromClient`, will be queued pending the test suite’s reply. + +Request params is an object with the following properties: + +- `id`: a unique identifier for this message +- `connectionID`: a unique identifer for the intercepted WebSocket connection that this message belongs to +- `type`: + - `binary` if the intercepted message is of Binary type + - `text` if it is of Text type +- `data`: the data of the intercepted WebSocket message + - if `type` is `binary`, then this value is Base64-encoded +- `fromClient`: describes the direction in which the intercepted message was sent + +Response result is one of the following objects: + +- `{ "action": "drop" }`: this will cause the proxy to drop the intercepted message +- `{ "action": "replace", "type": "binary", "data": "(…)" }`: this will cause the proxy to replace the intercepted message with a message of Binary type whose data is the result of Base64-decoding the `data` property +- `{ "action": "replace", "type": "text", "data": "(…)" }`: this will cause the proxy to replace the intercepted message with a message of Text type whose data is the value of the `data` property + +### Writing a proxy that sits behind mitmproxy in order to control WebSocket connection lifetime + +The idea is that we’ll build a proxy (i.e. something that mitmproxy knows how to tunnel WebSocket requests through), which will be the thing that actually manages the lifetime of the WebSocket connection to the client. And then, if it turns out that for some other reason we can’t use mitmproxy, we can just give up on interception and instead add a mode for this server to work as a reverse proxy. + +And, as a bonus, I can write this server in TypeScript, which means not having to battle with Python. + +OK, I’ve pulled across the code, now I need to write a proxy and figure out how to make mitmproxy target it. + +So, how do we get mitmproxy to intercept everything using `local` mode, but then send it upstream to another proxy? Let’s play around. + +1. start an mitmproxy that acts as a normal HTTP proxy (this mimics the proxy we’ll eventually implement ourselves): `mitmproxy --listen-port 8081` + +1. start an mitmproxy in `local` mode that intercepts `curl` traffic: `mitmproxy --mode local:curl`. Now the question is how do we make this one forward to our upstream proxy (i.e. http://localhost:8081)? The only functionality I can see in mitmproxy for forwarding to an upstream proxy is the [upstream proxy mode](https://docs.mitmproxy.org/stable/concepts-modes/#upstream-proxy), but that’s a mode; is it mutually exclusive from `local` mode? Let’s try `mitmproxy --mode local:curl --mode upstream:http://localhost:8081`. + +OK, [maintainer replied to my question](https://github.com/mitmproxy/mitmproxy/discussions/6786#discussioncomment-9044517) saying that’s not possible: + +> This commands starts two proxy modes: First, proxy all local cURL traffic. Second, spawn an HTTP proxy on port 8080 that will forward all traffic to another HTTP proxy upstream at `localhost:8081`. Proxy modes are not "interconnected". There currently is no (easy) way to use an upstream proxy with local redirect mode. + +So, we can’t use this TS server as a proxy server. What about if we write it as an application server? Either configuring the test suite to use it directly, or configuring mitmproxy to intercept and rewrite the destination server. + +How would we do the latter? Can we do [this example](https://docs.mitmproxy.org/stable/addons-examples/#http-redirect-requests) ("redirect HTTP requests to another server"), but somehow for WebSocket? + +Let's run an application server inside the server I created, then take it from there. + +OK, have got mitmproxy set up to intercept in local mode and then change the upstream server to my local app server. It seems to be doing that successfully (tested in websocat). Run with: + +`mitmdump --set stream_large_bodies=1 --mode local:websocat -s test/mitmproxy_addon_2.py` (or `local:node` for test suite) + +Now we need to set it to grab the original host so that it can open a connection to it. Let’s set a header? So I think that in normal proxy usage things are different — I think the proxy uses the `Host` header (which the agent that forwarded it the request doesn’t modify) to know where to forward to. But presumably mitmproxy is modifying that header. + +But how does a proxy know what scheme to use? i.e. whether to open with HTTPS or not. Here’s ChatGPT: + +> Yes, a proxy can determine whether to use HTTPS (HTTP over TLS) to communicate with the upstream server based on several factors: +> +> 1. **Protocol Detection**: The proxy can examine the initial request from the client to determine if it's an HTTPS request. If the request uses the HTTPS protocol (`https://`), the proxy can infer that it needs to establish an HTTPS connection with the upstream server. +> +> 2. **Explicit Configuration**: The proxy can be configured explicitly to use HTTPS for certain URLs or domains. This configuration tells the proxy to always use HTTPS when forwarding requests to these destinations. +> +> 3. **Forwarded Protocol Header**: Some proxies use a "Forwarded" or similar header to indicate the original protocol (HTTP or HTTPS) used by the client. This header can be used by the proxy to determine whether to use HTTPS when communicating with the upstream server. +> +> 4. **Environment Variables**: In some setups, environment variables or other configuration mechanisms can be used to indicate whether HTTPS should be used. The proxy can check these variables to make the decision. +> +> The exact method used depends on the proxy software and its configuration. However, proxies typically have mechanisms in place to determine whether to use HTTPS when communicating with upstream servers. + +The `Forwarded` header isn’t defined in HTTP spec, but rather in another RFC. See [MDN docs](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded). + +So let’s have: + +- `Ably-Test-Host`: like `Host` +- `Ably-Test-Proto`: either `ws` or `wss` + +## Notes from Thu 4 Apr that I didn’t previously incorporate here + +### About managing WebSocket connection lifetime + +hmm, all the test things are saying + +``` + 1) realtime/auth + auth_token_expires_with_web_socket_binary_transport: + + Verify correct disconnect statusCode + + expected - actual + + -400 + +401 +``` + +(why is the test suite sometimes just hanging?) + +OK, I think there's an actual issue here. What's happening is that the client is receiving the CONNECTED protocol message, then we intercept the DISCONNECTED but the server closes the connection before we have a chance to process the DISCONNECTED, so what ends up happening is that the client sees the close before the disconnected and we end up with the wrong status code. + +so for this to work we'd want to delay: + +- the delivery of the `CLOSE` frame from the server to the client +- "passing on" the server’s closing of the TCP connection (my TCP knowledge is shaky here so I don't know what that means exactly) + +until after we've delivered the DISCONNECTED + +is this possible in mitmproxy? would it have been possible in that kotlin thing? + +the problem is that mitmproxy doesn't work at a frame level AFAIK + +what we want to do is when mitmproxy finds out a websocket connection has been closed by server, to keep _its_ websocket connection with the client open until it's delivered all the stuff from the server, then pass along the `CLOSE` frame and close the connection + +is there a proper way to do with with mitmproxy? is there a hack way to do it? (will ask) + +this is how mitmproxy handles a connection closure: https://github.com/mitmproxy/mitmproxy/blob/8cf0cba1cb6e87f1cf48789e90526b75caa5436d/mitmproxy/proxy/layers/websocket.py#L202-L215 — not sure exactly what's happening here but it seems to be immediately doing _something_ with the close event...? + +`yield ws.send2(ws_event)` and `yield commands.CloseConnection(ws.conn)` (don’t know what the difference is) + +OK, I've [asked about this in mitmproxy discussions](https://github.com/mitmproxy/mitmproxy/discussions/6784) + +### About Comet + +see https://docs.mitmproxy.org/stable/overview-features/#streaming + +note that you can't manipulate streamed responses; is that an issue? in web i don't think we stream any more; what about in Node? + +start 09:48 with `--set stream_large_bodies=1` diff --git a/test/mitmproxy_addon_2.py b/test/mitmproxy_addon_2.py new file mode 100644 index 000000000..4ae36a9c1 --- /dev/null +++ b/test/mitmproxy_addon_2.py @@ -0,0 +1,54 @@ +import mitmproxy +import logging +import asyncio +import websockets +import json + +async def send_ready_notification(): + uri = "ws://localhost:8001" + logging.info(f'sending mitmproxyReady JSON-RPC notification to {uri}') + async with websockets.connect(uri) as websocket: + notification_dto = { "jsonrpc": "2.0", "method": "mitmproxyReady" } + data = json.dumps(notification_dto) + await websocket.send(data) + +class MitmproxyAddon2: + def running(self): + # tell the control API that we’re ready to receive traffic + asyncio.get_running_loop().create_task(send_ready_notification()) + + # Copied from https://docs.mitmproxy.org/stable/addons-examples/#http-redirect-requests + def request(self, flow: mitmproxy.http.HTTPFlow) -> None: + # To make sure that when running in local redirect mode (and hence intercepting all traffic from the test process) we don’t mess with traffic from the test process to the control API + # TODO see extended comments re this in test-node.yml and why it hasn’t yet been an issue in practice on macOS + if not flow.request.port in [80, 443]: + return + + # (b'Connection', b'Upgrade'), (b'Upgrade', b'websocket') + intercept = MitmproxyAddon2.is_websocket_upgrade_request(flow.request) + logging.info(f'MitmproxyAddon2 {"intercepting" if intercept else "not intercepting"} `request` {flow.request.url}, headers {flow.request.headers}') + # pretty_host takes the "Host" header of the request into account, + # which is useful in transparent mode where we usually only have the IP + # otherwise. + # if flow.request.pretty_host == "example.org": + # I tried doing it in websocket_start instead but that didn’t work + if MitmproxyAddon2.is_websocket_upgrade_request(flow.request): + original_host = flow.request.pretty_host + original_scheme = flow.request.scheme + + flow.request.host = "localhost" + flow.request.port = 8002 + flow.request.scheme = 'http' + # TODO understand how port fits into this + flow.request.headers['Ably-Test-Host'] = original_host + match original_scheme: + case 'http': + flow.request.headers['Ably-Test-Proto'] = 'ws' + case 'https': + flow.request.headers['Ably-Test-Proto'] = 'wss' + + def is_websocket_upgrade_request(request: mitmproxy.http.Request): + # TODO this request handling is a bit fragile, the special case for `split` is just to handle the fact that Firefox sends 'Connection: keep-alive, Upgrade' + return True if 'Connection' in request.headers and ('Upgrade' in request.headers['Connection'].split(", ")) and 'Upgrade' in request.headers and request.headers['Upgrade'] == 'websocket' else False + +addons = [MitmproxyAddon2()] diff --git a/test/mitmproxy_addon_generate_certs_and_exit.py b/test/mitmproxy_addon_generate_certs_and_exit.py new file mode 100644 index 000000000..29aa91df7 --- /dev/null +++ b/test/mitmproxy_addon_generate_certs_and_exit.py @@ -0,0 +1,13 @@ +import sys +import asyncio + +async def wait_a_bit_then_exit(): + await asyncio.sleep(1) + sys.exit() + +class MitmproxyAddon: + # Wait until we’ve started up, and presumably generated the SSL certs, then exit. (The wait is because if I put a `sys.exit()` directly inside `running()`, the certs aren’t yet generated; I guess there’s some enqueued work I need to wait to complete) + def running(self): + asyncio.get_running_loop().create_task(wait_a_bit_then_exit()) + +addons = [MitmproxyAddon()] diff --git a/test/realtime/connection.test.js b/test/realtime/connection.test.js index 058b6a646..660333027 100644 --- a/test/realtime/connection.test.js +++ b/test/realtime/connection.test.js @@ -1,6 +1,12 @@ 'use strict'; -define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async, chai) { +define(['ably', 'shared_helper', 'async', 'chai', 'interception_proxy_client'], function ( + Ably, + helper, + async, + chai, + interceptionProxyClient, +) { var expect = chai.expect; var closeAndFinish = helper.closeAndFinish; var closeAndFinishAsync = helper.closeAndFinishAsync; @@ -169,112 +175,124 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, helper, async * without being merged with new messages) */ it('connectionQueuing', function (done) { - var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), - channel = realtime.channels.get('connectionQueuing'), - connectionManager = realtime.connection.connectionManager; + interceptionProxyClient.intercept(done, (done, interceptionContext) => { + var realtime = helper.AblyRealtime({ transports: [helper.bestTransport] }), + channel = realtime.channels.get('connectionQueuing'), + connectionManager = realtime.connection.connectionManager; - realtime.connection.once('connected', function () { - var transport = connectionManager.activeProtocol.transport; - whenPromiseSettles(channel.attach(), function (err) { - if (err) { - closeAndFinish(done, realtime, err); - return; - } + realtime.connection.once('connected', function () { + var transport = connectionManager.activeProtocol.transport; + whenPromiseSettles(channel.attach(), function (err) { + if (err) { + closeAndFinish(done, realtime, err); + return; + } - let transportSendCallback; + let transportSendCallback; - /* Sabotage sending the message */ - transport.send = function (msg) { - if (msg.action == 15) { - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); + /* Sabotage sending the message */ + interceptionContext.transformClientMessage = (msg) => { + if (msg.deserialized.action == 15) { + expect(msg.deserialized.msgSerial).to.equal(0, 'Expect msgSerial to be 0'); - if (!transportSendCallback) { - done(new Error('transport.send override called before transportSendCallback populated')); - } + if (!transportSendCallback) { + done(new Error('transport.send override called before transportSendCallback populated')); + } - transportSendCallback(null); - } - }; + transportSendCallback(null); + } - let publishCallback; + // drop the message + return null; + }; - async.series( - [ - function (cb) { - transportSendCallback = cb; + let publishCallback; - /* Sabotaged publish */ - whenPromiseSettles(channel.publish('first', null), function (err) { - if (!publishCallback) { - done(new Error('publish completed before publishCallback populated')); - } - publishCallback(err); - }); - }, + async.series( + [ + function (cb) { + transportSendCallback = cb; - // We wait for transport.send to recieve the message that we just - // published before we proceed to disconnecting the transport, to - // make sure that the message got marked as `sendAttempted`. + /* Sabotaged publish */ + whenPromiseSettles(channel.publish('first', null), function (err) { + if (!publishCallback) { + done(new Error('publish completed before publishCallback populated')); + } + publishCallback(err); + }); + }, - function (cb) { - async.parallel( - [ - function (cb) { - publishCallback = function (err) { - try { - expect(!err, 'Check publish happened (eventually) without err').to.be.ok; - } catch (err) { - cb(err); - return; - } - cb(); - }; - }, - function (cb) { - /* After the disconnect, on reconnect, spy on transport.send again */ - connectionManager.once('transport.pending', function (transport) { - var oldSend = transport.send; + // We wait for transport.send to recieve the message that we just + // published before we proceed to disconnecting the transport, to + // make sure that the message got marked as `sendAttempted`. - transport.send = function (msg, msgCb) { - if (msg.action === 15) { - if (msg.messages[0].name === 'first') { - try { - expect(msg.msgSerial).to.equal(0, 'Expect msgSerial of original message to still be 0'); - expect(msg.messages.length).to.equal( - 1, - 'Expect second message to not have been merged with the attempted message', - ); - } catch (err) { - cb(err); - return; - } - } else if (msg.messages[0].name === 'second') { - try { - expect(msg.msgSerial).to.equal(1, 'Expect msgSerial of new message to be 1'); - } catch (err) { - cb(err); - return; - } - cb(); - } + function (cb) { + async.parallel( + [ + function (cb) { + publishCallback = function (err) { + try { + expect(!err, 'Check publish happened (eventually) without err').to.be.ok; + } catch (err) { + cb(err); + return; } - oldSend.call(transport, msg, msgCb); + cb(); }; - channel.publish('second', null); - }); + }, + function (cb) { + /* After the disconnect, on reconnect, spy on transport.send again */ + connectionManager.once('transport.pending', function (transport) { + // TODO does the identity of this transport matter, and can we replace the `transport.pending` check with something external too (e.g. detecting a new connection)? perhaps let's have an EventEmitter interface on the interception context that says when there's a new connection or something + interceptionContext.transformClientMessage = function (msg) { + if (msg.deserialized.action === 15) { + if (msg.deserialized.messages[0].name === 'first') { + try { + expect(msg.deserialized.msgSerial).to.equal( + 0, + 'Expect msgSerial of original message to still be 0', + ); + expect(msg.deserialized.messages.length).to.equal( + 1, + 'Expect second message to not have been merged with the attempted message', + ); + } catch (err) { + cb(err); + return msg.deserialized; + } + } else if (msg.deserialized.messages[0].name === 'second') { + try { + expect(msg.deserialized.msgSerial).to.equal( + 1, + 'Expect msgSerial of new message to be 1', + ); + } catch (err) { + cb(err); + return msg.deserialized; + } + cb(); + } + } - /* Disconnect the transport (will automatically reconnect and resume) () */ - connectionManager.disconnectAllTransports(); - }, - ], - cb, - ); + // preserve the message + return msg.deserialized; + }; + channel.publish('second', null); + }); + + /* Disconnect the transport (will automatically reconnect and resume) () */ + connectionManager.disconnectAllTransports(); + }, + ], + cb, + ); + }, + ], + function (err) { + closeAndFinish(done, realtime, err); }, - ], - function (err) { - closeAndFinish(done, realtime, err); - }, - ); + ); + }); }); }); }); diff --git a/test/support/openPlaywrightBrowser.js b/test/support/openPlaywrightBrowser.js new file mode 100644 index 000000000..2f828bd29 --- /dev/null +++ b/test/support/openPlaywrightBrowser.js @@ -0,0 +1,5 @@ +const { openPlaywrightBrowser } = require('./playwrightHelpers'); + +(async function run() { + await openPlaywrightBrowser(false /* headless */); +})(); diff --git a/test/support/playwrightHelpers.js b/test/support/playwrightHelpers.js new file mode 100644 index 000000000..8439110aa --- /dev/null +++ b/test/support/playwrightHelpers.js @@ -0,0 +1,66 @@ +const path = require('path'); +const util = require('util'); +const exec = util.promisify(require('child_process').exec); +const playwright = require('playwright'); +const { randomUUID } = require('crypto'); +const playwrightBrowsers = ['chromium', 'firefox', 'webkit']; + +async function openPlaywrightBrowser(headless) { + const browserEnv = process.env.PLAYWRIGHT_BROWSER; + + if (!playwrightBrowsers.includes(browserEnv)) { + throw new Error( + `PLAYWRIGHT_BROWSER environment variable must be one of: ${playwrightBrowsers.join( + ', ', + )}. Currently: ${browserEnv}`, + ); + } + + const browserType = playwright[browserEnv]; + + const options = { + headless, + // bypass localhost so that the proxy doesn’t need to be running in order for us to contact the control API to tell it to be started; TODO there is quite possibly a less convoluted way of starting the proxy in this case, also think in a more holistic manner about the various ways in which we make sure that only certain traffic is intercepted (there are notes dotted around about this) + proxy: { server: 'localhost:8080', bypass: 'localhost' }, + }; + + // (I originally tried using the ignoreHTTPSErrors Playwright option, but that doesn’t seem to work for CORS preflight requests) + + let browser; + let context; + + if (browserEnv === 'firefox') { + // TODO clean up when closing + const profileDirectory = path.join('tmp', 'browser-profiles', `browserEnv-${randomUUID()}`); + + // We create and then discard a browser instance just to create the structure of the profile directory, which I guess certutil needs + // TODO this probably isn’t necessary; I think we can just create the directory ahead of time and then use certutil to create the DB, like we do for Chromium + const throwawayBrowser = await browserType.launchPersistentContext(profileDirectory, { + ...options, + headless: true, + }); + await throwawayBrowser.close(); + + // Install the mitmproxy root CA cert + // https://github.com/microsoft/playwright/issues/18115#issuecomment-2067175748 + // https://wiki.mozilla.org/CA/AddRootToFirefox + // https://sadique.io/blog/2012/06/05/managing-security-certificates-from-the-console-on-windows-mac-os-x-and-linux/#firefox + // TODO document that on macOS you get certutil from `brew install nss` + await exec( + `certutil -A -d ${profileDirectory} -t C -n "Mitmproxy Root Cert" -i ~/.mitmproxy/mitmproxy-ca-cert.pem`, + ); + + browser = null; + context = await browserType.launchPersistentContext(profileDirectory, options); + } else { + // TODO explain what to do for trust + browser = await browserType.launch(options); + context = await browser.newContext(options); + } + + const page = await context.newPage(); + + return { browserType, browser, page }; +} + +module.exports = { openPlaywrightBrowser }; diff --git a/test/support/root_hooks.js b/test/support/root_hooks.js index aab15e140..95e6524d1 100644 --- a/test/support/root_hooks.js +++ b/test/support/root_hooks.js @@ -1,4 +1,8 @@ -define(['shared_helper'], function (helper) { +define(['shared_helper', 'interception_proxy_client'], function (helper, interceptionProxyClient) { + before(async function () { + await interceptionProxyClient.connect(); + }); + after(function (done) { this.timeout(10 * 1000); helper.tearDownApp(function (err) { @@ -10,8 +14,21 @@ define(['shared_helper'], function (helper) { }); }); + after(async () => { + await interceptionProxyClient.disconnect(); + }); + + // The `START TEST` and `END TEST` logs are to make it easy to see the IDs of interception proxy connections that were started during the test, to correlate with the proxy logs + afterEach(helper.closeActiveClients); afterEach(helper.logTestResults); afterEach(helper.flushTestLogs); + afterEach(function () { + console.log(`END TEST: ${this.currentTest.fullTitle()}`); + }); + + beforeEach(function () { + console.log(`START TEST: ${this.currentTest.fullTitle()}`); + }); beforeEach(helper.clearTransportPreference); }); diff --git a/test/support/runPlaywrightTests.js b/test/support/runPlaywrightTests.js index a318fc054..9b5b26118 100644 --- a/test/support/runPlaywrightTests.js +++ b/test/support/runPlaywrightTests.js @@ -1,19 +1,17 @@ -const playwright = require('playwright'); const path = require('path'); const MochaServer = require('../web_server'); const fs = require('fs'); const jUnitDirectoryPath = require('./junit_directory_path'); +const { openPlaywrightBrowser } = require('./playwrightHelpers'); const port = process.env.PORT || 3000; const host = 'localhost'; -const playwrightBrowsers = ['chromium', 'firefox', 'webkit']; const mochaServer = new MochaServer(/* playwrightTest: */ true); -const runTests = async (browserType) => { +const runTests = async () => { + const { browserType, browser, page } = await openPlaywrightBrowser(true /* headless */); + await mochaServer.listen(); - const browser = await browserType.launch(); - const context = await browser.newContext(); - const page = await context.newPage(); await page.goto(`http://${host}:${port}`); console.log(`\nrunning tests in ${browserType.name()}`); @@ -44,7 +42,10 @@ const runTests = async (browserType) => { } if (detail.pass) { - browser.close(); + context.close(); + if (browser) { + browser.close(); + } resolve(); } else { reject(new Error(`${browserType.name()} tests failed, exiting with code 1`)); @@ -68,17 +69,7 @@ const runTests = async (browserType) => { let caughtError; try { - const browserEnv = process.env.PLAYWRIGHT_BROWSER; - - if (!playwrightBrowsers.includes(browserEnv)) { - throw new Error( - `PLAYWRIGHT_BROWSER environment variable must be one of: ${playwrightBrowsers.join( - ', ', - )}. Currently: ${browserEnv}`, - ); - } - - await runTests(playwright[browserEnv]); + await runTests(); } catch (error) { // save error for now, we must ensure we end mocha web server first. // if we end current process too early, mocha web server will be left running,