Skip to content

Commit

Permalink
Consumer updates to ensure data integrity when services are down
Browse files Browse the repository at this point in the history
  • Loading branch information
pmanko committed Nov 6, 2023
1 parent 042e3e9 commit a1a7d6a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 72 deletions.
19 changes: 12 additions & 7 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,26 @@ export class KafkaConsumerUtil {
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }: EachBatchPayload) => {
const { topic, partition } = batch;

for (const message of batch.messages) {
if (!isRunning() || isStale()) return;

logger.info({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
});

await eachMessageCallback(topic, partition, message)

resolveOffset(message.offset);
await heartbeat();

try {
await eachMessageCallback(topic, partition, message);
resolveOffset(message.offset);
await heartbeat();
} catch (error) {
logger.error(`Error processing message ${message.offset}: ${error}`);
// Do not resolve offset in case of error, so message will be retried
// Optionally: Handle retries or backoff strategy here
}
}
},
});
Expand Down
2 changes: 1 addition & 1 deletion src/routes/lab-bw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ router.all('/', async (req: Request, res: Response) => {
orderBundle.entry &&
resultBundle.entry.length == orderBundle.entry.length
) {
WorkflowHandler.handleBwLabOrder(orderBundle, resultBundle)
WorkflowHandler.handleLabOrder(orderBundle, resultBundle)
return res.status(200).json(resultBundle)
} else {
return res.status(400).send(resultBundle)
Expand Down
41 changes: 39 additions & 2 deletions src/workflows/botswana/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { R4 } from "@ahryman40k/ts-fhir-types"
import logger from "../../lib/winston"
import got, { HTTPError, OptionsOfTextResponseBody, RequestError } from "got"




Expand Down Expand Up @@ -44,7 +46,7 @@ export function setTaskStatus(labBundle: R4.IBundle, status: R4.TaskStatusKind):


export function getBundleEntry(
entries: IBundle_Entry[],
entries: R4.IBundle_Entry[],
type: string,
id?: string,
): R4.IResource | undefined {
Expand All @@ -58,7 +60,7 @@ export function getBundleEntry(
}

export function getBundleEntries(
entries: IBundle_Entry[],
entries: R4.IBundle_Entry[],
type: string,
id?: string,
): (R4.IResource | undefined)[] {
Expand All @@ -71,4 +73,39 @@ export function getBundleEntries(
.map(entry => {
return entry.resource
})
}



// Wrapper function that includes retry logic
export async function postWithRetry(crUrl: string, options: OptionsOfTextResponseBody, retryLimit: number = 5, timeout: number = 1000) {

for (let attempt = 1; attempt <= retryLimit; attempt++) {
try {
const response = await got.post(crUrl, options).json();
return response; // If request is successful, return the response
} catch (error) {
logger.error(`Attempt ${attempt} failed`);

// Sleep for a given amount of time
await new Promise(resolve => setTimeout(resolve, timeout));

if (error instanceof HTTPError) {
// Handle HTTP errors (4xx and 5xx response codes)
console.error(`HTTP Error: ${error.response.statusCode}`);
} else if (error instanceof RequestError) {
// Handle network errors or other request issues
console.error(`Request Error: ${error.message}`);
} else {
// Handle any other errors that might occur
console.error(`Unknown Error: ${error}`);
}

// If we are on the last attempt, re-throw the error
if (attempt === retryLimit) {
console.error('All retries failed');
throw error;
}
}
}
}
6 changes: 2 additions & 4 deletions src/workflows/botswana/locationWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logger from "../../lib/winston"

/**
*
Expand All @@ -7,11 +8,8 @@
export async function mapLocations(labBundle: R4.IBundle): Promise<R4.IBundle> {
logger.info('Mapping Locations!')

labBundle = await WorkflowHandler.addBwLocations(labBundle)
const response: R4.IBundle = await saveBundle(labBundle)
return await addBwLocations(labBundle)

await this.sendPayload({ bundle: labBundle }, topicList.SAVE_PIMS_PATIENT)
await this.sendPayload({ bundle: labBundle }, topicList.SEND_ADT_TO_IPMS)

logger.debug(`Response: ${JSON.stringify(response)}`)
return response
Expand Down
15 changes: 8 additions & 7 deletions src/workflows/botswana/patientIdentityWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import { R4 } from "@ahryman40k/ts-fhir-types"
import config from "../../lib/config"
import { postWithRetry } from "./helpers"
import logger from "../../lib/winston"

/**
* updateCrPatient
Expand All @@ -6,7 +10,7 @@
*/
export async function updateCrPatient(bundle: R4.IBundle): Promise<R4.IBundle> {
const crUrl = `${config.get('clientRegistryUrl')}/Patient`
let pat: IPatient
let pat: R4.IPatient

const patResult = bundle.entry!.find(entry => {
return entry.resource && entry.resource.resourceType == 'Patient'
Expand All @@ -24,7 +28,7 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise<R4.IBundle> {
options.json = pat
}

const crResult = await got.post(`${crUrl}`, options).json()
const crResult = await postWithRetry(crUrl, options, config.get('bwConfig:retryCount'), config.get('bwConfig:retryDelay'))

logger.debug(`CR Patient Update Result: ${JSON.stringify(crResult)}`)

Expand All @@ -38,7 +42,7 @@ export async function updateCrPatient(bundle: R4.IBundle): Promise<R4.IBundle> {
* @returns
*/
export async function savePimsPatient(labBundle: R4.IBundle): Promise<R4.IBundle> {
const resultBundle = this.updateCrPatient(labBundle)
const resultBundle = updateCrPatient(labBundle)

return resultBundle
}
Expand All @@ -50,10 +54,7 @@ export async function savePimsPatient(labBundle: R4.IBundle): Promise<R4.IBundle
*/
export async function saveIpmsPatient(registrationBundle: R4.IBundle): Promise<R4.IBundle> {
// Save to CR
const resultBundle = this.updateCrPatient(registrationBundle)

// Handle order entry
this.handleAdtFromIpms(registrationBundle)
const resultBundle = updateCrPatient(registrationBundle)

return resultBundle
}
Expand Down
16 changes: 5 additions & 11 deletions src/workflows/botswana/terminologyWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { R4 } from "@ahryman40k/ts-fhir-types"
import logger from "../../lib/winston"

/**
*
* @param labBundle
Expand All @@ -6,20 +9,11 @@
export async function mapConcepts(labBundle: R4.IBundle): Promise<R4.IBundle> {
logger.info('Mapping Concepts!')

labBundle = await WorkflowHandler.addBwCodings(labBundle)

const response: R4.IBundle = await saveBundle(labBundle)

await this.sendPayload({ bundle: labBundle }, topicList.MAP_LOCATIONS)

return response
return await addAllCodings(labBundle)
}


// Add coding mappings info to bundle



// Add terminology mappings info to Bundle
async function addAllCodings(labBundle: ILaboratoryBundle): Promise<ILaboratoryBundle> {
try {
for (const e of labBundle.entry!) {
Expand Down
111 changes: 71 additions & 40 deletions src/workflows/botswana/workflowHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import { KafkaProducerUtil } from '../../lib/kafkaProducerUtil'
import logger from '../../lib/winston'
import { KafkaConfig, ProducerRecord } from 'kafkajs'
import { logLevel } from 'kafkajs';
import { handleAdtFromIpms, handleOruFromIpms, sendAdtToIpms, sendOrmToIpms } from './IpmsWorkflows'
import { mapConcepts } from './terminologyWorkflows'
import { mapLocations } from './locationWorkflows'
import { saveIpmsPatient, updateCrPatient } from './patientIdentityWorkflows'

// eslint-disable-next-line @typescript-eslint/no-var-requires
const hl7 = require('hl7')
Expand All @@ -31,31 +35,7 @@ export const topicList = {
HANDLE_ORU_FROM_IPMS: 'handle-oru-from-ipms',
}

export class WorkflowHandler {
private static kafka = new KafkaProducerUtil(producerConfig, (report) => {
logger.info('Delivery report:', report);
});

// Static instance of the Kafka producer.
private static kafkaProducerInitialized = false;

// Initialize Kafka producer when the class is first used.
public static async initKafkaProducer() {
if (!this.kafkaProducerInitialized) {
await this.kafka.init();
this.kafkaProducerInitialized = true;
}
}

// Shutdown Kafka producer when the application terminates.
public static async shutdownKafkaProducer() {
if (this.kafkaProducerInitialized) {
await this.kafka.shutdown();
this.kafkaProducerInitialized = false;
}
}

/**
/**
*
* To handle a lab order from the PIMS system (https://www.postman.com/itechuw/workspace/botswana-hie/collection/1525496-db80feab-8a77-42c8-aa7e-fd4beb0ae6a8)
*
Expand All @@ -75,43 +55,84 @@ export class WorkflowHandler {
* 10. Translate to ORM message (postman request)
* 11. Send ORM HL7 message to IPMS and get back ACK (Request Interceptor Needed - mllp interceptor?)
* 12. Set Task status --> received / accepted / rejected (postman test)
*
* @param orderBundle
* @param resultBundle
*
*
*
* Ensuring Data Integrity and Consistency
*
* For the following key actions, all of the outlined indicators of success must be met; otherwise, the
* incoming package needs to be marked as "uncommited" and retried later. This is necessary for
* both incoming Bundles and HL7 messages. Basically, if the external client has connectivity to
* the HIE, and if the SHR is running and recieves the package, then the workflow must at some point
* run and result in an ADT message being sent. If the workflow fails at any point, the package must
* be marked as "uncommited" and retried until success, or until a notification is sent out.
*
* 1. Incoming Lab Bundle
* We need to ensure that once a Lab Bundle comes in either from PIMS or BotswanaEMR,
* that eventually an ADT message is sent to IPMS to begin the IPMS side of the workflow.
*
* - Bundle is saved into SHR
* - Patient is saved into CR
* - Bundle is translated to ADT message
* - ADT message is sent to IPMS
*/
export class WorkflowHandler {
private static kafka = new KafkaProducerUtil(producerConfig, (report) => {
logger.info('Delivery report:', report);
});

static async handleBwLabOrder(orderBundle: R4.IBundle, resultBundle: R4.IBundle) {
try {
await this.sendPayload({ bundle: orderBundle, response: resultBundle }, topicList.MAP_CONCEPTS)
} catch (e) {
logger.error(e)
// Static instance of the Kafka producer.
private static kafkaProducerInitialized = false;

// Initialize Kafka producer when the class is first used.
public static async initKafkaProducer() {
if (!this.kafkaProducerInitialized) {
await this.kafka.init();
this.kafkaProducerInitialized = true;
}
}

// Shutdown Kafka producer when the application terminates.
public static async shutdownKafkaProducer() {
if (this.kafkaProducerInitialized) {
await this.kafka.shutdown();
this.kafkaProducerInitialized = false;
}
}


static async executeTopicWorkflow(topic: string, val: any) {
let res
let enrichedBundle
let origBundle
try {
switch (topic) {
case topicList.MAP_CONCEPTS:
res = await WorkflowHandler.mapConcepts(JSON.parse(val).bundle)
enrichedBundle = await mapConcepts(JSON.parse(val).bundle)
await this.sendPayload({ bundle: enrichedBundle }, topicList.MAP_LOCATIONS)
break
case topicList.MAP_LOCATIONS:
res = await WorkflowHandler.mapLocations(JSON.parse(val).bundle)
enrichedBundle = await mapLocations(JSON.parse(val).bundle)
await this.sendPayload({ bundle: enrichedBundle }, topicList.SAVE_PIMS_PATIENT)
await this.sendPayload({ bundle: enrichedBundle }, topicList.SEND_ADT_TO_IPMS)
break
case topicList.SAVE_PIMS_PATIENT:
res = await WorkflowHandler.updateCrPatient(JSON.parse(val).bundle)
res = await updateCrPatient(JSON.parse(val).bundle)
break
case topicList.SEND_ADT_TO_IPMS:
res = await WorkflowHandler.sendAdtToIpms(JSON.parse(val).bundle)
res = await sendAdtToIpms(JSON.parse(val).bundle)
break
case topicList.SAVE_IPMS_PATIENT:
res = await WorkflowHandler.saveIpmsPatient(JSON.parse(val).bundle)
origBundle = JSON.parse(val).bundle
saveIpmsPatient(origBundle)
handleAdtFromIpms(origBundle)

break
case topicList.SEND_ORM_TO_IPMS:
res = await WorkflowHandler.sendOrmToIpms(JSON.parse(val))
res = await sendOrmToIpms(JSON.parse(val))
break
case topicList.HANDLE_ORU_FROM_IPMS:
res = await WorkflowHandler.handleOruFromIpms(JSON.parse(val).bundle)
res = await handleOruFromIpms(JSON.parse(val).bundle)
break
default:
break
Expand All @@ -125,6 +146,7 @@ export class WorkflowHandler {
}



/**
* Sends a payload to a Kafka topic.
* @param payload - The payload to send.
Expand All @@ -150,5 +172,14 @@ export class WorkflowHandler {
console.error('Failed to send message:', err);
}
}

// Entrypoint wrapper function for Lab Order Workflows
static async handleLabOrder(orderBundle: R4.IBundle, resultBundle: R4.IBundle) {
try {
await this.sendPayload({ bundle: orderBundle, response: resultBundle }, topicList.MAP_CONCEPTS)
} catch (e) {
logger.error(e)
}
}
}

0 comments on commit a1a7d6a

Please sign in to comment.