diff --git a/script.module.jurialmunkey/addon.xml b/script.module.jurialmunkey/addon.xml index 83d1b98fa..42772edc1 100644 --- a/script.module.jurialmunkey/addon.xml +++ b/script.module.jurialmunkey/addon.xml @@ -1,5 +1,5 @@ - + diff --git a/script.module.jurialmunkey/resources/modules/jurialmunkey/bcache.py b/script.module.jurialmunkey/resources/modules/jurialmunkey/bcache.py index 418f21849..388714f0a 100644 --- a/script.module.jurialmunkey/resources/modules/jurialmunkey/bcache.py +++ b/script.module.jurialmunkey/resources/modules/jurialmunkey/bcache.py @@ -6,6 +6,7 @@ class BasicCache(): _simplecache = jurialmunkey.scache.SimpleCache + _queue_limit = 20 def __init__(self, filename=None): self._filename = filename @@ -23,6 +24,7 @@ def kodi_traceback(exc, log_msg): @kodi_try_except_internal_traceback('lib.addon.cache ret_cache') def ret_cache(self): if not self._cache: + self._simplecache._queue_limit = self._queue_limit self._cache = self._simplecache(filename=self._filename) return self._cache diff --git a/script.module.jurialmunkey/resources/modules/jurialmunkey/parser.py b/script.module.jurialmunkey/resources/modules/jurialmunkey/parser.py index 85cb9e2db..2e8df6863 100644 --- a/script.module.jurialmunkey/resources/modules/jurialmunkey/parser.py +++ b/script.module.jurialmunkey/resources/modules/jurialmunkey/parser.py @@ -51,14 +51,18 @@ def partition_list(iterable, pred): def parse_paramstring(paramstring): """ helper to assist to standardise urllib parsing """ from urllib.parse import unquote_plus - from unicodedata import normalize + # from unicodedata import normalize params = dict() paramstring = paramstring.replace('&', '&') # Just in case xml string for param in paramstring.split('&'): if '=' not in param: continue k, v = param.split('=') - params[unquote_plus(k)] = normalize('NFKD', unquote_plus(v)).strip('\'').strip('"') # Normalize and decompose combined utf-8 forms such as Arabic and strip out quotes + k = unquote_plus(k) + v = unquote_plus(v) + v = v.strip('\'').strip('"') # Strip out quote marks from Kodi string + # v = normalize('NFKD', v) # Normalize and decompose combined utf-8 forms such as Arabic (Unsure if needed anymore for edit control) + params[k] = v return params diff --git a/script.module.jurialmunkey/resources/modules/jurialmunkey/reqapi.py b/script.module.jurialmunkey/resources/modules/jurialmunkey/reqapi.py index eb72699b0..828a56745 100644 --- a/script.module.jurialmunkey/resources/modules/jurialmunkey/reqapi.py +++ b/script.module.jurialmunkey/resources/modules/jurialmunkey/reqapi.py @@ -84,6 +84,24 @@ def __init__(self, req_api_url=None, req_api_key=None, req_api_name=None, timeou self._error_notification = error_notification or self.error_notification self.translate_xml = translate_xml + @property + def requests(self): + try: + return self._requests + except AttributeError: + import requests + self._requests = requests + return self._requests + + @property + def session(self): + try: + return self._session + except AttributeError: + self._session = self.requests.Session() + self._session.mount(self.req_api_url, self.requests.adapters.HTTPAdapter(pool_maxsize=100)) + return self._session + @staticmethod def kodi_log(msg, level=0): from jurialmunkey.logger import Logger @@ -155,20 +173,19 @@ def timeout_error(self, err): get_property(self.req_timeout_err_prop, self.req_timeout_err) def get_simple_api_request(self, request=None, postdata=None, headers=None, method=None): - import requests try: if method == 'delete': - return requests.delete(request, headers=headers, timeout=self.timeout) + return self.session.delete(request, headers=headers, timeout=self.timeout) if method == 'put': - return requests.put(request, data=postdata, headers=headers, timeout=self.timeout) + return self.session.put(request, data=postdata, headers=headers, timeout=self.timeout) if method == 'json': - return requests.post(request, json=postdata, headers=headers, timeout=self.timeout) + return self.session.post(request, json=postdata, headers=headers, timeout=self.timeout) if postdata or method == 'post': # If pass postdata assume we want to post - return requests.post(request, data=postdata, headers=headers, timeout=self.timeout) - return requests.get(request, headers=headers, timeout=self.timeout) - except requests.exceptions.ConnectionError as errc: + return self.session.post(request, data=postdata, headers=headers, timeout=self.timeout) + return self.session.get(request, headers=headers, timeout=self.timeout) + except self.requests.exceptions.ConnectionError as errc: self.connection_error(errc, check_status=True) - except requests.exceptions.Timeout as errt: + except self.requests.exceptions.Timeout as errt: self.timeout_error(errt) except Exception as err: self.kodi_log(f'RequestError: {err}', 1) diff --git a/script.module.jurialmunkey/resources/modules/jurialmunkey/scache.py b/script.module.jurialmunkey/resources/modules/jurialmunkey/scache.py index b71af36c0..f669c517a 100644 --- a/script.module.jurialmunkey/resources/modules/jurialmunkey/scache.py +++ b/script.module.jurialmunkey/resources/modules/jurialmunkey/scache.py @@ -31,8 +31,9 @@ class SimpleCache(object): _memcache = False _basefolder = '' _fileutils = FILEUTILS - _retries = 4 - _retry_polling = 0.1 + _db_timeout = 3.0 + _db_read_timeout = 1.0 + _queue_limit = 20 def __init__(self, folder=None, filename=None): '''Initialize our caching class''' @@ -64,11 +65,7 @@ def close(self): def __del__(self): '''make sure close is called''' - if self._queue: - self.kodi_log(f'CACHE: Write {len(self._queue)} Items in Queue\n{self._sc_name}', 2) - for i in self._queue: - self._set_db_cache(*i) - self._queue = [] + self.write_queue() self.close() @contextmanager @@ -79,6 +76,18 @@ def busy_tasks(self, task_name): finally: self._busy_tasks.remove(task_name) + def write_queue(self): + if not self._queue: + return + + items = [i for i in self._queue] + self._queue = [] + + with self.busy_tasks(f'write.queue'): + self.kodi_log(f'CACHE: Write {len(items)} Items in Queue\n{self._sc_name}', 2) + for i in items: + self._set_db_cache(*i) + def get(self, endpoint, cur_time=None): ''' get object from cache and return the results @@ -94,7 +103,10 @@ def set(self, endpoint, data, cache_days=30): expires = set_timestamp(cache_days * TIME_DAYS, True) data = data_dumps(data, separators=(',', ':')) self._set_mem_cache(endpoint, expires, data) - self._set_db_cache(endpoint, expires, data) + self._queue.append((endpoint, expires, data, )) + if self._memcache and len(self._queue) < self._queue_limit: + return + self.write_queue() def check_cleanup(self): '''check if cleanup is needed - public method, may be called by calling addon''' @@ -102,6 +114,7 @@ def check_cleanup(self): lastexecuted = self._win.getProperty(f'{self._sc_name}.clean.lastexecuted') if not lastexecuted: self._win.setProperty(f'{self._sc_name}.clean.lastexecuted', str(cur_time - self._auto_clean_interval + 600)) + self._init_database() return if (int(lastexecuted) + self._auto_clean_interval) < cur_time: self._do_cleanup() @@ -142,26 +155,54 @@ def _set_mem_cache(self, endpoint, expires, data): def _get_db_cache(self, endpoint, cur_time): '''get cache data from sqllite _database''' - result = None query = "SELECT expires, data, checksum FROM simplecache WHERE id = ? LIMIT 1" - cache_data = self._execute_sql(query, (endpoint,)) + cache_data = self._execute_sql(query, (endpoint,), read_only=True) + if not cache_data: return + cache_data = cache_data.fetchone() - if not cache_data or int(cache_data[0]) <= cur_time: + + if not cache_data: return + try: - data = str(zlib.decompress(cache_data[1]), 'utf-8') + expires = int(cache_data[0]) # Check we can convert expiry to int otherwise assume has expired. + data = cache_data[1] # Check that we can get data from cache otherwise return None. except TypeError: - data = cache_data[1] + return + + if expires <= cur_time: + return + + try: + data = str(zlib.decompress(data), 'utf-8') + # This code block checking for TypeError was in legacy but seems wrong? Type should be consistent. + # zlib complaining about TypeError would indicate an issue with returned data -- treat as expired. + # except TypeError: + # data = cache_data[1] + except Exception as error: + self.kodi_log(f'CACHE: _get_db_cache zlib.decompress error: {error}\n{self._sc_name} - {endpoint}', 1) + return + + try: + result = data_loads(data) # Confirm that data is valid JSON + except Exception as error: + self.kodi_log(f'CACHE: _get_db_cache data_loads error: {error}\n{self._sc_name} - {endpoint}', 1) + return + self._set_mem_cache(endpoint, cache_data[0], data) - result = data_loads(data) + return result def _set_db_cache(self, endpoint, expires, data): ''' store cache data in _database ''' query = "INSERT OR REPLACE INTO simplecache( id, expires, data, checksum) VALUES (?, ?, ?, ?)" - data = zlib.compress(bytes(data, 'utf-8')) + try: + data = zlib.compress(bytes(data, 'utf-8')) + except Exception as error: + self.kodi_log(f'CACHE: _set_db_cache zlib.compress error: {error}\n{self._sc_name} - {endpoint}', 1) + return self._execute_sql(query, (endpoint, expires, data, 0)) def _do_delete(self): @@ -227,37 +268,43 @@ def _set_pragmas(self, connection): self._connection = connection return connection - def _get_database(self, attempts=2): + def _init_database(self): + if xbmcvfs.exists(self._db_file): + return + # self.kodi_log(f'CACHE: Deleting Corrupt File: {self._db_file}...', 1) + # xbmcvfs.delete(self._db_file) + return self._create_database() + + def _create_database(self): + try: + self.kodi_log(f'CACHE: Initialising: {self._db_file}...', 1) + connection = self._connection or sqlite3.connect(self._db_file, timeout=5.0, isolation_level=None, check_same_thread=not self._re_use_con) + connection.execute( + """CREATE TABLE IF NOT EXISTS simplecache( + id TEXT UNIQUE, expires INTEGER, data TEXT, checksum INTEGER)""") + except Exception as error: + self.kodi_log(f'CACHE: Exception while initializing _database: {error}\n{self._sc_name}', 1) + try: + connection.execute("CREATE INDEX idx ON simplecache(id)") + except Exception as error: + self.kodi_log(f'CACHE: Exception while creating index for _database: {error}\n{self._sc_name}', 1) + try: + return self._set_pragmas(connection) + except Exception as error: + self.kodi_log(f'CACHE: Exception while setting pragmas for _database: {error}\n{self._sc_name}', 1) + + def _get_database(self, read_only=False): '''get reference to our sqllite _database - performs basic integrity check''' + timeout = self._db_read_timeout if read_only else self._db_timeout try: - connection = self._connection or sqlite3.connect(self._db_file, timeout=2.0, isolation_level=None, check_same_thread=not self._re_use_con) + connection = self._connection or sqlite3.connect(self._db_file, timeout=timeout, isolation_level=None, check_same_thread=not self._re_use_con) connection.execute('SELECT * FROM simplecache LIMIT 1') return self._set_pragmas(connection) - except Exception: - # our _database is corrupt or doesn't exist yet, we simply try to recreate it - if xbmcvfs.exists(self._db_file): - self.kodi_log(f'CACHE: Deleting Corrupt File: {self._db_file}...', 1) - xbmcvfs.delete(self._db_file) - try: - self.kodi_log(f'CACHE: Initialising: {self._db_file}...', 1) - connection = self._connection or sqlite3.connect(self._db_file, timeout=2.0, isolation_level=None, check_same_thread=not self._re_use_con) - connection.execute( - """CREATE TABLE IF NOT EXISTS simplecache( - id TEXT UNIQUE, expires INTEGER, data TEXT, checksum INTEGER)""") - connection.execute("CREATE INDEX idx ON simplecache(id)") - return self._set_pragmas(connection) - except Exception as error: - self.kodi_log(f'CACHE: Exception while initializing _database: {error} ({attempts})\n{self._sc_name}', 1) - if attempts < 1: - return - attempts -= 1 - self._monitor.waitForAbort(1) - return self._get_database(attempts) + except Exception as error: + self.kodi_log(f'CACHE: ERROR while retrieving _database: {error}\n{self._sc_name}', 1) - def _execute_sql(self, query, data=None): + def _execute_sql(self, query, data=None, read_only=False): '''little wrapper around execute and executemany to just retry a db command if db is locked''' - retries = self._retries - def _database_execute(_database): if not data: return _database.execute(query) @@ -266,29 +313,13 @@ def _database_execute(_database): return _database.execute(query, data) # always use new db object because we need to be sure that data is available for other simplecache instances - error = None - with self._get_database() as _database: - while retries > 0 and not self._monitor.abortRequested(): - if self._exit: - return None + try: + with self._get_database(read_only=read_only) as _database: try: return _database_execute(_database) - except sqlite3.OperationalError as err: - error = f'{err}' - except Exception as err: - error = f'{err}' - if error is None: - continue - if error != 'database is locked': - break - retries = retries - 1 - if retries > 0: - log_level = 1 if retries < self._retries - 1 else 2 # Only debug log for first retry - transaction = 'commit' if data else 'lookup' - self.kodi_log(f'CACHE: _database LOCKED -- Retrying DB {transaction}...\n{self._sc_name}', log_level) - self._monitor.waitForAbort(self._retry_polling) - continue - error = 'Retry failed. Database locked.' - if error not in [None, 'not an error']: - self.kodi_log(f'CACHE: _database ERROR! -- {error}\n{self._sc_name}', 1) - return None + except sqlite3.OperationalError as operational_exception: + self.kodi_log(f'CACHE: _database OPERATIONAL ERROR! -- {operational_exception}\n{self._sc_name} -- read_only: {read_only}', 2) + except Exception as other_exception: + self.kodi_log(f'CACHE: _database OTHER ERROR! -- {other_exception}\n{self._sc_name} -- read_only: {read_only}', 2) + except Exception as database_exception: + self.kodi_log(f'CACHE: _database GET DATABASE ERROR! -- {database_exception}\n{self._sc_name} -- read_only: {read_only}', 2)