From e8972f33c4859f18a85a20a624d0070a05e07734 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Tue, 29 Oct 2024 12:51:29 +0000 Subject: [PATCH 1/8] Failing test Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index d61b6f1..c3db30f 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -331,6 +331,38 @@ describe('importFromMongo', () => { expect(ingredientsNew[0].ingredientText).toBe('test'); }); }); + + it('import from redis should always update product', async () => { + await createTestingModule([DomainModule], async (app) => { + // GIVEN: Product with data that matches MongoDB + const em = app.get(EntityManager); + const lastUpdated = new Date(2023, 1, 1); + const { products, productIdExisting } = testProducts(); + em.create(Product, { + code: productIdExisting, + source: ProductSource.INCREMENTAL_LOAD, + lastUpdated: lastUpdated, + lastModified: new Date(lastModified * 1000), + }); + await em.flush(); + const importService = app.get(ImportService); + + // WHEN: Doing an event import + mockMongoDB(products); + await importService.importWithFilter( + { code: { $in: [productIdExisting] } }, + ProductSource.EVENT, + ); + + // THEN: Source is updated + const productExisting = await em.findOne(Product, { + code: productIdExisting, + }); + expect(productExisting).toBeTruthy(); + expect(productExisting.source).toBe(ProductSource.EVENT); + expect(productExisting.lastUpdated).not.toStrictEqual(lastUpdated); + }); + }); }); describe('scheduledImportFromMongo', () => { From 7cba39ab427897e9bd76bdc81eb30dfbfa02ee2f Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Tue, 29 Oct 2024 12:56:20 +0000 Subject: [PATCH 2/8] Always import updates that come from an event Signed-off-by: John Gomersall --- src/domain/services/import.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index c78fb55..0be7b22 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -143,9 +143,9 @@ export class ImportService { ); lastModified = null; } - // Skip product if nothing has changed and not doing a full load + // Skip product if nothing has changed and doing an incremental load if ( - source !== ProductSource.FULL_LOAD && + source === ProductSource.INCREMENTAL_LOAD && lastModified?.getTime() === previousLastModified?.getTime() ) continue; From ba4bf6462e60e8a2fecddc8a0a85690c8ae887f0 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Tue, 29 Oct 2024 17:19:28 +0000 Subject: [PATCH 3/8] Switch to using last_updated_t from product opener Signed-off-by: John Gomersall --- src/domain/entities/product.ts | 8 ++-- src/domain/services/import.service.spec.ts | 44 +++++++++++----------- src/domain/services/import.service.ts | 38 ++++++++++--------- src/domain/services/settings.service.ts | 6 +-- src/migrations/.snapshot-query.json | 8 ++-- src/migrations/Migration20241029165410.ts | 16 ++++++++ 6 files changed, 71 insertions(+), 49 deletions(-) create mode 100644 src/migrations/Migration20241029165410.ts diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index c68dc53..56b56ed 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -21,7 +21,7 @@ export class Product { code?: string; @Property({ columnType: 'timestamptz' }) - lastModified?: Date; + lastUpdated?: Date; @Property({ index: true }) creator?: string; @@ -42,8 +42,9 @@ export class Product { @Property({ type: 'uuid', index: true }) lastUpdateId?: string; + // This is the last time off-query received the data @Property({ columnType: 'timestamptz' }) - lastUpdated?: Date; + lastProcessed?: Date; @Property() source?: ProductSource; @@ -57,7 +58,8 @@ export const MAPPED_FIELDS = [ 'product_name', 'creator', 'owners_tags', - 'last_modified_t', + 'last_modified_t', // Note we actually use last_updated_t for checks but not all products may have this + 'last_updated_t', 'ingredients_n', 'ingredients_without_ciqual_codes_n', 'ingredients', diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index c3db30f..f2a4af1 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -11,7 +11,7 @@ import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; -const lastModified = 1692032161; +const lastUpdated = 1692032161; function testProducts() { const productIdNew = randomCode(); @@ -20,14 +20,14 @@ function testProducts() { { // This one will be new code: productIdNew, - last_modified_t: lastModified, + last_updated_t: lastUpdated, ingredients_tags: ['test'], rev: 1, }, { // This one will already exist code: productIdExisting, - last_modified_t: lastModified, + last_updated_t: lastUpdated, ingredients_tags: ['new_ingredient', 'old_ingredient'], }, ]; @@ -125,7 +125,7 @@ describe('importFromMongo', () => { expect(productNew).toBeTruthy(); expect(productNew.lastUpdateId).toBe(updateId); expect(productNew.source).toBe(ProductSource.FULL_LOAD); - expect(productNew.lastUpdated.getTime()).toBeGreaterThanOrEqual(start); + expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start); const ingredientsNew = await em.find(ProductIngredientsTag, { product: productNew, }); @@ -186,13 +186,13 @@ describe('importFromMongo', () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Product with data that matches MongoDB const em = app.get(EntityManager); - const lastUpdated = new Date(2023, 1, 1); + const lastProcessed = new Date(2023, 1, 1); const { products, productIdExisting } = testProducts(); em.create(Product, { code: productIdExisting, source: ProductSource.EVENT, - lastUpdated: lastUpdated, - lastModified: new Date(lastModified * 1000), + lastProcessed: lastProcessed, + lastUpdated: new Date(lastUpdated * 1000), }); await em.flush(); const importService = app.get(ImportService); @@ -207,13 +207,13 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastUpdated).toStrictEqual(lastUpdated); + expect(productExisting.lastProcessed).toStrictEqual(lastProcessed); }); }); it('should start importing from the last import', async () => { await createTestingModule([DomainModule], async (app) => { - // GIVEN: lastModified setting already set + // GIVEN: lastUpdated setting already set const settings = app.get(SettingsService); const startFrom = new Date(2023, 1, 1); await settings.setLastModified(startFrom); @@ -227,12 +227,12 @@ describe('importFromMongo', () => { // THEN: Mongo find is called with the setting as a parameter expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts - expect(findCalls[0][0].last_modified_t.$gt).toBe( + expect(findCalls[0][0].last_updated_t.$gt).toBe( Math.floor(startFrom.getTime() / 1000), ); expect(await settings.getLastModified()).toStrictEqual( - new Date(lastModified * 1000), + new Date(lastUpdated * 1000), ); }); }); @@ -245,7 +245,7 @@ describe('importFromMongo', () => { { // This one will be new code: productIdNew, - last_modified_t: 1692032161, + last_updated_t: 1692032161, ingredients_tags: ['test \u0000 test2 \u0000 end'], }, ]); @@ -263,7 +263,7 @@ describe('importFromMongo', () => { }); }); - it('should set last_modified correctly if one product has an invalid date', async () => { + it('should set last_updated correctly if one product has an invalid date', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: products with invalid date const settings = app.get(SettingsService); @@ -272,7 +272,7 @@ describe('importFromMongo', () => { const { products } = testProducts(); const testData = [ products[0], - { ...products[1], last_modified_t: 'invalid' }, + { ...products[1], last_updated_t: 'invalid' }, ]; const importService = app.get(ImportService); @@ -282,7 +282,7 @@ describe('importFromMongo', () => { // THEN: The last modified date is set correctly expect(await settings.getLastModified()).toStrictEqual( - new Date(lastModified * 1000), + new Date(lastUpdated * 1000), ); }); }); @@ -336,13 +336,13 @@ describe('importFromMongo', () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Product with data that matches MongoDB const em = app.get(EntityManager); - const lastUpdated = new Date(2023, 1, 1); + const lastProcessed = new Date(2023, 1, 1); const { products, productIdExisting } = testProducts(); em.create(Product, { code: productIdExisting, source: ProductSource.INCREMENTAL_LOAD, - lastUpdated: lastUpdated, - lastModified: new Date(lastModified * 1000), + lastProcessed: lastProcessed, + lastUpdated: new Date(lastUpdated * 1000), }); await em.flush(); const importService = app.get(ImportService); @@ -360,7 +360,7 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastUpdated).not.toStrictEqual(lastUpdated); + expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed); }); }); }); @@ -439,8 +439,8 @@ describe('importWithFilter', () => { const productToDelete = em.create(Product, { code: productIdToDelete, source: ProductSource.FULL_LOAD, - lastUpdated: new Date(2023, 1, 1), - lastModified: new Date(lastModified * 1000), + lastProcessed: new Date(2023, 1, 1), + lastUpdated: new Date(lastUpdated * 1000), }); em.create(ProductIngredientsTag, { product: productToDelete, @@ -465,7 +465,7 @@ describe('importWithFilter', () => { code: productIdExisting, }); expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId); - expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual( + expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual( beforeImport, ); expect(deletedProduct.source).toBe(ProductSource.EVENT); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 0be7b22..b36df0a 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -49,7 +49,7 @@ export class ImportService { const filter = {}; if (from) { const fromTime = Math.floor(new Date(from).getTime() / 1000); - filter['last_modified_t'] = { $gt: fromTime }; + filter['last_updated_t'] = { $gt: fromTime }; this.logger.debug(`Starting import from ${from}`); } @@ -63,7 +63,7 @@ export class ImportService { } async importWithFilter(filter: any, source: ProductSource, skip?: number) { - let latestModified = 0; + let latestUpdated = 0; // The update id is unique to this run and is used later to run other // queries that should only affect products loaded in this import @@ -104,7 +104,7 @@ export class ImportService { // Now using postgres to help with transactions const connection = await sql.reserve(); - await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_modified timestamptz, data jsonb)`; + await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_updated timestamptz, data jsonb)`; // let sql: string; // const vars = []; for (const collection of Object.values(collections)) { @@ -128,25 +128,29 @@ export class ImportService { // Find the product if it exists let results = - await connection`select id, last_modified from product where code = ${data.code}`; + await connection`select id, last_updated from product where code = ${data.code}`; if (!results.length) { results = await connection`insert into product (code) values (${data.code}) returning id`; } const id = results[0].id; - const previousLastModified = results[0].last_modified; + const previousLastUpdated = results[0].last_updated; - let lastModified = new Date(data.last_modified_t * 1000); - if (isNaN(+lastModified)) { + let lastUpdated = new Date(data.last_updated_t * 1000); + if (isNaN(+lastUpdated)) { + // Fall back to last_modified_t if last_updated_t is not available + lastUpdated = new Date(data.last_modified_t * 1000); + } + if (isNaN(+lastUpdated)) { this.logger.warn( - `Product: ${data.code}. Invalid last_modified_t: ${data.last_modified_t}.`, + `Product: ${data.code}. Invalid last_updated_t: ${data.last_updated_t}, or last_modified_t: ${data.last_modified_t}.`, ); - lastModified = null; + lastUpdated = null; } // Skip product if nothing has changed and doing an incremental load if ( source === ProductSource.INCREMENTAL_LOAD && - lastModified?.getTime() === previousLastModified?.getTime() + lastUpdated?.getTime() === previousLastUpdated?.getTime() ) continue; @@ -170,11 +174,11 @@ export class ImportService { } results = - await connection`insert into product_temp (id, last_modified, data) values (${id}, ${lastModified}, ${ + await connection`insert into product_temp (id, last_updated, data) values (${id}, ${lastUpdated}, ${ data as unknown as SerializableParameter }) ON CONFLICT DO NOTHING`; - latestModified = Math.max(latestModified, lastModified?.getTime() ?? 0); + latestUpdated = Math.max(latestUpdated, lastUpdated?.getTime() ?? 0); if (!(i % this.importBatchSize)) { await this.applyProductChange(connection, obsolete, source, updateId); @@ -197,7 +201,7 @@ export class ImportService { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, last_update_id = ${updateId}, - last_updated = ${new Date()}, + last_processed = ${new Date()}, source = ${source} WHERE code IN ${sql(missingProducts)} RETURNING id`; @@ -225,7 +229,7 @@ export class ImportService { `Imported ${collections.normal.count} Products and ${collections.obsolete.count} Obsolete Products from ${source}${deleteLog}`, ); - return latestModified; + return latestUpdated; } async applyProductChange( @@ -246,9 +250,9 @@ export class ImportService { obsolete = ${obsolete}, ingredients_count = (tp.data->>'ingredients_n')::numeric, ingredients_without_ciqual_codes_count = (tp.data->>'ingredients_without_ciqual_codes_n')::numeric, - last_modified = tp.last_modified, + last_updated = tp.last_updated, last_update_id = ${updateId}, - last_updated = ${new Date()}, + last_processed = ${new Date()}, source = ${source}, revision = (tp.data->>'rev')::int FROM product_temp tp @@ -361,7 +365,7 @@ export class ImportService { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, last_update_id = ${updateId}, - last_updated = ${new Date()}, + last_processed = ${new Date()}, source = ${ProductSource.FULL_LOAD} WHERE last_update_id != ${updateId} OR last_update_id IS NULL RETURNING id`; diff --git a/src/domain/services/settings.service.ts b/src/domain/services/settings.service.ts index 6a24e83..0a1b1fa 100644 --- a/src/domain/services/settings.service.ts +++ b/src/domain/services/settings.service.ts @@ -11,11 +11,11 @@ export class SettingsService { } async getLastModified() { - return (await sql`SELECT last_modified FROM settings`)[0].last_modified; + return (await sql`SELECT last_updated FROM settings`)[0].last_updated; } - async setLastModified(lastModified: Date) { - await this.updateSetting({ last_modified: lastModified }); + async setLastModified(lastUpdated: Date) { + await this.updateSetting({ last_updated: lastUpdated }); } async getLastMessageId() { diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index 67a63df..cfcdde5 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -61,8 +61,8 @@ "nullable": true, "mappedType": "text" }, - "last_modified": { - "name": "last_modified", + "last_updated": { + "name": "last_updated", "type": "timestamptz", "unsigned": false, "autoincrement": false, @@ -126,8 +126,8 @@ "nullable": true, "mappedType": "uuid" }, - "last_updated": { - "name": "last_updated", + "last_processed": { + "name": "last_processed", "type": "timestamptz", "unsigned": false, "autoincrement": false, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts new file mode 100644 index 0000000..5f71c1c --- /dev/null +++ b/src/migrations/Migration20241029165410.ts @@ -0,0 +1,16 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20241029165410 extends Migration { + + async up(): Promise { + this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); + this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); + this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); + } + + async down(): Promise { + this.addSql('alter table "query"."settings" rename column "last_updated" to "last_modified";'); + this.addSql('alter table "query"."product" rename column "last_updated" to "last_modified";'); + this.addSql('alter table "query"."product" rename column "last_processed" to "last_updated";'); + } +} From 2c87a19029e55fa28000a69f6d82c02dd8c45cba Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 13:25:58 +0000 Subject: [PATCH 4/8] Switch to using transaction id for update_id so we can avoid deleting data from later transactions on a full import Signed-off-by: John Gomersall --- src/domain/entities/product.ts | 4 +- src/domain/services/import.service.spec.ts | 41 ++++++++++------ src/domain/services/import.service.ts | 57 ++++++++++++++-------- src/migrations/.snapshot-query.json | 12 ++--- src/migrations/Migration20241029165410.ts | 8 +++ tsconfig.json | 2 +- 6 files changed, 78 insertions(+), 46 deletions(-) diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index 56b56ed..4058b5e 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -39,8 +39,8 @@ export class Product { @Property() obsolete? = false; - @Property({ type: 'uuid', index: true }) - lastUpdateId?: string; + @Property({ columnType: 'xid8', index: true }) + processId?: bigint; // This is the last time off-query received the data @Property({ columnType: 'timestamptz' }) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index f2a4af1..9f4402d 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -10,6 +10,7 @@ import { ProductTagMap } from '../entities/product-tag-map'; import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; +import sql from '../../db'; const lastUpdated = 1692032161; @@ -83,7 +84,11 @@ describe('importFromMongo', () => { // app.useLogger(new Logger()); const importService = app.get(ImportService); - const deleteMock = (importService.deleteOtherProducts = jest.fn()); + // Mock the process id so it doesn't delete records from other tests + let currentProcessId = 99999999999999999n; + importService.getProcessId = jest + .fn() + .mockImplementation(() => ++currentProcessId); // GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo const em = app.get(EntityManager); @@ -111,19 +116,9 @@ describe('importFromMongo', () => { await importService.importFromMongo(); // THEN: New product is added, updated product is updated and other product is unchanged - expect(deleteMock).toHaveBeenCalledTimes(1); - let updateId = deleteMock.mock.calls[0][1]; - // Re-format updateId the way Postgres provides it - updateId = `${updateId.substring(0, 8)}-${updateId.substring( - 8, - 12, - )}-${updateId.substring(12, 16)}-${updateId.substring( - 16, - 20, - )}-${updateId.substring(20)}`.toLowerCase(); const productNew = await em.findOne(Product, { code: productIdNew }); expect(productNew).toBeTruthy(); - expect(productNew.lastUpdateId).toBe(updateId); + expect(productNew.processId).toBe(currentProcessId.toString()); expect(productNew.source).toBe(ProductSource.FULL_LOAD); expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start); const ingredientsNew = await em.find(ProductIngredientsTag, { @@ -144,12 +139,15 @@ describe('importFromMongo', () => { ingredientsExisting.find((i) => i.value === 'new_ingredient'), ).toBeTruthy(); - // We have mocked the delete of other products so just check the other product - // does not have the same update id as those imported + // Check unchanged product has been "deleted" const foundOldProduct = await em.findOne(Product, { code: productIdUnchanged, }); - expect(foundOldProduct.lastUpdateId).not.toBe(updateId); + expect(foundOldProduct.obsolete).toBeNull(); + const ingredientsUnchanged = await em.find(ProductIngredientsTag, { + product: foundOldProduct, + }); + expect(ingredientsUnchanged[0].obsolete).toBeNull(); const loadedTags = await app.get(TagService).getLoadedTags(); expect(loadedTags).toHaveLength( @@ -464,7 +462,7 @@ describe('importWithFilter', () => { const updatedProduct = await em.findOne(Product, { code: productIdExisting, }); - expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId); + expect(deletedProduct.processId).toBe(updatedProduct.processId); expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual( beforeImport, ); @@ -478,3 +476,14 @@ describe('importWithFilter', () => { }); }); }); + +describe('getProcessId', () => { + it('should return monotonically increasing numbers', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const transactionId = await importService.getProcessId(); + + expect(await importService.getProcessId()).toBeGreaterThan(transactionId); + }); + }); +}); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index b36df0a..9b10495 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -9,7 +9,7 @@ import { ProductSource } from '../enums/product-source'; import equal from 'fast-deep-equal'; import { SettingsService } from './settings.service'; import sql from '../../db'; -import { ReservedSql } from 'postgres'; +import { Fragment, Helper, Parameter, ReservedSql } from 'postgres'; import { SerializableParameter } from 'postgres'; @Injectable() @@ -62,12 +62,18 @@ export class ImportService { } } + async getProcessId() { + return BigInt( + (await sql`SELECT pg_current_xact_id() transaction_id`)[0].transaction_id, + ); + } + async importWithFilter(filter: any, source: ProductSource, skip?: number) { let latestUpdated = 0; // The update id is unique to this run and is used later to run other // queries that should only affect products loaded in this import - const updateId = Ulid.generate().toRaw(); + const processId = await this.getProcessId(); this.logger.debug('Connecting to MongoDB'); const client = new MongoClient(process.env.MONGO_URI); @@ -181,11 +187,16 @@ export class ImportService { latestUpdated = Math.max(latestUpdated, lastUpdated?.getTime() ?? 0); if (!(i % this.importBatchSize)) { - await this.applyProductChange(connection, obsolete, source, updateId); + await this.applyProductChange( + connection, + obsolete, + source, + processId, + ); await connection`begin`; } } - await this.applyProductChange(connection, obsolete, source, updateId); + await this.applyProductChange(connection, obsolete, source, processId); await cursor.close(); collection.count = i; } @@ -200,7 +211,7 @@ export class ImportService { if (missingProducts.length) { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - last_update_id = ${updateId}, + process_id = ${processId.toString()}, last_processed = ${new Date()}, source = ${source} WHERE code IN ${sql(missingProducts)} @@ -208,7 +219,7 @@ export class ImportService { await this.deleteProductTags( connection, - deletedProducts.map((p) => p.id), + sql(deletedProducts.map((p) => p.id)), ); deleteLog = `. Deleted ${deletedProducts.count}`; } @@ -219,7 +230,7 @@ export class ImportService { await this.tagService.addLoadedTags( Object.keys(ProductTagMap.MAPPED_TAGS), ); - await this.deleteOtherProducts(connection, updateId); + await this.deleteOtherProducts(connection, processId); } await connection`DROP TABLE product_temp`; @@ -236,7 +247,7 @@ export class ImportService { connection: ReservedSql, obsolete: boolean, source: string, - updateId: string, + processId: bigint, ) { // Analyze table for best query performance await connection`ANALYZE product_temp`; @@ -251,7 +262,7 @@ export class ImportService { ingredients_count = (tp.data->>'ingredients_n')::numeric, ingredients_without_ciqual_codes_count = (tp.data->>'ingredients_without_ciqual_codes_n')::numeric, last_updated = tp.last_updated, - last_update_id = ${updateId}, + process_id = ${processId.toString()}, last_processed = ${new Date()}, source = ${source}, revision = (tp.data->>'rev')::int @@ -361,33 +372,37 @@ export class ImportService { await connection`commit`; } - async deleteOtherProducts(connection: ReservedSql, updateId: string) { + async deleteOtherProducts(connection: ReservedSql, processId: bigint) { + // Note use < here to avoid deleting products from events that were received since a full + // import was started. + const filter = connection`process_id < ${processId.toString()} OR process_id IS NULL`; + + // Need to use a fragment here rather than a list of ids as could be a long list + await this.deleteProductTags( + connection, + sql`(SELECT id FROM product WHERE ${filter})`, + ); + const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - last_update_id = ${updateId}, + process_id = ${processId.toString()}, last_processed = ${new Date()}, source = ${ProductSource.FULL_LOAD} - WHERE last_update_id != ${updateId} OR last_update_id IS NULL - RETURNING id`; + WHERE ${filter}`; this.logger.debug(`${deletedProducts.count} Products deleted`); - - await this.deleteProductTags( - connection, - deletedProducts.map((p) => p.id), - ); } private async deleteProductTags( connection: ReservedSql, - deletedProductIds: any[], + deletedProductIds: Helper | Fragment, ) { for (const entity of Object.values(ProductTagMap.MAPPED_TAGS)) { const tableName = this.em.getMetadata(entity).tableName; await connection`UPDATE ${sql(tableName)} SET obsolete = NULL - where product_id in ${sql(deletedProductIds)}`; + where product_id in ${deletedProductIds}`; } await connection`UPDATE product_ingredient SET obsolete = NULL - where product_id in ${sql(deletedProductIds)}`; + where product_id in ${deletedProductIds}`; } // Make sure to pause redis before calling this diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index cfcdde5..caf76c7 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -117,14 +117,14 @@ "default": "false", "mappedType": "boolean" }, - "last_update_id": { - "name": "last_update_id", - "type": "uuid", + "process_id": { + "name": "process_id", + "type": "xid8", "unsigned": false, "autoincrement": false, "primary": false, "nullable": true, - "mappedType": "uuid" + "mappedType": "unknown" }, "last_processed": { "name": "last_processed", @@ -187,10 +187,10 @@ }, { "columnNames": [ - "last_update_id" + "process_id" ], "composite": false, - "keyName": "product_last_update_id_index", + "keyName": "product_process_id_index", "primary": false, "unique": false }, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts index 5f71c1c..abb9433 100644 --- a/src/migrations/Migration20241029165410.ts +++ b/src/migrations/Migration20241029165410.ts @@ -6,9 +6,17 @@ export class Migration20241029165410 extends Migration { this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); + this.addSql('alter table "query"."product" add column "process_id" xid8 null;'); + this.addSql('create index "product_process_id_index" on "query"."product" ("process_id");'); + this.addSql('drop index "query"."product_last_update_id_index";'); + this.addSql('alter table "query"."product" drop column "last_update_id";'); } async down(): Promise { + this.addSql('alter table "query"."product" add column "last_update_id" uuid null;'); + this.addSql('create index "product_last_update_id_index" on "query"."product" ("last_update_id");'); + this.addSql('drop index "query"."product_process_id_index";'); + this.addSql('alter table "query"."product" drop column "process_id";'); this.addSql('alter table "query"."settings" rename column "last_updated" to "last_modified";'); this.addSql('alter table "query"."product" rename column "last_updated" to "last_modified";'); this.addSql('alter table "query"."product" rename column "last_processed" to "last_updated";'); diff --git a/tsconfig.json b/tsconfig.json index fd76987..3e24617 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,7 @@ "experimentalDecorators": true, "esModuleInterop": true, "allowSyntheticDefaultImports": true, - "target": "es2017", + "target": "es2020", "sourceMap": true, "outDir": "./dist", "baseUrl": "./", From 209b12cb4522d8de66ed9fcb3ad613c1531512c0 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 14:49:49 +0000 Subject: [PATCH 5/8] Mock with a lower process id for full import to avoid clashing with other tests Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 9f4402d..49d5207 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -85,7 +85,7 @@ describe('importFromMongo', () => { const importService = app.get(ImportService); // Mock the process id so it doesn't delete records from other tests - let currentProcessId = 99999999999999999n; + let currentProcessId = 0n; importService.getProcessId = jest .fn() .mockImplementation(() => ++currentProcessId); From 6482580b8cab342e3abfddbee668ef3bf85c8e4f Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:04:01 +0000 Subject: [PATCH 6/8] More fixes to deal with test concurrency Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 25 ++++++++++++++++++++-- src/domain/services/query.service.spec.ts | 21 ++++++++++++------ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 49d5207..4750342 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -93,18 +93,31 @@ describe('importFromMongo', () => { // GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo const em = app.get(EntityManager); const { products, productIdExisting, productIdNew } = testProducts(); - const productExisting = em.create(Product, { code: productIdExisting }); + const productExisting = em.create(Product, { + code: productIdExisting, + processId: 0n, + }); em.create(ProductIngredientsTag, { product: productExisting, value: 'old_ingredient', }); const productIdUnchanged = randomCode(); - const productUnchanged = em.create(Product, { code: productIdUnchanged }); + const productUnchanged = em.create(Product, { + code: productIdUnchanged, + processId: 0n, + }); em.create(ProductIngredientsTag, { product: productUnchanged, value: 'unchanged_ingredient', }); + + const productIdLater = randomCode(); + em.create(Product, { + code: productIdLater, + processId: 100n, // Simulate a product that was added after the full load started + }); + // Delete a tag to prove it is re-created await em.nativeDelete(LoadedTag, { id: 'teams_tags' }); await em.flush(); @@ -149,6 +162,11 @@ describe('importFromMongo', () => { }); expect(ingredientsUnchanged[0].obsolete).toBeNull(); + const foundLaterProduct = await em.findOne(Product, { + code: productIdLater, + }); + expect(foundLaterProduct.obsolete).toBe(false); + const loadedTags = await app.get(TagService).getLoadedTags(); expect(loadedTags).toHaveLength( Object.keys(ProductTagMap.MAPPED_TAGS).length, @@ -339,6 +357,7 @@ describe('importFromMongo', () => { em.create(Product, { code: productIdExisting, source: ProductSource.INCREMENTAL_LOAD, + processId: 10n, lastProcessed: lastProcessed, lastUpdated: new Date(lastUpdated * 1000), }); @@ -358,6 +377,7 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); + expect(productExisting.processId).not.toBe(10n.toString()); expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed); }); }); @@ -437,6 +457,7 @@ describe('importWithFilter', () => { const productToDelete = em.create(Product, { code: productIdToDelete, source: ProductSource.FULL_LOAD, + processId: 10n, lastProcessed: new Date(2023, 1, 1), lastUpdated: new Date(lastUpdated * 1000), }); diff --git a/src/domain/services/query.service.spec.ts b/src/domain/services/query.service.spec.ts index d663c69..f6c8b18 100644 --- a/src/domain/services/query.service.spec.ts +++ b/src/domain/services/query.service.spec.ts @@ -24,11 +24,11 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode() }), + product: em.create(Product, { code: randomCode(), processId: 100n }), value: tagValue, }); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode() }), + product: em.create(Product, { code: randomCode(), processId: 100n }), value: tagValue, }); await em.flush(); @@ -46,7 +46,10 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); const notTagValue = randomCode(); - const productWithNotTag = em.create(Product, { code: randomCode() }); + const productWithNotTag = em.create(Product, { + code: randomCode(), + processId: 100n, + }); em.create(ProductBrandsTag, { product: productWithNotTag, value: tagValue, @@ -57,7 +60,7 @@ describe('count', () => { }); em.create(ProductBrandsTag, { - product: em.create(Product, { code: randomCode() }), + product: em.create(Product, { code: randomCode(), processId: 100n }), value: tagValue, }); await em.flush(); @@ -449,16 +452,22 @@ async function createTestTags(app) { const creatorValue = randomCode(); // Create some dummy products with a specific tag - const product1 = em.create(Product, { code: randomCode() }); + const product1 = em.create(Product, { code: randomCode(), processId: 100n }); const product2 = em.create(Product, { code: randomCode(), creator: creatorValue, + processId: 100n, }); const product3 = em.create(Product, { code: randomCode(), creator: creatorValue, + processId: 100n, + }); + const product4 = em.create(Product, { + code: randomCode(), + processId: 100n, + obsolete: true, }); - const product4 = em.create(Product, { code: randomCode(), obsolete: true }); // Matrix for testing // Product | Origin | AminoAcid | AminoAcid2 | Neucleotide | Obsolete | Creator From 9d9f80ada99e47068fcfe2c5be0ad115c25eb418 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:36:59 +0000 Subject: [PATCH 7/8] Ensure we use compatible PostgreSQL functionality Signed-off-by: John Gomersall --- src/domain/entities/product.ts | 3 ++- src/domain/services/import.service.ts | 3 ++- src/migrations/.snapshot-query.json | 2 +- src/migrations/Migration20241029165410.ts | 2 +- test/global-setup.ts | 3 ++- 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index 4058b5e..b920a1d 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -39,7 +39,8 @@ export class Product { @Property() obsolete? = false; - @Property({ columnType: 'xid8', index: true }) + // Note need to switch to xid8 when we upgrade PostgreSQL + @Property({ columnType: 'bigint', index: true }) processId?: bigint; // This is the last time off-query received the data diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 9b10495..3e62acc 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -63,8 +63,9 @@ export class ImportService { } async getProcessId() { + // Note need to switch to pg_current_xact_id when we upgrade PostgreSQL return BigInt( - (await sql`SELECT pg_current_xact_id() transaction_id`)[0].transaction_id, + (await sql`SELECT txid_current() transaction_id`)[0].transaction_id, ); } diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index caf76c7..b604605 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -119,7 +119,7 @@ }, "process_id": { "name": "process_id", - "type": "xid8", + "type": "bigint", "unsigned": false, "autoincrement": false, "primary": false, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts index abb9433..c0d5ce2 100644 --- a/src/migrations/Migration20241029165410.ts +++ b/src/migrations/Migration20241029165410.ts @@ -6,7 +6,7 @@ export class Migration20241029165410 extends Migration { this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); - this.addSql('alter table "query"."product" add column "process_id" xid8 null;'); + this.addSql('alter table "query"."product" add column "process_id" bigint null;'); this.addSql('create index "product_process_id_index" on "query"."product" ("process_id");'); this.addSql('drop index "query"."product_last_update_id_index";'); this.addSql('alter table "query"."product" drop column "last_update_id";'); diff --git a/test/global-setup.ts b/test/global-setup.ts index 309f7fa..01fb1ac 100644 --- a/test/global-setup.ts +++ b/test/global-setup.ts @@ -5,7 +5,8 @@ import child_process from 'child_process'; const exec = util.promisify(child_process.exec); export default async function () { - const container = await new PostgreSqlContainer().start(); + // Use same image as docker-compose.yml to ensure we don't use unsupported features + const container = await new PostgreSqlContainer('postgres:12-alpine').start(); process.env.POSTGRES_HOST = container.getHost(); process.env.POSTGRES_PORT = container.getPort().toString(); process.env.POSTGRES_DB = container.getDatabase(); From 8e5599352b586c91c8a21436c51bf2971aca766e Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:44:39 +0000 Subject: [PATCH 8/8] Remove unused imports Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 1 - src/domain/services/import.service.ts | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 4750342..d491aa6 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -10,7 +10,6 @@ import { ProductTagMap } from '../entities/product-tag-map'; import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; -import sql from '../../db'; const lastUpdated = 1692032161; diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 3e62acc..b253d8b 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -1,6 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; import { MAPPED_FIELDS } from '../entities/product'; -import { Ulid } from 'id128'; import { MongoClient } from 'mongodb'; import { EntityManager } from '@mikro-orm/postgresql'; import { TagService } from './tag.service'; @@ -9,7 +8,7 @@ import { ProductSource } from '../enums/product-source'; import equal from 'fast-deep-equal'; import { SettingsService } from './settings.service'; import sql from '../../db'; -import { Fragment, Helper, Parameter, ReservedSql } from 'postgres'; +import { Fragment, Helper, ReservedSql } from 'postgres'; import { SerializableParameter } from 'postgres'; @Injectable()