Skip to content

Commit

Permalink
Merge pull request #287 from pee-po/rework-decorators
Browse files Browse the repository at this point in the history
Rework decorators
  • Loading branch information
fboerman authored Jan 5, 2024
2 parents 29089d3 + 27f4cca commit dcf6f2d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 22 deletions.
82 changes: 60 additions & 22 deletions entsoe/decorators.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import sys
import logging
from functools import wraps
from socket import gaierror
from time import sleep
import requests
from functools import wraps
from .exceptions import NoMatchingDataError, PaginationError

import pandas as pd
import logging
import requests

from .misc import year_blocks, day_blocks
from .exceptions import NoMatchingDataError, PaginationError
from .misc import day_blocks, year_blocks

logger = logging.getLogger(__name__)

Expand All @@ -24,8 +24,10 @@ def retry_wrapper(*args, **kwargs):
result = func(*args, **kwargs)
except (requests.ConnectionError, gaierror) as e:
error = e
print("Connection Error, retrying in {} seconds".format(
self.retry_delay), file=sys.stderr)
logger.warning(
"Connection Error, "
f"retrying in {self.retry_delay} seconds"
)
sleep(self.retry_delay)
continue
else:
Expand Down Expand Up @@ -53,9 +55,11 @@ def pagination_wrapper(*args, start, end, **kwargs):

return pagination_wrapper


def documents_limited(n):
def decorator(func):
"""Deals with calls where you cannot query more than n documents at a time, by offsetting per n documents"""
"""Deals with calls where you cannot query more than n documents at a
time, by offsetting per n documents"""

@wraps(func)
def documents_wrapper(*args, **kwargs):
Expand All @@ -73,33 +77,66 @@ def documents_wrapper(*args, **kwargs):
raise NoMatchingDataError

df = pd.concat(frames, sort=True)
df = df.loc[~df.index.duplicated(keep='first')]
# For same indices pick last valid value
if df.index.has_duplicates:
df = df.groupby(df.index).agg(deduplicate_documents_limited)
return df
return documents_wrapper
return decorator


def deduplicate_documents_limited(group):
if group.shape[0] == 1:
return group
else:
return group.ffill().iloc[[-1]]


def year_limited(func):
"""Deals with calls where you cannot query more than a year, by splitting
the call up in blocks per year"""
"""Deals with calls where you cannot query more than a year,
by splitting the call up in blocks per year"""

@wraps(func)
def year_wrapper(*args, start=None, end=None, **kwargs):
if start is None or end is None:
raise Exception('Please specify the start and end date explicity with start=<date> when calling this '
'function')
if type(start) != pd.Timestamp or type(end) != pd.Timestamp:
raise Exception('Please use a timezoned pandas object for start and end')
raise Exception(
'Please specify the start and end date explicity with'
'start=<date> when calling this function'
)
if (
not isinstance(start, pd.Timestamp)
or not isinstance(end, pd.Timestamp)
):
raise Exception(
'Please use a timezoned pandas object for start and end'
)
if start.tzinfo is None or end.tzinfo is None:
raise Exception('Please use a timezoned pandas object for start and end')
raise Exception(
'Please use a timezoned pandas object for start and end'
)

blocks = year_blocks(start, end)
frames = []
for _start, _end in blocks:
try:
frame = func(*args, start=_start, end=_end, **kwargs)
# Due to partial matching func may return data indexed by
# timestamps outside _start and _end. In order to avoid
# (unintentionally) repeating records, frames are truncated to
# left-open intervals. Additionally, second disjunct forces the
# earliest block to be a closed interval.
#
# If there are repeating records in a single frame (e.g. due
# to corrections) then the result will also have them.
interval_mask = (
((frame.index > _start) & (frame.index <= _end))
| (frame.index == start)
)
frame = frame.loc[interval_mask]
except NoMatchingDataError:
logger.debug(f"NoMatchingDataError: between {_start} and {_end}")
logger.debug(
f"NoMatchingDataError: between {_start} and {_end}"
)
frame = None
frames.append(frame)

Expand All @@ -108,15 +145,14 @@ def year_wrapper(*args, start=None, end=None, **kwargs):
raise NoMatchingDataError

df = pd.concat(frames, sort=True)
df = df.loc[~df.index.duplicated(keep='first')]
return df

return year_wrapper


def day_limited(func):
"""Deals with calls where you cannot query more than a year, by splitting
the call up in blocks per year"""
"""Deals with calls where you cannot query more than a year,
by splitting the call up in blocks per year"""

@wraps(func)
def day_wrapper(*args, start, end, **kwargs):
Expand All @@ -126,7 +162,9 @@ def day_wrapper(*args, start, end, **kwargs):
try:
frame = func(*args, start=_start, end=_end, **kwargs)
except NoMatchingDataError:
print(f"NoMatchingDataError: between {_start} and {_end}", file=sys.stderr)
logger.debug(
f"NoMatchingDataError: between {_start} and {_end}"
)
frame = None
frames.append(frame)

Expand Down
41 changes: 41 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,47 @@ def test_query_procured_balancing_capacity(self):
)
self.assertIsInstance(ts, pd.DataFrame)

def test_year_limited_truncation(self):
"""
This is a specific example of polish operator correcting the data
i.e. there was an additional monthly auction for this period.
This results in duplicated time indices.
source: https://www.pse.pl/web/pse-eng/cross-border-electricity-exchange/auction-office/rzeszow-chmielnicka-interconnection/auction-results # noqa
"""
start = pd.Timestamp('2023-07-17 00:00:00', tz='Europe/Warsaw')
end = pd.Timestamp('2023-08-01 00:00:00', tz='Europe/Warsaw')
ts = self.client.query_offered_capacity(
'UA_IPS', 'PL',
start=start, end=end,
contract_marketagreement_type='A03',
implicit=False
)
total_hours = int((end - start).total_seconds()/60/60)
# Expected behaviour is to keep both initial data and corrections
# and leave the deduplication to the user.
self.assertEqual(total_hours*2, ts.shape[0])

def test_documents_limited_truncation(self):
ts = pd.DatetimeIndex(
["2022-03-01", "2022-03-11", "2022-03-21", "2022-04-01"],
tz="Europe/Berlin"
)
part_dfs = []
for i in range(len(ts) - 1):
df = self.client.query_contracted_reserve_prices(
'DE_LU', start=ts[i], end=ts[i+1],
type_marketagreement_type='A01'
)
part_dfs.append(df)
df_parts = pd.concat(part_dfs)
df_full = self.client.query_contracted_reserve_prices(
'DE_LU', start=ts[0], end=ts[-1],
type_marketagreement_type='A01'
)
self.assertEqual(df_parts.shape, df_full.shape)
self.assertTrue(all(df_parts.isna().sum() == df_full.isna().sum()))


if __name__ == '__main__':
unittest.main()

0 comments on commit dcf6f2d

Please sign in to comment.