diff --git a/test/mitmproxy-addon/src/control_rpc.py b/test/mitmproxy-addon/src/control_rpc.py deleted file mode 100644 index df9c47ab7..000000000 --- a/test/mitmproxy-addon/src/control_rpc.py +++ /dev/null @@ -1,56 +0,0 @@ -from base64 import b64encode, b64decode -from dataclasses import dataclass -from intercepted_messages_queue import DropMessageAction, ReplaceMessageAction -from typing import Literal -import logging - -@dataclass -class JSONRPCRequest: - id: str - - def create_dto(self): - return { "jsonrpc": "2.0", "id": self.id } - -@dataclass -class TransformInterceptedMessageJSONRPCRequest(JSONRPCRequest): - type: Literal["binary", "text"] - data: bytes | str - from_client: bool - - def create_dto(self): - data_param = None - match self.type: - case "binary": - data_param = b64encode(self.data).decode('utf-8') - case "text": - data_param = self.data - - params = { 'type': self.type, 'data': data_param, 'fromClient': self.from_client } - return { **super().create_dto(), "method": "transformInterceptedMessage", "params": params } - -def json_rpc_response_from_dto(dto): - # TODO when we add more methods we’ll need a way to know which method the request corresponds to, via the ID - return TransformInterceptedMessageJSONRPCResponse.from_dto(dto) - -@dataclass -class JSONRPCResponse: - id: str - -@dataclass -class TransformInterceptedMessageJSONRPCResponse(JSONRPCResponse): - result: DropMessageAction | ReplaceMessageAction - - def from_dto(dto): - match dto['result']['action']: - case 'drop': - result = DropMessageAction() - case 'replace': - type = dto['result']['type'] - match type: - case 'binary': - data = b64decode(dto['result']['data']) - case 'text': - data = dto['result']['data'] - result = ReplaceMessageAction(type, data) - - return TransformInterceptedMessageJSONRPCResponse(id = dto['id'], result = result) diff --git a/test/mitmproxy-addon/src/control_server.py b/test/mitmproxy-addon/src/control_server.py deleted file mode 100644 index 9b2a9b63a..000000000 --- a/test/mitmproxy-addon/src/control_server.py +++ /dev/null @@ -1,26 +0,0 @@ -import websockets -import asyncio -import logging - -class ControlServer: - def __init__(self, interception_context): - self._websocket_connections = [] - self._interception_context = interception_context - - # TODO make it not keep trying to restart this server every time I modify this file - async def run(self): - async with websockets.serve(self._handle_websocket_connection, "", 8001): - await asyncio.Future() # run forever - - async def _handle_websocket_connection(self, websocket): - logging.info(f'TestProxy handle_websocket_connection {websocket}, open {websocket.open}') - # Store the connection so we can broadcast to it later. - self._websocket_connections.append(websocket) - logging.info(f'TestProxy now has websockets {self._websocket_connections}') - - async for message in websocket: - logging.info(f'TestProxy received control message: {message}') - try: - self._interception_context.on_websocket_message(message) - except Exception as err: - logging.info(f'TestProxy got error handling control message: {err}') diff --git a/test/mitmproxy-addon/src/intercepted_messages_queue.py b/test/mitmproxy-addon/src/intercepted_messages_queue.py deleted file mode 100644 index b4097958f..000000000 --- a/test/mitmproxy-addon/src/intercepted_messages_queue.py +++ /dev/null @@ -1,71 +0,0 @@ -import mitmproxy -import uuid -from dataclasses import dataclass -from typing import Literal - -@dataclass -class InterceptedMessagePredicate: - flow: mitmproxy.http.HTTPFlow - from_client: bool - - # https://stackoverflow.com/a/4901847 - - def __hash__(self): - return hash((self.flow, self.from_client)) - - def __eq__(self, other): - return (self.flow, self.from_client) == (other.flow, other.from_client) - -@dataclass -# A handle for locating a message within InterceptedMessageQueue. -class InterceptedMessageHandle: - predicate: InterceptedMessagePredicate - message_id: uuid.UUID - -class DropMessageAction: - pass - -@dataclass -class ReplaceMessageAction: - type: Literal["binary", "text"] - data: bytes | str - -@dataclass -class InterceptedMessage: - message: mitmproxy.websocket.WebSocketMessage - id: uuid.UUID = uuid.uuid4() - action: None | DropMessageAction | ReplaceMessageAction = None - -# Per-connection, per-direction message queue. We use it to queue intercepted messages whilst waiting for a control server message telling us what to do with the message at the head of the queue. -class InterceptedMessagesQueue: - def __init__(self): - # Maps an InterceptedMessagePredicate to a queue - self.queues = {} - - def _messages_for(self, predicate, create_if_needed = False): - if predicate in self.queues: - return self.queues[predicate] - else: - result = [] - if create_if_needed: - self.queues[predicate] = result - return result - - def pop(self, predicate): - return self._messages_for(predicate).pop(0) - - def has_messages(self, predicate): - return len(self._messages_for(predicate)) != 0 - - def append(self, message: InterceptedMessage, predicate): - self._messages_for(predicate, create_if_needed = True).append(message) - - def count(self, predicate): - return len(self._messages_for(predicate)) - - def is_head(self, handle: InterceptedMessageHandle): - head = self._messages_for(handle.predicate)[0] - return head.id == handle.message_id - - def get_head(self, predicate: InterceptedMessagePredicate): - return self._messages_for(predicate)[0] diff --git a/test/mitmproxy-addon/src/interception_context.py b/test/mitmproxy-addon/src/interception_context.py deleted file mode 100644 index cf3f27187..000000000 --- a/test/mitmproxy-addon/src/interception_context.py +++ /dev/null @@ -1,115 +0,0 @@ -import logging -import json -import uuid -import asyncio -import mitmproxy - -from control_rpc import JSONRPCRequest, TransformInterceptedMessageJSONRPCRequest, JSONRPCResponse, TransformInterceptedMessageJSONRPCResponse, json_rpc_response_from_dto -from intercepted_messages_queue import InterceptedMessagesQueue, InterceptedMessagePredicate, InterceptedMessageHandle, DropMessageAction, ReplaceMessageAction, InterceptedMessage - -class InterceptionContext: - def __init__(self): - self._intercepted_messages_queue = InterceptedMessagesQueue() - self._json_rpc_request_ids_to_handles = {} - self.control_server = None - - # control API currently only works with text frames - def on_websocket_message(self, payload: str): - dto = json.loads(payload) - - if 'error' in dto: - raise Exception(f'TestProxy not expecting there to be an error in JSON-RPC response') - elif 'result' in dto: - response = json_rpc_response_from_dto(dto) - self.handle_json_rpc_response(response) - else: - raise Exception(f'TestProxy got unrecognised control API message {dto}') - - def handle_json_rpc_response(self, response: JSONRPCResponse): - logging.info(f'TestProxy got JSON-RPC response: {response}, type: {type(response)}') - match response: - case TransformInterceptedMessageJSONRPCResponse(): - self.handle_transform_intercepted_message_response(response) - case _: - raise Exception(f'TestProxy got unknown response {response}') - - def handle_transform_intercepted_message_response(self, response: TransformInterceptedMessageJSONRPCResponse): - json_rpc_request_id = uuid.UUID(response.id) - handle = self._json_rpc_request_ids_to_handles[json_rpc_request_id] - - if handle is None: - raise Exception(f'TestProxy doesn’t recognise response ID {json_rpc_request_id}, enqueued, messages are {self._intercepted_messages_queue}') - - del self._json_rpc_request_ids_to_handles[json_rpc_request_id] - - if not self._intercepted_messages_queue.is_head(handle): - raise Exception(f'TestProxy got response for an intercepted message that’s not at head of queue; shouldn’t be possible {json_rpc_request_id}') - - intercepted_message = self._intercepted_messages_queue.get_head(handle.predicate) - - if intercepted_message.action is not None: - raise Exception(f'TestProxy was asked to set the action for a message that already has an action set; shouldn’t happen.') - - intercepted_message.action = response.result - - self._dequeue_intercepted_message(handle.predicate) - - def _dequeue_intercepted_message(self, predicate): - logging.info(f'TestProxy dequeueing intercepted message') - - message = self._intercepted_messages_queue.pop(predicate) - - if message.action is None: - raise Exception(f'TestProxy attempted to dequeue {message} but it doesn’t have action') - - match message.action: - case ReplaceMessageAction(type, data): - # inject the replacement - # https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message - logging.info(f'TestProxy re-injecting message {message} with new type {type} and new data {data}') - # https://github.com/mitmproxy/mitmproxy/blob/e834259215dc4dd6f1b58dee8e0f84943a002db6/mitmproxy/addons/proxyserver.py#L308-L322 - mitmproxy.ctx.master.commands.call("inject.websocket", predicate.flow, not predicate.from_client, data.encode() if type == 'text' else data, type == 'text') - case DropMessageAction: - # drop the message - logging.info(f'TestProxy dropping message {message}') - - if self._intercepted_messages_queue.has_messages(predicate): - self._broadcast_next_message(predicate) - - def _broadcast_json_rpc_request(self, request: JSONRPCRequest): - data = json.dumps(request.create_dto()) - - logging.info(f'TestProxy broadcast request JSON {data}') - # TODO tidy up - have a method on server - for websocket in self.control_server._websocket_connections: - logging.info(f'TestProxy broadcast to connection {websocket}, open {websocket.open}') - asyncio.get_running_loop().create_task(websocket.send(data)) - - def _broadcast_next_message(self, predicate): - intercepted_message = self._intercepted_messages_queue.get_head(predicate) - - json_rpc_request_id = uuid.uuid4() - - handle = InterceptedMessageHandle(predicate, intercepted_message.id) - self._json_rpc_request_ids_to_handles[json_rpc_request_id] = handle - - # Broadcast to everyone connected to the control server. - # TODO I think would be better for there to be one client who sends an explicit message to become the active client, or to only allow a single connection at a time; not important now though - logging.info(f'TestProxy broadcast message {intercepted_message!r}') - - json_rpc_request = TransformInterceptedMessageJSONRPCRequest(id = str(json_rpc_request_id), type = "text" if intercepted_message.message.is_text else "binary", data = intercepted_message.message.text if intercepted_message.message.is_text else intercepted_message.message.content, from_client = intercepted_message.message.from_client) - self._broadcast_json_rpc_request(json_rpc_request) - - def enqueue_message(self, message, flow): - predicate = InterceptedMessagePredicate(flow, message.from_client) - - intercepted_message = InterceptedMessage(message) - self._intercepted_messages_queue.append(intercepted_message, predicate) - - # drop the message; we’ll insert it later when the client tells us what to do with it - message.drop() - - if self._intercepted_messages_queue.count(predicate) == 1: - self._broadcast_next_message(predicate) - else: - logging.info(f'TestProxy enqueued message {message} since there are {self._intercepted_messages_queue.count(predicate) - 1} pending messages') diff --git a/test/mitmproxy-addon/src/mitmproxy_addon.py b/test/mitmproxy-addon/src/mitmproxy_addon.py deleted file mode 100644 index 985f558fc..000000000 --- a/test/mitmproxy-addon/src/mitmproxy_addon.py +++ /dev/null @@ -1,67 +0,0 @@ -import logging -import mitmproxy -import asyncio - -from control_server import ControlServer -from interception_context import InterceptionContext - -# TODO cleanup as control server connections go away -# TODO cleanup as intercepted connections go away - -class AblyInterceptionProxyAddon: - # Called when an addon is first loaded. This event receives a Loader object, which contains methods for adding options and commands. This method is where the addon configures itself. - def load(self, loader: mitmproxy.addonmanager.Loader): - logging.info("TestProxy load") - self._interception_context = InterceptionContext() - self._control_server = ControlServer(self._interception_context) - self._interception_context.control_server = self._control_server - self._control_server_task = asyncio.get_running_loop().create_task(self._control_server.run()) - -# events I observe when the script is hot-reloaded upon changing this file: -# -# [17:46:19.517] Loading script test-proxy.py -# -# presumably the old one: -# [17:46:19.519] TestProxy done -# -# presumably the new one: -# [17:46:19.525] TestProxy load -# [17:46:19.526] TestProxy configure -# [17:46:19.526] TestProxy running -# -# want to make sure the server get shut down, because it didn't before and I got errors - - def done(self): - logging.info("TestProxy done") - # wrote this before seeing https://websockets.readthedocs.io/en/stable/faq/server.html#how-do-i-stop-a-server, will keep what I’ve got here until I understand that incantation - # hmm, doesn't actually seem to be working, look into it further - self._control_server_task.cancel() - self._control_server = None - - def configure(self, updated): - logging.info("TestProxy configure") - - def running(self): - logging.info("TestProxy running") - - # A WebSocket connection has commenced. - def websocket_start(self, flow: mitmproxy.http.HTTPFlow): - logging.info("TestProxy websocket_start") - - # Called when a WebSocket message is received from the client or server. The most recent message will be flow.messages[-1]. The message is user-modifiable. Currently there are two types of messages, corresponding to the BINARY and TEXT frame types. - # TODO do we need to think about fragmentation? - def websocket_message(self, flow: mitmproxy.http.HTTPFlow): - message = flow.websocket.messages[-1] - - if message.injected: - logging.info("TestProxy re-received injected message; not doing anything to it") - return - - logging.info("TestProxy websocket_message") - self._interception_context.enqueue_message(message, flow) - - # A WebSocket connection has ended. You can check flow.websocket.close_code to determine why it ended. - def websocket_end(self, flow: mitmproxy.http.HTTPFlow): - logging.info("TestProxy websocket_end") - -addons = [AblyInterceptionProxyAddon()]