Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tee): fix race condition in batch locking #3342

Merged
merged 10 commits into from
Dec 3, 2024

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

97 changes: 59 additions & 38 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,72 +64,93 @@ impl TeeProofGenerationDal<'_, '_> {
) -> DalResult<Option<LockedBatch>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let min_batch_number = i64::from(min_batch_number.0);
let mut transaction = self.storage.start_transaction().await?;
let batch_number = sqlx::query!(
r#"
SELECT
p.l1_batch_number
FROM
proof_generation_details p
LEFT JOIN
tee_proof_generation_details tee
ON
p.l1_batch_number = tee.l1_batch_number
AND tee.tee_type = $1
WHERE
(
p.l1_batch_number >= $5
AND p.vm_run_data_blob_url IS NOT NULL
AND p.proof_gen_data_blob_url IS NOT NULL
)
AND (
tee.l1_batch_number IS NULL
OR (
(tee.status = $2 OR tee.status = $3)
AND tee.prover_taken_at < NOW() - $4::INTERVAL
)
)
LIMIT 1
FOR UPDATE OF p
SKIP LOCKED
pbeza marked this conversation as resolved.
Show resolved Hide resolved
"#,
tee_type.to_string(),
TeeProofGenerationJobStatus::PickedByProver.to_string(),
TeeProofGenerationJobStatus::Failed.to_string(),
processing_timeout,
min_batch_number
)
.instrument("lock_batch_for_proving#get_batch_no")
.with_arg("tee_type", &tee_type)
.with_arg("processing_timeout", &processing_timeout)
.with_arg("min_batch_number", &min_batch_number)
.fetch_optional(&mut transaction)
.await?;

let batch_number = match batch_number {
Some(batch) => batch.l1_batch_number,
None => {
return Ok(None);
}
};

let locked_batch = sqlx::query_as!(
StorageLockedBatch,
r#"
WITH upsert AS (
SELECT
p.l1_batch_number
FROM
proof_generation_details p
LEFT JOIN
tee_proof_generation_details tee
ON
p.l1_batch_number = tee.l1_batch_number
AND tee.tee_type = $1
WHERE
(
p.l1_batch_number >= $5
AND p.vm_run_data_blob_url IS NOT NULL
AND p.proof_gen_data_blob_url IS NOT NULL
)
AND (
tee.l1_batch_number IS NULL
OR (
(tee.status = $2 OR tee.status = $3)
AND tee.prover_taken_at < NOW() - $4::INTERVAL
)
)
FETCH FIRST ROW ONLY
)

INSERT INTO
tee_proof_generation_details (
l1_batch_number, tee_type, status, created_at, updated_at, prover_taken_at
)
SELECT
l1_batch_number,
VALUES
(
$1,
$2,
$3,
NOW(),
NOW(),
NOW()
FROM
upsert
)
ON CONFLICT (l1_batch_number, tee_type) DO
UPDATE
SET
status = $2,
status = $3,
updated_at = NOW(),
prover_taken_at = NOW()
RETURNING
l1_batch_number,
created_at
"#,
batch_number,
tee_type.to_string(),
TeeProofGenerationJobStatus::PickedByProver.to_string(),
TeeProofGenerationJobStatus::Failed.to_string(),
processing_timeout,
min_batch_number
)
.instrument("lock_batch_for_proving")
.instrument("lock_batch_for_proving#insert")
.with_arg("batch_number", &batch_number)
.with_arg("tee_type", &tee_type)
.with_arg("processing_timeout", &processing_timeout)
.with_arg("l1_batch_number", &min_batch_number)
.fetch_optional(self.storage)
.fetch_optional(&mut transaction)
.await?
.map(Into::into);

transaction.commit().await?;
Ok(locked_batch)
}

Expand Down
Loading