diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index 4058b5e..c68dc53 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -21,7 +21,7 @@ export class Product { code?: string; @Property({ columnType: 'timestamptz' }) - lastUpdated?: Date; + lastModified?: Date; @Property({ index: true }) creator?: string; @@ -39,12 +39,11 @@ export class Product { @Property() obsolete? = false; - @Property({ columnType: 'xid8', index: true }) - processId?: bigint; + @Property({ type: 'uuid', index: true }) + lastUpdateId?: string; - // This is the last time off-query received the data @Property({ columnType: 'timestamptz' }) - lastProcessed?: Date; + lastUpdated?: Date; @Property() source?: ProductSource; @@ -58,8 +57,7 @@ export const MAPPED_FIELDS = [ 'product_name', 'creator', 'owners_tags', - 'last_modified_t', // Note we actually use last_updated_t for checks but not all products may have this - 'last_updated_t', + 'last_modified_t', 'ingredients_n', 'ingredients_without_ciqual_codes_n', 'ingredients', diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 4750342..d61b6f1 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -10,9 +10,8 @@ import { ProductTagMap } from '../entities/product-tag-map'; import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; import { ProductIngredient } from '../entities/product-ingredient'; -import sql from '../../db'; -const lastUpdated = 1692032161; +const lastModified = 1692032161; function testProducts() { const productIdNew = randomCode(); @@ -21,14 +20,14 @@ function testProducts() { { // This one will be new code: productIdNew, - last_updated_t: lastUpdated, + last_modified_t: lastModified, ingredients_tags: ['test'], rev: 1, }, { // This one will already exist code: productIdExisting, - last_updated_t: lastUpdated, + last_modified_t: lastModified, ingredients_tags: ['new_ingredient', 'old_ingredient'], }, ]; @@ -84,40 +83,23 @@ describe('importFromMongo', () => { // app.useLogger(new Logger()); const importService = app.get(ImportService); - // Mock the process id so it doesn't delete records from other tests - let currentProcessId = 0n; - importService.getProcessId = jest - .fn() - .mockImplementation(() => ++currentProcessId); + const deleteMock = (importService.deleteOtherProducts = jest.fn()); // GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo const em = app.get(EntityManager); const { products, productIdExisting, productIdNew } = testProducts(); - const productExisting = em.create(Product, { - code: productIdExisting, - processId: 0n, - }); + const productExisting = em.create(Product, { code: productIdExisting }); em.create(ProductIngredientsTag, { product: productExisting, value: 'old_ingredient', }); const productIdUnchanged = randomCode(); - const productUnchanged = em.create(Product, { - code: productIdUnchanged, - processId: 0n, - }); + const productUnchanged = em.create(Product, { code: productIdUnchanged }); em.create(ProductIngredientsTag, { product: productUnchanged, value: 'unchanged_ingredient', }); - - const productIdLater = randomCode(); - em.create(Product, { - code: productIdLater, - processId: 100n, // Simulate a product that was added after the full load started - }); - // Delete a tag to prove it is re-created await em.nativeDelete(LoadedTag, { id: 'teams_tags' }); await em.flush(); @@ -129,11 +111,21 @@ describe('importFromMongo', () => { await importService.importFromMongo(); // THEN: New product is added, updated product is updated and other product is unchanged + expect(deleteMock).toHaveBeenCalledTimes(1); + let updateId = deleteMock.mock.calls[0][1]; + // Re-format updateId the way Postgres provides it + updateId = `${updateId.substring(0, 8)}-${updateId.substring( + 8, + 12, + )}-${updateId.substring(12, 16)}-${updateId.substring( + 16, + 20, + )}-${updateId.substring(20)}`.toLowerCase(); const productNew = await em.findOne(Product, { code: productIdNew }); expect(productNew).toBeTruthy(); - expect(productNew.processId).toBe(currentProcessId.toString()); + expect(productNew.lastUpdateId).toBe(updateId); expect(productNew.source).toBe(ProductSource.FULL_LOAD); - expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start); + expect(productNew.lastUpdated.getTime()).toBeGreaterThanOrEqual(start); const ingredientsNew = await em.find(ProductIngredientsTag, { product: productNew, }); @@ -152,20 +144,12 @@ describe('importFromMongo', () => { ingredientsExisting.find((i) => i.value === 'new_ingredient'), ).toBeTruthy(); - // Check unchanged product has been "deleted" + // We have mocked the delete of other products so just check the other product + // does not have the same update id as those imported const foundOldProduct = await em.findOne(Product, { code: productIdUnchanged, }); - expect(foundOldProduct.obsolete).toBeNull(); - const ingredientsUnchanged = await em.find(ProductIngredientsTag, { - product: foundOldProduct, - }); - expect(ingredientsUnchanged[0].obsolete).toBeNull(); - - const foundLaterProduct = await em.findOne(Product, { - code: productIdLater, - }); - expect(foundLaterProduct.obsolete).toBe(false); + expect(foundOldProduct.lastUpdateId).not.toBe(updateId); const loadedTags = await app.get(TagService).getLoadedTags(); expect(loadedTags).toHaveLength( @@ -202,13 +186,13 @@ describe('importFromMongo', () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Product with data that matches MongoDB const em = app.get(EntityManager); - const lastProcessed = new Date(2023, 1, 1); + const lastUpdated = new Date(2023, 1, 1); const { products, productIdExisting } = testProducts(); em.create(Product, { code: productIdExisting, source: ProductSource.EVENT, - lastProcessed: lastProcessed, - lastUpdated: new Date(lastUpdated * 1000), + lastUpdated: lastUpdated, + lastModified: new Date(lastModified * 1000), }); await em.flush(); const importService = app.get(ImportService); @@ -223,13 +207,13 @@ describe('importFromMongo', () => { }); expect(productExisting).toBeTruthy(); expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.lastProcessed).toStrictEqual(lastProcessed); + expect(productExisting.lastUpdated).toStrictEqual(lastUpdated); }); }); it('should start importing from the last import', async () => { await createTestingModule([DomainModule], async (app) => { - // GIVEN: lastUpdated setting already set + // GIVEN: lastModified setting already set const settings = app.get(SettingsService); const startFrom = new Date(2023, 1, 1); await settings.setLastModified(startFrom); @@ -243,12 +227,12 @@ describe('importFromMongo', () => { // THEN: Mongo find is called with the setting as a parameter expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts - expect(findCalls[0][0].last_updated_t.$gt).toBe( + expect(findCalls[0][0].last_modified_t.$gt).toBe( Math.floor(startFrom.getTime() / 1000), ); expect(await settings.getLastModified()).toStrictEqual( - new Date(lastUpdated * 1000), + new Date(lastModified * 1000), ); }); }); @@ -261,7 +245,7 @@ describe('importFromMongo', () => { { // This one will be new code: productIdNew, - last_updated_t: 1692032161, + last_modified_t: 1692032161, ingredients_tags: ['test \u0000 test2 \u0000 end'], }, ]); @@ -279,7 +263,7 @@ describe('importFromMongo', () => { }); }); - it('should set last_updated correctly if one product has an invalid date', async () => { + it('should set last_modified correctly if one product has an invalid date', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: products with invalid date const settings = app.get(SettingsService); @@ -288,7 +272,7 @@ describe('importFromMongo', () => { const { products } = testProducts(); const testData = [ products[0], - { ...products[1], last_updated_t: 'invalid' }, + { ...products[1], last_modified_t: 'invalid' }, ]; const importService = app.get(ImportService); @@ -298,7 +282,7 @@ describe('importFromMongo', () => { // THEN: The last modified date is set correctly expect(await settings.getLastModified()).toStrictEqual( - new Date(lastUpdated * 1000), + new Date(lastModified * 1000), ); }); }); @@ -347,40 +331,6 @@ describe('importFromMongo', () => { expect(ingredientsNew[0].ingredientText).toBe('test'); }); }); - - it('import from redis should always update product', async () => { - await createTestingModule([DomainModule], async (app) => { - // GIVEN: Product with data that matches MongoDB - const em = app.get(EntityManager); - const lastProcessed = new Date(2023, 1, 1); - const { products, productIdExisting } = testProducts(); - em.create(Product, { - code: productIdExisting, - source: ProductSource.INCREMENTAL_LOAD, - processId: 10n, - lastProcessed: lastProcessed, - lastUpdated: new Date(lastUpdated * 1000), - }); - await em.flush(); - const importService = app.get(ImportService); - - // WHEN: Doing an event import - mockMongoDB(products); - await importService.importWithFilter( - { code: { $in: [productIdExisting] } }, - ProductSource.EVENT, - ); - - // THEN: Source is updated - const productExisting = await em.findOne(Product, { - code: productIdExisting, - }); - expect(productExisting).toBeTruthy(); - expect(productExisting.source).toBe(ProductSource.EVENT); - expect(productExisting.processId).not.toBe(10n.toString()); - expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed); - }); - }); }); describe('scheduledImportFromMongo', () => { @@ -457,9 +407,8 @@ describe('importWithFilter', () => { const productToDelete = em.create(Product, { code: productIdToDelete, source: ProductSource.FULL_LOAD, - processId: 10n, - lastProcessed: new Date(2023, 1, 1), - lastUpdated: new Date(lastUpdated * 1000), + lastUpdated: new Date(2023, 1, 1), + lastModified: new Date(lastModified * 1000), }); em.create(ProductIngredientsTag, { product: productToDelete, @@ -483,8 +432,8 @@ describe('importWithFilter', () => { const updatedProduct = await em.findOne(Product, { code: productIdExisting, }); - expect(deletedProduct.processId).toBe(updatedProduct.processId); - expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual( + expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId); + expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual( beforeImport, ); expect(deletedProduct.source).toBe(ProductSource.EVENT); @@ -497,14 +446,3 @@ describe('importWithFilter', () => { }); }); }); - -describe('getProcessId', () => { - it('should return monotonically increasing numbers', async () => { - await createTestingModule([DomainModule], async (app) => { - const importService = app.get(ImportService); - const transactionId = await importService.getProcessId(); - - expect(await importService.getProcessId()).toBeGreaterThan(transactionId); - }); - }); -}); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 9b10495..c78fb55 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -9,7 +9,7 @@ import { ProductSource } from '../enums/product-source'; import equal from 'fast-deep-equal'; import { SettingsService } from './settings.service'; import sql from '../../db'; -import { Fragment, Helper, Parameter, ReservedSql } from 'postgres'; +import { ReservedSql } from 'postgres'; import { SerializableParameter } from 'postgres'; @Injectable() @@ -49,7 +49,7 @@ export class ImportService { const filter = {}; if (from) { const fromTime = Math.floor(new Date(from).getTime() / 1000); - filter['last_updated_t'] = { $gt: fromTime }; + filter['last_modified_t'] = { $gt: fromTime }; this.logger.debug(`Starting import from ${from}`); } @@ -62,18 +62,12 @@ export class ImportService { } } - async getProcessId() { - return BigInt( - (await sql`SELECT pg_current_xact_id() transaction_id`)[0].transaction_id, - ); - } - async importWithFilter(filter: any, source: ProductSource, skip?: number) { - let latestUpdated = 0; + let latestModified = 0; // The update id is unique to this run and is used later to run other // queries that should only affect products loaded in this import - const processId = await this.getProcessId(); + const updateId = Ulid.generate().toRaw(); this.logger.debug('Connecting to MongoDB'); const client = new MongoClient(process.env.MONGO_URI); @@ -110,7 +104,7 @@ export class ImportService { // Now using postgres to help with transactions const connection = await sql.reserve(); - await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_updated timestamptz, data jsonb)`; + await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_modified timestamptz, data jsonb)`; // let sql: string; // const vars = []; for (const collection of Object.values(collections)) { @@ -134,29 +128,25 @@ export class ImportService { // Find the product if it exists let results = - await connection`select id, last_updated from product where code = ${data.code}`; + await connection`select id, last_modified from product where code = ${data.code}`; if (!results.length) { results = await connection`insert into product (code) values (${data.code}) returning id`; } const id = results[0].id; - const previousLastUpdated = results[0].last_updated; + const previousLastModified = results[0].last_modified; - let lastUpdated = new Date(data.last_updated_t * 1000); - if (isNaN(+lastUpdated)) { - // Fall back to last_modified_t if last_updated_t is not available - lastUpdated = new Date(data.last_modified_t * 1000); - } - if (isNaN(+lastUpdated)) { + let lastModified = new Date(data.last_modified_t * 1000); + if (isNaN(+lastModified)) { this.logger.warn( - `Product: ${data.code}. Invalid last_updated_t: ${data.last_updated_t}, or last_modified_t: ${data.last_modified_t}.`, + `Product: ${data.code}. Invalid last_modified_t: ${data.last_modified_t}.`, ); - lastUpdated = null; + lastModified = null; } - // Skip product if nothing has changed and doing an incremental load + // Skip product if nothing has changed and not doing a full load if ( - source === ProductSource.INCREMENTAL_LOAD && - lastUpdated?.getTime() === previousLastUpdated?.getTime() + source !== ProductSource.FULL_LOAD && + lastModified?.getTime() === previousLastModified?.getTime() ) continue; @@ -180,23 +170,18 @@ export class ImportService { } results = - await connection`insert into product_temp (id, last_updated, data) values (${id}, ${lastUpdated}, ${ + await connection`insert into product_temp (id, last_modified, data) values (${id}, ${lastModified}, ${ data as unknown as SerializableParameter }) ON CONFLICT DO NOTHING`; - latestUpdated = Math.max(latestUpdated, lastUpdated?.getTime() ?? 0); + latestModified = Math.max(latestModified, lastModified?.getTime() ?? 0); if (!(i % this.importBatchSize)) { - await this.applyProductChange( - connection, - obsolete, - source, - processId, - ); + await this.applyProductChange(connection, obsolete, source, updateId); await connection`begin`; } } - await this.applyProductChange(connection, obsolete, source, processId); + await this.applyProductChange(connection, obsolete, source, updateId); await cursor.close(); collection.count = i; } @@ -211,15 +196,15 @@ export class ImportService { if (missingProducts.length) { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - process_id = ${processId.toString()}, - last_processed = ${new Date()}, + last_update_id = ${updateId}, + last_updated = ${new Date()}, source = ${source} WHERE code IN ${sql(missingProducts)} RETURNING id`; await this.deleteProductTags( connection, - sql(deletedProducts.map((p) => p.id)), + deletedProducts.map((p) => p.id), ); deleteLog = `. Deleted ${deletedProducts.count}`; } @@ -230,7 +215,7 @@ export class ImportService { await this.tagService.addLoadedTags( Object.keys(ProductTagMap.MAPPED_TAGS), ); - await this.deleteOtherProducts(connection, processId); + await this.deleteOtherProducts(connection, updateId); } await connection`DROP TABLE product_temp`; @@ -240,14 +225,14 @@ export class ImportService { `Imported ${collections.normal.count} Products and ${collections.obsolete.count} Obsolete Products from ${source}${deleteLog}`, ); - return latestUpdated; + return latestModified; } async applyProductChange( connection: ReservedSql, obsolete: boolean, source: string, - processId: bigint, + updateId: string, ) { // Analyze table for best query performance await connection`ANALYZE product_temp`; @@ -261,9 +246,9 @@ export class ImportService { obsolete = ${obsolete}, ingredients_count = (tp.data->>'ingredients_n')::numeric, ingredients_without_ciqual_codes_count = (tp.data->>'ingredients_without_ciqual_codes_n')::numeric, - last_updated = tp.last_updated, - process_id = ${processId.toString()}, - last_processed = ${new Date()}, + last_modified = tp.last_modified, + last_update_id = ${updateId}, + last_updated = ${new Date()}, source = ${source}, revision = (tp.data->>'rev')::int FROM product_temp tp @@ -372,37 +357,33 @@ export class ImportService { await connection`commit`; } - async deleteOtherProducts(connection: ReservedSql, processId: bigint) { - // Note use < here to avoid deleting products from events that were received since a full - // import was started. - const filter = connection`process_id < ${processId.toString()} OR process_id IS NULL`; - - // Need to use a fragment here rather than a list of ids as could be a long list - await this.deleteProductTags( - connection, - sql`(SELECT id FROM product WHERE ${filter})`, - ); - + async deleteOtherProducts(connection: ReservedSql, updateId: string) { const deletedProducts = await connection`UPDATE product SET obsolete = NULL, - process_id = ${processId.toString()}, - last_processed = ${new Date()}, + last_update_id = ${updateId}, + last_updated = ${new Date()}, source = ${ProductSource.FULL_LOAD} - WHERE ${filter}`; + WHERE last_update_id != ${updateId} OR last_update_id IS NULL + RETURNING id`; this.logger.debug(`${deletedProducts.count} Products deleted`); + + await this.deleteProductTags( + connection, + deletedProducts.map((p) => p.id), + ); } private async deleteProductTags( connection: ReservedSql, - deletedProductIds: Helper | Fragment, + deletedProductIds: any[], ) { for (const entity of Object.values(ProductTagMap.MAPPED_TAGS)) { const tableName = this.em.getMetadata(entity).tableName; await connection`UPDATE ${sql(tableName)} SET obsolete = NULL - where product_id in ${deletedProductIds}`; + where product_id in ${sql(deletedProductIds)}`; } await connection`UPDATE product_ingredient SET obsolete = NULL - where product_id in ${deletedProductIds}`; + where product_id in ${sql(deletedProductIds)}`; } // Make sure to pause redis before calling this diff --git a/src/domain/services/query.service.spec.ts b/src/domain/services/query.service.spec.ts index f6c8b18..d663c69 100644 --- a/src/domain/services/query.service.spec.ts +++ b/src/domain/services/query.service.spec.ts @@ -24,11 +24,11 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); em.create(ProductIngredientsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); await em.flush(); @@ -46,10 +46,7 @@ describe('count', () => { // Create some dummy products with a specific tag const tagValue = randomCode(); const notTagValue = randomCode(); - const productWithNotTag = em.create(Product, { - code: randomCode(), - processId: 100n, - }); + const productWithNotTag = em.create(Product, { code: randomCode() }); em.create(ProductBrandsTag, { product: productWithNotTag, value: tagValue, @@ -60,7 +57,7 @@ describe('count', () => { }); em.create(ProductBrandsTag, { - product: em.create(Product, { code: randomCode(), processId: 100n }), + product: em.create(Product, { code: randomCode() }), value: tagValue, }); await em.flush(); @@ -452,22 +449,16 @@ async function createTestTags(app) { const creatorValue = randomCode(); // Create some dummy products with a specific tag - const product1 = em.create(Product, { code: randomCode(), processId: 100n }); + const product1 = em.create(Product, { code: randomCode() }); const product2 = em.create(Product, { code: randomCode(), creator: creatorValue, - processId: 100n, }); const product3 = em.create(Product, { code: randomCode(), creator: creatorValue, - processId: 100n, - }); - const product4 = em.create(Product, { - code: randomCode(), - processId: 100n, - obsolete: true, }); + const product4 = em.create(Product, { code: randomCode(), obsolete: true }); // Matrix for testing // Product | Origin | AminoAcid | AminoAcid2 | Neucleotide | Obsolete | Creator diff --git a/src/domain/services/settings.service.ts b/src/domain/services/settings.service.ts index 0a1b1fa..6a24e83 100644 --- a/src/domain/services/settings.service.ts +++ b/src/domain/services/settings.service.ts @@ -11,11 +11,11 @@ export class SettingsService { } async getLastModified() { - return (await sql`SELECT last_updated FROM settings`)[0].last_updated; + return (await sql`SELECT last_modified FROM settings`)[0].last_modified; } - async setLastModified(lastUpdated: Date) { - await this.updateSetting({ last_updated: lastUpdated }); + async setLastModified(lastModified: Date) { + await this.updateSetting({ last_modified: lastModified }); } async getLastMessageId() { diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index caf76c7..67a63df 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -61,8 +61,8 @@ "nullable": true, "mappedType": "text" }, - "last_updated": { - "name": "last_updated", + "last_modified": { + "name": "last_modified", "type": "timestamptz", "unsigned": false, "autoincrement": false, @@ -117,17 +117,17 @@ "default": "false", "mappedType": "boolean" }, - "process_id": { - "name": "process_id", - "type": "xid8", + "last_update_id": { + "name": "last_update_id", + "type": "uuid", "unsigned": false, "autoincrement": false, "primary": false, "nullable": true, - "mappedType": "unknown" + "mappedType": "uuid" }, - "last_processed": { - "name": "last_processed", + "last_updated": { + "name": "last_updated", "type": "timestamptz", "unsigned": false, "autoincrement": false, @@ -187,10 +187,10 @@ }, { "columnNames": [ - "process_id" + "last_update_id" ], "composite": false, - "keyName": "product_process_id_index", + "keyName": "product_last_update_id_index", "primary": false, "unique": false }, diff --git a/src/migrations/Migration20241029165410.ts b/src/migrations/Migration20241029165410.ts deleted file mode 100644 index abb9433..0000000 --- a/src/migrations/Migration20241029165410.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Migration } from '@mikro-orm/migrations'; - -export class Migration20241029165410 extends Migration { - - async up(): Promise { - this.addSql('alter table "query"."product" rename column "last_updated" to "last_processed";'); - this.addSql('alter table "query"."product" rename column "last_modified" to "last_updated";'); - this.addSql('alter table "query"."settings" rename column "last_modified" to "last_updated";'); - this.addSql('alter table "query"."product" add column "process_id" xid8 null;'); - this.addSql('create index "product_process_id_index" on "query"."product" ("process_id");'); - this.addSql('drop index "query"."product_last_update_id_index";'); - this.addSql('alter table "query"."product" drop column "last_update_id";'); - } - - async down(): Promise { - this.addSql('alter table "query"."product" add column "last_update_id" uuid null;'); - this.addSql('create index "product_last_update_id_index" on "query"."product" ("last_update_id");'); - this.addSql('drop index "query"."product_process_id_index";'); - this.addSql('alter table "query"."product" drop column "process_id";'); - this.addSql('alter table "query"."settings" rename column "last_updated" to "last_modified";'); - this.addSql('alter table "query"."product" rename column "last_updated" to "last_modified";'); - this.addSql('alter table "query"."product" rename column "last_processed" to "last_updated";'); - } -} diff --git a/tsconfig.json b/tsconfig.json index 3e24617..fd76987 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,7 @@ "experimentalDecorators": true, "esModuleInterop": true, "allowSyntheticDefaultImports": true, - "target": "es2020", + "target": "es2017", "sourceMap": true, "outDir": "./dist", "baseUrl": "./",