-
-
Notifications
You must be signed in to change notification settings - Fork 387
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2316 from zas/python_audit
PICARD-2757: Add a command-line option to enable audit
- Loading branch information
Showing
3 changed files
with
191 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Picard, the next-generation MusicBrainz tagger | ||
# | ||
# Copyright (C) 2023 Laurent Monin | ||
# | ||
# This program is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License | ||
# as published by the Free Software Foundation; either version 2 | ||
# of the License, or (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program; if not, write to the Free Software | ||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | ||
|
||
from collections import defaultdict | ||
import sys | ||
import threading | ||
import time | ||
|
||
|
||
def setup_audit(prefixes_string): | ||
"""Setup audit hook according to `audit` command-line option""" | ||
if not prefixes_string: | ||
return | ||
if 'all' in prefixes_string.split(','): | ||
def event_match(event): | ||
return ('all', ) | ||
else: | ||
# prebuild the dict, constant | ||
PREFIXES_DICT = make_prefixes_dict(prefixes_string) | ||
|
||
def event_match(event): | ||
return is_matching_a_prefix(event, PREFIXES_DICT) | ||
|
||
start_time = time.time() | ||
|
||
def audit(event, args): | ||
matched = event_match(event) | ||
if matched: | ||
matched = '.'.join(matched) | ||
tid = threading.get_native_id() | ||
secs = time.time() - start_time | ||
# we can't use log here, as it generates events | ||
print(f'audit:{matched}:{tid}:{secs} {event} args={args}') | ||
|
||
try: | ||
sys.addaudithook(audit) | ||
except AttributeError: | ||
# sys.addaudithook() appeared in Python 3.8 | ||
pass | ||
|
||
|
||
def list_from_prefixes_string(prefixes_string): | ||
"""Generate a sorted list of prefixes tuples | ||
A prefixes string is a comma-separated list of dot-separated keys | ||
"a,b.c,d.e.f,,g" would result in following sorted list: | ||
[('a',), ('b', 'c'), ('d', 'e', 'f'), ('g',)] | ||
""" | ||
yield from sorted(set(tuple(e.split('.')) for e in prefixes_string.split(',') if e)) | ||
|
||
|
||
def make_prefixes_dict(prefixes_string): | ||
"""Build a dict with keys = length of prefix""" | ||
d = defaultdict(list) | ||
for prefix_tuple in list_from_prefixes_string(prefixes_string): | ||
d[len(prefix_tuple)].append(prefix_tuple) | ||
return dict(d) | ||
|
||
|
||
def prefixes_candidates_for_length(length, prefixes_dict): | ||
"""Generate prefixes that may match this length""" | ||
for prefix_len, prefixes in prefixes_dict.items(): | ||
if length >= prefix_len: | ||
yield from prefixes | ||
|
||
|
||
def is_matching_a_prefix(key, prefixes_dict): | ||
"""Matches dot-separated key against prefixes | ||
Typical case: we want to match `os.mkdir` if prefix is `os` or `os.mkdir` | ||
but not the reverse: if prefix is `os.mkdir` we don't want to match a key named `os` | ||
It returns False, or the matched prefix | ||
""" | ||
key_tuple = tuple(key.split('.')) | ||
key_tuple_len = len(key_tuple) | ||
# only use candidates that may have a chance to match | ||
for prefix_tuple in prefixes_candidates_for_length(key_tuple_len, prefixes_dict): | ||
# check that all elements of the key are in prefix tuple | ||
if all(prefix_part == key_tuple[i] for i, prefix_part in enumerate(prefix_tuple)): | ||
return prefix_tuple | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Picard, the next-generation MusicBrainz tagger | ||
# | ||
# Copyright (C) 2023 Laurent Monin | ||
# | ||
# This program is free software; you can redistribute it and/or | ||
# modify it under the terms of the GNU General Public License | ||
# as published by the Free Software Foundation; either version 2 | ||
# of the License, or (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program; if not, write to the Free Software | ||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. | ||
|
||
import sys | ||
import unittest | ||
from unittest.mock import patch | ||
|
||
from test.picardtestcase import PicardTestCase | ||
|
||
from picard.audit import ( | ||
is_matching_a_prefix, | ||
list_from_prefixes_string, | ||
make_prefixes_dict, | ||
prefixes_candidates_for_length, | ||
setup_audit, | ||
) | ||
|
||
|
||
class AuditTest(PicardTestCase): | ||
def test_list_from_prefixes_string(self): | ||
def f(s): | ||
return list(list_from_prefixes_string(s)) | ||
|
||
self.assertEqual(f(''), []) | ||
self.assertEqual(f('a'), [('a',)]) | ||
self.assertEqual(f('a,b'), [('a',), ('b',)]) | ||
self.assertEqual(f('a,,b'), [('a',), ('b',)]) | ||
self.assertEqual(f('a.c,,b.d.f'), [('a', 'c'), ('b', 'd', 'f')]) | ||
self.assertEqual(f('b.d.f,a.c,'), [('a', 'c'), ('b', 'd', 'f')]) | ||
|
||
def test_make_prefixes_dict(self): | ||
d = dict(make_prefixes_dict('')) | ||
self.assertEqual(d, {}) | ||
d = dict(make_prefixes_dict('a')) | ||
self.assertEqual(d, {1: [('a',)]}) | ||
d = dict(make_prefixes_dict('a.b')) | ||
self.assertEqual(d, {2: [('a', 'b')]}) | ||
d = dict(make_prefixes_dict('a.b,c.d,a.b')) | ||
self.assertEqual(d, {2: [('a', 'b'), ('c', 'd')]}) | ||
d = dict(make_prefixes_dict('a,a.b,,a.b.c')) | ||
self.assertEqual(d, {1: [('a',)], 2: [('a', 'b')], 3: [('a', 'b', 'c')]}) | ||
|
||
def test_prefixes_candidates_for_length(self): | ||
d = make_prefixes_dict('a,a.b,c.d,a.b.c,d.e.f,g.h.i') | ||
self.assertEqual(list(prefixes_candidates_for_length(0, d)), []) | ||
self.assertEqual(list(prefixes_candidates_for_length(1, d)), [('a',)]) | ||
self.assertEqual(list(prefixes_candidates_for_length(2, d)), [('a',), ('a', 'b'), ('c', 'd')]) | ||
expected = [('a',), ('a', 'b'), ('c', 'd'), ('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'h', 'i')] | ||
self.assertEqual(list(prefixes_candidates_for_length(3, d)), expected) | ||
self.assertEqual(list(prefixes_candidates_for_length(4, d)), expected) | ||
|
||
def test_is_matching_a_prefix(self): | ||
d = make_prefixes_dict('a.b') | ||
self.assertEqual(is_matching_a_prefix('a', d), False) | ||
self.assertEqual(is_matching_a_prefix('a.b', d), ('a', 'b')) | ||
self.assertEqual(is_matching_a_prefix('a.b.c', d), ('a', 'b')) | ||
self.assertEqual(is_matching_a_prefix('b.c', d), False) | ||
|
||
|
||
@unittest.skipUnless(sys.version_info[:3] > (3, 8), "sys.addaudithook() available since Python 3.8") | ||
class AuditHookTest(PicardTestCase): | ||
def test_setup_audit_1(self): | ||
with patch('sys.addaudithook') as mock: | ||
setup_audit('a,b.c') | ||
self.assertTrue(mock.called) | ||
|
||
def test_setup_audit_2(self): | ||
with patch('sys.addaudithook') as mock: | ||
setup_audit('') | ||
self.assertFalse(mock.called) |