Skip to content

Commit

Permalink
Change get last state code and redis connection (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
willemarcel authored Apr 15, 2024
1 parent 089479f commit 6b62d62
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 17 deletions.
3 changes: 1 addition & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ const { postTagChanges } = require('./lib/tagChanges');
const { formatReplicationKey } = require('./util/format-replication-key');
const { request } = require('./util/request');
const { s3 } = require('./lib/s3-client');

const REPLICATION_BUCKET = process.env.ReplicationBucket || 'osm-planet-us-west-2';
const { REPLICATION_BUCKET } = require('./lib/constants');

process.on('unhandledRejection', (up) => { throw up; });
process.on('exit', (code) => {
Expand Down
3 changes: 3 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const OSMCHA_URL = process.env.OsmchaUrl || 'https://osmcha.org/api/v1';
const OVERPASS_PRIMARY_URL = process.env.OverpassPrimaryUrl || 'https://overpass.osmcha.org';
const OVERPASS_SECONDARY_URL = process.env.OverpassSecondaryUrl || 'https://overpass-api.de';
const REDIS_SERVER = process.env.RedisServer;
const REPLICATION_BUCKET = process.env.ReplicationBucket || 'osm-planet-us-west-2';
const OVERPASS_DELAY = process.env.OverpassServerDelay || 5;

module.exports = {
S3_MAX_RETRIES,
Expand All @@ -26,4 +28,5 @@ module.exports = {
OVERPASS_PRIMARY_URL,
OVERPASS_SECONDARY_URL,
REDIS_SERVER,
REPLICATION_BUCKET
}
1 change: 1 addition & 0 deletions test/test-get-changesets.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ test('test get changeset processes changeset', async (assert) => {
'85063116'
];
assert.deepEqual(Object.keys(results), changesets);
assert.end();
});
25 changes: 25 additions & 0 deletions test/test-get-states.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const test = require('tape');
const { parseSequenceNumber, getOverpassDelay, getOverpassTimestamp } = require('../util/get-states');

test('test getOverpassTimestamp', async (assert) => {
const timestamp = await getOverpassTimestamp();
const date = new Date(timestamp);
assert.equal(typeof date, 'object');
assert.end();
});

test('test getOverpassDelay', async (assert) => {
const delay = await getOverpassDelay();
assert.equal(typeof delay, 'number');
assert.ok(delay > 0);
assert.end();
});

test('test parseSequenceNumber', async (assert) => {
const text = `
#Mon Apr 15 12:31:39 UTC 2024
sequenceNumber=6048654
timestamp=2024-04-15T12\\:31\\:15Z`;
assert.equal(parseSequenceNumber(text), 6048654);
assert.end();
});
5 changes: 2 additions & 3 deletions update.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
'use strict';

const { getBothStates, getOverpassTimestamp } = require('./util/get-states');
const { getPlanetTimestamp } = require('./util/get-states');
const { getLastProcessedState, setProcessedState } = require('./util/redis-client');
const { range } = require('./util/range');
const run = require('./index');

const process = async () => {
const timestamp = await getOverpassTimestamp();
const planetState = getBothStates(timestamp.replace('\n', '')).planet;
const planetState = await getPlanetTimestamp();

let lastProcessedState = await getLastProcessedState();

Expand Down
42 changes: 32 additions & 10 deletions util/get-states.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
'use strict';
const moment = require('moment');
const zlib = require('zlib');
const util = require('util');
const gunzip = util.promisify(zlib.gunzip);
const { REPLICATION_BUCKET, OVERPASS_DELAY, OVERPASS_PRIMARY_URL, OVERPASS_SECONDARY_URL } = require('../lib/constants');
const { s3 } = require('../lib/s3-client');
const { request } = require('./request');
const { OVERPASS_PRIMARY_URL, OVERPASS_SECONDARY_URL } = require('../lib/constants');


const getStates = (created_at, closed_at) => {
Expand Down Expand Up @@ -36,18 +40,36 @@ const getOverpassTimestamp = async () => {
}
}

// return both the OSM Planet replication file and overpass state for a given minute
const getBothStates = (minute) => {
minute = moment.utc(minute);
const start = minute.startOf('minute');
const overpassState = (start.unix()/60) - 22457216;
const osmState = overpassState - 46836;
return { overpass: overpassState, planet: osmState };
};
const getOverpassDelay = async () => {
const timestamp = await getOverpassTimestamp();
return moment.utc().diff(new Date(timestamp.replace('\n', '')), 'minutes') + 1;
}

const getPlanetTimestamp = async () => {
const overpassDelay = await getOverpassDelay();
const { Body } = await s3.getObject({
Bucket: REPLICATION_BUCKET,
Key: 'planet/replication/minute/state.txt'
}).promise();
const state = Body.toString();
return parseSequenceNumber(state) - overpassDelay;
}

const parseSequenceNumber = (content) => {
let sequenceNumber;
content.split('\n').forEach(line => {
if (line.startsWith('sequenceNumber=')) {
sequenceNumber = parseInt(line.split('=')[1], 10);
}
});
return sequenceNumber;
}

module.exports = {
getStates,
getStateForMinute,
getBothStates,
getOverpassDelay,
getOverpassTimestamp,
getPlanetTimestamp,
parseSequenceNumber,
};
6 changes: 4 additions & 2 deletions util/redis-client.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
const { createClient } = require('redis');
const { REDIS_SERVER } = require('../lib/constants');

const REDIS_CONFIG = REDIS_SERVER ? { url: REDIS_SERVER } : null;

const setProcessedState = async (value) => {
const client = await createClient({ url: REDIS_SERVER })
const client = await createClient(REDIS_CONFIG)
.on('error', err => console.log('Redis Client Error', err))
.connect();

Expand All @@ -11,7 +13,7 @@ const setProcessedState = async (value) => {
};

const getLastProcessedState = async () => {
const client = await createClient({ url: REDIS_SERVER })
const client = await createClient(REDIS_CONFIG)
.on('error', err => console.log('Redis Client Error', err))
.connect();

Expand Down

0 comments on commit 6b62d62

Please sign in to comment.