diff --git a/src/lib/kafkaConsumerUtil.ts b/src/lib/kafkaConsumerUtil.ts index b928b4e..1ddda47 100644 --- a/src/lib/kafkaConsumerUtil.ts +++ b/src/lib/kafkaConsumerUtil.ts @@ -36,21 +36,26 @@ export class KafkaConsumerUtil { eachBatchAutoResolve: false, eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => { const { topic, partition } = batch; - + for (const message of batch.messages) { if (!isRunning() || isStale()) return; - + logger.info({ topic, partition, offset: message.offset, value: message.value?.toString(), }); - - await eachMessageCallback(topic, partition, message) - - resolveOffset(message.offset); - await heartbeat(); + + try { + await eachMessageCallback(topic, partition, message); + resolveOffset(message.offset); + await heartbeat(); + } catch (error) { + logger.error(`Error processing message ${message.offset}: ${error}`); + // Do not resolve offset in case of error, so message will be retried + // Optionally: Handle retries or backoff strategy here + } } }, }); diff --git a/src/routes/lab-bw.ts b/src/routes/lab-bw.ts index c2761d1..b6b7f68 100644 --- a/src/routes/lab-bw.ts +++ b/src/routes/lab-bw.ts @@ -41,7 +41,7 @@ router.all('/', async (req: Request, res: Response) => { orderBundle.entry && resultBundle.entry.length == orderBundle.entry.length ) { - WorkflowHandler.handleBwLabOrder(orderBundle, resultBundle) + WorkflowHandler.handleLabOrder(orderBundle, resultBundle) return res.status(200).json(resultBundle) } else { return res.status(400).send(resultBundle) diff --git a/src/workflows/botswana/helpers.ts b/src/workflows/botswana/helpers.ts index 3f31955..03034d5 100644 --- a/src/workflows/botswana/helpers.ts +++ b/src/workflows/botswana/helpers.ts @@ -1,5 +1,7 @@ import { R4 } from "@ahryman40k/ts-fhir-types" import logger from "../../lib/winston" +import got, { HTTPError, OptionsOfTextResponseBody, RequestError } from "got" + @@ -44,7 +46,7 @@ export function setTaskStatus(labBundle: R4.IBundle, status: R4.TaskStatusKind): export function getBundleEntry( - entries: IBundle_Entry[], + entries: R4.IBundle_Entry[], type: string, id?: string, ): R4.IResource | undefined { @@ -58,7 +60,7 @@ export function getBundleEntry( } export function getBundleEntries( - entries: IBundle_Entry[], + entries: R4.IBundle_Entry[], type: string, id?: string, ): (R4.IResource | undefined)[] { @@ -71,4 +73,39 @@ export function getBundleEntries( .map(entry => { return entry.resource }) +} + + + +// Wrapper function that includes retry logic +export async function postWithRetry(crUrl: string, options: OptionsOfTextResponseBody, retryLimit: number = 5, timeout: number = 1000) { + + for (let attempt = 1; attempt <= retryLimit; attempt++) { + try { + const response = await got.post(crUrl, options).json(); + return response; // If request is successful, return the response + } catch (error) { + logger.error(`Attempt ${attempt} failed`); + + // Sleep for a given amount of time + await new Promise(resolve => setTimeout(resolve, timeout)); + + if (error instanceof HTTPError) { + // Handle HTTP errors (4xx and 5xx response codes) + console.error(`HTTP Error: ${error.response.statusCode}`); + } else if (error instanceof RequestError) { + // Handle network errors or other request issues + console.error(`Request Error: ${error.message}`); + } else { + // Handle any other errors that might occur + console.error(`Unknown Error: ${error}`); + } + + // If we are on the last attempt, re-throw the error + if (attempt === retryLimit) { + console.error('All retries failed'); + throw error; + } + } + } } \ No newline at end of file diff --git a/src/workflows/botswana/locationWorkflows.ts b/src/workflows/botswana/locationWorkflows.ts index b8d5de6..e2553cb 100644 --- a/src/workflows/botswana/locationWorkflows.ts +++ b/src/workflows/botswana/locationWorkflows.ts @@ -1,3 +1,4 @@ +import logger from "../../lib/winston" /** * @@ -7,11 +8,8 @@ export async function mapLocations(labBundle: R4.IBundle): Promise { logger.info('Mapping Locations!') - labBundle = await WorkflowHandler.addBwLocations(labBundle) - const response: R4.IBundle = await saveBundle(labBundle) + return await addBwLocations(labBundle) - await this.sendPayload({ bundle: labBundle }, topicList.SAVE_PIMS_PATIENT) - await this.sendPayload({ bundle: labBundle }, topicList.SEND_ADT_TO_IPMS) logger.debug(`Response: ${JSON.stringify(response)}`) return response diff --git a/src/workflows/botswana/patientIdentityWorkflows.ts b/src/workflows/botswana/patientIdentityWorkflows.ts index 197509a..a167c1f 100644 --- a/src/workflows/botswana/patientIdentityWorkflows.ts +++ b/src/workflows/botswana/patientIdentityWorkflows.ts @@ -1,3 +1,7 @@ +import { R4 } from "@ahryman40k/ts-fhir-types" +import config from "../../lib/config" +import { postWithRetry } from "./helpers" +import logger from "../../lib/winston" /** * updateCrPatient @@ -6,7 +10,7 @@ */ export async function updateCrPatient(bundle: R4.IBundle): Promise { const crUrl = `${config.get('clientRegistryUrl')}/Patient` - let pat: IPatient + let pat: R4.IPatient const patResult = bundle.entry!.find(entry => { return entry.resource && entry.resource.resourceType == 'Patient' @@ -24,7 +28,7 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise { options.json = pat } - const crResult = await got.post(`${crUrl}`, options).json() + const crResult = await postWithRetry(crUrl, options, config.get('bwConfig:retryCount'), config.get('bwConfig:retryDelay')) logger.debug(`CR Patient Update Result: ${JSON.stringify(crResult)}`) @@ -38,7 +42,7 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise { * @returns */ export async function savePimsPatient(labBundle: R4.IBundle): Promise { - const resultBundle = this.updateCrPatient(labBundle) + const resultBundle = updateCrPatient(labBundle) return resultBundle } @@ -50,10 +54,7 @@ export async function savePimsPatient(labBundle: R4.IBundle): Promise { // Save to CR - const resultBundle = this.updateCrPatient(registrationBundle) - - // Handle order entry - this.handleAdtFromIpms(registrationBundle) + const resultBundle = updateCrPatient(registrationBundle) return resultBundle } diff --git a/src/workflows/botswana/terminologyWorkflows.ts b/src/workflows/botswana/terminologyWorkflows.ts index 0fd721e..44398f6 100644 --- a/src/workflows/botswana/terminologyWorkflows.ts +++ b/src/workflows/botswana/terminologyWorkflows.ts @@ -1,3 +1,6 @@ +import { R4 } from "@ahryman40k/ts-fhir-types" +import logger from "../../lib/winston" + /** * * @param labBundle @@ -6,20 +9,11 @@ export async function mapConcepts(labBundle: R4.IBundle): Promise { logger.info('Mapping Concepts!') - labBundle = await WorkflowHandler.addBwCodings(labBundle) - - const response: R4.IBundle = await saveBundle(labBundle) - - await this.sendPayload({ bundle: labBundle }, topicList.MAP_LOCATIONS) - - return response + return await addAllCodings(labBundle) } -// Add coding mappings info to bundle - - - +// Add terminology mappings info to Bundle async function addAllCodings(labBundle: ILaboratoryBundle): Promise { try { for (const e of labBundle.entry!) { diff --git a/src/workflows/botswana/workflowHandler.ts b/src/workflows/botswana/workflowHandler.ts index 3b409f4..fa8b2e3 100644 --- a/src/workflows/botswana/workflowHandler.ts +++ b/src/workflows/botswana/workflowHandler.ts @@ -8,6 +8,10 @@ import { KafkaProducerUtil } from '../../lib/kafkaProducerUtil' import logger from '../../lib/winston' import { KafkaConfig, ProducerRecord } from 'kafkajs' import { logLevel } from 'kafkajs'; +import { handleAdtFromIpms, handleOruFromIpms, sendAdtToIpms, sendOrmToIpms } from './IpmsWorkflows' +import { mapConcepts } from './terminologyWorkflows' +import { mapLocations } from './locationWorkflows' +import { saveIpmsPatient, updateCrPatient } from './patientIdentityWorkflows' // eslint-disable-next-line @typescript-eslint/no-var-requires const hl7 = require('hl7') @@ -31,31 +35,7 @@ export const topicList = { HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms', } -export class WorkflowHandler { - private static kafka = new KafkaProducerUtil(producerConfig, (report) => { - logger.info('Delivery report:', report); - }); - - // Static instance of the Kafka producer. - private static kafkaProducerInitialized = false; - - // Initialize Kafka producer when the class is first used. - public static async initKafkaProducer() { - if (!this.kafkaProducerInitialized) { - await this.kafka.init(); - this.kafkaProducerInitialized = true; - } - } - - // Shutdown Kafka producer when the application terminates. - public static async shutdownKafkaProducer() { - if (this.kafkaProducerInitialized) { - await this.kafka.shutdown(); - this.kafkaProducerInitialized = false; - } - } - - /** + /** * * To handle a lab order from the PIMS system (https://www.postman.com/itechuw/workspace/botswana-hie/collection/1525496-db80feab-8a77-42c8-aa7e-fd4beb0ae6a8) * @@ -75,43 +55,84 @@ export class WorkflowHandler { * 10. Translate to ORM message (postman request) * 11. Send ORM HL7 message to IPMS and get back ACK (Request Interceptor Needed - mllp interceptor?) * 12. Set Task status --> received / accepted / rejected (postman test) - * - * @param orderBundle - * @param resultBundle + * + * + * + * Ensuring Data Integrity and Consistency + * + * For the following key actions, all of the outlined indicators of success must be met; otherwise, the + * incoming package needs to be marked as "uncommited" and retried later. This is necessary for + * both incoming Bundles and HL7 messages. Basically, if the external client has connectivity to + * the HIE, and if the SHR is running and recieves the package, then the workflow must at some point + * run and result in an ADT message being sent. If the workflow fails at any point, the package must + * be marked as "uncommited" and retried until success, or until a notification is sent out. + * + * 1. Incoming Lab Bundle + * We need to ensure that once a Lab Bundle comes in either from PIMS or BotswanaEMR, + * that eventually an ADT message is sent to IPMS to begin the IPMS side of the workflow. + * + * - Bundle is saved into SHR + * - Patient is saved into CR + * - Bundle is translated to ADT message + * - ADT message is sent to IPMS */ +export class WorkflowHandler { + private static kafka = new KafkaProducerUtil(producerConfig, (report) => { + logger.info('Delivery report:', report); + }); - static async handleBwLabOrder(orderBundle: R4.IBundle, resultBundle: R4.IBundle) { - try { - await this.sendPayload({ bundle: orderBundle, response: resultBundle }, topicList.MAP_CONCEPTS) - } catch (e) { - logger.error(e) + // Static instance of the Kafka producer. + private static kafkaProducerInitialized = false; + + // Initialize Kafka producer when the class is first used. + public static async initKafkaProducer() { + if (!this.kafkaProducerInitialized) { + await this.kafka.init(); + this.kafkaProducerInitialized = true; } } + // Shutdown Kafka producer when the application terminates. + public static async shutdownKafkaProducer() { + if (this.kafkaProducerInitialized) { + await this.kafka.shutdown(); + this.kafkaProducerInitialized = false; + } + } + + static async executeTopicWorkflow(topic: string, val: any) { let res + let enrichedBundle + let origBundle try { switch (topic) { case topicList.MAP_CONCEPTS: - res = await WorkflowHandler.mapConcepts(JSON.parse(val).bundle) + enrichedBundle = await mapConcepts(JSON.parse(val).bundle) + await this.sendPayload({ bundle: enrichedBundle }, topicList.MAP_LOCATIONS) break case topicList.MAP_LOCATIONS: - res = await WorkflowHandler.mapLocations(JSON.parse(val).bundle) + enrichedBundle = await mapLocations(JSON.parse(val).bundle) + await this.sendPayload({ bundle: enrichedBundle }, topicList.SAVE_PIMS_PATIENT) + await this.sendPayload({ bundle: enrichedBundle }, topicList.SEND_ADT_TO_IPMS) break case topicList.SAVE_PIMS_PATIENT: - res = await WorkflowHandler.updateCrPatient(JSON.parse(val).bundle) + res = await updateCrPatient(JSON.parse(val).bundle) break case topicList.SEND_ADT_TO_IPMS: - res = await WorkflowHandler.sendAdtToIpms(JSON.parse(val).bundle) + res = await sendAdtToIpms(JSON.parse(val).bundle) break case topicList.SAVE_IPMS_PATIENT: - res = await WorkflowHandler.saveIpmsPatient(JSON.parse(val).bundle) + origBundle = JSON.parse(val).bundle + saveIpmsPatient(origBundle) + handleAdtFromIpms(origBundle) + break case topicList.SEND_ORM_TO_IPMS: - res = await WorkflowHandler.sendOrmToIpms(JSON.parse(val)) + res = await sendOrmToIpms(JSON.parse(val)) break case topicList.HANDLE_ORU_FROM_IPMS: - res = await WorkflowHandler.handleOruFromIpms(JSON.parse(val).bundle) + res = await handleOruFromIpms(JSON.parse(val).bundle) break default: break @@ -125,6 +146,7 @@ export class WorkflowHandler { } + /** * Sends a payload to a Kafka topic. * @param payload - The payload to send. @@ -150,5 +172,14 @@ export class WorkflowHandler { console.error('Failed to send message:', err); } } + + // Entrypoint wrapper function for Lab Order Workflows + static async handleLabOrder(orderBundle: R4.IBundle, resultBundle: R4.IBundle) { + try { + await this.sendPayload({ bundle: orderBundle, response: resultBundle }, topicList.MAP_CONCEPTS) + } catch (e) { + logger.error(e) + } + } }