-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.ts
85 lines (72 loc) · 2.21 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// SPDX-License-Identifier: CC0-1.0
import {
CompressionCodecs,
CompressionTypes,
Kafka as BaseKafka,
Partitioners,
} from 'kafkajs'
import type {
KafkaConfig as BaseKafkaConfig,
ConsumerConfig as BaseConsumerConfig,
ProducerConfig,
} from 'kafkajs'
import { Issuer } from 'openid-client'
import { randomUUID } from 'crypto'
import { compress, decompress } from '@mongodb-js/zstd'
CompressionCodecs[CompressionTypes.ZSTD] = () => {
return {
compress,
decompress,
}
}
type KafkaConfig = {
client_id: string
client_secret?: string
domain?: 'gcn.nasa.gov' | 'test.gcn.nasa.gov' | 'dev.gcn.nasa.gov'
} & Omit<BaseKafkaConfig, 'brokers'>
type ConsumerConfig = Omit<BaseConsumerConfig, 'groupId'> &
Partial<Pick<BaseConsumerConfig, 'groupId'>>
class Kafka extends BaseKafka {
constructor({
client_id,
client_secret,
domain = 'gcn.nasa.gov',
...config
}: KafkaConfig) {
const brokers = [`kafka.${domain}:9092`]
config.ssl ??= true
if (client_id && !config.sasl) {
const issuer = new Issuer({
issuer: domain,
token_endpoint: `https://auth.${domain}/oauth2/token`,
})
const client = new issuer.Client({ client_id, client_secret })
config.sasl = {
mechanism: 'oauthbearer',
oauthBearerProvider: async () => {
const { access_token } = await client.grant({
grant_type: 'client_credentials',
})
if (!access_token) {
throw new Error('response must contain access_token')
}
return { value: access_token }
},
}
}
super({ brokers, ...config })
}
consumer({ groupId, ...config }: ConsumerConfig = {}) {
groupId ??= randomUUID()
return super.consumer({ groupId, ...config })
}
producer({ createPartitioner, ...config }: ProducerConfig = {}) {
// Suppress default partitioner warning.
// FIXME: remove once KafkaJS has removed the warning.
// See https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner
createPartitioner ??= Partitioners.DefaultPartitioner
return super.producer({ createPartitioner, ...config })
}
}
export * from 'kafkajs'
export { Kafka, ConsumerConfig, KafkaConfig }