From 6368a811303106ad5b8d0a8090d372f54bd1f440 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:18:39 +0000 Subject: [PATCH 1/6] Revert "More fixes to deal with test concurrency" This reverts commit 1a34b6310cc8f1bca45fa34682439756e8605431. --- src/domain/services/import.service.spec.ts | 25 ++-------------------- src/domain/services/query.service.spec.ts | 21 ++++++------------ 2 files changed, 8 insertions(+), 38 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 4750342..49d5207 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -93,31 +93,18 @@ describe('importFromMongo', () => { // GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo const em = app.get(EntityManager); const { products, productIdExisting, productIdNew } = testProducts(); - const productExisting = em.create(Product, { - code: productIdExisting, - processId: 0n, - }); + const productExisting = em.create(Product, { code: productIdExisting }); em.create(ProductIngredientsTag, { product: productExisting, value: 'old_ingredient', }); const productIdUnchanged = randomCode(); - const productUnchanged = em.create(Product, { - code: productIdUnchanged, - processId: 0n, - }); + const productUnchanged = em.create(Product, { code: productIdUnchanged }); em.create(ProductIngredientsTag, { product: productUnchanged, value: 'unchanged_ingredient', }); - - const productIdLater = randomCode(); - em.create(Product, { - code: productIdLater, - processId: 100n, // Simulate a product that was added after the full load started - }); - // Delete a tag to prove it is re-created await em.nativeDelete(LoadedTag, { id: 'teams_tags' }); await em.flush(); @@ -162,11 +149,6 @@ describe('importFromMongo', () => { }); expect(ingredientsUnchanged[0].obsolete).toBeNull(); - const foundLaterProduct = await em.findOne(Product, { - code: productIdLater, - }); - expect(foundLaterProduct.obsolete).toBe(false); - const loadedTags = await app.get(TagService).getLoadedTags(); expect(loadedTags).toHaveLength( Object.keys(ProductTagMap.MAPPED_TAGS).length, @@ -357,7 +339,6 @@ describe('importFromMongo', () => { em.create(Product, { code: productIdExisting, source: ProductSource.INCREMENTAL_LOAD, - processId: 10n, lastProcessed: lastProcessed, lastUpdated: new Date(lastUpdated * 1000), }); @@ -377,7 +358,6 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.processId).not.toBe(10n.toString()); expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed); }); }); @@ -457,7 +437,6 @@ describe('importWithFilter', () => { const productToDelete = em.create(Product, { code: productIdToDelete, source: ProductSource.FULL_LOAD, - processId: 10n, lastProcessed: new Date(2023, 1, 1), lastUpdated: new Date(lastUpdated * 1000), }); diff --git a/src/domain/services/query.service.spec.ts b/src/domain/services/query.service.spec.ts index f6c8b18..d663c69 100644 --- a/src/domain/services/query.service.spec.ts +++ b/src/domain/services/query.service.spec.ts @@ -24,11 +24,11 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); await em.flush(); @@ -46,10 +46,7 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); const notTagValue = randomCode(); - const productWithNotTag = em.create(Product, { - code: randomCode(), - processId: 100n, - }); + const productWithNotTag = em.create(Product, { code: randomCode() }); em.create(ProductBrandsTag, { product: productWithNotTag, value: tagValue, @@ -60,7 +57,7 @@ describe('count', () => { }); em.create(ProductBrandsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); await em.flush(); @@ -452,22 +449,16 @@ async function createTestTags(app) { const creatorValue = randomCode(); // Create some dummy products with a specific tag - const product1 = em.create(Product, { code: randomCode(), processId: 100n }); + const product1 = em.create(Product, { code: randomCode() }); const product2 = em.create(Product, { code: randomCode(), creator: creatorValue, - processId: 100n, }); const product3 = em.create(Product, { code: randomCode(), creator: creatorValue, - processId: 100n, - }); - const product4 = em.create(Product, { - code: randomCode(), - processId: 100n, - obsolete: true, }); + const product4 = em.create(Product, { code: randomCode(), obsolete: true }); // Matrix for testing // Product | Origin | AminoAcid | AminoAcid2 | Neucleotide | Obsolete | Creator From 04659ff8ddc0186fd98db089814d27745a768ee6 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:18:57 +0000 Subject: [PATCH 2/6] Revert "Mock with a lower process id for full import to avoid clashing with other tests" This reverts commit 403db49ebaad222fcfd194178aef948e4d4526af. --- src/domain/services/import.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 49d5207..9f4402d 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -85,7 +85,7 @@ describe('importFromMongo', () => { const importService = app.get(ImportService); // Mock the process id so it doesn't delete records from other tests - let currentProcessId = 0n; + let currentProcessId = 99999999999999999n; importService.getProcessId = jest .fn() .mockImplementation(() => ++currentProcessId); From 6d7aace816174e5cba344b4686aedbe15d5a4102 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:19:10 +0000 Subject: [PATCH 3/6] Revert "Switch to using transaction id for update_id so we can avoid deleting data from later transactions on a full import" This reverts commit b6545c5241d77424e81f7858d6236897250f5646. --- src/domain/entities/product.ts | 4 +- src/domain/services/import.service.spec.ts | 41 ++++++---------- src/domain/services/import.service.ts | 57 ++++++++-------------- src/migrations/.snapshot-query.json | 12 ++--- src/migrations/Migration20241029165410.ts | 8 --- tsconfig.json | 2 +- 6 files changed, 46 insertions(+), 78 deletions(-) diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index 4058b5e..56b56ed 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -39,8 +39,8 @@ export class Product { @Property() obsolete? = false; - @Property({ columnType: 'xid8', index: true }) - processId?: bigint; + @Property({ type: 'uuid', index: true }) + lastUpdateId?: string; // This is the last time off-query received the data @Property({ columnType: 'timestamptz' }) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 9f4402d..f2a4af1 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -10,7 +10,6 @@ import { ProductTagMap } from '../entities/product-tag-map'; import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; -import sql from '../../db'; const lastUpdated = 1692032161; @@ -84,11 +83,7 @@ describe('importFromMongo', () => { // app.useLogger(new Logger()); const importService = app.get(ImportService); - // Mock the process id so it doesn't delete records from other tests - let currentProcessId = 99999999999999999n; - importService.getProcessId = jest - .fn() - .mockImplementation(() => ++currentProcessId); + const deleteMock = (importService.deleteOtherProducts = jest.fn()); // GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo const em = app.get(EntityManager); @@ -116,9 +111,19 @@ describe('importFromMongo', () => { await importService.importFromMongo(); // THEN: New product is added, updated product is updated and other product is unchanged + expect(deleteMock).toHaveBeenCalledTimes(1); + let updateId = deleteMock.mock.calls[0][1]; + // Re-format updateId the way Postgres provides it + updateId = `${updateId.substring(0, 8)}-${updateId.substring( + 8, + 12, + )}-${updateId.substring(12, 16)}-${updateId.substring( + 16, + 20, + )}-${updateId.substring(20)}`.toLowerCase(); const productNew = await em.findOne(Product, { code: productIdNew }); expect(productNew).toBeTruthy(); - expect(productNew.processId).toBe(currentProcessId.toString()); + expect(productNew.lastUpdateId).toBe(updateId); expect(productNew.source).toBe(ProductSource.FULL_LOAD); expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start); const ingredientsNew = await em.find(ProductIngredientsTag, { @@ -139,15 +144,12 @@ describe('importFromMongo', () => { ingredientsExisting.find((i) => i.value === 'new_ingredient'), ).toBeTruthy(); - // Check unchanged product has been "deleted" + // We have mocked the delete of other products so just check the other product + // does not have the same update id as those imported const foundOldProduct = await em.findOne(Product, { code: productIdUnchanged, }); - expect(foundOldProduct.obsolete).toBeNull(); - const ingredientsUnchanged = await em.find(ProductIngredientsTag, { - product: foundOldProduct, - }); - expect(ingredientsUnchanged[0].obsolete).toBeNull(); + expect(foundOldProduct.lastUpdateId).not.toBe(updateId); const loadedTags = await app.get(TagService).getLoadedTags(); expect(loadedTags).toHaveLength( @@ -462,7 +464,7 @@ describe('importWithFilter', () => { const updatedProduct = await em.findOne(Product, { code: productIdExisting, }); - expect(deletedProduct.processId).toBe(updatedProduct.processId); + expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId); expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual( beforeImport, ); @@ -476,14 +478,3 @@ describe('importWithFilter', () => { }); }); }); - -describe('getProcessId', () => { - it('should return monotonically increasing numbers', async () => { - await createTestingModule([DomainModule], async (app) => { - const importService = app.get(ImportService); - const transactionId = await importService.getProcessId(); - - expect(await importService.getProcessId()).toBeGreaterThan(transactionId); - }); - }); -}); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 9b10495..b36df0a 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -9,7 +9,7 @@ import { ProductSource } from '../enums/product-source'; import equal from 'fast-deep-equal'; import { SettingsService } from './settings.service'; import sql from '../../db'; -import { Fragment, Helper, Parameter, ReservedSql } from 'postgres'; +import { ReservedSql } from 'postgres'; import { SerializableParameter } from 'postgres'; @Injectable() @@ -62,18 +62,12 @@ export class ImportService { } } - async getProcessId() { - return BigInt( - (await sql`SELECT pg_current_xact_id() transaction_id`)[0].transaction_id, - ); - } - async importWithFilter(filter: any, source: ProductSource, skip?: number) { let latestUpdated = 0; // The update id is unique to this run and is used later to run other // queries that should only affect products loaded in this import - const processId = await this.getProcessId(); + const updateId = Ulid.generate().toRaw(); this.logger.debug('Connecting to MongoDB'); const client = new MongoClient(process.env.MONGO_URI); @@ -187,16 +181,11 @@ export class ImportService { latestUpdated = Math.max(latestUpdated, lastUpdated?.getTime() ?? 0); if (!(i % this.importBatchSize)) { - await this.applyProductChange( - connection, - obsolete, - source, - processId, - ); + await this.applyProductChange(connection, obsolete, source, updateId); await connection`begin`; } } - await this.applyProductChange(connection, obsolete, source, processId); + await this.applyProductChange(connection, obsolete, source, updateId); await cursor.close(); collection.count = i; } @@ -211,7 +200,7 @@ export class ImportService { if (missingProducts.length) { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - process_id = ${processId.toString()}, + last_update_id = ${updateId}, last_processed = ${new Date()}, source = ${source} WHERE code IN ${sql(missingProducts)} @@ -219,7 +208,7 @@ export class ImportService { await this.deleteProductTags( connection, - sql(deletedProducts.map((p) => p.id)), + deletedProducts.map((p) => p.id), ); deleteLog = `. Deleted ${deletedProducts.count}`; } @@ -230,7 +219,7 @@ export class ImportService { await this.tagService.addLoadedTags( Object.keys(ProductTagMap.MAPPED_TAGS), ); - await this.deleteOtherProducts(connection, processId); + await this.deleteOtherProducts(connection, updateId); } await connection`DROP TABLE product_temp`; @@ -247,7 +236,7 @@ export class ImportService { connection: ReservedSql, obsolete: boolean, source: string, - processId: bigint, + updateId: string, ) { // Analyze table for best query performance await connection`ANALYZE product_temp`; @@ -262,7 +251,7 @@ export class ImportService { ingredients_count = (tp.data->>'ingredients_n')::numeric, ingredients_without_ciqual_codes_count = (tp.data->>'ingredients_without_ciqual_codes_n')::numeric, last_updated = tp.last_updated, - process_id = ${processId.toString()}, + last_update_id = ${updateId}, last_processed = ${new Date()}, source = ${source}, revision = (tp.data->>'rev')::int @@ -372,37 +361,33 @@ export class ImportService { await connection`commit`; } - async deleteOtherProducts(connection: ReservedSql, processId: bigint) { - // Note use < here to avoid deleting products from events that were received since a full - // import was started. - const filter = connection`process_id < ${processId.toString()} OR process_id IS NULL`; - - // Need to use a fragment here rather than a list of ids as could be a long list - await this.deleteProductTags( - connection, - sql`(SELECT id FROM product WHERE ${filter})`, - ); - + async deleteOtherProducts(connection: ReservedSql, updateId: string) { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - process_id = ${processId.toString()}, + last_update_id = ${updateId}, last_processed = ${new Date()}, source = ${ProductSource.FULL_LOAD} - WHERE ${filter}`; + WHERE last_update_id != ${updateId} OR last_update_id IS NULL + RETURNING id`; this.logger.debug(`${deletedProducts.count} Products deleted`); + + await this.deleteProductTags( + connection, + deletedProducts.map((p) => p.id), + ); } private async deleteProductTags( connection: ReservedSql, - deletedProductIds: Helper | Fragment, + deletedProductIds: any[], ) { for (const entity of Object.values(ProductTagMap.MAPPED_TAGS)) { const tableName = this.em.getMetadata(entity).tableName; await connection`UPDATE ${sql(tableName)} SET obsolete = NULL - where product_id in ${deletedProductIds}`; + where product_id in ${sql(deletedProductIds)}`; } await connection`UPDATE product_ingredient SET obsolete = NULL - where product_id in ${deletedProductIds}`; + where product_id in ${sql(deletedProductIds)}`; } // Make sure to pause redis before calling this diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index caf76c7..cfcdde5 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -117,14 +117,14 @@ "default": "false", "mappedType": "boolean" }, - "process_id": { - "name": "process_id", - "type": "xid8", + "last_update_id": { + "name": "last_update_id", + "type": "uuid", "unsigned": false, "autoincrement": false, "primary": false, "nullable": true, - "mappedType": "unknown" + "mappedType": "uuid" }, "last_processed": { "name": "last_processed", @@ -187,10 +187,10 @@ }, { "columnNames": [ - "process_id" + "last_update_id" ], "composite": false, - "keyName": "product_process_id_index", + "keyName": "product_last_update_id_index", "primary": false, "unique": false }, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts index abb9433..5f71c1c 100644 --- a/src/migrations/Migration20241029165410.ts +++ b/src/migrations/Migration20241029165410.ts @@ -6,17 +6,9 @@ export class Migration20241029165410 extends Migration { this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); - this.addSql('alter table "query"."product" add column "process_id" xid8 null;'); - this.addSql('create index "product_process_id_index" on "query"."product" ("process_id");'); - this.addSql('drop index "query"."product_last_update_id_index";'); - this.addSql('alter table "query"."product" drop column "last_update_id";'); } async down(): Promise { - this.addSql('alter table "query"."product" add column "last_update_id" uuid null;'); - this.addSql('create index "product_last_update_id_index" on "query"."product" ("last_update_id");'); - this.addSql('drop index "query"."product_process_id_index";'); - this.addSql('alter table "query"."product" drop column "process_id";'); this.addSql('alter table "query"."settings" rename column "last_updated" to "last_modified";'); this.addSql('alter table "query"."product" rename column "last_updated" to "last_modified";'); this.addSql('alter table "query"."product" rename column "last_processed" to "last_updated";'); diff --git a/tsconfig.json b/tsconfig.json index 3e24617..fd76987 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,7 @@ "experimentalDecorators": true, "esModuleInterop": true, "allowSyntheticDefaultImports": true, - "target": "es2020", + "target": "es2017", "sourceMap": true, "outDir": "./dist", "baseUrl": "./", From b02d68afde5847c83807856b80231932c7167bbb Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:19:25 +0000 Subject: [PATCH 4/6] Revert "Switch to using last_updated_t from product opener" This reverts commit b34123b98a1af0f5e7695477b3f59f5347343573. --- src/domain/entities/product.ts | 8 ++-- src/domain/services/import.service.spec.ts | 44 +++++++++++----------- src/domain/services/import.service.ts | 38 +++++++++---------- src/domain/services/settings.service.ts | 6 +-- src/migrations/.snapshot-query.json | 8 ++-- src/migrations/Migration20241029165410.ts | 16 -------- 6 files changed, 49 insertions(+), 71 deletions(-) delete mode 100644 src/migrations/Migration20241029165410.ts diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index 56b56ed..c68dc53 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -21,7 +21,7 @@ export class Product { code?: string; @Property({ columnType: 'timestamptz' }) - lastUpdated?: Date; + lastModified?: Date; @Property({ index: true }) creator?: string; @@ -42,9 +42,8 @@ export class Product { @Property({ type: 'uuid', index: true }) lastUpdateId?: string; - // This is the last time off-query received the data @Property({ columnType: 'timestamptz' }) - lastProcessed?: Date; + lastUpdated?: Date; @Property() source?: ProductSource; @@ -58,8 +57,7 @@ export const MAPPED_FIELDS = [ 'product_name', 'creator', 'owners_tags', - 'last_modified_t', // Note we actually use last_updated_t for checks but not all products may have this - 'last_updated_t', + 'last_modified_t', 'ingredients_n', 'ingredients_without_ciqual_codes_n', 'ingredients', diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index f2a4af1..c3db30f 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -11,7 +11,7 @@ import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; -const lastUpdated = 1692032161; +const lastModified = 1692032161; function testProducts() { const productIdNew = randomCode(); @@ -20,14 +20,14 @@ function testProducts() { { // This one will be new code: productIdNew, - last_updated_t: lastUpdated, + last_modified_t: lastModified, ingredients_tags: ['test'], rev: 1, }, { // This one will already exist code: productIdExisting, - last_updated_t: lastUpdated, + last_modified_t: lastModified, ingredients_tags: ['new_ingredient', 'old_ingredient'], }, ]; @@ -125,7 +125,7 @@ describe('importFromMongo', () => { expect(productNew).toBeTruthy(); expect(productNew.lastUpdateId).toBe(updateId); expect(productNew.source).toBe(ProductSource.FULL_LOAD); - expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start); + expect(productNew.lastUpdated.getTime()).toBeGreaterThanOrEqual(start); const ingredientsNew = await em.find(ProductIngredientsTag, { product: productNew, }); @@ -186,13 +186,13 @@ describe('importFromMongo', () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Product with data that matches MongoDB const em = app.get(EntityManager); - const lastProcessed = new Date(2023, 1, 1); + const lastUpdated = new Date(2023, 1, 1); const { products, productIdExisting } = testProducts(); em.create(Product, { code: productIdExisting, source: ProductSource.EVENT, - lastProcessed: lastProcessed, - lastUpdated: new Date(lastUpdated * 1000), + lastUpdated: lastUpdated, + lastModified: new Date(lastModified * 1000), }); await em.flush(); const importService = app.get(ImportService); @@ -207,13 +207,13 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastProcessed).toStrictEqual(lastProcessed); + expect(productExisting.lastUpdated).toStrictEqual(lastUpdated); }); }); it('should start importing from the last import', async () => { await createTestingModule([DomainModule], async (app) => { - // GIVEN: lastUpdated setting already set + // GIVEN: lastModified setting already set const settings = app.get(SettingsService); const startFrom = new Date(2023, 1, 1); await settings.setLastModified(startFrom); @@ -227,12 +227,12 @@ describe('importFromMongo', () => { // THEN: Mongo find is called with the setting as a parameter expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts - expect(findCalls[0][0].last_updated_t.$gt).toBe( + expect(findCalls[0][0].last_modified_t.$gt).toBe( Math.floor(startFrom.getTime() / 1000), ); expect(await settings.getLastModified()).toStrictEqual( - new Date(lastUpdated * 1000), + new Date(lastModified * 1000), ); }); }); @@ -245,7 +245,7 @@ describe('importFromMongo', () => { { // This one will be new code: productIdNew, - last_updated_t: 1692032161, + last_modified_t: 1692032161, ingredients_tags: ['test \u0000 test2 \u0000 end'], }, ]); @@ -263,7 +263,7 @@ describe('importFromMongo', () => { }); }); - it('should set last_updated correctly if one product has an invalid date', async () => { + it('should set last_modified correctly if one product has an invalid date', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: products with invalid date const settings = app.get(SettingsService); @@ -272,7 +272,7 @@ describe('importFromMongo', () => { const { products } = testProducts(); const testData = [ products[0], - { ...products[1], last_updated_t: 'invalid' }, + { ...products[1], last_modified_t: 'invalid' }, ]; const importService = app.get(ImportService); @@ -282,7 +282,7 @@ describe('importFromMongo', () => { // THEN: The last modified date is set correctly expect(await settings.getLastModified()).toStrictEqual( - new Date(lastUpdated * 1000), + new Date(lastModified * 1000), ); }); }); @@ -336,13 +336,13 @@ describe('importFromMongo', () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Product with data that matches MongoDB const em = app.get(EntityManager); - const lastProcessed = new Date(2023, 1, 1); + const lastUpdated = new Date(2023, 1, 1); const { products, productIdExisting } = testProducts(); em.create(Product, { code: productIdExisting, source: ProductSource.INCREMENTAL_LOAD, - lastProcessed: lastProcessed, - lastUpdated: new Date(lastUpdated * 1000), + lastUpdated: lastUpdated, + lastModified: new Date(lastModified * 1000), }); await em.flush(); const importService = app.get(ImportService); @@ -360,7 +360,7 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed); + expect(productExisting.lastUpdated).not.toStrictEqual(lastUpdated); }); }); }); @@ -439,8 +439,8 @@ describe('importWithFilter', () => { const productToDelete = em.create(Product, { code: productIdToDelete, source: ProductSource.FULL_LOAD, - lastProcessed: new Date(2023, 1, 1), - lastUpdated: new Date(lastUpdated * 1000), + lastUpdated: new Date(2023, 1, 1), + lastModified: new Date(lastModified * 1000), }); em.create(ProductIngredientsTag, { product: productToDelete, @@ -465,7 +465,7 @@ describe('importWithFilter', () => { code: productIdExisting, }); expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId); - expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual( + expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual( beforeImport, ); expect(deletedProduct.source).toBe(ProductSource.EVENT); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index b36df0a..0be7b22 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -49,7 +49,7 @@ export class ImportService { const filter = {}; if (from) { const fromTime = Math.floor(new Date(from).getTime() / 1000); - filter['last_updated_t'] = { $gt: fromTime }; + filter['last_modified_t'] = { $gt: fromTime }; this.logger.debug(`Starting import from ${from}`); } @@ -63,7 +63,7 @@ export class ImportService { } async importWithFilter(filter: any, source: ProductSource, skip?: number) { - let latestUpdated = 0; + let latestModified = 0; // The update id is unique to this run and is used later to run other // queries that should only affect products loaded in this import @@ -104,7 +104,7 @@ export class ImportService { // Now using postgres to help with transactions const connection = await sql.reserve(); - await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_updated timestamptz, data jsonb)`; + await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_modified timestamptz, data jsonb)`; // let sql: string; // const vars = []; for (const collection of Object.values(collections)) { @@ -128,29 +128,25 @@ export class ImportService { // Find the product if it exists let results = - await connection`select id, last_updated from product where code = ${data.code}`; + await connection`select id, last_modified from product where code = ${data.code}`; if (!results.length) { results = await connection`insert into product (code) values (${data.code}) returning id`; } const id = results[0].id; - const previousLastUpdated = results[0].last_updated; + const previousLastModified = results[0].last_modified; - let lastUpdated = new Date(data.last_updated_t * 1000); - if (isNaN(+lastUpdated)) { - // Fall back to last_modified_t if last_updated_t is not available - lastUpdated = new Date(data.last_modified_t * 1000); - } - if (isNaN(+lastUpdated)) { + let lastModified = new Date(data.last_modified_t * 1000); + if (isNaN(+lastModified)) { this.logger.warn( - `Product: ${data.code}. Invalid last_updated_t: ${data.last_updated_t}, or last_modified_t: ${data.last_modified_t}.`, + `Product: ${data.code}. Invalid last_modified_t: ${data.last_modified_t}.`, ); - lastUpdated = null; + lastModified = null; } // Skip product if nothing has changed and doing an incremental load if ( source === ProductSource.INCREMENTAL_LOAD && - lastUpdated?.getTime() === previousLastUpdated?.getTime() + lastModified?.getTime() === previousLastModified?.getTime() ) continue; @@ -174,11 +170,11 @@ export class ImportService { } results = - await connection`insert into product_temp (id, last_updated, data) values (${id}, ${lastUpdated}, ${ + await connection`insert into product_temp (id, last_modified, data) values (${id}, ${lastModified}, ${ data as unknown as SerializableParameter }) ON CONFLICT DO NOTHING`; - latestUpdated = Math.max(latestUpdated, lastUpdated?.getTime() ?? 0); + latestModified = Math.max(latestModified, lastModified?.getTime() ?? 0); if (!(i % this.importBatchSize)) { await this.applyProductChange(connection, obsolete, source, updateId); @@ -201,7 +197,7 @@ export class ImportService { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, last_update_id = ${updateId}, - last_processed = ${new Date()}, + last_updated = ${new Date()}, source = ${source} WHERE code IN ${sql(missingProducts)} RETURNING id`; @@ -229,7 +225,7 @@ export class ImportService { `Imported ${collections.normal.count} Products and ${collections.obsolete.count} Obsolete Products from ${source}${deleteLog}`, ); - return latestUpdated; + return latestModified; } async applyProductChange( @@ -250,9 +246,9 @@ export class ImportService { obsolete = ${obsolete}, ingredients_count = (tp.data->>'ingredients_n')::numeric, ingredients_without_ciqual_codes_count = (tp.data->>'ingredients_without_ciqual_codes_n')::numeric, - last_updated = tp.last_updated, + last_modified = tp.last_modified, last_update_id = ${updateId}, - last_processed = ${new Date()}, + last_updated = ${new Date()}, source = ${source}, revision = (tp.data->>'rev')::int FROM product_temp tp @@ -365,7 +361,7 @@ export class ImportService { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, last_update_id = ${updateId}, - last_processed = ${new Date()}, + last_updated = ${new Date()}, source = ${ProductSource.FULL_LOAD} WHERE last_update_id != ${updateId} OR last_update_id IS NULL RETURNING id`; diff --git a/src/domain/services/settings.service.ts b/src/domain/services/settings.service.ts index 0a1b1fa..6a24e83 100644 --- a/src/domain/services/settings.service.ts +++ b/src/domain/services/settings.service.ts @@ -11,11 +11,11 @@ export class SettingsService { } async getLastModified() { - return (await sql`SELECT last_updated FROM settings`)[0].last_updated; + return (await sql`SELECT last_modified FROM settings`)[0].last_modified; } - async setLastModified(lastUpdated: Date) { - await this.updateSetting({ last_updated: lastUpdated }); + async setLastModified(lastModified: Date) { + await this.updateSetting({ last_modified: lastModified }); } async getLastMessageId() { diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index cfcdde5..67a63df 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -61,8 +61,8 @@ "nullable": true, "mappedType": "text" }, - "last_updated": { - "name": "last_updated", + "last_modified": { + "name": "last_modified", "type": "timestamptz", "unsigned": false, "autoincrement": false, @@ -126,8 +126,8 @@ "nullable": true, "mappedType": "uuid" }, - "last_processed": { - "name": "last_processed", + "last_updated": { + "name": "last_updated", "type": "timestamptz", "unsigned": false, "autoincrement": false, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts deleted file mode 100644 index 5f71c1c..0000000 --- a/src/migrations/Migration20241029165410.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { Migration } from '@mikro-orm/migrations'; - -export class Migration20241029165410 extends Migration { - - async up(): Promise { - this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); - this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); - this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); - } - - async down(): Promise { - this.addSql('alter table "query"."settings" rename column "last_updated" to "last_modified";'); - this.addSql('alter table "query"."product" rename column "last_updated" to "last_modified";'); - this.addSql('alter table "query"."product" rename column "last_processed" to "last_updated";'); - } -} From 9cb30207e55c8ce4c09d4c64a725d2c1837014b3 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:19:33 +0000 Subject: [PATCH 5/6] Revert "Always import updates that come from an event" This reverts commit 791427e034cf0d8be743e5ea998a77b6e149345e. --- src/domain/services/import.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 0be7b22..c78fb55 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -143,9 +143,9 @@ export class ImportService { ); lastModified = null; } - // Skip product if nothing has changed and doing an incremental load + // Skip product if nothing has changed and not doing a full load if ( - source === ProductSource.INCREMENTAL_LOAD && + source !== ProductSource.FULL_LOAD && lastModified?.getTime() === previousLastModified?.getTime() ) continue; From 9969b334206d11455fb7adb3ab3d277861777b0b Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Thu, 31 Oct 2024 15:19:41 +0000 Subject: [PATCH 6/6] Revert "Failing test" This reverts commit 9595ee049ce4d8f556778d08d2098d840213104d. --- src/domain/services/import.service.spec.ts | 32 ---------------------- 1 file changed, 32 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index c3db30f..d61b6f1 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -331,38 +331,6 @@ describe('importFromMongo', () => { expect(ingredientsNew[0].ingredientText).toBe('test'); }); }); - - it('import from redis should always update product', async () => { - await createTestingModule([DomainModule], async (app) => { - // GIVEN: Product with data that matches MongoDB - const em = app.get(EntityManager); - const lastUpdated = new Date(2023, 1, 1); - const { products, productIdExisting } = testProducts(); - em.create(Product, { - code: productIdExisting, - source: ProductSource.INCREMENTAL_LOAD, - lastUpdated: lastUpdated, - lastModified: new Date(lastModified * 1000), - }); - await em.flush(); - const importService = app.get(ImportService); - - // WHEN: Doing an event import - mockMongoDB(products); - await importService.importWithFilter( - { code: { $in: [productIdExisting] } }, - ProductSource.EVENT, - ); - - // THEN: Source is updated - const productExisting = await em.findOne(Product, { - code: productIdExisting, - }); - expect(productExisting).toBeTruthy(); - expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastUpdated).not.toStrictEqual(lastUpdated); - }); - }); }); describe('scheduledImportFromMongo', () => {