Skip to content

Commit

Permalink
Add prediction feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Nguyen-Duc-Khai committed Nov 18, 2023
1 parent f13e4f2 commit 6f45c9c
Show file tree
Hide file tree
Showing 15 changed files with 9,832 additions and 6,358 deletions.
15,297 changes: 9,160 additions & 6,137 deletions backend/package-lock.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
"author": "",
"license": "ISC",
"dependencies": {
"@hokify/agenda": "^6.3.0",
"agendash": "^4.0.0",
"axios": "^1.6.2",
"cors": "^2.8.5",
"cron-time-generator": "^2.0.1",
"dotenv": "^16.3.1",
"express": "^4.18.2",
"helmet": "^7.0.0",
Expand Down
22 changes: 7 additions & 15 deletions backend/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ const express = require("express");
const helmet = require("helmet");
const cors = require("cors");
const logger = require("./config/logger");
const AirQualityModel = require("./models/AirQuality");
const moment = require("moment-timezone");
const { getData } = require("./helper/getData");
const webpush = require("web-push");
const config = require("./config/config");
const { captureScreenshot } = require("./puppeteer/puppeteer");
// const { captureScreenshot } = require("./puppeteer/puppeteer");
const { calcAQI } = require("./helper/calculateTotalAQI");
const Agendash = require("agendash");

const start = async () => {
const start = async (agenda) => {
const app = express();

webpush.setVapidDetails(
Expand All @@ -31,6 +29,9 @@ const start = async () => {
app.use(cors());
app.options("*", cors());

// Agenda jobs dashboard
app.use("/jobs", Agendash(agenda));

return app;
};

Expand All @@ -51,16 +52,7 @@ const apiRoutes = (app, io, mqtt) => {
pm10:
so2:
no2:
},
{
co:,
o3:,
tvoc:
pm25:
pm10:
so2:
no2:
}
},
]
*/

Expand Down
4 changes: 4 additions & 0 deletions backend/src/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const envVarsSchema = Joi.object()
MQTT_PASSWORD: Joi.string(),
VAPID_PUBLIC_KEY: Joi.string(),
VAPID_PRIVATE_KEY: Joi.string(),
AQI_API_TOKEN: Joi.string(),
PREDICTION_SERVICE_URL: Joi.string(),
})
.unknown();

Expand Down Expand Up @@ -58,4 +60,6 @@ module.exports = {
},
vapidPublicKey: envVars.VAPID_PUBLIC_KEY,
vapidPrivateKey: envVars.VAPID_PRIVATE_KEY,
aqiApiToken: envVars.AQI_API_TOKEN,
predictionServiceUrl: envVars.PREDICTION_SERVICE_URL,
};
142 changes: 79 additions & 63 deletions backend/src/index.js
Original file line number Diff line number Diff line change
@@ -1,63 +1,79 @@
const http = require("http");
const mongoose = require("mongoose");
const { start, apiRoutes } = require("./app");
const config = require("./config/config");
const logger = require("./config/logger");
const socketio = require("./websocket/socketio");
const mqtt = require("./mqtt/mqtt");

let server;

mongoose
.connect(config.mongoose.url, config.mongoose.options)
.then(async () => {
try {
logger.info("Connected to MongoDB");

// migration
// await migrate();

app = await start();
const httpServer = http.createServer(app);

var { message, io } = await socketio(httpServer);
logger.info(message);

var { message, client } = await mqtt(io);
logger.info(message);

apiRoutes(app, io, client);

server = httpServer.listen(config.port, () => {
logger.info(`Listening to port ${config.port}`);
});
} catch (error) {
logger.error(error.message);
}
});

const exitHandler = () => {
if (server) {
server.close(() => {
logger.info("Server closed");
process.exit(1);
});
} else {
process.exit(1);
}
};

const unexpectedErrorHandler = (error) => {
logger.error(error);
exitHandler();
};

process.on("uncaughtException", unexpectedErrorHandler);
process.on("unhandledRejection", unexpectedErrorHandler);

process.on("SIGTERM", () => {
logger.info("SIGTERM received");
if (server) {
server.close();
}
});
const http = require("http");
const mongoose = require("mongoose");
const { start, apiRoutes } = require("./app");
const config = require("./config/config");
const logger = require("./config/logger");
const socketio = require("./websocket/socketio");
const mqtt = require("./mqtt/mqtt");
const { agenda } = require("./jobs");
const { CronTime } = require("cron-time-generator");

let server;

const startAgenda = async () => {
await agenda.start();

/** For testing
* every 5 minutes: cronTime.every(5).minutes()
* every 5 seconds: cronTime.everyMinute()
* every sunday at 00:00 : cronTime.everySundayAt(0, 0)
* */

await agenda.every(CronTime.everyDayAt(16, 28), "retrieveDailyAqi");
// agenda.now("retrieveDailyAqi");
};

mongoose
.connect(config.mongoose.url, config.mongoose.options)
.then(async () => {
try {
logger.info("Connected to MongoDB");

// migration
// await migrate();

app = await start(agenda);
const httpServer = http.createServer(app);

var { message, io } = await socketio(httpServer);
logger.info(message);

var { message, client } = await mqtt(io);
logger.info(message);

apiRoutes(app, io, client);
await startAgenda();

server = httpServer.listen(config.port, () => {
logger.info(`Listening to port ${config.port}`);
});
} catch (error) {
logger.error(error.message);
}
});

const exitHandler = () => {
if (server) {
server.close(() => {
logger.info("Server closed");
process.exit(1);
});
} else {
process.exit(1);
}
};

const unexpectedErrorHandler = (error) => {
logger.error(error);
exitHandler();
};

process.on("uncaughtException", unexpectedErrorHandler);
process.on("unhandledRejection", unexpectedErrorHandler);

process.on("SIGTERM", () => {
logger.info("SIGTERM received");
if (server) {
server.close();
}
});
19 changes: 19 additions & 0 deletions backend/src/jobs/definitions/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const { retrieveDailyAqi } = require("./retrieveDailyAqi");
const logger = require("../../config/logger");

let definitions = [retrieveDailyAqi];

const loadDefinitions = async (agenda) => {
try {
console.log()
for (let definition of definitions) {
await definition(agenda);
}
} catch (error) {
logger.error("Error when load agenda job definitions : " + error);
}
};

module.exports = {
loadDefinitions,
};
53 changes: 53 additions & 0 deletions backend/src/jobs/definitions/retrieveDailyAqi.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const { aqiApiToken, predictionServiceUrl } = require("../../config/config");
const axios = require("axios");
const logger = require("../../config/logger");

const AQI_URL = "http://api.waqi.info";

const retrieveDailyAqi = async (agenda) => {
try {
// define(jobName, fn, [options])
agenda.define(
"retrieveDailyAqi",
async function (job, done) {
try {
var response = await axios({
method: "get",
url: `${AQI_URL}/feed/@1583/?token=${aqiApiToken}`,
});

response = response.data;

logger.info(
`New AQI retrieved for date ${response.data.time.iso} from ${AQI_URL}: ${response.data.aqi}`
);

var requestBody = {
newAqi: response.data.aqi,
date: response.data.time.iso.substring(0, 10),
};

console.log(requestBody);

await axios({
method: "post",
url: `${predictionServiceUrl}/new/aqi`,
data: requestBody,
});

done();
} catch (error) {
logger.error(
"Failed to update new daily aqi in job [agenda.retrieveDailyAqi] : " +
error.message
);
}
},
{ priority: "highest", concurrency: 20 }
);
} catch (error) {
logger.error("Error in job [agenda.retrieveDailyAqi] : " + error);
}
};

module.exports = { retrieveDailyAqi };
36 changes: 36 additions & 0 deletions backend/src/jobs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const config = require("../config/config");
const { Agenda } = require("@hokify/agenda");
const logger = require("../config/logger");
const { loadDefinitions } = require("./definitions/index.js");

var mongoConnectionString = config.mongoose.url;
const processEvery = "60 seconds";

/**
* If having trouble with "Agenda is not a constructor".
* Follow this instruction: https://github.com/agenda/agenda/issues/1293
*/
var agenda = new Agenda({
db: {
address: mongoConnectionString,
options: { useNewUrlParser: true },
collection: "jobs",
},
useUnifiedTopology: true,
});

// Specifies the frequency at which agenda will query the database looking for jobs that need to be processed
agenda.processEvery(processEvery);

// listen for the ready or error event.
agenda
.on("ready", () => logger.info("Agenda started!"))
.on("error", () => logger.error("Agenda connection error!"));

// Define agenda definitions
loadDefinitions(agenda);

// Logs all registered jobs
console.log({ jobs: agenda.definitions });

module.exports = { agenda };
2 changes: 1 addition & 1 deletion backend/src/mqtt/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { calcAQI } = require("../helper/calculateTotalAQI");

const mqttClient = (io) => {
return new Promise((resolve) => {
const clientId = "mqtt_nodejs";
const clientId = "mqtt_nodejs_local";
const connectUrl = `${config.mqttProtocol}://${config.mqttHost}:${config.mqttPort}`;

const client = mqtt.connect(connectUrl, {
Expand Down
Loading

0 comments on commit 6f45c9c

Please sign in to comment.