Skip to content

Commit

Permalink
Updates to Botswana workflows for lab order patient matching. (#104)
Browse files Browse the repository at this point in the history
* Version Bump

* Pilot work

* Updates for bdrs/immigration matching for Botswana

* HL7 retry DMQ and sender updates

* Build fixes

* Dep update
  • Loading branch information
pmanko authored Apr 27, 2024
1 parent 48d5346 commit 72aac33
Show file tree
Hide file tree
Showing 13 changed files with 431 additions and 537 deletions.
2 changes: 1 addition & 1 deletion config/config_docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"loincSystemUrl": "https://api.openconceptlab.org/orgs/Regenstrief/sources/LOINC/",
"omangSystemUrl": "http://moh.bw.org/ext/identifier/omang",
"brdsSystemUrl": "http://moh.bw.org/ext/identifier/bcn",
"immigrationSystemUrl": "http://moh.bw.org/ext/identifier/passportno",
"immigrationSystemUrl": "http://moh.bw.org/ext/identifier/ppn",
"oclUrl": "https://api.openconceptlab.org",
"facilityCodeSystemUrl": "http://moh.bw.org/ext/identifier/facility-code",
"ipmsProviderSystemUrl": "http://moh.bw.org/ext/ipms-provider",
Expand Down
Binary file modified config/ipms_facility_mappings.xlsx
Binary file not shown.
2 changes: 1 addition & 1 deletion debug.docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
container_name: shr
restart: unless-stopped
hostname: shr
image: itechuw/shared-health-record:debug-1
image: itechuw/shared-health-record:dev
build:
context: ./
args:
Expand Down
3 changes: 3 additions & 0 deletions node-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /usr/bin/env bash

docker run -it --rm -v ${PWD}:/usr/src/app -w /usr/src/app node:18-slim /bin/bash
41 changes: 6 additions & 35 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,58 +26,33 @@
"author": "Intrahealth International",
"license": "ISC",
"dependencies": {
"@ahryman40k/ts-fhir-types": "^4.0.34",
"@i-tech-uw/mllp-server": ">=3.3.1",
"@types/got": "^9.6.12",
"acorn": ">=7.1.1",
"async": "^3.2.1",
"axios": "^1.6.1",
"cookie-parser": "^1.4.5",
"damerau-levenshtein": "^1.0.5",
"double-metaphone": "^1.0.5",
"exceljs": "^4.3.0",
"express": "^4.17.1",
"fast-levenshtein": "^2.0.6",
"fhir": "^4.7.10",
"fhirclient": "^2.3.10",
"formidable": "^1.2.1",
"got": "^11.8.2",
"hl7": "^1.1.1",
"is-json": "^2.0.1",
"jaro-winkler": "^0.2.8",
"jest": "^29.7.0",
"jsonwebtoken": "^9.0.0",
"lodash": "^4.17.19",
"metaphone": "^1.0.6",
"minimist": ">=1.2.6",
"moment": "^2.29.2",
"nconf": "^0.11.4",
"openhim-mediator-utils": "^0.4.0",
"querystring": "^0.2.1",
"read-excel-file": "^5.6.1",
"rimraf": "^4.1.2",
"slashes": "^1.0.5",
"sleep-promise": "^9.1.0",
"soundex-code": "^1.0.4",
"sprintf-js": "^1.1.2",
"supertest": "^5.0.0-0",
"urijs": "^1.19.11",
"uuid": "^3.3.3",
"uuid4": "^1.1.4",
"winston": "^3.2.1",
"winston-daily-rotate-file": "^4.4.2"
},
"devDependencies": {
"@ahryman40k/ts-fhir-types": "^4.0.34",
"@i-tech-uw/mllp-server": ">=3.3.1",
"@types/async": "^3.2.3",
"@types/cookie-parser": "^1.4.2",
"@types/express": "^4.17.13",
"@types/got": "^9.6.12",
"@types/hapi": "^18.0.3",
"@types/ip": "^1.1.0",
"@types/jest": "^26.0.24",
"@types/lodash": "^4.14.162",
"@types/nconf": "^0.10.1",
"@types/newman": "^5.1.4",
"@types/nock": "^11.1.0",
"@types/node": "^14.11.10",
"@types/request": "^2.48.7",
"@types/sprintf-js": "^1.1.2",
Expand All @@ -88,24 +63,20 @@
"@types/uuid": "^9.0.5",
"@typescript-eslint/eslint-plugin": "^5.54.0",
"@typescript-eslint/parser": "^5.54.0",
"acorn": ">=7.1.1",
"cross-env": "^7.0.3",
"eslint": "^8.35.0",
"eslint-config-airbnb-base": "^14.0.0",
"eslint-config-standard": "^14.1.0",
"eslint-plugin-import": "^2.20.1",
"eslint-plugin-node": "^11.0.0",
"eslint-plugin-promise": "^4.2.1",
"eslint-plugin-standard": "^4.0.1",
"eslint-plugin-unused-imports": "^2.0.0",
"eslint-plugin-vue": "^6.1.2",
"husky": "^8.0.3",
"jest": "^29.7.0",
"kafkajs": "^1.15.0",
"lint-staged": "^14.0.0",
"minimist": ">=1.2.6",
"nock": "^13.1.1",
"openapi-types": "^9.1.0",
"prettier": "^2.8.4",
"rimraf": "^4.1.2",
"supertest": "^7.0.0-0",
"ts-jest": "^29.1.1",
"ts-node": "^9.0.0",
"typescript": "^4.0.3"
Expand Down
50 changes: 41 additions & 9 deletions src/lib/hl7MllpSender.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
import { MllpServer } from '@i-tech-uw/mllp-server'
import logger from './winston'
import { WorkflowHandler, topicList } from '../workflows/botswana/workflowHandler'

export default class Hl7MllpSender {
targetIp: string
targetPort: number
mllpServer: MllpServer
retries: number
retryInterval: number

constructor(targetIp: string, targetPort: number) {
private static instance: Hl7MllpSender

constructor(targetIp: string, targetPort: number, retries = 3, retryInterval = 10000) {
this.targetPort = targetPort
this.targetIp = targetIp
this.retries = retries
this.retryInterval = retryInterval
this.mllpServer = new MllpServer(targetIp, targetPort, logger)
}

public static getInstance(targetIp: string, targetPort: number): Hl7MllpSender {
if (!Hl7MllpSender.instance) {
Hl7MllpSender.instance = new Hl7MllpSender(targetIp, targetPort)
}
return Hl7MllpSender.instance
}

/**
*
* @returns Promise
*/
send(message: string, retries = 10): any {
send(message: string, targetIp?: string, port?: number, retries?: number): any {
const targetIpToSend = targetIp || this.targetIp || '0.0.0.0'
const portToSend = port || this.targetPort || 3000
const retriesToSend = retries || this.retries || 3

message = message.replace(/[\n\r]/g, '\r')
const firstNewline = message.match(/\r/)
const header = firstNewline ? message.substring(0, firstNewline.index) : ''

return new Promise((resolve, reject) => {
this.mllpServer.send(this.targetIp, this.targetPort, message, (err: any, ackData: any) => {
this.mllpServer.send(targetIpToSend, portToSend, message, (err: any, ackData: any) => {
logger.info(
`!! Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${
`Sending HL7 message ${header}!\n err: ${err ? err : ''}\n ackData: ${
ackData ? ackData : ''
}`,
)
if (err) {
reject({ error: err, retries: retries })
} else {
logger.info(
`!! Successfully sent HL7 message ${header} \n with ${retries} retries left!`,
`Successfully sent HL7 message ${header} \n with ${retries} retries left!`,
)
resolve(ackData)
}
Expand All @@ -42,13 +60,27 @@ export default class Hl7MllpSender {
return ackData
})
.catch(e => {
if (e.retries > 0) {
logger.info(`Retrying... ${e.retries} retries left`)
return setTimeout(() => this.send(message, e.retries - 1), 2000)
if (retriesToSend > 0) {
logger.info(`Retrying... ${retriesToSend} retries left`)
return setTimeout(
() => this.send(message, targetIpToSend, portToSend, retriesToSend - 1),
this.retryInterval,
)
} else {
logger.error(`!! Failed to send HL7 message ${header}!`)
logger.error(`Failed to send HL7 message ${header}!`)

// Send to DMQ
WorkflowHandler.sendPayload(
{ message: message, targetIp: targetIpToSend, portToSend },
topicList.DMQ,
)

return e.error
}
})
}
}

const hl7Sender = Hl7MllpSender.getInstance('127.0.0.1', 3000)

export { hl7Sender }
8 changes: 5 additions & 3 deletions src/lib/kafkaConsumerUtil.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Consumer, EachBatchPayload, Kafka, KafkaConfig, Message } from 'kafkajs'
import logger from './winston'
import { WorkflowResult } from '../workflows/botswana/workflowHandler'
import { WorkflowHandler, WorkflowResult, topicList } from '../workflows/botswana/workflowHandler'

export type EachMessageCallback = (
topic: string,
Expand Down Expand Up @@ -87,11 +87,13 @@ export class KafkaConsumerUtil {
retryCount++
if (retryCount >= maxRetries) {
logger.error(
`Max retries reached for message ${message.offset}, sending to dead letter queue or similar.`,
`Max retries reached for message ${message.offset}, sending to dead message queue.`,
)
resolveOffset(message.offset)

// TODO: handle with DLQ
// Send to DMQ
WorkflowHandler.sendPayload({ topic: topic, partition: partition, message: message }, topicList.DMQ)

break
}
await new Promise(resolve => setTimeout(resolve, retryDelay))
Expand Down
53 changes: 2 additions & 51 deletions src/routes/hl7.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

import express, { Request, Response } from 'express'
import Hl7MllpSender from '../lib/hl7MllpSender'
import { hl7Sender } from '../lib/hl7MllpSender'

export const router = express.Router()

Expand All @@ -11,9 +11,7 @@ router.post('/forward/:targetIp/:targetPort', async (req: Request, res: Response
const targetIp: string = req.params.targetIp
const targetPort = Number(req.params.targetPort)

const sender = new Hl7MllpSender(targetIp, targetPort)

const ack = await sender.send(hl7Msg)
const ack = await hl7Sender.send(hl7Msg, targetIp, targetPort)

res.status(200)
res.send(ack)
Expand All @@ -23,51 +21,4 @@ router.post('/forward/:targetIp/:targetPort', async (req: Request, res: Response
}
})

// OUTDATED - for reference
// Save ORU message as a lab bundle
// router.post('/oru', async (req: Request, res: Response) => {
// try {
// let hl7Msg = req.body.trim()

// let resultBundle: R4.IBundle = await Hl7WorkflowsBw.saveOruMessage(hl7Msg)

// return res.status(200).json(resultBundle)

// } catch (error) {
// logger.error(`/oru: failed!\n${error}`)
// return res.status(500).json(error)
// }

// })

// // Get list of HL7 OBR messages targeted at a given facility
// // * Translates ServiceRequests that target a given facility and are active
// router.get('/obr/list/:facilityCode', async (req: Request, res: Response) => {

// try {
// // Get all active ServiceRequest Profiles targeting facility with facilityCode
// let uri = URI(config.get("fhirServer:baseUrl"))
// .segment("ServiceRequest")
// .addQuery('performer', req.params.facilityCode)
// .addQuery('status', 'active')

// uri = URI(config.get("fhirConverterUrl")).segment("convert").segment("fhir")

// // let hl7Msg = await got.post(uri.toString(), {
// // username: config.get("mediator:client:id"),
// // password: config.get("mediator:client:password"),
// // json: bundle
// // })

// return res.status(200).send("Not Implemented Yet!")
// } catch (error) {
// return res.status(500).json(error)
// }
// })

// // Get a single OBR representing a given service request.
// router.get('obr/:requestId', async (req: Request, res: Response) => {
// return res.status(501).send("Not Implemented")
// })

export default router
Loading

0 comments on commit 72aac33

Please sign in to comment.