From a86fc989115e1f69a32de4721063cc64630d3a9f Mon Sep 17 00:00:00 2001 From: Patrick Beza Date: Thu, 28 Nov 2024 12:13:51 +0100 Subject: [PATCH] fix(tee): fix race condition in batch locking After scaling zksync-tee-prover to two instances/replicas on Azure for azure-mainnet2, azure-testnet2, and azure-mainnet2, we started experiencing duplicated proving for some batches. This was due to a race condition in batch locking. This PR fixes the issue by adding atomic batch locking. --- ...34096d9357e229fc86fa82ace3a4ec32406b7.json | 26 +++++ ...82b0fa233913582fe9091cc1e8954dfd0eb1b.json | 30 ++++++ ...8203a62629904bc4956249e690a8ad7a48983.json | 32 ------ core/lib/dal/src/models/storage_tee_proof.rs | 5 + core/lib/dal/src/tee_proof_generation_dal.rs | 99 +++++++++++-------- 5 files changed, 121 insertions(+), 71 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-32785942bbbcc7e8212c8e401ab34096d9357e229fc86fa82ace3a4ec32406b7.json create mode 100644 core/lib/dal/.sqlx/query-4777de5d3f313f1eb8c3b6a4c1782b0fa233913582fe9091cc1e8954dfd0eb1b.json delete mode 100644 core/lib/dal/.sqlx/query-e46c99b23db91800b27c717100f8203a62629904bc4956249e690a8ad7a48983.json diff --git a/core/lib/dal/.sqlx/query-32785942bbbcc7e8212c8e401ab34096d9357e229fc86fa82ace3a4ec32406b7.json b/core/lib/dal/.sqlx/query-32785942bbbcc7e8212c8e401ab34096d9357e229fc86fa82ace3a4ec32406b7.json new file mode 100644 index 000000000000..7491c656225b --- /dev/null +++ b/core/lib/dal/.sqlx/query-32785942bbbcc7e8212c8e401ab34096d9357e229fc86fa82ace3a4ec32406b7.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n p.l1_batch_number\n FROM\n proof_generation_details p\n LEFT JOIN\n tee_proof_generation_details tee\n ON\n p.l1_batch_number = tee.l1_batch_number\n AND tee.tee_type = $1\n WHERE\n (\n p.l1_batch_number >= $5\n AND p.vm_run_data_blob_url IS NOT NULL\n AND p.proof_gen_data_blob_url IS NOT NULL\n )\n AND (\n tee.l1_batch_number IS NULL\n OR (\n (tee.status = $2 OR tee.status = $3)\n AND tee.prover_taken_at < NOW() - $4::INTERVAL\n )\n )\n FETCH FIRST ROW ONLY\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "l1_batch_number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Interval", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "32785942bbbcc7e8212c8e401ab34096d9357e229fc86fa82ace3a4ec32406b7" +} diff --git a/core/lib/dal/.sqlx/query-4777de5d3f313f1eb8c3b6a4c1782b0fa233913582fe9091cc1e8954dfd0eb1b.json b/core/lib/dal/.sqlx/query-4777de5d3f313f1eb8c3b6a4c1782b0fa233913582fe9091cc1e8954dfd0eb1b.json new file mode 100644 index 000000000000..37adf92582e7 --- /dev/null +++ b/core/lib/dal/.sqlx/query-4777de5d3f313f1eb8c3b6a4c1782b0fa233913582fe9091cc1e8954dfd0eb1b.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n tee_proof_generation_details (\n l1_batch_number, tee_type, status, created_at, updated_at, prover_taken_at\n )\n VALUES\n (\n $1,\n $2,\n $3,\n NOW(),\n NOW(),\n NOW()\n )\n ON CONFLICT (l1_batch_number, tee_type) DO\n UPDATE\n SET\n status = $3,\n updated_at = NOW(),\n prover_taken_at = NOW()\n RETURNING\n l1_batch_number,\n created_at\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "created_at", + "type_info": "Timestamp" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "4777de5d3f313f1eb8c3b6a4c1782b0fa233913582fe9091cc1e8954dfd0eb1b" +} diff --git a/core/lib/dal/.sqlx/query-e46c99b23db91800b27c717100f8203a62629904bc4956249e690a8ad7a48983.json b/core/lib/dal/.sqlx/query-e46c99b23db91800b27c717100f8203a62629904bc4956249e690a8ad7a48983.json deleted file mode 100644 index 7ca2c9e7e9fa..000000000000 --- a/core/lib/dal/.sqlx/query-e46c99b23db91800b27c717100f8203a62629904bc4956249e690a8ad7a48983.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH upsert AS (\n SELECT\n p.l1_batch_number\n FROM\n proof_generation_details p\n LEFT JOIN\n tee_proof_generation_details tee\n ON\n p.l1_batch_number = tee.l1_batch_number\n AND tee.tee_type = $1\n WHERE\n (\n p.l1_batch_number >= $5\n AND p.vm_run_data_blob_url IS NOT NULL\n AND p.proof_gen_data_blob_url IS NOT NULL\n )\n AND (\n tee.l1_batch_number IS NULL\n OR (\n (tee.status = $2 OR tee.status = $3)\n AND tee.prover_taken_at < NOW() - $4::INTERVAL\n )\n )\n FETCH FIRST ROW ONLY\n )\n \n INSERT INTO\n tee_proof_generation_details (\n l1_batch_number, tee_type, status, created_at, updated_at, prover_taken_at\n )\n SELECT\n l1_batch_number,\n $1,\n $2,\n NOW(),\n NOW(),\n NOW()\n FROM\n upsert\n ON CONFLICT (l1_batch_number, tee_type) DO\n UPDATE\n SET\n status = $2,\n updated_at = NOW(),\n prover_taken_at = NOW()\n RETURNING\n l1_batch_number,\n created_at\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "l1_batch_number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "created_at", - "type_info": "Timestamp" - } - ], - "parameters": { - "Left": [ - "Text", - "Text", - "Text", - "Interval", - "Int8" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "e46c99b23db91800b27c717100f8203a62629904bc4956249e690a8ad7a48983" -} diff --git a/core/lib/dal/src/models/storage_tee_proof.rs b/core/lib/dal/src/models/storage_tee_proof.rs index 6e031674b585..8ab9517cefa9 100644 --- a/core/lib/dal/src/models/storage_tee_proof.rs +++ b/core/lib/dal/src/models/storage_tee_proof.rs @@ -13,6 +13,11 @@ pub struct StorageTeeProof { pub attestation: Option>, } +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct StorageBatch { + pub l1_batch_number: i64, +} + #[derive(Debug, Clone, sqlx::FromRow)] pub struct StorageLockedBatch { pub l1_batch_number: i64, diff --git a/core/lib/dal/src/tee_proof_generation_dal.rs b/core/lib/dal/src/tee_proof_generation_dal.rs index e6b2df974b26..16cc321c080b 100644 --- a/core/lib/dal/src/tee_proof_generation_dal.rs +++ b/core/lib/dal/src/tee_proof_generation_dal.rs @@ -13,7 +13,7 @@ use zksync_db_connection::{ use zksync_types::{tee_types::TeeType, L1BatchNumber}; use crate::{ - models::storage_tee_proof::{StorageLockedBatch, StorageTeeProof}, + models::storage_tee_proof::{StorageBatch, StorageLockedBatch, StorageTeeProof}, Core, }; @@ -64,72 +64,93 @@ impl TeeProofGenerationDal<'_, '_> { ) -> DalResult> { let processing_timeout = pg_interval_from_duration(processing_timeout); let min_batch_number = i64::from(min_batch_number.0); + let mut transaction = self.storage.start_transaction().await?; + let batch_number = sqlx::query_as!( + StorageBatch, + r#" + SELECT + p.l1_batch_number + FROM + proof_generation_details p + LEFT JOIN + tee_proof_generation_details tee + ON + p.l1_batch_number = tee.l1_batch_number + AND tee.tee_type = $1 + WHERE + ( + p.l1_batch_number >= $5 + AND p.vm_run_data_blob_url IS NOT NULL + AND p.proof_gen_data_blob_url IS NOT NULL + ) + AND ( + tee.l1_batch_number IS NULL + OR ( + (tee.status = $2 OR tee.status = $3) + AND tee.prover_taken_at < NOW() - $4::INTERVAL + ) + ) + FETCH FIRST ROW ONLY + "#, + tee_type.to_string(), + TeeProofGenerationJobStatus::PickedByProver.to_string(), + TeeProofGenerationJobStatus::Failed.to_string(), + processing_timeout, + min_batch_number + ) + .instrument("lock_batch_for_proving#get_batch_no") + .with_arg("tee_type", &tee_type) + .with_arg("processing_timeout", &processing_timeout) + .with_arg("min_batch_number", &min_batch_number) + .fetch_optional(&mut transaction) + .await?; + + let batch_number = match batch_number { + Some(batch) => batch.l1_batch_number, + None => { + transaction.commit().await?; + return Ok(None); + } + }; + let locked_batch = sqlx::query_as!( StorageLockedBatch, r#" - WITH upsert AS ( - SELECT - p.l1_batch_number - FROM - proof_generation_details p - LEFT JOIN - tee_proof_generation_details tee - ON - p.l1_batch_number = tee.l1_batch_number - AND tee.tee_type = $1 - WHERE - ( - p.l1_batch_number >= $5 - AND p.vm_run_data_blob_url IS NOT NULL - AND p.proof_gen_data_blob_url IS NOT NULL - ) - AND ( - tee.l1_batch_number IS NULL - OR ( - (tee.status = $2 OR tee.status = $3) - AND tee.prover_taken_at < NOW() - $4::INTERVAL - ) - ) - FETCH FIRST ROW ONLY - ) - INSERT INTO tee_proof_generation_details ( l1_batch_number, tee_type, status, created_at, updated_at, prover_taken_at ) - SELECT - l1_batch_number, + VALUES + ( $1, $2, + $3, NOW(), NOW(), NOW() - FROM - upsert + ) ON CONFLICT (l1_batch_number, tee_type) DO UPDATE SET - status = $2, + status = $3, updated_at = NOW(), prover_taken_at = NOW() RETURNING l1_batch_number, created_at "#, + batch_number, tee_type.to_string(), TeeProofGenerationJobStatus::PickedByProver.to_string(), - TeeProofGenerationJobStatus::Failed.to_string(), - processing_timeout, - min_batch_number ) - .instrument("lock_batch_for_proving") + .instrument("lock_batch_for_proving#insert") + .with_arg("batch_number", &batch_number) .with_arg("tee_type", &tee_type) - .with_arg("processing_timeout", &processing_timeout) - .with_arg("l1_batch_number", &min_batch_number) - .fetch_optional(self.storage) + .fetch_optional(&mut transaction) .await? .map(Into::into); + transaction.commit().await?; Ok(locked_batch) }