From 38dfb48e5389dffd3c3807bfac994395adffc06d Mon Sep 17 00:00:00 2001 From: hudy9x Date: Thu, 14 Mar 2024 18:01:37 +0700 Subject: [PATCH] update code --- packages/be-gateway/src/events/index.ts | 10 ++ .../be-gateway/src/events/reminder.event.ts | 22 +++ .../src/exceptions/InternalErrorException.ts | 7 + packages/be-gateway/src/lib/redis.ts | 16 +- packages/be-gateway/src/main.ts | 4 +- packages/be-gateway/src/routes/task/index.ts | 103 ++---------- packages/be-gateway/src/routes/test/index.ts | 49 +++++- .../be-gateway/src/services/task.service.ts | 153 ++++++++++++++++++ packages/be-scheduler/src/cronJob.ts | 14 +- packages/be-scheduler/src/main.ts | 12 +- packages/be-scheduler/src/scheduler.ts | 6 + packages/shared-libs/src/lib/date.ts | 16 +- packages/shared-pubsub/src/lib/publisher.ts | 13 +- 13 files changed, 314 insertions(+), 111 deletions(-) create mode 100644 packages/be-gateway/src/events/reminder.event.ts create mode 100644 packages/be-gateway/src/exceptions/InternalErrorException.ts create mode 100644 packages/be-gateway/src/services/task.service.ts diff --git a/packages/be-gateway/src/events/index.ts b/packages/be-gateway/src/events/index.ts index 2f1cc044..0c84d79e 100644 --- a/packages/be-gateway/src/events/index.ts +++ b/packages/be-gateway/src/events/index.ts @@ -1,8 +1,10 @@ import { connectSubClient } from '@shared/pubsub' import { NotificationEvent } from './notification.event' +import { ReminderEvent } from './reminder.event' export const CHANNEL_SCHEDULER_ACTION_NOTIFY = 'scheduler:action-notify' export const CHANNEL_SCHEDULER_CREATE = 'scheduler:create' +export const CHANNEL_RUN_EVERY_MINUTE = 'fixed:run-every-minute' export const EVENT = { SCHEDULER_DELETE: 'scheduler:delete' } @@ -11,14 +13,22 @@ connectSubClient((err, redis) => { if (err) { return } + // We must subscribe channels first redis.subscribe(CHANNEL_SCHEDULER_ACTION_NOTIFY, (err, count) => { console.log('subscribed', CHANNEL_SCHEDULER_ACTION_NOTIFY) }) + redis.subscribe(CHANNEL_RUN_EVERY_MINUTE) + + // After that, we can listen messages from them redis.on('message', async (channel: string, data: string) => { if (channel === CHANNEL_SCHEDULER_ACTION_NOTIFY) { const event = new NotificationEvent() event.run(data) } + if (channel === CHANNEL_RUN_EVERY_MINUTE) { + const reminder = new ReminderEvent() + reminder.run() + } }) }) diff --git a/packages/be-gateway/src/events/reminder.event.ts b/packages/be-gateway/src/events/reminder.event.ts new file mode 100644 index 00000000..d170a6a8 --- /dev/null +++ b/packages/be-gateway/src/events/reminder.event.ts @@ -0,0 +1,22 @@ +import { notifyToWebUsers } from '../lib/buzzer' +import { findCache, getJSONCache } from '../lib/redis' +import { extracDatetime, padZero } from '@shared/libs' +export class ReminderEvent { + async run() { + const { y, m, d, hour, min } = extracDatetime(new Date()) + + const key = [ + `remind-${y}-${padZero(m)}-${padZero(d)}-${padZero(hour)}:${padZero(min)}` + ] + + const results = await findCache(key) + + console.log('keys list', results) + if (!results.length) return + + results.forEach(k => { + const res = getJSONCache([k]) + console.log(res) + }) + } +} diff --git a/packages/be-gateway/src/exceptions/InternalErrorException.ts b/packages/be-gateway/src/exceptions/InternalErrorException.ts new file mode 100644 index 00000000..c3daac1e --- /dev/null +++ b/packages/be-gateway/src/exceptions/InternalErrorException.ts @@ -0,0 +1,7 @@ +export default class InternalErrorException extends Error { + status: number + constructor(message?: string) { + super(message || 'INTERNAL_SERVER_ERROR') + this.status = 500 + } +} diff --git a/packages/be-gateway/src/lib/redis.ts b/packages/be-gateway/src/lib/redis.ts index 0ceeb526..4d5918b8 100644 --- a/packages/be-gateway/src/lib/redis.ts +++ b/packages/be-gateway/src/lib/redis.ts @@ -93,20 +93,27 @@ export const genKeyFromSource = (source: { [key: string]: unknown }) => { export const setJSONCache = ( key: CACHE_KEY, - value: RedisJSONValue | RedisJSONValue[] + value: RedisJSONValue | RedisJSONValue[], + expired?: number ) => { if (!connected) { return null } try { const cacheKey = genKey(key) + console.log('cachekey', cacheKey, expired) redis.set(cacheKey, JSON.stringify(value)) - redis.expire(cacheKey, DAY) + redis.expire(cacheKey, expired || DAY) } catch (error) { console.log('set redis cache error') } } +export const setCacheExpire = (key: CACHE_KEY, expired: number) => { + const cacheKey = genKey(key) + redis.expire(cacheKey, expired) +} + export const getJSONCache = async (key: CACHE_KEY) => { if (!connected) { return null @@ -147,10 +154,11 @@ export const delMultiCache = async (keys: CACHE_KEY[]) => { await pipeline.exec() } -export const findCache = async (key: CACHE_KEY) => { +export const findCache = async (key: CACHE_KEY, abs = false) => { try { const newKey = genKey(key) - const results = await redis.keys(newKey + '*') + const asterisk = abs ? '' : '*' + const results = await redis.keys(newKey + asterisk) return results } catch (error) { console.log('find cache key error', error) diff --git a/packages/be-gateway/src/main.ts b/packages/be-gateway/src/main.ts index ba62eeb9..5ed22e1b 100644 --- a/packages/be-gateway/src/main.ts +++ b/packages/be-gateway/src/main.ts @@ -12,7 +12,9 @@ import './events' import Routes from './routes' // import { Log } from './lib/log' -connectPubClient() +connectPubClient(err => { + console.log(err) +}) const app: Application = express() app.get('/check-health', (req, res) => { diff --git a/packages/be-gateway/src/routes/task/index.ts b/packages/be-gateway/src/routes/task/index.ts index d7f4adb4..7bbe009a 100644 --- a/packages/be-gateway/src/routes/task/index.ts +++ b/packages/be-gateway/src/routes/task/index.ts @@ -39,6 +39,7 @@ import { } from '../../services/todo.counter' import ActivityService from '../../services/activity.service' import { createModuleLog } from '../../lib/log' +import TaskService from '../../services/task.service' const logging = createModuleLog('ProjectTask') // import { Log } from '../../lib/log' @@ -258,100 +259,17 @@ router.post('/project/task/make-cover', async (req: AuthRequest, res) => { // It means POST:/api/example router.post('/project/task', async (req: AuthRequest, res) => { - const body = req.body as Task - const activityService = new ActivityService() - const { - desc, - visionId, - assigneeIds, - title, - dueDate, - projectId, - priority, - progress - } = req.body as Task - let taskStatusId = body.taskStatusId - const { id } = req.authen - - const key = [CKEY.TASK_QUERY, projectId] - const counterKey = [CKEY.PROJECT_TASK_COUNTER, projectId] - try { - const doneStatus = await mdTaskStatusWithDoneType(projectId) - - const done = doneStatus && doneStatus.id === taskStatusId ? true : false - - if (!taskStatusId) { - const todoStatus = await mdTaskStatusWithTodoType(projectId) - taskStatusId = todoStatus.id - } - - const order = await incrCache(counterKey) - const result = await mdTaskAdd({ - title, - cover: null, - order: order, - startDate: null, - dueDate: dueDate || null, - plannedStartDate: dueDate || null, - plannedDueDate: dueDate || null, - assigneeIds, - desc, - done, - fileIds: [], - projectId, - priority, - taskStatusId: taskStatusId, - tagIds: [], - visionId: visionId || null, - parentTaskId: null, - taskPoint: null, - createdBy: id, - createdAt: new Date(), - updatedAt: null, - updatedBy: null, - progress + const taskService = new TaskService() + const result = await taskService.createNewTask({ + uid: req.authen.id, + body: req.body }) - activityService.createTask({ - id: result.id, - userId: id - }) - - const processes = [] - - // delete todo counter - if (assigneeIds && assigneeIds[0]) { - processes.push(deleteTodoCounter([assigneeIds[0], projectId])) - } - - // delete all cached tasks - processes.push(findNDelCaches(key)) - - // run all process - await Promise.allSettled(processes) - - if (result.assigneeIds && result.assigneeIds.length) { - // if created user and assigned user are the same person - // do not send notification - if (result.assigneeIds[0] !== id) { - const project = await mdProjectGet(result.projectId) - const taskLink = genFrontendUrl( - `${project.organizationId}/project/${projectId}?mode=task&taskId=${result.id}` - ) - - notifyToWebUsers(result.assigneeIds, { - title: 'Got a new task', - body: `${result.title}`, - deep_link: taskLink - }) - } - } - res.json({ status: 200, data: result }) } catch (error) { console.log(error) - res.json({ status: 500, error }) + res.status(500).send(error) } }) @@ -581,9 +499,13 @@ router.put('/project/task', async (req: AuthRequest, res) => { await Promise.allSettled(processes) const getWatchers = async () => { - const watchers = await projectSettingRepo.getAllNotifySettings(result.projectId) + const watchers = await projectSettingRepo.getAllNotifySettings( + result.projectId + ) // merge watchers and make sure that do not send it to user who updated this task - const watcherList = [...result.assigneeIds, ...watchers].filter(uid => uid !== userId) + const watcherList = [...result.assigneeIds, ...watchers].filter( + uid => uid !== userId + ) return watcherList } @@ -617,7 +539,6 @@ router.put('/project/task', async (req: AuthRequest, res) => { body: `From ${oldProgress} => ${result.progress} on "${result.title}"`, deep_link: taskLink }) - } res.json({ status: 200, data: result }) diff --git a/packages/be-gateway/src/routes/test/index.ts b/packages/be-gateway/src/routes/test/index.ts index f81ec645..9b28205b 100644 --- a/packages/be-gateway/src/routes/test/index.ts +++ b/packages/be-gateway/src/routes/test/index.ts @@ -9,7 +9,14 @@ import { Res } from '../../core' import { CounterType } from '@prisma/client' -import { CKEY, incrCache, setCache } from '../../lib/redis' +import { + CKEY, + delCache, + findCache, + incrCache, + setCache, + setJSONCache +} from '../../lib/redis' import { TaskQueue, getTaskQueueInstance } from '../../queues' @@ -22,6 +29,46 @@ export class TestController extends BaseController { this.taskQueue = getTaskQueueInstance() } + calculateSecondBetween2Date() { + const d1 = new Date() + const d2 = new Date( + d1.getFullYear(), + d1.getMonth(), + d1.getDate(), + d1.getHours(), + d1.getMinutes() + 2 + ) + + return (d2.getTime() - d1.getTime()) / 1000 + } + + @Get('/hello') + async sayHello() { + const { taskId } = this.req.query as { taskId: string } + console.log('hello to test/ api ') + + console.log( + 'calculate second between 2 dates', + this.calculateSecondBetween2Date() + ) + + const key = [`remind-${taskId}-24-03-14-14:45`] + const result = await findCache(key, true) + + console.log('result:', result) + + if (!result.length) { + await setJSONCache(key, { + title: 'Just 15p to this meeting' + }) + } else { + console.log('delete cache') + delCache(key) + } + + return 1111 + } + @Get('/bullmq') async runQueue() { // await this.taskQueue.addJob('name', { diff --git a/packages/be-gateway/src/services/task.service.ts b/packages/be-gateway/src/services/task.service.ts new file mode 100644 index 00000000..3ad6569a --- /dev/null +++ b/packages/be-gateway/src/services/task.service.ts @@ -0,0 +1,153 @@ +import { Task } from '@prisma/client' +import ActivityService from './activity.service' +import { CKEY, findNDelCaches, incrCache, setJSONCache } from '../lib/redis' +import { + mdProjectGet, + mdTaskAdd, + mdTaskStatusWithDoneType, + mdTaskStatusWithTodoType +} from '@shared/models' +import { deleteTodoCounter } from './todo.counter' +import { genFrontendUrl } from '../lib/url' +import { notifyToWebUsers } from '../lib/buzzer' +import InternalErrorException from '../exceptions/InternalErrorException' + +export default class TaskService { + activityService: ActivityService + constructor() { + this.activityService = new ActivityService() + } + async createNewTask({ uid, body }: { uid: string; body: Task }) { + { + const activityService = this.activityService + const { + desc, + visionId, + assigneeIds, + title, + dueDate, + projectId, + priority, + progress + } = body + let taskStatusId = body.taskStatusId + + const key = [CKEY.TASK_QUERY, projectId] + const counterKey = [CKEY.PROJECT_TASK_COUNTER, projectId] + + try { + const doneStatus = await mdTaskStatusWithDoneType(projectId) + const done = doneStatus && doneStatus.id === taskStatusId ? true : false + + if (!taskStatusId) { + const todoStatus = await mdTaskStatusWithTodoType(projectId) + taskStatusId = todoStatus.id + } + + const order = await incrCache(counterKey) + const result = await mdTaskAdd({ + title, + cover: null, + order: order, + startDate: null, + dueDate: dueDate || null, + plannedStartDate: dueDate || null, + plannedDueDate: dueDate || null, + assigneeIds, + desc, + done, + fileIds: [], + projectId, + priority, + taskStatusId: taskStatusId, + tagIds: [], + visionId: visionId || null, + parentTaskId: null, + taskPoint: null, + createdBy: uid, + createdAt: new Date(), + updatedAt: null, + updatedBy: null, + progress + }) + + activityService.createTask({ + id: result.id, + userId: uid + }) + + const processes = [] + + // delete todo counter + if (assigneeIds && assigneeIds[0]) { + processes.push(deleteTodoCounter([assigneeIds[0], projectId])) + } + + // delete all cached tasks + processes.push(findNDelCaches(key)) + + // run all process + await Promise.allSettled(processes) + + this.notifyNewTaskToAssignee({ uid, task: result }) + this.createTaskReminder(result) + + return result + } catch (error) { + console.log(error) + throw new InternalErrorException(error) + } + } + } + + async createTaskReminder(task: Task) { + const dueDate = task.dueDate + const d1 = new Date(dueDate) + const now = new Date() + + if (d1 < now) { + console.log('can not create reminder for past tasks') + return + } + + const expired = (d1.getTime() - now.getTime()) / 1000 + + const y = d1.getFullYear() + const m = d1.getMonth() + const d = d1.getDate() + const hour = d1.getHours() + const min = d1.getMinutes() + const pZero = n => (n < 10 ? '0' + n : n) + + const key = [ + `remind-${y}-${pZero(m)}-${pZero(d)}-${pZero(hour)}:${pZero(min)}-${ + task.id + }` + ] + console.log(key) + + setJSONCache(key, task, Math.ceil(expired)) + } + + async notifyNewTaskToAssignee({ uid, task }: { uid: string; task: Task }) { + const assigneeIds = task.assigneeIds + if (!assigneeIds.length) return + + // if creator and assignee is the same person + // do not send notification + const filtered = assigneeIds.filter(assignee => assignee !== uid) + + if (!filtered.length) return + + const project = await mdProjectGet(task.projectId) + const taskLink = genFrontendUrl( + `${project.organizationId}/project/${task.projectId}?mode=task&taskId=${task.id}` + ) + + notifyToWebUsers(filtered, { + title: 'Got a new task', + body: `${task.title}`, + deep_link: taskLink + }) + } +} diff --git a/packages/be-scheduler/src/cronJob.ts b/packages/be-scheduler/src/cronJob.ts index 8daa3aa9..e18945e7 100644 --- a/packages/be-scheduler/src/cronJob.ts +++ b/packages/be-scheduler/src/cronJob.ts @@ -1,9 +1,10 @@ import cron, { ScheduledTask } from 'node-cron' import { randomUUID } from 'crypto' -import { Scheduler } from '@prisma/client' export type TTrigger = { every: + | 'minute' + | 'hour' | 'day' | 'weekday' | 'mon' @@ -13,21 +14,20 @@ export type TTrigger = { | 'fri' | 'sat' | 'sun' - at: { hour: number; minute: number; period: 'am' | 'pm' } -} - -type TActionNotifyConfig = { - [key: string]: unknown + at?: { hour: number; minute: number; period: 'am' | 'pm' } } const cronList = new Map() const generateCronPattern = (trigger: TTrigger) => { const { every, at } = trigger - const { hour, minute, period } = at + const { hour, minute, period } = at || { hour: 1, minute: 1, period: 'am' } let time: string[] = [] + if (every === 'minute') return '* * * * *' + if (every === 'hour') return '0 * * * *' + if (hour && minute && period) { time.push(minute + '') time.push(hour + (period === 'pm' ? 12 : 0) + '') diff --git a/packages/be-scheduler/src/main.ts b/packages/be-scheduler/src/main.ts index a5131803..8812559f 100644 --- a/packages/be-scheduler/src/main.ts +++ b/packages/be-scheduler/src/main.ts @@ -2,15 +2,25 @@ import 'dotenv/config' import { connectPubClient, connectSubClient } from '@shared/pubsub' import { SchedulerAction } from './scheduler' import { NotificationAction } from './actions/NotificationAction' +import { cronJob } from './cronJob' +connectPubClient((err, redis) => { + if (err) return + // for fixed cronjobs + const fixedCronId = 'fixed-cron-id' + cronJob.create(fixedCronId, { every: 'minute' }, () => { + const CHANNEL_RUN_EVERY_MINUTE = 'fixed:run-every-minute' + redis.publish(CHANNEL_RUN_EVERY_MINUTE, 'Hello') + }) +}) -connectPubClient() connectSubClient((err, redis) => { if (err) { console.log(err) return } + // for automation scheduler that was setting up be each project const schedulerAction = new SchedulerAction(redis) schedulerAction.register(new NotificationAction()) schedulerAction.run() diff --git a/packages/be-scheduler/src/scheduler.ts b/packages/be-scheduler/src/scheduler.ts index 6853351b..0c8dab89 100644 --- a/packages/be-scheduler/src/scheduler.ts +++ b/packages/be-scheduler/src/scheduler.ts @@ -68,6 +68,7 @@ export class SchedulerAction { const rd = this.rd const repo = this.repo + // after subscribing channels, now we can listen from them rd.on('message', (channel, message) => { if (channel === EVENT.SCHEDULER_CREATE) { const data = JSON.parse(message) as Scheduler @@ -90,6 +91,7 @@ export class SchedulerAction { subscribeChannel() { const rd = this.rd + // just subscribe channel by name, and do nothing rd.subscribe(EVENT.SCHEDULER_DELETE) rd.subscribe(EVENT.SCHEDULER_CREATE, (err, count) => { if (err) { @@ -100,8 +102,12 @@ export class SchedulerAction { } run() { + // get all scheduler from databases this.fetchNRunAllScheduler() + // create channels first this.subscribeChannel() + // after create channels, now we can listen from them + // note: you must create channels first then listeners must work this.listenMessage() } } diff --git a/packages/shared-libs/src/lib/date.ts b/packages/shared-libs/src/lib/date.ts index 8e55411f..54c8ca11 100644 --- a/packages/shared-libs/src/lib/date.ts +++ b/packages/shared-libs/src/lib/date.ts @@ -1,4 +1,4 @@ -import { format } from "date-fns" +import { format } from 'date-fns' export function getDatesInMonth(date: Date) { console.log('a') @@ -8,6 +8,20 @@ export function dateFormat(date: number | Date, formatString: string) { return format(date, formatString) } +export function padZero(n: number) { + return n < 10 ? '0' + n : n +} + +export function extracDatetime(dt: Date) { + const y = dt.getFullYear() + const m = dt.getMonth() + const d = dt.getDate() + const hour = dt.getHours() + const min = dt.getMinutes() + + return { y, m, d, hour, min } +} + export function isDateEqual(dateLeft: Date, dateRight: Date): boolean { const d1 = [ dateLeft.getFullYear(), diff --git a/packages/shared-pubsub/src/lib/publisher.ts b/packages/shared-pubsub/src/lib/publisher.ts index 02aeba10..1e4b5e1f 100644 --- a/packages/shared-pubsub/src/lib/publisher.ts +++ b/packages/shared-pubsub/src/lib/publisher.ts @@ -1,30 +1,33 @@ -import Redis from "ioredis"; +import Redis from 'ioredis' let redis: Redis let error = false -export const connectPubClient = () => { +export const connectPubClient = ( + cb: (error: boolean, redis?: Redis) => void +) => { try { redis = new Redis(process.env.REDIS_HOST) redis.once('connect', () => { error = false console.log('redis publisher connection established') + cb(false, redis) }) redis.on('error', err => { if (error) return console.log('redis publisher connection error') error = true + cb(true) // console.log(error) }) } catch (error) { + cb(true) console.log('redis publisher connection error') } - } - export const publish = (channel: string, message: unknown) => { console.log('pubblished channel', channel) - redis.publish(channel, JSON.stringify(message)); + redis.publish(channel, JSON.stringify(message)) }