Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP, ECO-4787] Abandoned Python-only version of interception proxy #1826

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ junit/
private-api-usage/
private-api-usage-reports/
test/support/mocha_junit_reporter/build/

# Python stuff (for interception proxy)
__pycache__
56 changes: 56 additions & 0 deletions test/mitmproxy-addon/src/control_rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from base64 import b64encode, b64decode
from dataclasses import dataclass
from intercepted_messages_queue import DropMessageAction, ReplaceMessageAction
from typing import Literal
import logging

@dataclass
class JSONRPCRequest:
id: str

def create_dto(self):
return { "jsonrpc": "2.0", "id": self.id }

@dataclass
class TransformInterceptedMessageJSONRPCRequest(JSONRPCRequest):
type: Literal["binary", "text"]
data: bytes | str
from_client: bool

def create_dto(self):
data_param = None
match self.type:
case "binary":
data_param = b64encode(self.data).decode('utf-8')
case "text":
data_param = self.data

params = { 'type': self.type, 'data': data_param, 'fromClient': self.from_client }
return { **super().create_dto(), "method": "transformInterceptedMessage", "params": params }

def json_rpc_response_from_dto(dto):
# TODO when we add more methods we’ll need a way to know which method the request corresponds to, via the ID
return TransformInterceptedMessageJSONRPCResponse.from_dto(dto)

@dataclass
class JSONRPCResponse:
id: str

@dataclass
class TransformInterceptedMessageJSONRPCResponse(JSONRPCResponse):
result: DropMessageAction | ReplaceMessageAction

def from_dto(dto):
match dto['result']['action']:
case 'drop':
result = DropMessageAction()
case 'replace':
type = dto['result']['type']
match type:
case 'binary':
data = b64decode(dto['result']['data'])
case 'text':
data = dto['result']['data']
result = ReplaceMessageAction(type, data)

return TransformInterceptedMessageJSONRPCResponse(id = dto['id'], result = result)
26 changes: 26 additions & 0 deletions test/mitmproxy-addon/src/control_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import websockets
import asyncio
import logging

class ControlServer:
def __init__(self, interception_context):
self._websocket_connections = []
self._interception_context = interception_context

# TODO make it not keep trying to restart this server every time I modify this file
async def run(self):
async with websockets.serve(self._handle_websocket_connection, "", 8001):
await asyncio.Future() # run forever

async def _handle_websocket_connection(self, websocket):
logging.info(f'TestProxy handle_websocket_connection {websocket}, open {websocket.open}')
# Store the connection so we can broadcast to it later.
self._websocket_connections.append(websocket)
logging.info(f'TestProxy now has websockets {self._websocket_connections}')

async for message in websocket:
logging.info(f'TestProxy received control message: {message}')
try:
self._interception_context.on_websocket_message(message)
except Exception as err:
logging.info(f'TestProxy got error handling control message: {err}')
71 changes: 71 additions & 0 deletions test/mitmproxy-addon/src/intercepted_messages_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import mitmproxy
import uuid
from dataclasses import dataclass
from typing import Literal

@dataclass
class InterceptedMessagePredicate:
flow: mitmproxy.http.HTTPFlow
from_client: bool

# https://stackoverflow.com/a/4901847

def __hash__(self):
return hash((self.flow, self.from_client))

def __eq__(self, other):
return (self.flow, self.from_client) == (other.flow, other.from_client)

@dataclass
# A handle for locating a message within InterceptedMessageQueue.
class InterceptedMessageHandle:
predicate: InterceptedMessagePredicate
message_id: uuid.UUID

class DropMessageAction:
pass

@dataclass
class ReplaceMessageAction:
type: Literal["binary", "text"]
data: bytes | str

@dataclass
class InterceptedMessage:
message: mitmproxy.websocket.WebSocketMessage
id: uuid.UUID = uuid.uuid4()
action: None | DropMessageAction | ReplaceMessageAction = None

# Per-connection, per-direction message queue. We use it to queue intercepted messages whilst waiting for a control server message telling us what to do with the message at the head of the queue.
class InterceptedMessagesQueue:
def __init__(self):
# Maps an InterceptedMessagePredicate to a queue
self.queues = {}

def _messages_for(self, predicate, create_if_needed = False):
if predicate in self.queues:
return self.queues[predicate]
else:
result = []
if create_if_needed:
self.queues[predicate] = result
return result

def pop(self, predicate):
return self._messages_for(predicate).pop(0)

def has_messages(self, predicate):
return len(self._messages_for(predicate)) != 0

def append(self, message: InterceptedMessage, predicate):
self._messages_for(predicate, create_if_needed = True).append(message)

def count(self, predicate):
return len(self._messages_for(predicate))

def is_head(self, handle: InterceptedMessageHandle):
head = self._messages_for(handle.predicate)[0]
return head.id == handle.message_id

def get_head(self, predicate: InterceptedMessagePredicate):
return self._messages_for(predicate)[0]
115 changes: 115 additions & 0 deletions test/mitmproxy-addon/src/interception_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
import json
import uuid
import asyncio
import mitmproxy

from control_rpc import JSONRPCRequest, TransformInterceptedMessageJSONRPCRequest, JSONRPCResponse, TransformInterceptedMessageJSONRPCResponse, json_rpc_response_from_dto
from intercepted_messages_queue import InterceptedMessagesQueue, InterceptedMessagePredicate, InterceptedMessageHandle, DropMessageAction, ReplaceMessageAction, InterceptedMessage

class InterceptionContext:
def __init__(self):
self._intercepted_messages_queue = InterceptedMessagesQueue()
self._json_rpc_request_ids_to_handles = {}
self.control_server = None

# control API currently only works with text frames
def on_websocket_message(self, payload: str):
dto = json.loads(payload)

if 'error' in dto:
raise Exception(f'TestProxy not expecting there to be an error in JSON-RPC response')
elif 'result' in dto:
response = json_rpc_response_from_dto(dto)
self.handle_json_rpc_response(response)
else:
raise Exception(f'TestProxy got unrecognised control API message {dto}')

def handle_json_rpc_response(self, response: JSONRPCResponse):
logging.info(f'TestProxy got JSON-RPC response: {response}, type: {type(response)}')
match response:
case TransformInterceptedMessageJSONRPCResponse():
self.handle_transform_intercepted_message_response(response)
case _:
raise Exception(f'TestProxy got unknown response {response}')

def handle_transform_intercepted_message_response(self, response: TransformInterceptedMessageJSONRPCResponse):
json_rpc_request_id = uuid.UUID(response.id)
handle = self._json_rpc_request_ids_to_handles[json_rpc_request_id]

if handle is None:
raise Exception(f'TestProxy doesn’t recognise response ID {json_rpc_request_id}, enqueued, messages are {self._intercepted_messages_queue}')

del self._json_rpc_request_ids_to_handles[json_rpc_request_id]

if not self._intercepted_messages_queue.is_head(handle):
raise Exception(f'TestProxy got response for an intercepted message that’s not at head of queue; shouldn’t be possible {json_rpc_request_id}')

intercepted_message = self._intercepted_messages_queue.get_head(handle.predicate)

if intercepted_message.action is not None:
raise Exception(f'TestProxy was asked to set the action for a message that already has an action set; shouldn’t happen.')

intercepted_message.action = response.result

self._dequeue_intercepted_message(handle.predicate)

def _dequeue_intercepted_message(self, predicate):
logging.info(f'TestProxy dequeueing intercepted message')

message = self._intercepted_messages_queue.pop(predicate)

if message.action is None:
raise Exception(f'TestProxy attempted to dequeue {message} but it doesn’t have action')

match message.action:
case ReplaceMessageAction(type, data):
# inject the replacement
# https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
logging.info(f'TestProxy re-injecting message {message} with new type {type} and new data {data}')
# https://github.com/mitmproxy/mitmproxy/blob/e834259215dc4dd6f1b58dee8e0f84943a002db6/mitmproxy/addons/proxyserver.py#L308-L322
mitmproxy.ctx.master.commands.call("inject.websocket", predicate.flow, not predicate.from_client, data.encode() if type == 'text' else data, type == 'text')
case DropMessageAction:
# drop the message
logging.info(f'TestProxy dropping message {message}')

if self._intercepted_messages_queue.has_messages(predicate):
self._broadcast_next_message(predicate)

def _broadcast_json_rpc_request(self, request: JSONRPCRequest):
data = json.dumps(request.create_dto())

logging.info(f'TestProxy broadcast request JSON {data}')
# TODO tidy up - have a method on server
for websocket in self.control_server._websocket_connections:
logging.info(f'TestProxy broadcast to connection {websocket}, open {websocket.open}')
asyncio.get_running_loop().create_task(websocket.send(data))

def _broadcast_next_message(self, predicate):
intercepted_message = self._intercepted_messages_queue.get_head(predicate)

json_rpc_request_id = uuid.uuid4()

handle = InterceptedMessageHandle(predicate, intercepted_message.id)
self._json_rpc_request_ids_to_handles[json_rpc_request_id] = handle

# Broadcast to everyone connected to the control server.
# TODO I think would be better for there to be one client who sends an explicit message to become the active client, or to only allow a single connection at a time; not important now though
logging.info(f'TestProxy broadcast message {intercepted_message!r}')

json_rpc_request = TransformInterceptedMessageJSONRPCRequest(id = str(json_rpc_request_id), type = "text" if intercepted_message.message.is_text else "binary", data = intercepted_message.message.text if intercepted_message.message.is_text else intercepted_message.message.content, from_client = intercepted_message.message.from_client)
self._broadcast_json_rpc_request(json_rpc_request)

def enqueue_message(self, message, flow):
predicate = InterceptedMessagePredicate(flow, message.from_client)

intercepted_message = InterceptedMessage(message)
self._intercepted_messages_queue.append(intercepted_message, predicate)

# drop the message; we’ll insert it later when the client tells us what to do with it
message.drop()

if self._intercepted_messages_queue.count(predicate) == 1:
self._broadcast_next_message(predicate)
else:
logging.info(f'TestProxy enqueued message {message} since there are {self._intercepted_messages_queue.count(predicate) - 1} pending messages')
67 changes: 67 additions & 0 deletions test/mitmproxy-addon/src/mitmproxy_addon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
import mitmproxy
import asyncio

from control_server import ControlServer
from interception_context import InterceptionContext

# TODO cleanup as control server connections go away
# TODO cleanup as intercepted connections go away

class AblyInterceptionProxyAddon:
# Called when an addon is first loaded. This event receives a Loader object, which contains methods for adding options and commands. This method is where the addon configures itself.
def load(self, loader: mitmproxy.addonmanager.Loader):
logging.info("TestProxy load")
self._interception_context = InterceptionContext()
self._control_server = ControlServer(self._interception_context)
self._interception_context.control_server = self._control_server
self._control_server_task = asyncio.get_running_loop().create_task(self._control_server.run())

# events I observe when the script is hot-reloaded upon changing this file:
#
# [17:46:19.517] Loading script test-proxy.py
#
# presumably the old one:
# [17:46:19.519] TestProxy done
#
# presumably the new one:
# [17:46:19.525] TestProxy load
# [17:46:19.526] TestProxy configure
# [17:46:19.526] TestProxy running
#
# want to make sure the server get shut down, because it didn't before and I got errors

def done(self):
logging.info("TestProxy done")
# wrote this before seeing https://websockets.readthedocs.io/en/stable/faq/server.html#how-do-i-stop-a-server, will keep what I’ve got here until I understand that incantation
# hmm, doesn't actually seem to be working, look into it further
self._control_server_task.cancel()
self._control_server = None

def configure(self, updated):
logging.info("TestProxy configure")

def running(self):
logging.info("TestProxy running")

# A WebSocket connection has commenced.
def websocket_start(self, flow: mitmproxy.http.HTTPFlow):
logging.info("TestProxy websocket_start")

# Called when a WebSocket message is received from the client or server. The most recent message will be flow.messages[-1]. The message is user-modifiable. Currently there are two types of messages, corresponding to the BINARY and TEXT frame types.
# TODO do we need to think about fragmentation?
def websocket_message(self, flow: mitmproxy.http.HTTPFlow):
message = flow.websocket.messages[-1]

if message.injected:
logging.info("TestProxy re-received injected message; not doing anything to it")
return

logging.info("TestProxy websocket_message")
self._interception_context.enqueue_message(message, flow)

# A WebSocket connection has ended. You can check flow.websocket.close_code to determine why it ended.
def websocket_end(self, flow: mitmproxy.http.HTTPFlow):
logging.info("TestProxy websocket_end")

addons = [AblyInterceptionProxyAddon()]
Loading