Skip to content

Commit

Permalink
Refactor recorder retryable_database_job decorator (home-assistant#12…
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery authored Sep 5, 2024
1 parent 65e16b4 commit 86ae707
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
3 changes: 2 additions & 1 deletion homeassistant/components/recorder/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
execute_stmt_lambda_element,
get_index_by_name,
retryable_database_job,
retryable_database_job_method,
session_scope,
)

Expand Down Expand Up @@ -2229,7 +2230,7 @@ def do_migrate(self, instance: Recorder, session: Session) -> None:
else:
self.migration_done(instance, session)

@retryable_database_job("migrate data", method=True)
@retryable_database_job_method("migrate data")
def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed."""
status = self.migrate_data_impl(instance)
Expand Down
66 changes: 44 additions & 22 deletions homeassistant/components/recorder/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,44 +645,66 @@ def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool:


type _FuncType[**P, R] = Callable[Concatenate[Recorder, P], R]
type _MethType[Self, **P, R] = Callable[Concatenate[Self, Recorder, P], R]
type _FuncOrMethType[**_P, _R] = Callable[_P, _R]


def retryable_database_job[**_P](
description: str, method: bool = False
) -> Callable[[_FuncOrMethType[_P, bool]], _FuncOrMethType[_P, bool]]:
description: str,
) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]:
"""Try to execute a database job.
The job should return True if it finished, and False if it needs to be rescheduled.
"""
recorder_pos = 1 if method else 0

def decorator(job: _FuncOrMethType[_P, bool]) -> _FuncOrMethType[_P, bool]:
@functools.wraps(job)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> bool:
instance: Recorder = args[recorder_pos] # type: ignore[assignment]
try:
return job(*args, **kwargs)
except OperationalError as err:
if _is_retryable_error(instance, err):
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info(
"%s; %s not completed, retrying", err.orig.args[1], description
)
time.sleep(instance.db_retry_wait)
# Failed with retryable error
return False
def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]:
return _wrap_func_or_meth(job, description, False)

_LOGGER.warning("Error executing %s: %s", description, err)
return decorator

# Failed with permanent error
return True

return wrapper
def retryable_database_job_method[_Self, **_P](
description: str,
) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]:
"""Try to execute a database job.
The job should return True if it finished, and False if it needs to be rescheduled.
"""

def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]:
return _wrap_func_or_meth(job, description, True)

return decorator


def _wrap_func_or_meth[**_P](
job: _FuncOrMethType[_P, bool], description: str, method: bool
) -> _FuncOrMethType[_P, bool]:
recorder_pos = 1 if method else 0

@functools.wraps(job)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> bool:
instance: Recorder = args[recorder_pos] # type: ignore[assignment]
try:
return job(*args, **kwargs)
except OperationalError as err:
if _is_retryable_error(instance, err):
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info(
"%s; %s not completed, retrying", err.orig.args[1], description
)
time.sleep(instance.db_retry_wait)
# Failed with retryable error
return False

_LOGGER.warning("Error executing %s: %s", description, err)

# Failed with permanent error
return True

return wrapper


def database_job_retry_wrapper[**_P](
description: str, attempts: int = 5
) -> Callable[[_FuncType[_P, None]], _FuncType[_P, None]]:
Expand Down

0 comments on commit 86ae707

Please sign in to comment.