diff --git a/packages/sql/src/SqlEventLogServer.ts b/packages/sql/src/SqlEventLogServer.ts index 1f90c823200..ff58c776083 100644 --- a/packages/sql/src/SqlEventLogServer.ts +++ b/packages/sql/src/SqlEventLogServer.ts @@ -38,22 +38,22 @@ export const makeStorage = (options?: { pg: () => sql`CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} ( remote_id BYTEA PRIMARY KEY - )`.withoutTransform, + )`.unprepared, mysql: () => sql` CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} ( remote_id BINARY(16) PRIMARY KEY - )`.withoutTransform, + )`.unprepared, mssql: () => sql` CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} ( remote_id VARBINARY(16) PRIMARY KEY - )`.withoutTransform, + )`.unprepared, orElse: () => sql` CREATE TABLE IF NOT EXISTS ${sql(remoteIdTable)} ( remote_id BLOB PRIMARY KEY - )`.withoutTransform + )`.unprepared }) const remoteId = yield* sql<{ remote_id: Uint8Array }>`SELECT remote_id FROM ${sql(remoteIdTable)}`.pipe( Effect.flatMap((results) => { @@ -108,6 +108,7 @@ export const makeStorage = (options?: { encrypted_entry BLOB NOT NULL )`.withoutTransform }) + const pubsub = yield* Effect.acquireRelease( PubSub.unbounded(), PubSub.shutdown @@ -138,15 +139,14 @@ export const makeStorage = (options?: { encrypted_entry: entry.encryptedEntry }) } - yield* sql`INSERT INTO ${sql(table)} ${sql.insert(forInsert)} ON CONFLICT DO NOTHING`.withoutTransform - const encryptedEntries = yield* sql`SELECT * FROM ${sql(table)} WHERE ${ - sql.in("entry_id", ids) - } ORDER BY sequence ASC`.pipe( - Effect.flatMap(decodeEntries) - ) + const encryptedEntries = yield* sql`INSERT INTO ${sql(table)} ${sql.insert(forInsert)} ON CONFLICT DO NOTHING` + .withoutTransform.pipe( + Effect.zipRight(sql`SELECT * FROM ${sql(table)} WHERE ${sql.in("entry_id", ids)} ORDER BY sequence ASC`), + Effect.flatMap(decodeEntries), + sql.withTransaction + ) yield* pubsub.offerAll(encryptedEntries) }).pipe( - sql.withTransaction, Effect.retry({ times: 3 }), Effect.orDie, Effect.scoped