Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.23.0 #143

Merged
merged 122 commits into from
Oct 8, 2024
Merged

Release/1.23.0 #143

merged 122 commits into from
Oct 8, 2024

Conversation

stchris
Copy link
Contributor

@stchris stchris commented Dec 20, 2023

This introduces several changes to the way servicelayer exposes a RabbitMQ-based implementation for Aleph tasks.

Objects

Task

A data class that contains information about one Aleph task.
Also contains methods to track how many times the task was retried.

task_id: str
job_id: str
delivery_tag: str
operation: str
context: dict
payload: dict
priority: int
collection_id: Optional[str] = None

Dataset

Object which keeps track of the status of currently running tasks by populating Redis. One Dataset instance corresponds to one collection_id. This is also the object which the /status/ API ends up querying to populate its response.

Redis keys used by the Dataset object:

  • tq:qdatasets: set of all collection_ids of active datasets (a dataset is considered active when it has either running or pending tasks)
  • tq:qdj:<dataset>:taskretry:<task_id>: the number of times task_id was retried

All of the following keys refer to task_ids or statistics about tasks per a certain dataset (collection_id):

  • tq:qdj:<dataset>:finished: number of tasks that have been marked as "Done" and for which an acknowledgement is also sent by the Worker over RabbitMQ.
  • tq:qdj:<dataset>:running: set of all task_ids of tasks currently running. A "Running" task is a task which has been checked out, and is being processed by a worker.
  • tq:qdj:<dataset>:pending: set of all task_ids of tasks currently pending. A "Pending" task has been added to a RabbitMQ queue (via a basic_publish call) by a producer (an API call, a UI action etc.).
  • tq:qdj:<dataset>:start: the UTC timestamp when either the first task_id has been added to a RabbitMQ queue (so, we have our first Pending task) or the timestamp when the first task_id has been checked out (so, we have our first Running task). The start key is updated when the first task is handed to a Worker.
  • tq:qdj:<dataset>:last_update: the UTC timestamp from the latest change to the state of tasks running for a certain collection_id. This is set when: a new task is Pending, a new task is Running, a new task is Done, a new task is canceled.
  • tq:qds:<dataset>:<stage>: a set of all task_ids that are either running or pending, for a certain stage.
  • tq:qds:<dataset>:<stage>:finished: number of tasks that have been marked as "Done" for a certain stage.
  • tq:qds:<dataset>:<stage>:running: set of all task_ids of tasks currently running for a certain stage.
  • tq:qds:<dataset>:<stage>:pending: set of all task_ids of tasks currently pending for a certain stage.

Worker

The parent class of all workers used in aleph: the Aleph worker, the ingest-file worker. Handles the consuming of tasks from RabbitMQ queues, and sending acknowledgements when the tasks are completed. The dispatch_task method is implemented in each subsequent child class.

Changes from the initial RabbitMQ implementation

  • implemented priorities for tasks. Each task gets assigned a random priority. The Producer will also reserve a maximum priority for tasks that need to be queued and executed urgently. This maximum priority implementation will exist outside of servicelayer.
  • added Redis keys: tq:qds:<dataset>:<stage> (stage_key), tq:qds:<dataset>:<stage>:finished, tq:qds:<dataset>:<stage>:running, tq:qds:<dataset>:<stage>:pending and added code to get_status to expose a break-down of tasks per stage. The job_id key is set to null since jobs are no longer relevant. The key was preserved in the JSON in order to not introduce breaking changes.
  • get_rabbitmq_connection has been refactored and it will now re-establish a new RabbitMQ connection is the existing one was closed.

Other changes

The status API JSON response has been modified, introducing a breaking change. The jobs key has been removed from the JSON. Now, the JSON contains a total number of running, pending and finished tasks, as well as a break-down of these tasks per stage.

@stchris stchris marked this pull request as ready for review December 20, 2023 16:42
stchris and others added 7 commits December 21, 2023 11:27
…ys open (#148)

* Rework connection creation

* Remove commented code

* Increase logging verbosity around RabbitMQ connections

* Increase log severity for temporary logging messages

* Log RabbitMQ connection re-establishing on debug
@stchris
Copy link
Contributor Author

stchris commented Feb 8, 2024

On the topic of job_id I think this is less important because it is no longer possible to cancel a given job (as in: we can't retract a certain message off the queue). But for the purposes of tracing it could be interesting to have ids on messages and what I'm thinking is to generate a BasicProperties.message_id (perhaps a ULID?).

@catileptic
Copy link
Contributor

@stchris on the topic of the job_id:

  • the job_id is still created, and the Task object even keeps track of it. From what I can see, tasks that create other tasks (in the case of ingesting or crawldir for example), the "parent" task is "inherited" by the "child" tasks (so you can see a "child" Task with a unique task_id but the job_id is the same as the one of their "parents)
  • for tracing purposes, we're covered: the Task has both a unique task_id and a job_id
  • however, the job_id is not visible in the /status/ page anymore [1], and neither are the task_ids
  • we can maybe talk about some logging messages that expose both

[1] - so, previously, in the /status/ JSON answer we had a dictionary per each collection_id with a key called jobs. This contained a list of initiated jobs. Each job had multiple stages. In the RabbitMQ implementation, the key structure of the JSON is the same (but we can still change it, if it better suits us) and each collection_id publishes the total amount of tasks per stage (not per job and then per stage).

stchris and others added 28 commits June 25, 2024 17:07
* Keep rmq channels open

* Account for test queue not being there
This fixes an issue with the metric for failed tasks and also adds test coverage for the metrics.

Follow up to #181
* Reproduce incorrect dataset timestamps

alephdata/aleph#3787

* Remove "end_time" timestamp

We currently do not retain information about inactive datasets (i.e. datasets that do not have any more pending or running tasks). For this reason, the "end_time" timestamp is currently of no use, as it would never be displayed to users anyway.

While there are some plans around providing more detail about the results of processed tasks as well as completed jobs, it is unclear where this data will be stored and what the implementation will look like. As it is easy enough to add this information back (< 10 LOC), I’ve removed it for now.

* Only set `start_time` timestamp if it hasn’t been set yet

* Delete dataset timestamps when all tasks have been processed

* Add tests covering dataset status when cancelling tasks

* Extract flushing status data into separate method

* Ensure time-machine is installed in GitHub Actions

* Delete dead code

* Remove redundant code

`cleanup_dataset_status` iterates over all active datasets and removes status information if it is done (i.e. there are no more pending or running tasks). It is called by the Aleph worker periodically.

We already do this for individual datasets whenever a task from the dataset is done or the dataset is cancelled as a whole. That means that `cleanup_dataset_status` is redundant.

Also see: #190 (comment)

* Use `is_done` helper method

* Fix linter errors

---------

Co-authored-by: catileptic <[email protected]>
* Fix Redis keys (temporary solution)

* Fix unused variable (linter)

* Fix double increment bug, remove deprecated key from test

* Bump version: 1.23.0-rc34 → 1.23.0-rc35

* Execute redis commands at the end of mark_task_for_retry

* Bump version: 1.23.0-rc35 → 1.23.0-rc36

* Remove wrong keys clean-up code

* Fix failing tests
@catileptic catileptic merged commit 267f63d into main Oct 8, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants