diff --git a/src/libs/bootstrap.js b/src/libs/bootstrap.js index cb461e6..7fb748b 100644 --- a/src/libs/bootstrap.js +++ b/src/libs/bootstrap.js @@ -106,7 +106,7 @@ async function initDatabase(app) { app.db = new Database(app.conf); await app.db.init(); - app.userDb = new Users(app.db); + app.userDb = new Users(app); app.db.users = app.userDb; app.db.factories.Network = require('../libs/dataModels/network').factory(app.db, app.crypt); diff --git a/src/worker/messagestores/index.js b/src/worker/messagestores/index.js index 48dc868..776c2f6 100644 --- a/src/worker/messagestores/index.js +++ b/src/worker/messagestores/index.js @@ -77,6 +77,12 @@ class MessageStores { await store.storeMessage(...args); }); } + + async purgeMessages(...args) { + this.stores.filter(s => s.supportsWrite).forEach(async store => { + await store.purgeMessages(...args); + }); + } } module.exports = MessageStores; diff --git a/src/worker/messagestores/sqlite.js b/src/worker/messagestores/sqlite.js index e13b179..13ef476 100644 --- a/src/worker/messagestores/sqlite.js +++ b/src/worker/messagestores/sqlite.js @@ -24,6 +24,13 @@ class SqliteMessageStore { max: 50 * 1000 * 1000, // very roughly 50mb cache length: (entry, key) => key.length, }); + + this.purgerRunning = false; + this.purgerTimeout = 0; + // Max number of records to delete in one loop + this.purgerLimit = 1000; + // Delay in ms between each loop + this.purgerDelay = 1000; } async init() { @@ -49,6 +56,13 @@ class SqliteMessageStore { data BLOB UNIQUE )`); + this.db.exec(` + CREATE TABLE IF NOT EXISTS logs_deletions ( + id INTEGER PRIMARY KEY, + user_id INTEGER, + before_time INTEGER + )`); + this.stmtInsertData = this.db.prepare("INSERT INTO data(data) values(?)"); this.stmtInsertLogWithId = this.db.prepare(` INSERT INTO logs ( @@ -65,6 +79,9 @@ class SqliteMessageStore { ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); this.stmtGetExistingDataId = this.db.prepare("SELECT id FROM data WHERE data = ?"); + + // Start purging any messages defined in logs_deletions + this.startMessagePurger(); } // Insert a chunk of data into the data table if it doesn't already exist, returning its ID @@ -398,6 +415,83 @@ class SqliteMessageStore { this.storeQueue.push({message, upstreamCon, clientCon}); this.storeMessageLoop(); } + + async purgeMessages(userId, beforeTime) { + if (!userId && !beforeTime) { + return; + } + + let stmt = this.db.prepare(` + INSERT INTO logs_deletions ( + user_id, + before_time + ) VALUES (?, ?) + `); + + await stmt.run(userId, beforeTime); + await this.startMessagePurger(); + } + + async startMessagePurger() { + if (this.purgerRunning) { + return; + } + this.purgerRunning = true; + this.messagePurger(); + } + + async messagePurger() { + const selectJob = this.db.prepare(` + SELECT + id, + user_id, + before_time + FROM logs_deletions + ORDER BY id ASC + LIMIT 1 + `); + + const job = selectJob.get(); + if (!job) { + this.purgerRunning = false; + return; + } + + l('Purging messages matching: [user_id: ' + job.user_id + ' && before_time: ' + job.before_time + ']'); + + const whereCond = []; + if (job.user_id) { + whereCond.push('user_id = :userId'); + } + if (job.before_time) { + whereCond.push('time < :beforeTime'); + } + + const deleteLogs = this.db.prepare(` + DELETE FROM logs + WHERE ${whereCond.join(' AND ')} + AND msgid IN ( + SELECT msgid + FROM logs + WHERE ${whereCond.join(' AND ')} + ORDER BY time ASC + LIMIT :limit + )`); + const result = deleteLogs.run({ + userId: job.user_id, + beforeTime: job.before_time, + limit: this.purgerLimit + }); + + if (result.changes < this.purgerLimit) { + // Last query's changes less than limit, job complete + const deleteJob = this.db.prepare('DELETE FROM logs_deletions WHERE id = :jobId'); + deleteJob.run({jobId: job.id}); + l('Purging messages complete'); + } + + this.purgerTimeout = setTimeout(() => { this.messagePurger() }, this.purgerDelay); + } } module.exports = SqliteMessageStore; diff --git a/src/worker/users.js b/src/worker/users.js index f53caf1..7b637ac 100644 --- a/src/worker/users.js +++ b/src/worker/users.js @@ -4,8 +4,9 @@ const { BncError } = require('../libs/errors'); const tokens = require('../libs/tokens'); class Users { - constructor(db) { - this.db = db; + constructor(app) { + this.app = app; + this.db = app.db; } async authUserNetwork(username, password, network) { @@ -197,9 +198,32 @@ class Users { }; async deleteUser(user_id) { - await this.db.factories.User.query().where('id', user_id).delete(); - await this.db.factories.Network.query().where('user_id', user_id).delete(); + // remove user tokens await this.db.db('user_tokens').where('user_id', user_id).delete(); + + if (this.app.cons) { + // app is running disconnect and destroy connected networks + const userOutCons = await this.app.cons.findUsersOutgoingConnection(user_id); + userOutCons.forEach((con) => { + con.close(); + con.destroy(); + }); + + const userInCons = await this.app.cons.findAllUsersClients(user_id); + userInCons.forEach((con) => { + con.close(); + con.destroy(); + }); + } else { + // app is not running remove user's connections from db + await this.db.dbConnections('connections').where('auth_user_id', user_id).delete(); + } + + // delete networks + await this.db.factories.Network.query().where('user_id', user_id).delete(); + + // delete user + await this.db.factories.User.query().where('id', user_id).delete(); } async changeUserPassword(id, password) {