Skip to content

Commit

Permalink
Fix SA2.0 error: celery task
Browse files Browse the repository at this point in the history
1. In 2.0, when the statement contains "returning", the result type is
   ChunkedIteratorResult, which does not have the rowcount attr,
   becuase:
2. result.rowcount should not be used for statements containting the returning clause

Ref: https://docs.sqlalchemy.org/en/20/core/connections.html#sqlalchemy.engine.CursorResult.rowcount
  • Loading branch information
jdavcs committed Mar 4, 2024
1 parent b184104 commit 4d0b4cd
Showing 1 changed file with 14 additions and 23 deletions.
37 changes: 14 additions & 23 deletions lib/galaxy/celery/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,28 @@ class GalaxyTaskBeforeStartUserRateLimitPostgres(GalaxyTaskBeforeStartUserRateLi
We take advantage of efficiencies in its dialect.
"""

_update_stmt = (
update(CeleryUserRateLimit)
.where(CeleryUserRateLimit.user_id == bindparam("userid"))
.values(last_scheduled_time=text("greatest(last_scheduled_time + ':interval second', " ":now) "))
.returning(CeleryUserRateLimit.last_scheduled_time)
)
_insert_stmt = (
ps_insert(CeleryUserRateLimit)
.values(user_id=bindparam("userid"), last_scheduled_time=bindparam("now"))
.returning(CeleryUserRateLimit.last_scheduled_time)
)

_upsert_stmt = _insert_stmt.on_conflict_do_update( # type:ignore[attr-defined]
index_elements=["user_id"], set_=dict(last_scheduled_time=bindparam("sched_time"))
)

def calculate_task_start_time( # type: ignore
self, user_id: int, sa_session: galaxy_scoped_session, task_interval_secs: float, now: datetime.datetime
) -> datetime.datetime:
with transaction(sa_session):
result = sa_session.execute(
self._update_stmt, {"userid": user_id, "interval": task_interval_secs, "now": now}
update_stmt = (
update(CeleryUserRateLimit)
.where(CeleryUserRateLimit.user_id == user_id)
.values(last_scheduled_time=text("greatest(last_scheduled_time + ':interval second', " ":now) "))
.returning(CeleryUserRateLimit.last_scheduled_time)
)
if result.rowcount == 0:
result = sa_session.execute(update_stmt, {"interval": task_interval_secs, "now": now}).all()
if not result:
sched_time = now + datetime.timedelta(seconds=task_interval_secs)
result = sa_session.execute(
self._upsert_stmt, {"userid": user_id, "now": now, "sched_time": sched_time}
upsert_stmt = (
ps_insert(CeleryUserRateLimit) # type:ignore[attr-defined]
.values(user_id=user_id, last_scheduled_time=now)
.returning(CeleryUserRateLimit.last_scheduled_time)
.on_conflict_do_update(index_elements=["user_id"], set_=dict(last_scheduled_time=sched_time))
)
for row in result:
return row[0]
result = sa_session.execute(upsert_stmt).all()
sa_session.commit()
return result[0][0]


class GalaxyTaskBeforeStartUserRateLimitStandard(GalaxyTaskBeforeStartUserRateLimit):
Expand Down

0 comments on commit 4d0b4cd

Please sign in to comment.