Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(request-queue): Update request queue locking docs #1322

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 79 additions & 28 deletions sources/platform/storage/request_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,20 @@ You can lock a request so that no other clients receive it when they fetch the q
This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)).
If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking).

In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request.
In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs.
drobnikj marked this conversation as resolved.
Show resolved Hide resolved

:::info
The lock mechanism works on the client level, as well as the run level, when running the Actor on the Apify platform.

This means you can unlock or prolong the lock the locked request only if:

1. You are using the same client key, or
2. The operation is being called from the same Actor run.
drobnikj marked this conversation as resolved.
Show resolved Hide resolved

:::

<Tabs groupId="main">
<TabItem value="Actor 1" label="Actor 1">

```js
import { Actor, ApifyClient } from 'apify';
Expand All @@ -422,15 +435,12 @@ const client = new ApifyClient({
const requestQueue = await client.requestQueues().getOrCreate('example-queue');

// Creates two clients with different keys for the same request queue.
const requestQueueClientOne = client.requestQueue(requestQueue.id, {
const requestQueueClient = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueueone',
});
const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Adds multiple requests to the queue.
await requestQueueClientOne.batchAddRequests([
await requestQueueClient.batchAddRequests([
{
url: 'http://example.com/foo',
uniqueKey: 'http://example.com/foo',
Expand All @@ -454,53 +464,94 @@ await requestQueueClientOne.batchAddRequests([
]);

// Locks the first two requests at the head of the queue.
const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead(
const processingRequestsClientOne = await requestQueueClient.listAndLockHead(
{
limit: 2,
lockSecs: 60,
lockSecs: 120,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const lockedRequest = processingRequestsClientOne.items[0];
const lockedRequestDetail = await requestQueueClient.getRequest(
lockedRequest.id,
);
console.log(`Request locked until ${lockedRequestDetail?.lockExpiresAt}`);

// Prolongs the lock of the first request or unlocks it.
await requestQueueClient.prolongRequestLock(
lockedRequest.id,
{ lockSecs: 120 },
);
await requestQueueClient.deleteRequestLock(
lockedRequest.id,
);

await Actor.exit();
```

</TabItem>
<TabItem value="Actor 2" label="Actor 2">

```js
import { Actor, ApifyClient } from 'apify';

await Actor.init();

const client = new ApifyClient({
token: 'MY-APIFY-TOKEN',
});

// Waits for the first Actor to lock the requests.
await new Promise((resolve) => setTimeout(resolve, 5000));

// Get the same request queue in different Actor run and with a different client key.
const requestQueue = await client.requestQueues().getOrCreate('example-queue');

const requestQueueClient = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Get all requests from the queue and check one locked by the first Actor.
const requests = await requestQueueClient.listRequests();
const requestsLockedByAnotherRun = requests.items.filter((request) => request.lockByClient === 'requestqueueone');
const requestLockedByAnotherRun = await requestQueueClient.getRequest(
requestsLockedByAnotherRun[0].id,
);

// Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue.
const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead(
const processingRequestsClientTwo = await requestQueueClient.listAndLockHead(
{
limit: 2,
limit: 10,
lockSecs: 60,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
theFirstRequestLockedByClientOne.id,
const wasBothRunsLockedSameRequest = !!processingRequestsClientTwo.items.find(
(request) => request.id === requestLockedByAnotherRun.id,
);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);

console.log(`Was the request locked by the first run locked by the second run? ${wasBothRunsLockedSameRequest}`);
console.log(`Request locked until ${requestLockedByAnotherRun?.lockExpiresAt}`);

// Other clients cannot modify the lock; attempting to do so will throw an error.
try {
await requestQueueClientTwo.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
await requestQueueClient.prolongRequestLock(
requestLockedByAnotherRun.id,
{ lockSecs: 60 },
);
} catch (err) {
// This will throw an error.
}

// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 60 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);

// Cleans up the queue.
await requestQueueClientOne.delete();
await requestQueueClient.delete();

await Actor.exit();
```

</TabItem>
</Tabs>

A detailed tutorial on how to process one request queue with multiple Actor runs can be found in [Academy tutorials](https://docs.apify.com/academy/node-js/multiple-runs-scrape).

## Sharing
Expand Down
Loading