Skip to content

Commit

Permalink
saving last sync time into elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
ashaban committed May 10, 2021
1 parent ebba5da commit 7943fc8
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 137 deletions.
21 changes: 14 additions & 7 deletions server/lib/dataSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,26 @@ function syncWorkflows (callback) {
}, () => {
syncStatus.syncWorkflows = 'not_running'
if (!processingError) {
mixin.updateConfigFile(['lastSync', 'syncWorkflows', 'time'], newRunsLastSync, () => {});
mixin.updateLastIndexingTime(newRunsLastSync, 'syncWorkflows')
}
return callback(processingError);
});
}

function syncContacts(callback) {
async function syncContacts(callback) {
if(syncStatus.syncContacts === 'running') {
return callback()
}
syncStatus.syncContacts = 'running'
let newRunsLastSync = moment().format('Y-MM-DDTHH:mm:ss');
let runsLastSync = config.get('lastSync:syncContacts:time');

let runsLastSync
await mixin.getLastIndexingTime('syncContacts', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})

const isValid = moment(runsLastSync, 'Y-MM-DD').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DD');
Expand Down Expand Up @@ -119,7 +126,7 @@ function syncContacts(callback) {
}, () => {
syncStatus.syncContacts = 'not_running'
if(!processingError) {
mixin.updateConfigFile(['lastSync', 'syncContacts', 'time'], newRunsLastSync, () => {});
mixin.updateLastIndexingTime(newRunsLastSync, 'syncContacts')
cacheFHIR2ES(() => {});
logger.info('Contacts Sync Done');
return callback()
Expand Down Expand Up @@ -164,7 +171,7 @@ function syncContactsGroups(callback) {
}, () => {
syncStatus.syncContactsGroups = 'not_running'
if (!processingError) {
mixin.updateConfigFile(['lastSync', 'syncContactsGroups', 'time'], newRunsLastSync, () => {});
mixin.updateLastIndexingTime(newRunsLastSync, 'syncContactsGroups')
}
return callback(processingError);
});
Expand Down Expand Up @@ -194,7 +201,7 @@ function syncWorkflowRunMessages(callback) {
}, () => {
syncStatus.syncWorkflowRunMessages = 'not_running'
if (!processingError) {
mixin.updateConfigFile(['lastSync', 'syncWorkflowRunMessages', 'time'], newRunsLastSync, () => {});
mixin.updateLastIndexingTime(newRunsLastSync, 'syncWorkflowRunMessages')
}
return callback(processingError);
});
Expand All @@ -210,7 +217,7 @@ function syncFloipFlowResults(callback) {
syncStatus.syncFloipFlowResults = 'not_running'
logger.info("Done Synchronizing flow results from FLOIP server");
if(!err) {
mixin.updateConfigFile(['lastSync', 'syncFloipFlowResults', 'time'], newRunsLastSync, () => {});
mixin.updateLastIndexingTime(newRunsLastSync, 'syncFloipFlowResults')
}
return callback(err);
});
Expand Down
7 changes: 6 additions & 1 deletion server/lib/floip.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ async function flowResultsToQuestionnaire(callback) {
bundle.type = 'batch';
bundle.resourceType = 'Bundle';
bundle.entry = [];
let runsLastSync = config.get('lastSync:syncFloipFlowResults:time');
let runsLastSync
await mixin.getLastIndexingTime('syncFloipFlowResults', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})
const isValid = moment(runsLastSync, 'Y-MM-DD HH:mm:ss').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DD HH:mm:ss');
Expand Down
139 changes: 139 additions & 0 deletions server/lib/mixin.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,147 @@
const fs = require('fs');
const medUtils = require('openhim-mediator-utils');
const request = require('request');
const axios = require('axios');
const URI = require('urijs');
const logger = require('./winston');
const config = require('./config');
const env = process.env.NODE_ENV || 'development';

const updateLastIndexingTime = (time, syncType) => {
return new Promise((resolve, reject) => {
logger.info('Updating lastIndexingTime')
axios({
url: URI(config.get("elastic:baseURL")).segment('emnuttsyncdata').segment("_doc").segment(syncType).toString(),
method: 'PUT',
auth: {
username: config.get("elastic:username"),
password: config.get("elastic:password")
},
data: {
"lastIndexingTime": time
}
}).then((response) => {
if(response.status < 200 && response.status > 299) {
logger.error('An error occured while updating lastIndexingTime')
return reject()
}
return resolve(false)
}).catch((err) => {
logger.error(err)
logger.error('An error occured while updating lastIndexingTime')
return reject(true)
})
})
}

const getLastIndexingTime = (syncType, reset) => {
return new Promise((resolve, reject) => {
logger.info('Getting lastIndexingTime')
let query = {
query: {
term: {
_id: syncType
}
}
}
axios({
method: "GET",
url: URI(config.get("elastic:baseURL")).segment('emnuttsyncdata').segment("_search").toString(),
data: query,
auth: {
username: config.get("elastic:username"),
password: config.get("elastic:password")
}
}).then((response) => {
if(reset) {
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return resolve('1970-01-01T00:00:00')
}
if(response.data.hits.hits.length === 0) {
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return resolve('1970-01-01T00:00:00')
}
logger.info('Returning lastIndexingTime of ' + response.data.hits.hits[0]._source.lastIndexingTime)
return resolve(response.data.hits.hits[0]._source.lastIndexingTime)
}).catch((err) => {
if (err.response && err.response.status && err.response.status === 404) {
logger.info('Index not found, creating index syncData');
let mappings = {
mappings: {
properties: {
lastIndexingTime: {
type: "text"
}
},
},
};
axios({
method: 'PUT',
url: URI(config.get("elastic:baseURL")).segment('emnuttsyncdata').toString(),
data: mappings,
auth: {
username: config.get("elastic:username"),
password: config.get("elastic:password")
}
})
.then(response => {
if (response.status !== 200) {
logger.error('Something went wrong and index was not created');
logger.error(response.data);
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return reject()
} else {
logger.info('Index syncdata created successfully');
logger.info('Adding default lastIndexTime which is 1970-01-01T00:00:00')
axios({
method: 'PUT',
auth: {
username: config.get("elastic:username"),
password: config.get("elastic:password")
},
url: URI(config.get("elastic:baseURL")).segment('emnuttsyncdata').segment("_doc").segment(syncType).toString(),
data: {
"lastIndexingTime": "1970-01-01T00:00:00"
}
}).then((response) => {
if(response.status >= 200 && response.status <= 299) {
logger.info('Default lastIndexTime added')
} else {
logger.error('An error has occured while saving default lastIndexTime');
return reject("1970-01-01T00:00:00")
}
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return resolve("1970-01-01T00:00:00")
}).catch((err) => {
logger.error('An error has occured while saving default lastIndexTime');
if (err.response && err.response.data) {
logger.error(err.response.data);
}
if (err.error) {
logger.error(err.error);
}
if (!err.response) {
logger.error(err);
}
return reject("1970-01-01T00:00:00")
})
}
})
.catch(err => {
logger.error('Error: ' + err);
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return reject("1970-01-01T00:00:00")
});
} else {
logger.error('Error occured while getting last indexing time in ES');
logger.error(err);
logger.info('Returning lastIndexingTime of 1970-01-01T00:00:00')
return reject("1970-01-01T00:00:00")
}
})
})
}

const setNestedKey = (obj, path, value, callback) => {
if (path.length === 1) {
obj[path] = value;
Expand Down Expand Up @@ -130,6 +267,8 @@ const getNameFromResource = (resource) => {
}

module.exports = {
updateLastIndexingTime,
getLastIndexingTime,
updateConfigFile,
updateopenHIMConfig,
updatePhoneNumber,
Expand Down
37 changes: 28 additions & 9 deletions server/lib/rapidpro.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ module.exports = function () {
* @param {Object} param0
* @param {*} callback
*/
syncWorkflows(callback) {
async syncWorkflows(callback) {
let processingError = false;
let runsLastSync = config.get('lastSync:syncWorkflows:time');
let runsLastSync
await mixin.getLastIndexingTime('syncWorkflows', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})
const isValid = moment(runsLastSync, 'Y-MM-DDTHH:mm:ss').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
Expand All @@ -47,8 +52,13 @@ module.exports = function () {
}
);
},
syncWorkflowRunMessages(callback) {
let runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
async syncWorkflowRunMessages(callback) {
let runsLastSync
await mixin.getLastIndexingTime('syncWorkflowRunMessages', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})
let processingError = false;
let runBundle = {};
runBundle.type = 'batch';
Expand All @@ -60,7 +70,6 @@ module.exports = function () {
let nextRunURL = false
async.doWhilst(
(callback) => {
runsLastSync = config.get('lastSync:syncWorkflowRunMessages:time');
const isValid = moment(runsLastSync, 'Y-MM-DDTHH:mm:ss').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
Expand Down Expand Up @@ -448,10 +457,15 @@ module.exports = function () {
}
},

POSContactGroupsSync(callback) {
async POSContactGroupsSync(callback) {
let failed = false;
logger.info('Received a request to sync POS Contacts Groups');
let runsLastSync = config.get('lastSync:syncContactsGroups:time');
let runsLastSync
await mixin.getLastIndexingTime('syncContactsGroups', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})
const isValid = moment(runsLastSync, 'Y-MM-DD').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DD');
Expand Down Expand Up @@ -635,8 +649,13 @@ module.exports = function () {
});
},

RPContactGroupsSync(callback) {
let runsLastSync = config.get('lastSync:syncContactsGroups:time');
async RPContactGroupsSync(callback) {
let runsLastSync
await mixin.getLastIndexingTime('syncContactsGroups', false).then((time) => {
runsLastSync = time
}).catch((time) => {
runsLastSync = moment('1970-01-01').format('Y-MM-DDTHH:mm:ss');
})
const isValid = moment(runsLastSync, 'Y-MM-DD').isValid();
if (!isValid) {
runsLastSync = moment('1970-01-01').format('Y-MM-DD');
Expand Down
2 changes: 1 addition & 1 deletion server/lib/routes/dataSync.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ router.post('/syncContacts', (req, res) => {
});

router.get('/syncContactsGroups', (req, res) => {
logger.info('Received a request to sync workflow messages');
logger.info('Received a request to sync contacts groups');
dataSync.syncContactsGroups((error) => {
if (error) {
return res.status(500).send('Some errors occured');
Expand Down
Loading

0 comments on commit 7943fc8

Please sign in to comment.