From df52238e28271549ed5d1906293700b003eafa95 Mon Sep 17 00:00:00 2001 From: drobnikj Date: Fri, 6 Dec 2024 12:15:02 +0100 Subject: [PATCH 1/7] fix(request-queue): Update rq locking docs --- sources/platform/storage/request_queue.md | 84 +++++++++++++++++------ 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 4113e92a4..5d6dd9999 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -407,7 +407,10 @@ You can lock a request so that no other clients receive it when they fetch the q This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)). If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking). -In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request. +In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs. + + + ```js import { Actor, ApifyClient } from 'apify'; @@ -425,9 +428,6 @@ const requestQueue = await client.requestQueues().getOrCreate('example-queue'); const requestQueueClientOne = client.requestQueue(requestQueue.id, { clientKey: 'requestqueueone', }); -const requestQueueClientTwo = client.requestQueue(requestQueue.id, { - clientKey: 'requestqueuetwo', -}); // Adds multiple requests to the queue. await requestQueueClientOne.batchAddRequests([ @@ -457,23 +457,71 @@ await requestQueueClientOne.batchAddRequests([ const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead( { limit: 2, - lockSecs: 60, + lockSecs: 120, }, ); +// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute. +const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0]; +const requestLockedByClientOne = await requestQueueClientOne.getRequest( + theFirstRequestLockedByClientOne.id, +); +console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`); + +// Prolongs the lock of the first request or unlocks it. +await requestQueueClientOne.prolongRequestLock( + theFirstRequestLockedByClientOne.id, + { lockSecs: 120 }, +); +await requestQueueClientOne.deleteRequestLock( + theFirstRequestLockedByClientOne.id, +); + +// Cleans up the queue. +await requestQueueClientOne.delete(); + +await Actor.exit(); +``` + + + + +```js +import { Actor, ApifyClient } from 'apify'; + +await Actor.init(); + +const client = new ApifyClient({ + token: 'MY-APIFY-TOKEN', +}); + +// Waits for the first Actor to lock the requests. +await new Promise((resolve) => setTimeout(resolve, 5000)); + +// Creates a new request queue. +const requestQueue = await client.requestQueues().getOrCreate('example-queue'); + +const requestQueueClientTwo = client.requestQueue(requestQueue.id, { + clientKey: 'requestqueuetwo', +}); + +// Get all requests from the queue and check one locked by the first Actor. +const requests = await requestQueueClientTwo.listRequests(); +const requestLockedByClientOne = requests.items.filter((request) => request.lockedByClientKey === 'requestqueueone'); +const theFirstRequestLockedByClientOne = requestLockedByClientOne[0]; + // Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue. const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead( { - limit: 2, + limit: 10, lockSecs: 60, }, ); - -// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute. -const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0]; -const requestLockedByClientOne = await requestQueueClientOne.getRequest( - theFirstRequestLockedByClientOne.id, +const wasTheClientTwoLockedSameRequest = !!processingRequestsClientTwo.items.find( + (request) => request.id === theFirstRequestLockedByClientOne.id, ); + +console.log(`Was the request locked by the first client locked by the second client? ${wasTheClientTwoLockedSameRequest}`); console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`); // Other clients cannot modify the lock; attempting to do so will throw an error. @@ -486,21 +534,13 @@ try { // This will throw an error. } -// Prolongs the lock of the first request or unlocks it. -await requestQueueClientOne.prolongRequestLock( - theFirstRequestLockedByClientOne.id, - { lockSecs: 60 }, -); -await requestQueueClientOne.deleteRequestLock( - theFirstRequestLockedByClientOne.id, -); - -// Cleans up the queue. -await requestQueueClientOne.delete(); await Actor.exit(); ``` + + + A detailed tutorial on how to process one request queue with multiple Actor runs can be found in [Academy tutorials](https://docs.apify.com/academy/node-js/multiple-runs-scrape). ## Sharing From dee9669617a83e78a6d569da444a19f6b78be947 Mon Sep 17 00:00:00 2001 From: drobnikj Date: Fri, 6 Dec 2024 14:12:26 +0100 Subject: [PATCH 2/7] fix(request-queue): Stress out the locking mechanism --- sources/platform/storage/request_queue.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 5d6dd9999..527e2effa 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -409,6 +409,15 @@ If the Actor processing the request fails, the lock expires, and the request is In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs. +:::info +The lock mechanism works on the client level, as well as the run level, when running the Actor on the Apify platform. + +This means you can unlock or prolong the lock the locked request only if: + +1. You are using the same client key, or +2. The operation is being called from the same Actor run. +::: + From c00842f1328ee56eb2795d087d3793ecca3ecc76 Mon Sep 17 00:00:00 2001 From: drobnikj Date: Fri, 6 Dec 2024 14:15:42 +0100 Subject: [PATCH 3/7] fix(request-queue): lint fix --- sources/platform/storage/request_queue.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 527e2effa..6010624d7 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -416,6 +416,7 @@ This means you can unlock or prolong the lock the locked request only if: 1. You are using the same client key, or 2. The operation is being called from the same Actor run. + ::: From d6dd1afac219a89fe6d0489fe6eb0422dad9556b Mon Sep 17 00:00:00 2001 From: drobnikj Date: Fri, 6 Dec 2024 14:26:48 +0100 Subject: [PATCH 4/7] fix(request-queue): lint fix --- sources/platform/storage/request_queue.md | 1 - 1 file changed, 1 deletion(-) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 6010624d7..45ea6586a 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -544,7 +544,6 @@ try { // This will throw an error. } - await Actor.exit(); ``` From 540a8a61021fb6af4d4abe382ca8f478111a8289 Mon Sep 17 00:00:00 2001 From: drobnikj Date: Mon, 9 Dec 2024 11:57:37 +0100 Subject: [PATCH 5/7] feat(request-queue): Improve naming --- sources/platform/storage/request_queue.md | 54 ++++++++++++----------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 45ea6586a..2c8a0863c 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -435,12 +435,12 @@ const client = new ApifyClient({ const requestQueue = await client.requestQueues().getOrCreate('example-queue'); // Creates two clients with different keys for the same request queue. -const requestQueueClientOne = client.requestQueue(requestQueue.id, { +const requestQueueClient = client.requestQueue(requestQueue.id, { clientKey: 'requestqueueone', }); // Adds multiple requests to the queue. -await requestQueueClientOne.batchAddRequests([ +await requestQueueClient.batchAddRequests([ { url: 'http://example.com/foo', uniqueKey: 'http://example.com/foo', @@ -464,7 +464,7 @@ await requestQueueClientOne.batchAddRequests([ ]); // Locks the first two requests at the head of the queue. -const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead( +const processingRequestsClientOne = await requestQueueClient.listAndLockHead( { limit: 2, lockSecs: 120, @@ -472,24 +472,21 @@ const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead( ); // Checks when the lock will expire. The locked request will have a lockExpiresAt attribute. -const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0]; -const requestLockedByClientOne = await requestQueueClientOne.getRequest( - theFirstRequestLockedByClientOne.id, +const lockedRequest = processingRequestsClientOne.items[0]; +const lockedRequestDetail = await requestQueueClient.getRequest( + lockedRequest.id, ); -console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`); +console.log(`Request locked until ${lockedRequestDetail?.lockExpiresAt}`); // Prolongs the lock of the first request or unlocks it. -await requestQueueClientOne.prolongRequestLock( - theFirstRequestLockedByClientOne.id, +await requestQueueClient.prolongRequestLock( + lockedRequest.id, { lockSecs: 120 }, ); -await requestQueueClientOne.deleteRequestLock( - theFirstRequestLockedByClientOne.id, +await requestQueueClient.deleteRequestLock( + lockedRequest.id, ); -// Cleans up the queue. -await requestQueueClientOne.delete(); - await Actor.exit(); ``` @@ -508,42 +505,47 @@ const client = new ApifyClient({ // Waits for the first Actor to lock the requests. await new Promise((resolve) => setTimeout(resolve, 5000)); -// Creates a new request queue. +// Get the same request queue in different Actor run and with a different client key. const requestQueue = await client.requestQueues().getOrCreate('example-queue'); -const requestQueueClientTwo = client.requestQueue(requestQueue.id, { +const requestQueueClient = client.requestQueue(requestQueue.id, { clientKey: 'requestqueuetwo', }); // Get all requests from the queue and check one locked by the first Actor. -const requests = await requestQueueClientTwo.listRequests(); -const requestLockedByClientOne = requests.items.filter((request) => request.lockedByClientKey === 'requestqueueone'); -const theFirstRequestLockedByClientOne = requestLockedByClientOne[0]; +const requests = await requestQueueClient.listRequests(); +const requestsLockedByAnotherRun = requests.items.filter((request) => request.lockByClient === 'requestqueueone'); +const requestLockedByAnotherRun = await requestQueueClient.getRequest( + requestsLockedByAnotherRun[0].id, +); // Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue. -const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead( +const processingRequestsClientTwo = await requestQueueClient.listAndLockHead( { limit: 10, lockSecs: 60, }, ); -const wasTheClientTwoLockedSameRequest = !!processingRequestsClientTwo.items.find( - (request) => request.id === theFirstRequestLockedByClientOne.id, +const wasBothRunsLockedSameRequest = !!processingRequestsClientTwo.items.find( + (request) => request.id === requestLockedByAnotherRun.id, ); -console.log(`Was the request locked by the first client locked by the second client? ${wasTheClientTwoLockedSameRequest}`); -console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`); +console.log(`Was the request locked by the first run locked by the second run? ${wasBothRunsLockedSameRequest}`); +console.log(`Request locked until ${requestLockedByAnotherRun?.lockExpiresAt}`); // Other clients cannot modify the lock; attempting to do so will throw an error. try { - await requestQueueClientTwo.prolongRequestLock( - theFirstRequestLockedByClientOne.id, + await requestQueueClient.prolongRequestLock( + requestLockedByAnotherRun.id, { lockSecs: 60 }, ); } catch (err) { // This will throw an error. } +// Cleans up the queue. +await requestQueueClient.delete(); + await Actor.exit(); ``` From 983ad20803515cac7c8c62d0f3d1b7c54237b7c8 Mon Sep 17 00:00:00 2001 From: drobnikj Date: Mon, 9 Dec 2024 11:59:45 +0100 Subject: [PATCH 6/7] feat(request-queue): Improve naming --- sources/platform/storage/request_queue.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index 2c8a0863c..b8b9a82e7 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -515,7 +515,7 @@ const requestQueueClient = client.requestQueue(requestQueue.id, { // Get all requests from the queue and check one locked by the first Actor. const requests = await requestQueueClient.listRequests(); const requestsLockedByAnotherRun = requests.items.filter((request) => request.lockByClient === 'requestqueueone'); -const requestLockedByAnotherRun = await requestQueueClient.getRequest( +const requestLockedByAnotherRunDetail = await requestQueueClient.getRequest( requestsLockedByAnotherRun[0].id, ); @@ -527,16 +527,16 @@ const processingRequestsClientTwo = await requestQueueClient.listAndLockHead( }, ); const wasBothRunsLockedSameRequest = !!processingRequestsClientTwo.items.find( - (request) => request.id === requestLockedByAnotherRun.id, + (request) => request.id === requestLockedByAnotherRunDetail.id, ); console.log(`Was the request locked by the first run locked by the second run? ${wasBothRunsLockedSameRequest}`); -console.log(`Request locked until ${requestLockedByAnotherRun?.lockExpiresAt}`); +console.log(`Request locked until ${requestLockedByAnotherRunDetail?.lockExpiresAt}`); // Other clients cannot modify the lock; attempting to do so will throw an error. try { await requestQueueClient.prolongRequestLock( - requestLockedByAnotherRun.id, + requestLockedByAnotherRunDetail.id, { lockSecs: 60 }, ); } catch (err) { From 8e1f2835c7a3393fb6444c0fd942666f9fa0736d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Drobn=C3=ADk?= Date: Tue, 10 Dec 2024 10:36:58 +0100 Subject: [PATCH 7/7] fix: PR fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: MichaƂ Olender <92638966+TC-MO@users.noreply.github.com> --- sources/platform/storage/request_queue.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sources/platform/storage/request_queue.md b/sources/platform/storage/request_queue.md index b8b9a82e7..f5a75cf28 100644 --- a/sources/platform/storage/request_queue.md +++ b/sources/platform/storage/request_queue.md @@ -407,15 +407,15 @@ You can lock a request so that no other clients receive it when they fetch the q This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)). If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking). -In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs. +In the following example, we demonstrate how you can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs. :::info The lock mechanism works on the client level, as well as the run level, when running the Actor on the Apify platform. This means you can unlock or prolong the lock the locked request only if: -1. You are using the same client key, or -2. The operation is being called from the same Actor run. +- You are using the same client key, or +- The operation is being called from the same Actor run. :::