Skip to content

Commit

Permalink
fix: fix compositions sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanGriffe committed Oct 29, 2021
1 parent 64e42fc commit 788ef47
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 14 deletions.
12 changes: 9 additions & 3 deletions src/messages/api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { parseMessages } from '@/messages/parsing';
import utils from '@/messages/utils';

const axios = require('axios');

Expand All @@ -17,9 +18,14 @@ const parseArgs = rawMessage => {

const getMessages = args => {
const url = '/messages/states';
return axios
.post(url, args)
.then(res => ({ ...parseMessages(res.data.data), count: res.data.count }));
return axios.post(url, args).then(res => ({
...parseMessages(
res.data.data,
args.sort_column ? utils.underScoreToCamelCase(args.sort_column) : 'enqueuedDatetime',
args.sort_direction || 'desc'
),
count: res.data.count
}));
};

const cancelMessage = messageId => {
Expand Down
79 changes: 68 additions & 11 deletions src/messages/parsing.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ function addDetails(composition) {
if (composition.compositionType === 'pipeline') {
composition.startedDatetime = composition.messages[0].startedDatetime;
} else {
const datetime = composition.messages.reduce(
(count, { startedDatetime }) => Math.min(count, startedDatetime || Infinity),
Infinity
);
const datetime = Math.min(...composition.messages.map(msg => msg.startedDatetime || Infinity));
if (datetime !== Infinity) {
composition.startedDatetime = new Date(datetime);
}
}
// add Enqueued datetime
composition.enqueuedDatetime = new Date(
Math.min(...composition.messages.map(msg => msg.enqueuedDatetime || Infinity))
);
// add Wait time
composition.waitTime = composition.messages.reduce(
(count, { waitTime }) => count + (waitTime || 0),
Expand Down Expand Up @@ -369,16 +370,70 @@ function assembleComposition(compositionMsgs) {
return group;
}

/**
* Sorts the assembled compositions by column
* @param array
* @param sortColumn
* @param sortDirection
* @returns {*}
*/
function sortByColumn(array, sortColumn, sortDirection) {
function compareByColumn(a, b) {
let value;
if (a[sortColumn] < b[sortColumn]) {
value = 1;
} else {
value = -1;
}
if (sortDirection === 'desc') {
value *= -1;
}
return value;
}
return array.sort(compareByColumn);
}

/**
* Merge the messages and compositions while keeping them sorted by a column
* @param messages
* @param compositions
* @param sortColumn
* @param sortDirection
* @returns {*}
*/
function insertByColumn(messages, compositions, sortColumn, sortDirection) {
let index = 0;
while (compositions.length > 0) {
if (
index === messages.length ||
(sortDirection === 'asc' &&
compositions[0][sortColumn] != null &&
(messages[index][sortColumn] == null ||
messages[index][sortColumn] > compositions[0][sortColumn])) ||
(sortDirection === 'desc' &&
(compositions[0][sortColumn] == null ||
(messages[index][sortColumn] != null &&
messages[index][sortColumn] < compositions[0][sortColumn])))
) {
messages.splice(index, 0, compositions.splice(0, 1)[0]);
}
index += 1;
}
return messages;
}

/**
* Parse Messages, regroup them into their compositions and add computed properties such as waitTime.
* @param data
* @param sortColumn
* @param sortDirection
* @returns {{loadDateTime: Date, messages}}
*/
export function parseMessages(data) {
export function parseMessages(data, sortColumn = null, sortDirection = null) {
const loadDatetime = new Date();

//Recursively parse messages to extract useful information
const messages = data.map(message => parseMessage(message, loadDatetime));
let messages = data.map(message => parseMessage(message, loadDatetime));

//group messages by composition_id
const compositions = {};
Expand All @@ -396,13 +451,15 @@ export function parseMessages(data) {
}

// adding not yet enqueued messages
Object.values(compositions).forEach(messages => {
messages.map(message => fillPipe(message, compositions));
Object.values(compositions).forEach(composition => {
composition.map(message => fillPipe(message, compositions));
});

// assemble compositions
Object.values(compositions).forEach(compositionMessages => {
messages.push(assembleComposition(compositionMessages));
});
let assembledCompositions = Object.values(compositions).map(assembleComposition);
// sort the compositions
assembledCompositions = sortByColumn(assembledCompositions, sortColumn, sortDirection);
// insert the compositions with the messages keeping them sorted by column
messages = insertByColumn(messages, assembledCompositions, sortColumn, sortDirection);
return { messages: messages, loadDateTime: loadDatetime };
}
14 changes: 14 additions & 0 deletions src/messages/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ const camelCaseToUnderScore = str => {
.toLowerCase();
};

const underScoreToCamelCase = str => {
return str
.split(/_/)
.map((s, index) => {
if (index !== 0) {
return s.charAt(0).toUpperCase() + s.slice(1);
} else {
return s;
}
})
.join('');
};

/**
* If item exist in list remove it
* otherwise add it.
Expand All @@ -50,6 +63,7 @@ const toggleItemFromList = (item, list) => {
export default {
getSortColumnAndDirection,
formatMillis,
underScoreToCamelCase,
camelCaseToUnderScore,
toggleItemFromList
};
105 changes: 105 additions & 0 deletions tests/unit/api.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,42 @@ describe('Composition started_time', () => {
});
});

test('Composition enqueued datetime', () => {
expect(
parseMessages([
{
message_id: 'id0',
enqueued_datetime: '2021-05-01 11:00:00',
options: {
composition_id: 'comp_id',
group_info: {
group_id: 'grp_id'
}
}
},
{
message_id: 'id1',
enqueued_datetime: '2021-05-01 10:00:00',
options: {
composition_id: 'comp_id',
group_info: {
group_id: 'grp_id'
}
}
},
{
message_id: 'id2',
options: {
composition_id: 'comp_id',
group_info: {
group_id: 'grp_id'
}
}
}
]).messages[0].enqueuedDatetime.getTime()
).toBe(new Date('2021-05-01 10:00:00').getTime());
});

test('Composition progress test', () => {
expect(
parseMessages([
Expand Down Expand Up @@ -872,3 +908,72 @@ test('Test group with a pipeline whose last message is a group', () => {
}
]);
});

describe('Composition sorting', () => {
test.each([
['ascending', 'asc'],
['descending', 'desc']
])('works when in %s order', (name, direction) => {
const target = [
{ messageId: 'id5', startedDatetime: new Date('2021-05-01 9:00:00') },
{ compositionType: 'group', startedDatetime: new Date('2021-05-01 10:00:00') },
{ messageId: 'id4' },
{ compositionType: 'pipeline' }
];
const messages = [
{
message_id: 'id5',
started_datetime: '2021-05-01 9:00:00'
},
{
message_id: 'id4'
}
];
if (direction === 'desc') {
target.reverse();
messages.reverse();
}
expect(
parseMessages(
[
{
message_id: 'id0',
options: {
composition_id: 'comp0_id',
pipe_target: [
{
message_id: 'id1',
options: {
composition_id: 'comp0_id'
}
}
]
}
},
{
message_id: 'id2',
started_datetime: '2021-05-01 10:00:00',
options: {
composition_id: 'comp1_id',
group_info: {
group_id: 'grp_id'
}
}
},
{
message_id: 'id3',
started_datetime: '2021-05-01 10:00:00',
options: {
composition_id: 'comp1_id',
group_info: {
group_id: 'grp_id'
}
}
}
].concat(messages),
'startedDatetime',
direction
).messages
).toMatchObject(target);
});
});

0 comments on commit 788ef47

Please sign in to comment.