Skip to content

Commit

Permalink
Configurable time gap between ttl expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
exoego committed Aug 22, 2021
1 parent fe88aad commit 051a13f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 39 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Options:
--deleteTableMs <ms> Amount of time tables stay in DELETING state (default: 500)
--updateTableMs <ms> Amount of time tables stay in UPDATING state (default: 500)
--maxItemSizeKb <kb> Maximum item size (default: 400)
--ttlCheckEvery <kb> Time gap between TTL expiration background job (default: 60)
0 or negative disable TTL expiration

Report bugs at github.com/mhart/dynalite/issues
```
Expand Down
2 changes: 2 additions & 0 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ if (argv.help) {
'--deleteTableMs <ms> Amount of time tables stay in DELETING state (default: 500)',
'--updateTableMs <ms> Amount of time tables stay in UPDATING state (default: 500)',
'--maxItemSizeKb <kb> Maximum item size (default: 400)',
'--ttlCheckEvery <s> Time gap between TTL expiration background job (default: 60)' +
' 0 or negative disable TTL expiration',
'',
'Report bugs at github.com/mhart/dynalite/issues',
].join('\n'))
Expand Down
84 changes: 46 additions & 38 deletions db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function create(options) {
if (options.deleteTableMs == null) options.deleteTableMs = 500
if (options.updateTableMs == null) options.updateTableMs = 500
if (options.maxItemSizeKb == null) options.maxItemSizeKb = exports.MAX_SIZE / 1024
if (options.ttlCheckEvery == null) options.ttlCheckEvery = 60
options.maxItemSize = options.maxItemSizeKb * 1024

var db = levelup(options.path ? require('leveldown')(options.path) : memdown()),
Expand Down Expand Up @@ -128,48 +129,55 @@ function create(options) {
})
}

var timerIdTtlScanner = setInterval(function() {
var currentUnixSeconds = Math.round(Date.now() / 1000)
function logError(err, result) {
if (err) console.error("@@@", err)
}
lazyStream(tableDb.createKeyStream({}), logError)
.join(function(tableNames) {
tableNames.forEach(function(name) {
getTable(name, false, function(err, table) {
if (err) return
if (!table.TimeToLiveDescription || table.TimeToLiveDescription.TimeToLiveStatus !== 'ENABLED') return

var keyAttrNames = table.KeySchema.map(function(i) {
return i.AttributeName
})
var source = {
getItemDb: getItemDb,
getIndexDb: getIndexDb,
}
var itemDb = getItemDb(table.TableName)
var kvStream = lazyStream(itemDb.createReadStream({}), logError())
kvStream = kvStream.filter(function(item){
var ttl = item.value[table.TimeToLiveDescription.AttributeName]
return ttl && typeof ttl.N === 'string' && currentUnixSeconds > Number(ttl.N)
})
kvStream.join(function(kvs){
kvs.forEach(function(kv) {
var itemKey = keyAttrNames.reduce(function(key, attrName) {
key[attrName] = kv.value[attrName]
return key
}, {})
var data = { TableName: name, Key: itemKey}
var cb = function(err) {
// Noop ?
}
deleteItem(source, data, table, itemDb, kv.key, cb)
var timerIdTtlScanner = null
if (typeof options.ttlCheckEvery === 'number' && options.ttlCheckEvery > 0) {
var ttlScannerInterval = options.ttlCheckEvery * 1000

timerIdTtlScanner = setInterval(function () {
var currentUnixSeconds = Math.round(Date.now() / 1000)

function logError(err, result) {
if (err) console.error("@@@", err)
}

lazyStream(tableDb.createKeyStream({}), logError)
.join(function (tableNames) {
tableNames.forEach(function (name) {
getTable(name, false, function (err, table) {
if (err) return
if (!table.TimeToLiveDescription || table.TimeToLiveDescription.TimeToLiveStatus !== 'ENABLED') return

var keyAttrNames = table.KeySchema.map(function (i) {
return i.AttributeName
})
var source = {
getItemDb: getItemDb,
getIndexDb: getIndexDb,
}
var itemDb = getItemDb(table.TableName)
var kvStream = lazyStream(itemDb.createReadStream({}), logError())
kvStream = kvStream.filter(function (item) {
var ttl = item.value[table.TimeToLiveDescription.AttributeName]
return ttl && typeof ttl.N === 'string' && currentUnixSeconds > Number(ttl.N)
})
kvStream.join(function (kvs) {
kvs.forEach(function (kv) {
var itemKey = keyAttrNames.reduce(function (key, attrName) {
key[attrName] = kv.value[attrName]
return key
}, {})
var data = {TableName: name, Key: itemKey}
var cb = function (err) {
// Noop ?
}
deleteItem(source, data, table, itemDb, kv.key, cb)
})
})
})
})
})
})
}, 1000)
}, ttlScannerInterval)
}

function stopBackgroundJobs() {
clearInterval(timerIdTtlScanner)
Expand Down
6 changes: 5 additions & 1 deletion test/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ var port = 10000 + Math.round(Math.random() * 10000),
requestOpts = process.env.REMOTE ?
{host: 'dynamodb.' + exports.awsRegion + '.amazonaws.com', method: 'POST'} :
{host: '127.0.0.1', port: port, method: 'POST'}
var ttlCheckEvery = 1

var dynaliteServer = dynalite({path: process.env.DYNALITE_PATH})
var dynaliteServer = dynalite({
path: process.env.DYNALITE_PATH,
ttlCheckEvery: ttlCheckEvery,
})

var CREATE_REMOTE_TABLES = true
var DELETE_REMOTE_TABLES = true
Expand Down

0 comments on commit 051a13f

Please sign in to comment.