Skip to content

Commit

Permalink
AB#143 feat: emit the event when run state changes
Browse files Browse the repository at this point in the history
  • Loading branch information
giovannibaratta committed Mar 2, 2024
1 parent d21886b commit b60d4ef
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 12 deletions.
3 changes: 2 additions & 1 deletion service/.env.local
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DATABASE_URL=postgresql://developer:Safe1!@localhost:5432/terraapprove?schema=public
DATABASE_URL=postgresql://developer:Safe1!@localhost:5432/terraapprove?schema=public
KAFKA_BROKERS=localhost:9092
3 changes: 2 additions & 1 deletion service/.env.test
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DATABASE_URL=postgresql://developer:Safe1!@localhost:5433/terraapprove?schema=public
DATABASE_URL=postgresql://developer:Safe1!@localhost:5433/terraapprove?schema=public
KAFKA_BROKERS=localhost:9092
8 changes: 8 additions & 0 deletions service/libs/external/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Injectable} from "@nestjs/common"
@Injectable()
export class Config {
private dbConnectionUrl: string
readonly kafkaBrokers: string[]

constructor() {
const connectionUrl = process.env.DATABASE_URL
Expand All @@ -11,6 +12,13 @@ export class Config {
throw new Error("DATABASE_URL is not defined")

this.dbConnectionUrl = connectionUrl

const rawKafkaBrokers = process.env.KAFKA_BROKERS

if (rawKafkaBrokers === undefined)
throw new Error("KAFKA_BROKERS is not defined")

this.kafkaBrokers = rawKafkaBrokers.split(",")
}

getDbConnectionUrl(): string {
Expand Down
29 changes: 23 additions & 6 deletions service/libs/external/src/external.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import {PLAN_REPOSITORY_TOKEN} from "@libs/service/interfaces/plan.interfaces"
import {
RUN_EVENT_PUBLISHER_TOKEN,
RUN_REPOSITORY_TOKEN
} from "@libs/service/interfaces/run.interfaces"
import {SOURCE_CODE_REPOSITORY_TOKEN} from "@libs/service/interfaces/source-code.interfaces"
import {Module} from "@nestjs/common"
import {Config} from "./config/config"
import {DatabaseClient} from "./db/database-client"
import {SOURCE_CODE_REPOSITORY_TOKEN} from "@libs/service/interfaces/source-code.interfaces"
import {SourceCodeDbRepository} from "./db/source-code.repository"
import {PLAN_REPOSITORY_TOKEN} from "@libs/service/interfaces/plan.interfaces"
import {PlanDbRepository} from "./db/plan.repository"
import {Config} from "./config/config"
import {RUN_REPOSITORY_TOKEN} from "@libs/service/interfaces/run.interfaces"
import {RunDbRepository} from "./db/run.repository"
import {SourceCodeDbRepository} from "./db/source-code.repository"
import {KafkaPublisher} from "./kafka/kafka-publisher"
import {RunKafkaEventPublisher} from "./kafka/run.event-publisher"

const sourceCodeRepository = {
provide: SOURCE_CODE_REPOSITORY_TOKEN,
Expand All @@ -23,15 +28,27 @@ const runRepository = {
useClass: RunDbRepository
}

const runEventPublisher = {
provide: RUN_EVENT_PUBLISHER_TOKEN,
useClass: RunKafkaEventPublisher
}

@Module({
imports: [],
providers: [
sourceCodeRepository,
planRepository,
runRepository,
runEventPublisher,
DatabaseClient,
KafkaPublisher,
Config
],
exports: [sourceCodeRepository, planRepository, runRepository]
exports: [
sourceCodeRepository,
planRepository,
runRepository,
runEventPublisher
]
})
export class ExternalModule {}
31 changes: 31 additions & 0 deletions service/libs/external/src/kafka/kafka-publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {Injectable} from "@nestjs/common"
import {Kafka} from "kafkajs"
import {Config} from "../config/config"

@Injectable()
export class KafkaPublisher {
private readonly kafka: Kafka

constructor(readonly config: Config) {
this.kafka = new Kafka({
clientId: "terraapprove-publisher",
brokers: config.kafkaBrokers,
retry: {
retries: 2
},
connectionTimeout: 5000
})
}

async publish(topic: string, message: string) {
/* Possible improvements: cache the producers instead of creating a new
one for each message to publish */
const producer = this.kafka.producer()
await producer.connect()
await producer.send({
topic,
messages: [{value: message}]
})
await producer.disconnect()
}
}
43 changes: 43 additions & 0 deletions service/libs/external/src/kafka/run.event-publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
RunEventPublisher,
RunState
} from "@libs/service/interfaces/run.interfaces"
import {Injectable, Logger} from "@nestjs/common"
import {TaskEither} from "fp-ts/lib/TaskEither"
import {KafkaPublisher} from "./kafka-publisher"
import * as TE from "fp-ts/lib/TaskEither"
import {pipe} from "fp-ts/lib/function"

@Injectable()
export class RunKafkaEventPublisher implements RunEventPublisher {
constructor(private readonly kafkaPublisher: KafkaPublisher) {}

publishRunState(runState: RunState): TaskEither<never, void> {
const result = pipe(
runState,
TE.right,
TE.map(mapToEvent),
TE.chainW(this.emitRunStateEvent())
)

return result
}

private emitRunStateEvent(): (event: string) => TaskEither<never, void> {
return event =>
TE.tryCatchK(
() => this.kafkaPublisher.publish("run-state-changed", event),
error => {
Logger.error("Error while publishing run state event")
throw error
}
)()
}
}

function mapToEvent(runState: RunState): string {
return JSON.stringify({
...runState,
revision: runState.revision.toString()
})
}
28 changes: 25 additions & 3 deletions service/libs/service/src/run.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import {randomUUID} from "crypto"
import {Either} from "fp-ts/lib/Either"
import {pipe} from "fp-ts/lib/function"
import * as TE from "fp-ts/lib/TaskEither"
import {RunEventPublisher} from "./interfaces/run.interfaces"

@Injectable()
export class RunService {
constructor(
@Inject("RUN_REPOSITORY_TOKEN")
private readonly runRepository: RunDbRepository
private readonly runRepository: RunDbRepository,
@Inject("RUN_EVENT_PUBLISHER_TOKEN")
private readonly runEventPublisher: RunEventPublisher
) {}

async createRun(
Expand All @@ -32,12 +35,24 @@ export class RunService {
}
})

const emitRunStateEvent = (run: BaseRun) =>
pipe(
run,
TE.right,
TE.chainW(value => this.runEventPublisher.publishRunState(value)),
TE.map(() => run)
)

/* The emitting of the event could fail and the event could be lost forever but for now we are
ignoring the issue in order not to overcomplicate the code at this stage. */
const result = await pipe(
request,
TE.right,
TE.chainW(persistRun),
TE.chainW((result: BaseRun) => logCreateResult(result, request)),
TE.map(result => result.id)
TE.chainW(emitRunStateEvent),
TE.chainW(logEmitEventResult),
TE.map(value => value.id)
)()

return result
Expand All @@ -46,7 +61,14 @@ export class RunService {

const logCreateResult = (result: BaseRun, conxtext: CreateRun) => {
Logger.log(
`Created run with id ${result} for source code ${conxtext.sourceCodeId} and plan ${conxtext.planId}`
`Created run with id ${result.id} for source code ${conxtext.sourceCodeId} and plan ${conxtext.planId}`
)
return TE.right(result)
}

const logEmitEventResult = (result: BaseRun) => {
Logger.log(
`Published run state event for run ${result.id} with state ${result.state}`
)
return TE.right(result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ describe("POST /runs", () => {
})
.overrideProvider(Config)
.useValue({
getDbConnectionUrl: () => isolatedDb
getDbConnectionUrl: () => isolatedDb,
kafkaBrokers: ["localhost:9092"]
})
.compile()
app = module.createNestApplication()
Expand Down

0 comments on commit b60d4ef

Please sign in to comment.