Skip to content

Commit

Permalink
Merge pull request #108 from openfoodfacts/103-always-update-product-…
Browse files Browse the repository at this point in the history
…for-redis-events

fix: Re-implement changes from PR #104
  • Loading branch information
john-gom authored Oct 31, 2024
2 parents f43b9c8 + 8e55993 commit 963a4ae
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 103 deletions.
13 changes: 8 additions & 5 deletions src/domain/entities/product.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class Product {
code?: string;

@Property({ columnType: 'timestamptz' })
lastModified?: Date;
lastUpdated?: Date;

@Property({ index: true })
creator?: string;
Expand All @@ -39,11 +39,13 @@ export class Product {
@Property()
obsolete? = false;

@Property({ type: 'uuid', index: true })
lastUpdateId?: string;
// Note need to switch to xid8 when we upgrade PostgreSQL
@Property({ columnType: 'bigint', index: true })
processId?: bigint;

// This is the last time off-query received the data
@Property({ columnType: 'timestamptz' })
lastUpdated?: Date;
lastProcessed?: Date;

@Property()
source?: ProductSource;
Expand All @@ -57,7 +59,8 @@ export const MAPPED_FIELDS = [
'product_name',
'creator',
'owners_tags',
'last_modified_t',
'last_modified_t', // Note we actually use last_updated_t for checks but not all products may have this
'last_updated_t',
'ingredients_n',
'ingredients_without_ciqual_codes_n',
'ingredients',
Expand Down
133 changes: 97 additions & 36 deletions src/domain/services/import.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ProductSource } from '../enums/product-source';
import { SettingsService } from './settings.service';
import { ProductIngredient } from '../entities/product-ingredient';

const lastModified = 1692032161;
const lastUpdated = 1692032161;

function testProducts() {
const productIdNew = randomCode();
Expand All @@ -20,14 +20,14 @@ function testProducts() {
{
// This one will be new
code: productIdNew,
last_modified_t: lastModified,
last_updated_t: lastUpdated,
ingredients_tags: ['test'],
rev: 1,
},
{
// This one will already exist
code: productIdExisting,
last_modified_t: lastModified,
last_updated_t: lastUpdated,
ingredients_tags: ['new_ingredient', 'old_ingredient'],
},
];
Expand Down Expand Up @@ -83,23 +83,40 @@ describe('importFromMongo', () => {
// app.useLogger(new Logger());

const importService = app.get(ImportService);
const deleteMock = (importService.deleteOtherProducts = jest.fn());
// Mock the process id so it doesn't delete records from other tests
let currentProcessId = 0n;
importService.getProcessId = jest
.fn()
.mockImplementation(() => ++currentProcessId);

// GIVEN: Two existing products, one of which is in Mongo plus one new one in Mongo
const em = app.get(EntityManager);
const { products, productIdExisting, productIdNew } = testProducts();
const productExisting = em.create(Product, { code: productIdExisting });
const productExisting = em.create(Product, {
code: productIdExisting,
processId: 0n,
});
em.create(ProductIngredientsTag, {
product: productExisting,
value: 'old_ingredient',
});

const productIdUnchanged = randomCode();
const productUnchanged = em.create(Product, { code: productIdUnchanged });
const productUnchanged = em.create(Product, {
code: productIdUnchanged,
processId: 0n,
});
em.create(ProductIngredientsTag, {
product: productUnchanged,
value: 'unchanged_ingredient',
});

const productIdLater = randomCode();
em.create(Product, {
code: productIdLater,
processId: 100n, // Simulate a product that was added after the full load started
});

// Delete a tag to prove it is re-created
await em.nativeDelete(LoadedTag, { id: 'teams_tags' });
await em.flush();
Expand All @@ -111,21 +128,11 @@ describe('importFromMongo', () => {
await importService.importFromMongo();

// THEN: New product is added, updated product is updated and other product is unchanged
expect(deleteMock).toHaveBeenCalledTimes(1);
let updateId = deleteMock.mock.calls[0][1];
// Re-format updateId the way Postgres provides it
updateId = `${updateId.substring(0, 8)}-${updateId.substring(
8,
12,
)}-${updateId.substring(12, 16)}-${updateId.substring(
16,
20,
)}-${updateId.substring(20)}`.toLowerCase();
const productNew = await em.findOne(Product, { code: productIdNew });
expect(productNew).toBeTruthy();
expect(productNew.lastUpdateId).toBe(updateId);
expect(productNew.processId).toBe(currentProcessId.toString());
expect(productNew.source).toBe(ProductSource.FULL_LOAD);
expect(productNew.lastUpdated.getTime()).toBeGreaterThanOrEqual(start);
expect(productNew.lastProcessed.getTime()).toBeGreaterThanOrEqual(start);
const ingredientsNew = await em.find(ProductIngredientsTag, {
product: productNew,
});
Expand All @@ -144,12 +151,20 @@ describe('importFromMongo', () => {
ingredientsExisting.find((i) => i.value === 'new_ingredient'),
).toBeTruthy();

// We have mocked the delete of other products so just check the other product
// does not have the same update id as those imported
// Check unchanged product has been "deleted"
const foundOldProduct = await em.findOne(Product, {
code: productIdUnchanged,
});
expect(foundOldProduct.lastUpdateId).not.toBe(updateId);
expect(foundOldProduct.obsolete).toBeNull();
const ingredientsUnchanged = await em.find(ProductIngredientsTag, {
product: foundOldProduct,
});
expect(ingredientsUnchanged[0].obsolete).toBeNull();

const foundLaterProduct = await em.findOne(Product, {
code: productIdLater,
});
expect(foundLaterProduct.obsolete).toBe(false);

const loadedTags = await app.get(TagService).getLoadedTags();
expect(loadedTags).toHaveLength(
Expand Down Expand Up @@ -186,13 +201,13 @@ describe('importFromMongo', () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: Product with data that matches MongoDB
const em = app.get(EntityManager);
const lastUpdated = new Date(2023, 1, 1);
const lastProcessed = new Date(2023, 1, 1);
const { products, productIdExisting } = testProducts();
em.create(Product, {
code: productIdExisting,
source: ProductSource.EVENT,
lastUpdated: lastUpdated,
lastModified: new Date(lastModified * 1000),
lastProcessed: lastProcessed,
lastUpdated: new Date(lastUpdated * 1000),
});
await em.flush();
const importService = app.get(ImportService);
Expand All @@ -207,13 +222,13 @@ describe('importFromMongo', () => {
});
expect(productExisting).toBeTruthy();
expect(productExisting.source).toBe(ProductSource.EVENT);
expect(productExisting.lastUpdated).toStrictEqual(lastUpdated);
expect(productExisting.lastProcessed).toStrictEqual(lastProcessed);
});
});

it('should start importing from the last import', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: lastModified setting already set
// GIVEN: lastUpdated setting already set
const settings = app.get(SettingsService);
const startFrom = new Date(2023, 1, 1);
await settings.setLastModified(startFrom);
Expand All @@ -227,12 +242,12 @@ describe('importFromMongo', () => {

// THEN: Mongo find is called with the setting as a parameter
expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts
expect(findCalls[0][0].last_modified_t.$gt).toBe(
expect(findCalls[0][0].last_updated_t.$gt).toBe(
Math.floor(startFrom.getTime() / 1000),
);

expect(await settings.getLastModified()).toStrictEqual(
new Date(lastModified * 1000),
new Date(lastUpdated * 1000),
);
});
});
Expand All @@ -245,7 +260,7 @@ describe('importFromMongo', () => {
{
// This one will be new
code: productIdNew,
last_modified_t: 1692032161,
last_updated_t: 1692032161,
ingredients_tags: ['test \u0000 test2 \u0000 end'],
},
]);
Expand All @@ -263,7 +278,7 @@ describe('importFromMongo', () => {
});
});

it('should set last_modified correctly if one product has an invalid date', async () => {
it('should set last_updated correctly if one product has an invalid date', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: products with invalid date
const settings = app.get(SettingsService);
Expand All @@ -272,7 +287,7 @@ describe('importFromMongo', () => {
const { products } = testProducts();
const testData = [
products[0],
{ ...products[1], last_modified_t: 'invalid' },
{ ...products[1], last_updated_t: 'invalid' },
];
const importService = app.get(ImportService);

Expand All @@ -282,7 +297,7 @@ describe('importFromMongo', () => {

// THEN: The last modified date is set correctly
expect(await settings.getLastModified()).toStrictEqual(
new Date(lastModified * 1000),
new Date(lastUpdated * 1000),
);
});
});
Expand Down Expand Up @@ -331,6 +346,40 @@ describe('importFromMongo', () => {
expect(ingredientsNew[0].ingredientText).toBe('test');
});
});

it('import from redis should always update product', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: Product with data that matches MongoDB
const em = app.get(EntityManager);
const lastProcessed = new Date(2023, 1, 1);
const { products, productIdExisting } = testProducts();
em.create(Product, {
code: productIdExisting,
source: ProductSource.INCREMENTAL_LOAD,
processId: 10n,
lastProcessed: lastProcessed,
lastUpdated: new Date(lastUpdated * 1000),
});
await em.flush();
const importService = app.get(ImportService);

// WHEN: Doing an event import
mockMongoDB(products);
await importService.importWithFilter(
{ code: { $in: [productIdExisting] } },
ProductSource.EVENT,
);

// THEN: Source is updated
const productExisting = await em.findOne(Product, {
code: productIdExisting,
});
expect(productExisting).toBeTruthy();
expect(productExisting.source).toBe(ProductSource.EVENT);
expect(productExisting.processId).not.toBe(10n.toString());
expect(productExisting.lastProcessed).not.toStrictEqual(lastProcessed);
});
});
});

describe('scheduledImportFromMongo', () => {
Expand Down Expand Up @@ -407,8 +456,9 @@ describe('importWithFilter', () => {
const productToDelete = em.create(Product, {
code: productIdToDelete,
source: ProductSource.FULL_LOAD,
lastUpdated: new Date(2023, 1, 1),
lastModified: new Date(lastModified * 1000),
processId: 10n,
lastProcessed: new Date(2023, 1, 1),
lastUpdated: new Date(lastUpdated * 1000),
});
em.create(ProductIngredientsTag, {
product: productToDelete,
Expand All @@ -432,8 +482,8 @@ describe('importWithFilter', () => {
const updatedProduct = await em.findOne(Product, {
code: productIdExisting,
});
expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId);
expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual(
expect(deletedProduct.processId).toBe(updatedProduct.processId);
expect(deletedProduct.lastProcessed.getTime()).toBeGreaterThanOrEqual(
beforeImport,
);
expect(deletedProduct.source).toBe(ProductSource.EVENT);
Expand All @@ -446,3 +496,14 @@ describe('importWithFilter', () => {
});
});
});

describe('getProcessId', () => {
it('should return monotonically increasing numbers', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);
const transactionId = await importService.getProcessId();

expect(await importService.getProcessId()).toBeGreaterThan(transactionId);
});
});
});
Loading

0 comments on commit 963a4ae

Please sign in to comment.