generated from matrix-org/.github
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add better API semantics for locking; allow mitm filters
- Loading branch information
Showing
4 changed files
with
139 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,9 @@ | ||
import logging, os | ||
from mitmproxy import ctx | ||
from mitmproxy.addons import asgiapp | ||
from flask import Flask, request | ||
|
||
from status_code import StatusCode | ||
|
||
app = Flask("mitmoptset") | ||
|
||
@app.route("/options", methods=["POST"]) | ||
def set_options() -> str: | ||
body = request.json | ||
options = body.get("options", {}) | ||
print(f"setting options {options}") | ||
ctx.options.update(**options) | ||
return {} | ||
from controller import MITM_DOMAIN_NAME, app | ||
|
||
addons = [ | ||
asgiapp.WSGIApp(app, "mitm.local", 80), # requests to this host will be routed to the flask app | ||
asgiapp.WSGIApp(app, MITM_DOMAIN_NAME, 80), # requests to this host will be routed to the flask app | ||
StatusCode(), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import random | ||
from mitmproxy import ctx | ||
from flask import Flask, request, make_response | ||
|
||
MITM_DOMAIN_NAME = "mitm.local" | ||
app = Flask("mitmoptset") | ||
|
||
prev_options = { | ||
"lock_id": "", | ||
"options": {}, | ||
} | ||
|
||
# Set options on mitmproxy. See https://docs.mitmproxy.org/stable/concepts-options/ | ||
# This is intended to be used exclusively for our addons in this package, but nothing | ||
# stops tests from enabling/disabling/tweaking other mitmproxy options. | ||
# POST /options/lock | ||
# { | ||
# "options": { | ||
# "body_size_limit": "3m", | ||
# } | ||
# } | ||
# HTTP/1.1 200 OK | ||
# { | ||
# "reset_id": "some_opaque_string" | ||
# } | ||
# Calling this endpoint locks the proxy from further option modification until /options/unlock | ||
# is called. This ensures that tests can't forget to reset options when they are done with them. | ||
@app.route("/options/lock", methods=["POST"]) | ||
def lock_options(): | ||
if prev_options["lock_id"] != "": | ||
return make_response(("options already locked, did you forget to unlock?", 400)) | ||
body = request.json | ||
options = body.get("options", {}) | ||
prev_options["lock_id"] = bytes.hex(random.randbytes(8)) | ||
for k, v in ctx.options.items(): | ||
if k in options: | ||
prev_options["options"][k] = v.current() | ||
print(f"locking options {options}") | ||
ctx.options.update(**options) | ||
return { | ||
"reset_id": prev_options["lock_id"] | ||
} | ||
|
||
# Unlock previously set options on mitmproxy. Must be called after a call to POST /options/lock | ||
# to allow further option modifications. | ||
# POST /options/unlock | ||
# { | ||
# "reset_id": "some_opaque_string" | ||
# } | ||
@app.route("/options/unlock", methods=["POST"]) | ||
def unlock_options() -> str: | ||
body = request.json | ||
reset_id = body.get("reset_id", "") | ||
if prev_options["lock_id"] == "": | ||
return make_response(("options were not locked, mismatched lock/unlock calls", 400)) | ||
if prev_options["lock_id"] != reset_id: | ||
return make_response(("refusing to unlock, wrong id supplied", 400)) | ||
print(f"unlocking options back to {prev_options['options']}") | ||
ctx.options.update(**prev_options["options"]) | ||
# apply AFTER update so if we fail to reset them back we won't unlock, indicating a problem. | ||
prev_options["lock_id"] = "" | ||
prev_options["options"] = {} | ||
return {} | ||
|
||
# Creates a filter which can then be passed to options | ||
# POST /create_filter | ||
# { | ||
# "hs": "hs1|hs2|*|" empty disables filter, * matches all hses, else HS domain | ||
# "user": "@alice:hs1|*|" empty disables filter, * matches all users, else user ID | ||
# "device": "FOO|*|" empty disables filter, * matches all devices, else device ID | ||
# } | ||
# HTTP/1.1 200 OK | ||
# { | ||
# "filter_id": "some_opaque_string" | ||
# } | ||
@app.route("/create_filter", methods=["POST"]) | ||
def create_filter() -> str: | ||
body = request.json | ||
filter = body.get("filter", {}) | ||
print(f"creating filter {filter}") | ||
hs_filter = filter.get("hs", "") | ||
user_filter = filter.get("user", "") | ||
device_filter = filter.get("device", "") | ||
ctx.options.update(**options) | ||
return {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,47 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from mitmproxy import ctx | ||
from mitmproxy import ctx, flowfilter | ||
from mitmproxy.http import Response | ||
from controller import MITM_DOMAIN_NAME | ||
|
||
class StatusCode: | ||
def __init__(self): | ||
self.return_status_code = 0 # disabled | ||
self.reset() | ||
print(MITM_DOMAIN_NAME) | ||
self.matchall = flowfilter.parse(".") | ||
self.filter: Optional[flowfilter.TFilter] = self.matchall | ||
|
||
def reset(self): | ||
self.config = { | ||
"return_status": 0, | ||
"filter": None, | ||
} | ||
|
||
def load(self, loader): | ||
loader.add_option( | ||
name="statuscode", | ||
typespec=int, | ||
default=0, | ||
help="Change the response status code", | ||
typespec=dict, | ||
default={"return_status": 0, "filter": None}, | ||
help="Change the response status code, with an optional filter", | ||
) | ||
|
||
def configure(self, updates): | ||
if "statuscode" not in updates: | ||
self.return_status_code = 0 | ||
self.reset() | ||
return | ||
if ctx.options.statuscode is None or ctx.options.statuscode == 0: | ||
self.return_status_code = 0 | ||
if ctx.options.statuscode is None or ctx.options.statuscode["return_status"] == 0: | ||
self.reset() | ||
return | ||
self.return_status_code = ctx.options.statuscode | ||
logging.info(f"statuscode will return HTTP {self.return_status_code}") | ||
self.config = ctx.options.statuscode | ||
print(f"statuscode will return HTTP {self.config['return_status']} filter={self.config.get('filter', {})}") | ||
|
||
def response(self, flow): | ||
if not flow.request.pretty_host.startswith("hs"): | ||
return # ignore responses sent by the controller | ||
if self.return_status_code == 0: | ||
# always ignore the controller | ||
if flow.request.pretty_host == MITM_DOMAIN_NAME: | ||
return | ||
if self.config["return_status"] == 0: | ||
return # ignore responses if we aren't told a code | ||
flow.response = Response.make(self.return_status_code) | ||
if flowfilter.match(self.filter, flow): | ||
flow.response = Response.make(self.config["return_status"]) | ||
else: | ||
print("flow does not match filter") |