Skip to content

BIE Kafka Client

Ponnia edited this page May 1, 2024 · 50 revisions

Intro

Status: Currently, this page is a Work-In-Progress Tech Spec. Once implementation is completed, update and transition this page to become documentation for the BIE Kafka Client microservice.

Epic and tickets: Consume BIP contention event streams #1642

Goal: Implement a BIE Kafka Client microservice that other VRO components can use to subscribe to and handle BIE's Event Stream

Diagram

graph TD

subgraph Kafka
    subgraph BIE[BIE env]
        schema-registry(Schema Registry)
    end

    subgraph BIE[BIE env]
        bie-kafka[TST_CONTENTION_BIE_CONTENTION_ASSOCIATED_TO_CLAIM_V02\nTST_CONTENTION_BIE_CONTENTION_UPDATED_V02\nTST_CONTENTION_BIE_CONTENTION_CLASSIFIED_V02\nTST_CONTENTION_BIE_CONTENTION_COMPLETED_V02\nTST_CONTENTION_BIE_CONTENTION_DELETED_V02]
    end
end

BIE -.->|event| kafka-client


subgraph VRO

    subgraph test_env[Test env]
        mock-kafka[mock-kafka-server]
        mock-schema-registry[mock-schema-registry]
    end
    
    subgraph svc-bie-kafka
        kafka-client -.-> kafkaEvent-notifier
    end
    subgraph MQ["(Platform) MQ"]
        kafkaEventQ(bie-events-contention-associated)
        kafkaEventQ2(bie-events-contention-updated)
        kafkaEventQ3(bie-events-contention-classified)
        kafkaEventQ4(bie-events-contention-completed)
        kafkaEventQ5(bie-events-contention-deleted)
    end
    

    kafkaEvent-notifier -.-> MQ -.-> event-handler
    saveToDB -.-> DB
    DB[("Postgres DB")]
    subgraph xample-workflow
        event-handler(kafkaEvent-handler) -.-> saveToDB
    end
end
test_env -.->|Test event| kafka-client
event-handler[kafkaEvent-handler] -.-> Datadog


style svc-bie-kafka fill:#AAF,stroke-width:2px,stroke:#777
style xample-workflow fill:#FAA,stroke-width:2px,stroke:#777
style test_env fill:#AAA,stroke-width:2px,stroke:#777
style Kafka fill:#EEE,stroke-width:2px,stroke:#777
style MQ fill:#AA7,stroke-width:2px,stroke:#997
Loading
  • Dotted arrows represent many events flowing, as a result of the topic subscription.
  • There may be one kafkaEvent-handler to handle multiple Kafka event exchanges; or one kafkaEvent-handlers per exchange.

When VRO is deployed to LHDI, VRO uses the respective BIE env (TBD in Establish connectivity between VRO and BIE kafka service environments #1674). When VRO is run locally or as part of integration tests, the mock-kafka-server will be used instead.

Verify connection to Kafka clusters using kcat (a.k.a kafkacat)

  1. visit Lightkeeper-tool and setup lighthouse and kubectl cli tool and get the config file, set an alias for dev env alias kcdev=va-abd-rrd-dev
  2. run kcdev apply -f sleep-pod.yaml to deploy a k8s pod to dev namespace using a manifest file like this one (run kcdev get pods you should see the pod from the list)
apiVersion: v1
kind: Pod
metadata:
  name: sleep-pod
  namespace: va-abd-rrd-dev
spec:
  containers:
    - name: sleep
      image: debian:latest
      command: ["/bin/sleep", "3650d"]
      resources:
        limits:
          cpu: "1"
          memory: "1Gi"
  1. run kcdev exec -it sleep-pod -- /bin/sh to access the pod
  2. run apt-get update && apt-get install kafkacat to install kcat, then exit the pod
  3. copy CA, private key, certificate from here (in VA network) and kafka_config.txt local files into the pod tmp folder e.g. kcdev cp /test.vro.bip.va.gov.pem sleep-pod:/tmp

kafka_config.txt content

bootstrap.servers=kafka.dev.bip.va.gov:443
security.protocol=SSL
ssl.key.location=test.vro.bip.va.gov.pem
ssl.certificate.location=test.vro.bip.va.gov.crt.pem
ssl.ca.location=VACACerts.pem
  1. access the pod again and cd to tmp folder and run kcat -F kafkacat_config.txt -L
  2. can access topic message by running kcat -F kafkacat_config.txt -L TST_CONTENTION_BIE_CONTENTION_ASSOCIATED_TO_CLAIM_V02 -C

Create keystore and truststore files

Read the instruction and run the script

All VA internal S2 certificates download here

If done correctly, load the truststore file in Keystore Explorer, the setting should look like this image http://aia.pki.va.gov/PKI/AIA/VA/

Kafka Cluster Info Per Environment

ENV Bootstrap Server Schema Registry Permission Type ENV Infix
DEV bip-kafka.dev.bip.va.gov https://bip-schemaregistry.dev.bip.va.gov READ Topic BIA_SERVICES_BIE_CATALOG_TST_
QA bip-kafka.stage.bip.va.gov https://bip-schemaregistry.stage.bip.va.gov READ Topic BIA_SERVICES_BIE_CATALOG_UAT
Sandbox bip-kafka.stage.bip.va.gov https://bip-schemaregistry.stage.bip.va.gov READ Topic BIA_SERVICES_BIE_CATALOG_IVV

NOTE: More Envs to be added as we get the credentials

Kafka Cluster Environments OUTDATED

image

NOTE: PreProd environment urls have been updated to kafka.preprod.bip.va.gov:443 and https://schemaregistry.preprod.bip.va.gov:443

svc-bie-kafka

  • When this microservice starts up, it will immediately subscribe to Kafka topics based on a configuration settings.
    • Authenticate to mock BIE's Kafka service; only this microservice holds the credentials and is responsible for authenticating (microservice clients don't need to provide BIE credentials).
  • When Kafka-topic events come in, send a RabbitMQ message into a RabbitMQ exchange with a payload as follows:
# exchange: 'bie-events-contention-associated'
  (async, one-way) payload = {
    "topic": "TST_CONTENTION_BIE_CONTENTION_ASSOCIATED_TO_CLAIM_V02",
    "notifiedAt": "2023-06-12T19:29 UTC+00:00", // when `svc-bie-kafka` received the event
    "event": { ...entire Kafka event object }
  }
# exchange: 'bie-events-contention-deleted'
  (async, one-way) payload = {
    "topic": "TST_CONTENTION_BIE_CONTENTION_DELETED_V02",
    "notifiedAt": "2023-06-12T19:30 UTC+00:00",
    "event": { ...entire Kafka event object }
  }
  • The RabbitMQ exchange name will be apriori constants, so the microservice and its clients know which exchange to use for each Kafka topic.
  • For the RabbitMQ exchange, create a topic exchange (or fanout exchange if desired), which will allow multiple svc-bie-kafka clients to subscribe to the topic.
    • Using routing keys was considered but we decided to implement a more straightforward 1-to-1 mapping between Kafka topic to RabbitMQ exchange.

xample-workflow

  • Implement an example VRO component that subscribes to RabbitMQ exchange(s) upon startup.
    • If using Java, it's encouraged to add a new Camel route to domain-xample/xample-workflows/ that does event-handling when a msg is available in the RabbitMQ exchange.
  • When a Kafka event comes through the RabbitMQ topic exchange, log the event and save it to the DB.
    • Initially, structure the DB entity generically. Iteratively add more specificity when we know what event fields should be stored as DB columns.
-- table: 'bie_event' columns: 
- uuid: "PK: Primary key used to reference this DB record"
- created_at: "Creation time of this DB record"
- notified_at: "When the event was received by VRO"
- event_type (indexed): "Enum: contention-associated, contention-deleted, ..."
- event_details: "JSON text with PII removed"
- event_at: "Event timestamp extracted from event_details"
- benefit_claim_id (indexed): "Benefit Claim ID extracted from event_details"
- contention_id (indexed): "Contention ID extracted from event_details"
- classification (indexed): "Classification of contention extracted from event_details"

-- table: 'diagnostic_code'
- code: "PK: Diagnostic Codes extracted from event_details"

-- many-to-many association table: 'bie_event_to_diagnostic_code'
- bie_event_uuid: "FK: Primary key used to reference this DB record"
- diagnostic_code: "FK: Diagnostic Codes extracted from event_details"
# Follow guidance at https://stackoverflow.com/a/9790225
  • Timestamps without a time zone should be in GMT
  • Create an integration test that registers a Kafka topic and publishes events to the topic, then checks if expected DB entry exists.

Future Improvements

The following possible improvements are not currently part of the Consume BIP contention event streams #1642 epic unless there is sufficient reason to include one or more of them.

  • Resilience: Store Kafka subscription state in Redis for observability.
  • Correctness when scaling: When multiple svc-bie-kafka instances, ensure there are no duplicate notification events in the RabbitMQ exchange.
Clone this wiki locally