Skip to content

Commit

Permalink
fix sqlite deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Nov 21, 2024
1 parent a99a750 commit f1a32d1
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions packages/sql/src/SqlEventLogServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ export const makeStorage = (options?: {
pg: () =>
sql`CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} (
remote_id BYTEA PRIMARY KEY
)`.withoutTransform,
)`.unprepared,
mysql: () =>
sql`
CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} (
remote_id BINARY(16) PRIMARY KEY
)`.withoutTransform,
)`.unprepared,
mssql: () =>
sql`
CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} (
remote_id VARBINARY(16) PRIMARY KEY
)`.withoutTransform,
)`.unprepared,
orElse: () =>
sql`
CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} (
remote_id BLOB PRIMARY KEY
)`.withoutTransform
)`.unprepared
})
const remoteId = yield* sql<{ remote_id: Uint8Array }>`SELECT remote_id FROM ${sql(remoteIdTable)}`.pipe(
Effect.flatMap((results) => {
Expand Down Expand Up @@ -108,6 +108,7 @@ export const makeStorage = (options?: {
encrypted_entry BLOB NOT NULL
)`.withoutTransform
})

const pubsub = yield* Effect.acquireRelease(
PubSub.unbounded<EncryptedRemoteEntry>(),
PubSub.shutdown
Expand Down Expand Up @@ -138,15 +139,14 @@ export const makeStorage = (options?: {
encrypted_entry: entry.encryptedEntry
})
}
yield* sql`INSERT INTO ${sql(table)} ${sql.insert(forInsert)} ON CONFLICT DO NOTHING`.withoutTransform
const encryptedEntries = yield* sql`SELECT * FROM ${sql(table)} WHERE ${
sql.in("entry_id", ids)
} ORDER BY sequence ASC`.pipe(
Effect.flatMap(decodeEntries)
)
const encryptedEntries = yield* sql`INSERT INTO ${sql(table)} ${sql.insert(forInsert)} ON CONFLICT DO NOTHING`
.withoutTransform.pipe(
Effect.zipRight(sql`SELECT * FROM ${sql(table)} WHERE ${sql.in("entry_id", ids)} ORDER BY sequence ASC`),
Effect.flatMap(decodeEntries),
sql.withTransaction
)
yield* pubsub.offerAll(encryptedEntries)
}).pipe(
sql.withTransaction,
Effect.retry({ times: 3 }),
Effect.orDie,
Effect.scoped
Expand Down

0 comments on commit f1a32d1

Please sign in to comment.