diff --git a/picard/audit.py b/picard/audit.py new file mode 100644 index 0000000000..2e62409c89 --- /dev/null +++ b/picard/audit.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Picard, the next-generation MusicBrainz tagger +# +# Copyright (C) 2023 Laurent Monin +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from collections import defaultdict +import sys +import threading +import time + + +def setup_audit(prefixes_string): + """Setup audit hook according to `audit` command-line option""" + if not prefixes_string: + return + if 'all' in prefixes_string.split(','): + def event_match(event): + return ('all', ) + else: + # prebuild the dict, constant + PREFIXES_DICT = make_prefixes_dict(prefixes_string) + + def event_match(event): + return is_matching_a_prefix(event, PREFIXES_DICT) + + start_time = time.time() + + def audit(event, args): + matched = event_match(event) + if matched: + matched = '.'.join(matched) + tid = threading.get_native_id() + secs = time.time() - start_time + # we can't use log here, as it generates events + print(f'audit:{matched}:{tid}:{secs} {event} args={args}') + + try: + sys.addaudithook(audit) + except AttributeError: + # sys.addaudithook() appeared in Python 3.8 + pass + + +def list_from_prefixes_string(prefixes_string): + """Generate a sorted list of prefixes tuples + A prefixes string is a comma-separated list of dot-separated keys + "a,b.c,d.e.f,,g" would result in following sorted list: + [('a',), ('b', 'c'), ('d', 'e', 'f'), ('g',)] + """ + yield from sorted(set(tuple(e.split('.')) for e in prefixes_string.split(',') if e)) + + +def make_prefixes_dict(prefixes_string): + """Build a dict with keys = length of prefix""" + d = defaultdict(list) + for prefix_tuple in list_from_prefixes_string(prefixes_string): + d[len(prefix_tuple)].append(prefix_tuple) + return dict(d) + + +def prefixes_candidates_for_length(length, prefixes_dict): + """Generate prefixes that may match this length""" + for prefix_len, prefixes in prefixes_dict.items(): + if length >= prefix_len: + yield from prefixes + + +def is_matching_a_prefix(key, prefixes_dict): + """Matches dot-separated key against prefixes + Typical case: we want to match `os.mkdir` if prefix is `os` or `os.mkdir` + but not the reverse: if prefix is `os.mkdir` we don't want to match a key named `os` + It returns False, or the matched prefix + """ + key_tuple = tuple(key.split('.')) + key_tuple_len = len(key_tuple) + # only use candidates that may have a chance to match + for prefix_tuple in prefixes_candidates_for_length(key_tuple_len, prefixes_dict): + # check that all elements of the key are in prefix tuple + if all(prefix_part == key_tuple[i] for i, prefix_part in enumerate(prefix_tuple)): + return prefix_tuple + return False diff --git a/picard/tagger.py b/picard/tagger.py index 5c1f1dcf38..f4c643c199 100644 --- a/picard/tagger.py +++ b/picard/tagger.py @@ -81,6 +81,7 @@ NatAlbum, run_album_post_removal_processors, ) +from picard.audit import setup_audit from picard.browser.browser import BrowserIntegration from picard.browser.filelookup import FileLookup from picard.cluster import ( @@ -256,6 +257,9 @@ def __init__(self, picard_args, localedir, autoupdate, pipe_handler=None): if picard_args.debug or "PICARD_DEBUG" in os.environ: self.set_log_level(logging.DEBUG) + if picard_args.audit: + setup_audit(picard_args.audit) + # Main thread pool used for most background tasks self.thread_pool = QtCore.QThreadPool(self) # Two threads are needed for the pipe handler and command processing. @@ -1437,6 +1441,10 @@ def process_picard_args(): parser.add_argument("-display", nargs=1, help=argparse.SUPPRESS) # Picard specific arguments + parser.add_argument("-a", "--audit", action='store', + default=None, + help="audit events passed as a comma-separated list, prefixes supported, " + "use all to match any (see https://docs.python.org/3/library/audit_events.html#audit-events)") parser.add_argument("-c", "--config-file", action='store', default=None, help="location of the configuration file") diff --git a/test/test_audit.py b/test/test_audit.py new file mode 100644 index 0000000000..45f83e0b7d --- /dev/null +++ b/test/test_audit.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Picard, the next-generation MusicBrainz tagger +# +# Copyright (C) 2023 Laurent Monin +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +import sys +import unittest +from unittest.mock import patch + +from test.picardtestcase import PicardTestCase + +from picard.audit import ( + is_matching_a_prefix, + list_from_prefixes_string, + make_prefixes_dict, + prefixes_candidates_for_length, + setup_audit, +) + + +class AuditTest(PicardTestCase): + def test_list_from_prefixes_string(self): + def f(s): + return list(list_from_prefixes_string(s)) + + self.assertEqual(f(''), []) + self.assertEqual(f('a'), [('a',)]) + self.assertEqual(f('a,b'), [('a',), ('b',)]) + self.assertEqual(f('a,,b'), [('a',), ('b',)]) + self.assertEqual(f('a.c,,b.d.f'), [('a', 'c'), ('b', 'd', 'f')]) + self.assertEqual(f('b.d.f,a.c,'), [('a', 'c'), ('b', 'd', 'f')]) + + def test_make_prefixes_dict(self): + d = dict(make_prefixes_dict('')) + self.assertEqual(d, {}) + d = dict(make_prefixes_dict('a')) + self.assertEqual(d, {1: [('a',)]}) + d = dict(make_prefixes_dict('a.b')) + self.assertEqual(d, {2: [('a', 'b')]}) + d = dict(make_prefixes_dict('a.b,c.d,a.b')) + self.assertEqual(d, {2: [('a', 'b'), ('c', 'd')]}) + d = dict(make_prefixes_dict('a,a.b,,a.b.c')) + self.assertEqual(d, {1: [('a',)], 2: [('a', 'b')], 3: [('a', 'b', 'c')]}) + + def test_prefixes_candidates_for_length(self): + d = make_prefixes_dict('a,a.b,c.d,a.b.c,d.e.f,g.h.i') + self.assertEqual(list(prefixes_candidates_for_length(0, d)), []) + self.assertEqual(list(prefixes_candidates_for_length(1, d)), [('a',)]) + self.assertEqual(list(prefixes_candidates_for_length(2, d)), [('a',), ('a', 'b'), ('c', 'd')]) + expected = [('a',), ('a', 'b'), ('c', 'd'), ('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'h', 'i')] + self.assertEqual(list(prefixes_candidates_for_length(3, d)), expected) + self.assertEqual(list(prefixes_candidates_for_length(4, d)), expected) + + def test_is_matching_a_prefix(self): + d = make_prefixes_dict('a.b') + self.assertEqual(is_matching_a_prefix('a', d), False) + self.assertEqual(is_matching_a_prefix('a.b', d), ('a', 'b')) + self.assertEqual(is_matching_a_prefix('a.b.c', d), ('a', 'b')) + self.assertEqual(is_matching_a_prefix('b.c', d), False) + + +@unittest.skipUnless(sys.version_info[:3] > (3, 8), "sys.addaudithook() available since Python 3.8") +class AuditHookTest(PicardTestCase): + def test_setup_audit_1(self): + with patch('sys.addaudithook') as mock: + setup_audit('a,b.c') + self.assertTrue(mock.called) + + def test_setup_audit_2(self): + with patch('sys.addaudithook') as mock: + setup_audit('') + self.assertFalse(mock.called)