diff --git a/package.json b/package.json index 59c5a76..85645f6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "custom-metrics", - "version": "0.7.1", + "version": "0.7.2", "description": "Low cost custom metrics for AWS", "main": "dist/cjs/index.js", "module": "dist/mjs/index.js", diff --git a/src/index.ts b/src/index.ts index 875bcc9..3f2b1c1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -95,8 +95,7 @@ export type MetricEmitOptions = { } export type MetricListOptions = { - fields?: string[] - log?: boolean + log?: boolean // Enable OneTable find logging limit?: number next?: object owner?: string @@ -133,9 +132,14 @@ var Instances: InstanceMap = {} /* On exit, flush any buffered metrics */ -process.on('SIGTERM', async () => { - await CustomMetrics.terminate() -}) +process.on( + 'SIGTERM', + /* istanbul ignore next */ + async () => { + /* istanbul ignore next */ + await CustomMetrics.terminate() + } +) export class CustomMetrics { private buffer: MetricBufferOptions | undefined @@ -153,22 +157,13 @@ export class CustomMetrics { constructor(options: MetricOptions = {}) { if (options.log == true) { - this.log = { - info: (message: string, context: {}) => null, - error: (message: string, context: {}) => console.log('ERROR: ' + message, context), - } + this.log = {info: this.nop, error: this.error} } else if (options.log == 'verbose') { - this.log.info = (message: string, context: {}) => console.log('INFO: ' + message, context) + this.log = {info: this.info, error: this.error} } else if (options.log) { this.log = options.log } else { - this.log = { - info: (message: string, context: {}) => null, - error: (message: string, context: {}) => null, - } - } - if (!options.owner) { - throw new Error('Missing required "owner" in options') + this.log = {info: this.nop, error: this.nop} } if (options.ttl && typeof options.ttl != 'number') { throw new Error('Bad type for "ttl" option') @@ -198,7 +193,9 @@ export class CustomMetrics { try { this.MetricModel = this.db.getModel('Metric') } catch (err) { + /* istanbul ignore next */ this.db.addModel('Metric', Schema.models.Metric) + /* istanbul ignore next */ this.MetricModel = this.db.getModel('Metric') } } else { @@ -256,25 +253,23 @@ export class CustomMetrics { let point: Point let buffer = options.buffer || this.buffer if (buffer && Buffering) { - point = this.bufferMetric(namespace, metricName, value, dimensionsList, buffer) - if (point.timestamp === 0) { - // Buffer signal. Return a partial metric with only the buffered point. - let result = { - spans: [{points: [point]}], - metric: metricName, - namespace: namespace, - owner: options.owner || this.owner, - version: Version, - } - return result as Metric - } - } else { - point = {count: 1, sum: value} + return await this.bufferMetric(namespace, metricName, value, dimensionsList, options) } + point = {count: 1, sum: value} + return await this.emitDimensions(namespace, metricName, point, dimensionsList, options) + } + + private async emitDimensions( + namespace: string, + metricName: string, + point: Point, + dimensionsList: MetricDimensionsList, + options: MetricEmitOptions + ): Promise { let result: Metric for (let dimensions of dimensionsList) { let dimString = this.makeDimensionString(dimensions) - result = await this.emitDimensionMetric(namespace, metricName, point, dimString, options) + result = await this.emitDimensionedMetric(namespace, metricName, point, dimString, options) } return result! } @@ -282,7 +277,7 @@ export class CustomMetrics { /* Emit a metric for specific dimensions */ - private async emitDimensionMetric( + private async emitDimensionedMetric( namespace: string, metricName: string, point: Point, @@ -310,6 +305,7 @@ export class CustomMetrics { point.timestamp will be set for buffered metrics which may be in the past */ let si = metric.spans.findIndex((s) => s.end - s.period <= point.timestamp && point.timestamp < s.end) + /* istanbul ignore else */ if (si >= 0) { this.addValue(metric, point.timestamp, point, si) } else { @@ -322,69 +318,79 @@ export class CustomMetrics { await this.updateMetric(metric, point, ttl) break } catch (err: any) { + /* istanbul ignore next */ if (err.code != 'ConditionalCheckFailedException') { this.log.info(`Emit exception code ${err.code} message ${err.message}`, err) throw err } } + /* istanbul ignore next */ if (retries == 0) { this.log.error(`Metric has too many retries`, {namespace, metricName, dimensions}) break } + /* istanbul ignore next */ this.log.info(`Retry metric update`, {retries}) } while (retries-- > 0) return metric } - bufferMetric( + async bufferMetric( namespace: string, metricName: string, value: number, dimensionsList: MetricDimensionsList, - options: MetricBufferOptions - ): Point { + options: MetricEmitOptions + ): Promise { + let buffer = options.buffer || this.buffer let interval = this.spans[0].period / this.spans[0].samples let key = `${namespace}|${metricName}|${JSON.stringify(dimensionsList)}` let elt: BufferElt = (this.buffers[key] = this.buffers[key] || { count: 0, sum: 0, - timestamp: this.timestamp + (options.elapsed || interval), + timestamp: this.timestamp + (buffer.elapsed || interval), namespace: namespace, metric: metricName, dimensions: dimensionsList, }) - elt.sum += value - elt.count++ if ( - options.force || - (options.sum && elt.sum >= options.sum) || - (options.count && elt.count >= options.count) || + buffer.force || + (buffer.sum && elt.sum >= buffer.sum) || + (buffer.count && elt.count >= buffer.count) || this.timestamp >= elt.timestamp ) { - // Emit buffered values now - delete this.buffers[key] + // Remove this trace soon this.log.info( `Emit buffered metric ${namespace}/${metricName} = ${value}, sum ${elt.sum} count ${ elt.count - } remaining ${elt.timestamp - this.timestamp}`, - {elt, buffers: this.buffers} + } remaining ${elt.timestamp - this.timestamp}` ) - return {count: elt.count, sum: elt.sum, timestamp: elt.timestamp} + let point = {count: elt.count, sum: elt.sum, timestamp: elt.timestamp} + await this.emitDimensions(namespace, metricName, point, dimensionsList, options) } - CustomMetrics.saveInstance({key}, this) + elt.count++ + elt.sum += value + + // Remove this trace soon this.log.info( `Buffer metric ${namespace}/${metricName} = ${value}, sum ${elt.sum} count ${elt.count}, remaining ${ elt.timestamp - this.timestamp }` ) - // Buffer and don't emit. Use timestamp == 0 as a signal to not emit - return {count: elt.count, sum: elt.sum, timestamp: 0} + CustomMetrics.saveInstance({key}, this) + return { + spans: [{points: [{count: elt.count, sum: elt.sum}]}], + metric: metricName, + namespace: namespace, + owner: options.owner || this.owner, + version: Version, + } as Metric } static async terminate() { - let start = Date.now() + // let start = Date.now() await CustomMetrics.flushAll() - console.log(`Lambda terminating, metric flush took ${(Date.now() - start) / 1000} ms`) + // console.log(`Lambda terminating, metric flush took ${(Date.now() - start) / 1000} ms`) } static async flushAll() { @@ -399,7 +405,7 @@ export class CustomMetrics { for (let elt of Object.values(this.buffers)) { let point = {count: elt.count, sum: elt.sum} for (let dimensions of elt.dimensions) { - await this.emitDimensionMetric(elt.namespace, elt.metric, point, this.makeDimensionString(dimensions)) + await this.emitDimensionedMetric(elt.namespace, elt.metric, point, this.makeDimensionString(dimensions)) } } this.buffers = {} @@ -417,6 +423,7 @@ export class CustomMetrics { options: MetricQueryOptions = {} ): Promise { this.timestamp = Math.floor((options.timestamp || Date.now()) / 1000) + this.log.info(`Query metrics ${namespace}/${metricName}`, {dimensions}) let owner = options.owner || this.owner @@ -450,6 +457,8 @@ export class CustomMetrics { span = metric.spans.at(-1) period = span.period } + + // Remove this trace soon this.log.info(`Query ${namespace} ${metricName} ${dimString} ${period} ${statistic}`, { owner, metric, @@ -465,6 +474,8 @@ export class CustomMetrics { this.addValue(metric, this.timestamp, {count: 0, sum: 0}, 0, period) let result: MetricQueryResult + + /* istanbul ignore else */ if (metric && span) { if (options.accumulate) { result = this.accumulateMetric(metric, span, statistic, owner) @@ -472,10 +483,13 @@ export class CustomMetrics { result = this.calculateSeries(metric, span, statistic, owner) } } else { + // Should never happen as spans are created for the desired period result = {dimensions, metric: metricName, namespace, period, points: [], owner} } + + // Remove this trace soon this.log.info( - `Metric query ${namespace}, ${metricName}, ${this.makeDimensionString(dimensions)}, ` + + `Metric query ${namespace}, ${metricName}, ${this.makeDimensionString(dimensions) || '[]'}, ` + `period ${period}, statistic "${statistic}"`, {result} ) @@ -520,16 +534,14 @@ export class CustomMetrics { } } else if (statistic == 'sum') { value += point.sum - count += point.count } else if (statistic == 'count') { value += point.count - count += point.count } else if (statistic.match(/^p[0-9]+/)) { - pvalues = pvalues.concat(point.pvalues || []) + pvalues = pvalues.concat(point.pvalues) } /* avg */ else { value += point.sum - count += point.count } + count += point.count } if (statistic.match(/^p[0-9]+/)) { let p = parseInt(statistic.slice(1)) @@ -537,7 +549,7 @@ export class CustomMetrics { let nth = Math.min(Math.round((pvalues.length * p) / 100 + 1), pvalues.length - 1) value = pvalues[nth] } else if (statistic == 'avg') { - value /= count || 1 + value /= Math.max(count, 1) } return { dimensions: this.makeDimensionObject(metric.dimensions), @@ -562,9 +574,9 @@ export class CustomMetrics { Also, the last point may have an end beyond the current time, so limit. */ let timestamp = span.end - span.points.length * interval + let value: number | undefined let i = 0 for (let point of span.points) { - let value: number | undefined if (point.count > 0) { if (statistic == 'max') { if (point.max != undefined) { @@ -588,12 +600,12 @@ export class CustomMetrics { value = point.count } else if (statistic.match(/^p[0-9]+/)) { let p = parseInt(statistic.slice(1)) - let pvalues = point.pvalues || [] + let pvalues = point.pvalues pvalues.sort((a, b) => a - b) let nth = Math.min(Math.round((pvalues.length * p) / 100 + 1), pvalues.length - 1) value = pvalues[nth] } /* avg */ else { - value = point.sum / (point.count || 1) + value = point.sum / point.count } } // Want timestamp to be the end of the point bucket @@ -617,11 +629,7 @@ export class CustomMetrics { */ private makeDimensionString(dimensions: MetricDimensions): string { let result: string[] = [] - for (let [name, value] of Object.entries(dimensions || {})) { - if (Array.isArray(value)) { - /* istanbul ignore next */ - this.log.error(`Dimension is an array`, {value, dimensions}) - } + for (let [name, value] of Object.entries(dimensions)) { result.push(`${name}=${value}`) } return result.join(',') @@ -642,7 +650,8 @@ export class CustomMetrics { } /* - Add a point value to the metric at a desired timestamp and/or span index + Add a point value to the metric at a desired timestamp (and/or span index). + We aggregate aged out points to upper spans as required. For queries, a queryPeriod nominates the desired span, all lower span values are aggregated up. */ private addValue( @@ -660,49 +669,64 @@ export class CustomMetrics { let points = span.points let start = span.end - points.length * interval - // Stop recursing once the desired span period is reached - let period = span.period < queryPeriod ? queryPeriod : 0 + // Aggregate points to higher spans if not querying or if querying and not yet at desired period + let aggregate = !queryPeriod || span.period < queryPeriod ? true : false + + // Just for safety + /* istanbul ignore next */ + while (points.length > span.samples) { + points.shift() + } + /* + Aggregate points. Calculate how many points have aged, or if querying, how many must be aggregated. + */ if (points.length) { /* istanbul ignore next */ if (timestamp < start) { - this.log.error('Bad span', {metric, point, timestamp, start, span}) + this.log.error('Bad metric span', {metric, point, timestamp, start, span}) return } let shift = 0 - if (queryPeriod) { - if (period) { - // Propagate all points if not on the target period - shift = points.length - } + if (queryPeriod && aggregate) { + // Querying and not yet on the target period, so aggregate all points + shift = points.length } else if (point.count) { - // Adding one more to make room for this point being added - shift = (timestamp - start) / interval - span.samples + 1 + // Point to add and either querying and not on the target period or normal emit. + // Add one more to make room for this point being added incase points[] is full. + shift = Math.floor((timestamp - start) / interval) - span.samples + 1 + } else if (queryPeriod) { + // Querying and on target period so, shift out all points that have aged out. + shift = Math.floor((timestamp - start) / interval) - span.samples } shift = Math.max(0, Math.min(shift, points.length)) this.assert(0 <= shift && shift <= points.length) + /* + Shift out aged points to make room. Propagate up to higher spans. + */ while (shift-- > 0) { let p = points.shift() - // Recurse and add the point to the next metric - if (p.count && si + 1 < metric.spans.length) { - this.addValue(metric, start, p, si + 1, period) + // Recurse and add the point to the next metric. If querying, only to this if recursing + if (aggregate && p.count && si + 1 < metric.spans.length) { + this.addValue(metric, start, p, si + 1, queryPeriod) } start += interval } } - if (period) { - if (si + 1 < metric.spans.length) { - this.addValue(metric, timestamp, point, si + 1, period) - } + if (aggregate && queryPeriod && si + 1 < metric.spans.length) { + // Querying and must recurse to aggregate all periods up to the target + this.addValue(metric, timestamp, point, si + 1, queryPeriod) } else if (point.count) { if (points.length == 0) { start = span.end = this.getTimestamp(span, timestamp) } + // Desired time period pre-dates span points while (timestamp < start) { points.unshift({count: 0, sum: 0}) start -= interval } + // Must always push one point space for the current point while (timestamp >= span.end) { points.push({count: 0, sum: 0}) span.end += interval @@ -712,7 +736,6 @@ export class CustomMetrics { let index = Math.floor((timestamp - start) / interval) this.assert(0 <= index && index < points.length) this.setPoint(span, index, point) - // span.points[index].when = timestamp } } @@ -724,8 +747,8 @@ export class CustomMetrics { let points = span.points this.assert(0 <= index && index < points.length) let point = points.at(index)! + /* istanbul ignore next */ if (!point) { - /* istanbul ignore next */ this.log.error(`Metric null point`, {span, index, add}) return } @@ -745,7 +768,7 @@ export class CustomMetrics { if (this.pResolution) { point.pvalues = point.pvalues || [] if (add.pvalues) { - point.pvalues.push(...(add.pvalues || [add.sum / add.count])) + point.pvalues.push(...add.pvalues) } else { point.pvalues.push(add.sum / add.count) } @@ -773,9 +796,10 @@ export class CustomMetrics { } let where: string | undefined = undefined if (metric.seq != undefined) { + /* istanbul ignore next */ let seq = (metric.seq = metric.seq || 0) + /* istanbul ignore next */ if (metric.seq++ >= MaxSeq) { - /* istanbul ignore next */ metric.seq = 0 } where = `\${seq} = {${seq}}` @@ -793,6 +817,7 @@ export class CustomMetrics { stats, where, }) + /* KEEP let size = JSON.stringify(result).length this.log.info( `Emit metric: ${metric.namespace}/${metric.metric}/${metric.dimensions} = ${point.sum}, msize ${size}`, @@ -801,7 +826,7 @@ export class CustomMetrics { point, stats, } - ) + ) */ } /* @@ -812,16 +837,18 @@ export class CustomMetrics { async getMetricList( namespace: string = undefined, metric: string = undefined, - options: MetricListOptions = {fields: ['pk', 'sk'], limit: MetricListLimit} + options: MetricListOptions = {/* fields: ['pk', 'sk'], */ limit: MetricListLimit} ): Promise { + /* if (!options.fields) { options.fields = ['pk', 'sk'] - } + } */ let map = {} as any let next: object | undefined let owner = options.owner || this.owner do { options.next = next + /* istanbul ignore next */ options.log = this.options.log == 'verbose' ? true : false let list = await this.db.find('Metric', {owner, namespace, version: Version}, options) if (list.length) { @@ -935,6 +962,17 @@ export class CustomMetrics { this.log.error(`Assertion failed ${msg.stack}`) } } -} -export {Schema, Version} + /* istanbul ignore next */ + private info(message: string, context = {}) { + console.log('INFO: ' + message, context) + } + + /* istanbul ignore next */ + private error(message: string, context = {}) { + console.log('ERROR: ' + message, context) + } + + /* istanbul ignore next */ + private nop() {} +}