Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/pytest-mock-3.11.1
Browse files Browse the repository at this point in the history
  • Loading branch information
stchris authored Oct 12, 2023
2 parents 7063cd8 + 6241a10 commit 4c273a0
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.21.0
current_version = 1.21.2
commit = True
tag = True

Expand Down
10 changes: 5 additions & 5 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
black==23.3.0
ruff==0.0.270
pytest==7.3.1
pytest-env==0.8.1
black==23.9.1
ruff==0.0.292
pytest==7.4.2
pytest-env==1.0.1
pytest-cov==4.1.0
pytest-mock==3.11.1
wheel==0.40.0
wheel==0.41.2
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
banal==1.0.6
normality==2.4.0
fakeredis==2.13.0
normality==2.5.0
fakeredis==2.19.0
sqlalchemy==2.0.4
structlog==23.1.0
structlog==23.2.0
colorama==0.4.6
pika==1.3.2
2 changes: 1 addition & 1 deletion servicelayer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

__version__ = "1.21.0"
__version__ = "1.21.2"


logging.getLogger("boto3").setLevel(logging.WARNING)
Expand Down
2 changes: 2 additions & 0 deletions servicelayer/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from structlog.contextvars import clear_contextvars, bind_contextvars

from servicelayer import settings
from servicelayer.util import unpack_int

LOG_FORMAT_TEXT = "TEXT"
LOG_FORMAT_JSON = "JSON"
Expand Down Expand Up @@ -84,6 +85,7 @@ def apply_task_context(task, **kwargs):
dataset=task.job.dataset.name,
start_time=time.time(),
trace_id=str(uuid.uuid4()),
retry=unpack_int(task.context.get("retries")),
**kwargs
)

Expand Down
6 changes: 3 additions & 3 deletions servicelayer/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Tags(object):

def __init__(self, name, uri=settings.TAGS_DATABASE_URI, **config):
self.name = name
self.engine = create_engine(uri, **config)
self.engine = create_engine(uri, future=True, **config)
self.is_postgres = self.engine.dialect.name == "postgresql"
self.table = Table(
name,
Expand Down Expand Up @@ -50,7 +50,7 @@ def get(self, key, since=None):
stmt = stmt.where(self.table.c.timestamp >= since)
with self.engine.connect() as conn:
rp = conn.execute(stmt)
row = rp.fetchone()
row = rp.fetchone()
if row is not None:
return row.value

Expand All @@ -61,7 +61,7 @@ def exists(self, key, since=None):
stmt = stmt.where(self.table.c.timestamp >= since)
with self.engine.connect() as conn:
rp = conn.execute(stmt)
count = rp.scalar()
count = rp.scalar()
return count > 0

def _store_values(self, conn, row):
Expand Down
11 changes: 9 additions & 2 deletions servicelayer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ def init_internal(self):
def retry(self, task):
retries = unpack_int(task.context.get("retries"))
if retries < settings.WORKER_RETRY:
log.warning("Queue failed task for re-try...")
task.context["retries"] = retries + 1
retry_count = retries + 1
log.warning(
f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa
)
task.context["retries"] = retry_count
task.stage.queue(task.payload, task.context)
else:
log.warning(
f"Failed task, exhausted retry count of {settings.WORKER_RETRY}"
)

def process(self, blocking=True, interval=INTERVAL):
retries = 0
Expand Down
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="servicelayer",
version="1.21.0",
version="1.21.2",
description="Basic remote service functions for alephdata components",
classifiers=[
"Development Status :: 3 - Alpha",
Expand All @@ -30,8 +30,9 @@
install_requires=[
"banal >= 1.0.6, < 2.0.0",
"normality >= 2.4.0, < 3.0.0",
"fakeredis >=2.11.2, < 3.0.0",
"sqlalchemy >= 2.0.4, < 3.0.0",
"fakeredis >= 2.11.2, < 3.0.0",
"redis <= 4.6.0",
"sqlalchemy >= 1.4.49, < 3.0.0",
"structlog >= 20.2.0, < 24.0.0",
"colorama >= 0.4.4, < 1.0.0",
"pika >= 1.3.1, < 2.0.0",
Expand Down
96 changes: 60 additions & 36 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import TestCase
import logging
import pytest

from servicelayer.cache import get_fakeredis
Expand All @@ -17,39 +17,63 @@ def handle(self, task):
self.test_done += 1


class WorkerTest(TestCase):
def test_run(self):
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
class NoOpWorker(worker.Worker):
def handle(self, task):
pass


def test_run():
conn = get_fakeredis()
operation = "lala"
worker = CountingWorker(conn=conn, stages=[operation])
worker.sync()
assert worker.test_done == 0, worker.test_done
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})
assert not job.is_done()
assert worker.test_done == 0, worker.test_done
worker.sync()
assert worker.test_done == 1, worker.test_done
assert job.is_done()
worker.retry(task)
assert not job.is_done()
worker.sync()
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
assert worker.test_done == 1, worker.test_done
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
worker.num_threads = None
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)


def test_fails(caplog):
caplog.set_level(logging.DEBUG)
conn = get_fakeredis()
operation = "fails"
job = Job.create(conn, "test")
stage = job.get_stage(operation)
task = stage.queue({}, {})

worker = NoOpWorker(conn=conn, stages=[operation])
worker.sync()
for _ in range(4):
worker.retry(task)
worker.run(blocking=False)
assert job.is_done()
assert worker.exit_code == 0, worker.exit_code
try:
worker._handle_signal(5, None)
except SystemExit as exc:
assert exc.code == 5, exc.code
with pytest.raises(SystemExit) as exc: # noqa
worker._handle_signal(5, None)

log_messages = [r.msg for r in caplog.records]
assert "Queueing failed task for retry #1/3..." in log_messages
assert "Queueing failed task for retry #2/3..." in log_messages
assert "Queueing failed task for retry #3/3..." in log_messages
assert "Failed task, exhausted retry count of 3" in log_messages

0 comments on commit 4c273a0

Please sign in to comment.