-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.ts
123 lines (106 loc) · 4.15 KB
/
client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import { DataSource, EntityManager, LessThan, MongoRepository } from 'typeorm';
import { PendingOrder } from './model';
import cron, { ScheduledTask } from 'node-cron';
import { ProcessorConfig } from '../chains/interfaces/processorConfig';
import { getChain } from '../chains';
import dayjs from 'dayjs';
import { createLogger, Logger } from '@subsquid/logger';
import { TgLoggerMessage } from './model/tgLoggerMessage';
export class ServiceLocalStorage {
private static instance: ServiceLocalStorage;
private ds!: DataSource;
private initialized: boolean = false; // TODO should be refactored to native DataSource check
private chainConfig: ProcessorConfig;
private sqdLogger: Logger = createLogger('service-local-storage');
private dmnRegPendingOrdersCleanTasks: Map<string, ScheduledTask> = new Map();
constructor() {
this.chainConfig = getChain().config;
}
get dataSource(): DataSource {
if (!this.ds || !this.ds.isInitialized)
throw new Error('DataSource is not available yet.');
return this.ds;
}
get repository(): {
pendingOrder: MongoRepository<PendingOrder>;
tgLoggerMessage: MongoRepository<TgLoggerMessage>;
} {
return {
pendingOrder: this.ds.getMongoRepository(PendingOrder),
tgLoggerMessage: this.ds.getMongoRepository(TgLoggerMessage)
};
}
static getInstance(): ServiceLocalStorage {
if (!ServiceLocalStorage.instance) {
ServiceLocalStorage.instance = new ServiceLocalStorage();
}
return ServiceLocalStorage.instance;
}
async init() {
if (this.initialized) return this;
// this.ds = new DataSource({
// type: 'better-sqlite3',
// // database: ':memory:',
// database: 'src/serviceLocalStorageClient/db/serviceLSDb.sql',
// dropSchema: false,
// synchronize: true,
// logging: false,
// entities: [PendingOrder, TgLoggerMessage]
// });
if (
!this.chainConfig.sellerIndexer.serviceLocalStorageDbUrl ||
this.chainConfig.sellerIndexer.serviceLocalStorageDbUrl.length === 0
)
throw new Error('ServiceLocalStorageDbUrl is undefined');
this.ds = new DataSource({
type: 'mongodb',
url: this.chainConfig.sellerIndexer.serviceLocalStorageDbUrl,
useNewUrlParser: true,
synchronize: true,
logging: false,
entities: [PendingOrder, TgLoggerMessage],
migrations: [],
subscribers: []
});
await this.ds.initialize();
this.initialized = true;
await this.deletePendingOrderWhenExpInitial();
return this;
}
async deletePendingOrderById(orderId: string): Promise<void> {
this.sqdLogger.info(`>>>>> deletePendingOrderById - ${orderId}`);
await this.repository.pendingOrder.deleteOne({ id: { $eq: orderId } });
}
private async deletePendingOrderWhenExpInitial() {
const intervalMinutes =
this.chainConfig.sellerIndexer.dmnRegPendingOrderExpTime / 60 / 1000;
const expiredPendingOrders = await this.repository.pendingOrder.findBy({
timestamp: { $lt: dayjs().subtract(intervalMinutes, 'm').toDate() }
});
const idsToDelete = expiredPendingOrders.map((or) => or.id);
if (!idsToDelete || idsToDelete.length === 0) return;
await this.repository.pendingOrder.deleteMany({ id: { $in: idsToDelete } });
this.sqdLogger.info(
`Next Pending Orders have been deleted automatically due to reaching expiration time: ${idsToDelete.join(
', '
)}`
);
}
public cronDeletePendingOrderWhenExp(orderId: string, schedule?: string) {
const intervalMinutes =
this.chainConfig.sellerIndexer.dmnRegPendingOrderExpTime / 60 / 1000;
this.dmnRegPendingOrdersCleanTasks.set(
orderId,
cron.schedule(schedule || `*/${intervalMinutes} * * * *`, async () => {
await this.deletePendingOrderById(orderId);
this.sqdLogger.info(
`Pending Order with ID: "${orderId}" has been automatically deleted by cron job in ${intervalMinutes} minutes after creation.`
);
if (this.dmnRegPendingOrdersCleanTasks.has(orderId)) {
this.dmnRegPendingOrdersCleanTasks.get(orderId)!.stop();
this.dmnRegPendingOrdersCleanTasks.delete(orderId);
}
})
);
}
}