diff --git a/entsoe/decorators.py b/entsoe/decorators.py index 1cfeeb1..45656ba 100644 --- a/entsoe/decorators.py +++ b/entsoe/decorators.py @@ -1,13 +1,13 @@ -import sys +import logging +from functools import wraps from socket import gaierror from time import sleep -import requests -from functools import wraps -from .exceptions import NoMatchingDataError, PaginationError + import pandas as pd -import logging +import requests -from .misc import year_blocks, day_blocks +from .exceptions import NoMatchingDataError, PaginationError +from .misc import day_blocks, year_blocks logger = logging.getLogger(__name__) @@ -24,8 +24,10 @@ def retry_wrapper(*args, **kwargs): result = func(*args, **kwargs) except (requests.ConnectionError, gaierror) as e: error = e - print("Connection Error, retrying in {} seconds".format( - self.retry_delay), file=sys.stderr) + logger.warning( + "Connection Error, " + f"retrying in {self.retry_delay} seconds" + ) sleep(self.retry_delay) continue else: @@ -53,9 +55,11 @@ def pagination_wrapper(*args, start, end, **kwargs): return pagination_wrapper + def documents_limited(n): def decorator(func): - """Deals with calls where you cannot query more than n documents at a time, by offsetting per n documents""" + """Deals with calls where you cannot query more than n documents at a + time, by offsetting per n documents""" @wraps(func) def documents_wrapper(*args, **kwargs): @@ -73,33 +77,66 @@ def documents_wrapper(*args, **kwargs): raise NoMatchingDataError df = pd.concat(frames, sort=True) - df = df.loc[~df.index.duplicated(keep='first')] + # For same indices pick last valid value + if df.index.has_duplicates: + df = df.groupby(df.index).agg(deduplicate_documents_limited) return df return documents_wrapper return decorator +def deduplicate_documents_limited(group): + if group.shape[0] == 1: + return group + else: + return group.ffill().iloc[[-1]] + + def year_limited(func): - """Deals with calls where you cannot query more than a year, by splitting - the call up in blocks per year""" + """Deals with calls where you cannot query more than a year, + by splitting the call up in blocks per year""" @wraps(func) def year_wrapper(*args, start=None, end=None, **kwargs): if start is None or end is None: - raise Exception('Please specify the start and end date explicity with start= when calling this ' - 'function') - if type(start) != pd.Timestamp or type(end) != pd.Timestamp: - raise Exception('Please use a timezoned pandas object for start and end') + raise Exception( + 'Please specify the start and end date explicity with' + 'start= when calling this function' + ) + if ( + not isinstance(start, pd.Timestamp) + or not isinstance(end, pd.Timestamp) + ): + raise Exception( + 'Please use a timezoned pandas object for start and end' + ) if start.tzinfo is None or end.tzinfo is None: - raise Exception('Please use a timezoned pandas object for start and end') + raise Exception( + 'Please use a timezoned pandas object for start and end' + ) blocks = year_blocks(start, end) frames = [] for _start, _end in blocks: try: frame = func(*args, start=_start, end=_end, **kwargs) + # Due to partial matching func may return data indexed by + # timestamps outside _start and _end. In order to avoid + # (unintentionally) repeating records, frames are truncated to + # left-open intervals. Additionally, second disjunct forces the + # earliest block to be a closed interval. + # + # If there are repeating records in a single frame (e.g. due + # to corrections) then the result will also have them. + interval_mask = ( + ((frame.index > _start) & (frame.index <= _end)) + | (frame.index == start) + ) + frame = frame.loc[interval_mask] except NoMatchingDataError: - logger.debug(f"NoMatchingDataError: between {_start} and {_end}") + logger.debug( + f"NoMatchingDataError: between {_start} and {_end}" + ) frame = None frames.append(frame) @@ -108,15 +145,14 @@ def year_wrapper(*args, start=None, end=None, **kwargs): raise NoMatchingDataError df = pd.concat(frames, sort=True) - df = df.loc[~df.index.duplicated(keep='first')] return df return year_wrapper def day_limited(func): - """Deals with calls where you cannot query more than a year, by splitting - the call up in blocks per year""" + """Deals with calls where you cannot query more than a year, + by splitting the call up in blocks per year""" @wraps(func) def day_wrapper(*args, start, end, **kwargs): @@ -126,7 +162,9 @@ def day_wrapper(*args, start, end, **kwargs): try: frame = func(*args, start=_start, end=_end, **kwargs) except NoMatchingDataError: - print(f"NoMatchingDataError: between {_start} and {_end}", file=sys.stderr) + logger.debug( + f"NoMatchingDataError: between {_start} and {_end}" + ) frame = None frames.append(frame) diff --git a/tests.py b/tests.py index 75c7a00..4b777ae 100644 --- a/tests.py +++ b/tests.py @@ -135,6 +135,47 @@ def test_query_procured_balancing_capacity(self): ) self.assertIsInstance(ts, pd.DataFrame) + def test_year_limited_truncation(self): + """ + This is a specific example of polish operator correcting the data + i.e. there was an additional monthly auction for this period. + This results in duplicated time indices. + + source: https://www.pse.pl/web/pse-eng/cross-border-electricity-exchange/auction-office/rzeszow-chmielnicka-interconnection/auction-results # noqa + """ + start = pd.Timestamp('2023-07-17 00:00:00', tz='Europe/Warsaw') + end = pd.Timestamp('2023-08-01 00:00:00', tz='Europe/Warsaw') + ts = self.client.query_offered_capacity( + 'UA_IPS', 'PL', + start=start, end=end, + contract_marketagreement_type='A03', + implicit=False + ) + total_hours = int((end - start).total_seconds()/60/60) + # Expected behaviour is to keep both initial data and corrections + # and leave the deduplication to the user. + self.assertEqual(total_hours*2, ts.shape[0]) + + def test_documents_limited_truncation(self): + ts = pd.DatetimeIndex( + ["2022-03-01", "2022-03-11", "2022-03-21", "2022-04-01"], + tz="Europe/Berlin" + ) + part_dfs = [] + for i in range(len(ts) - 1): + df = self.client.query_contracted_reserve_prices( + 'DE_LU', start=ts[i], end=ts[i+1], + type_marketagreement_type='A01' + ) + part_dfs.append(df) + df_parts = pd.concat(part_dfs) + df_full = self.client.query_contracted_reserve_prices( + 'DE_LU', start=ts[0], end=ts[-1], + type_marketagreement_type='A01' + ) + self.assertEqual(df_parts.shape, df_full.shape) + self.assertTrue(all(df_parts.isna().sum() == df_full.isna().sum())) + if __name__ == '__main__': unittest.main()