Quantyca - Build with Confluent - Quality gates through centralized computational policy enforcement
- Exploit a wider syntax than the one currently supported by Schema Registry’s domain rules
- Reference a central repository for policy definitions
- Apply policy enforcement as close as possible to data producers
- Define custom behaviour to handle bad data
- Advanced Stream Governance enables usage of domain rules
- Schema Registry associates policies defined externally to topics’ schemas
- Open Data Mesh Platform provides the service for central policy management
- Data Product Descriptor Specification defines the data product ingredients to bind everything together
- Central and tech agnostic repository for policy management: avoid redundancy and facilitate integration
- Proactive quality gate enablement: data is verified before its publication enabling effective quality management
The key advantages of the solution include:
- Central and tech agnostic repository for policy management: avoid redundancy and facilitate integration.
- Proactive quality gates enablement: data is verified before its publication enabling effective quality management, leveraging the Stream Governance Advanced package of the Schema Registry.
- A Confluent Cloud account
- Maven installed in your environment (ref)
- A tool such as curl (or Postman) to send REST requests
Let’s imagine a data product designed to handle real-time order production for an e-commerce platform. Orders must go through validation to ensure compliance with specified criteria. This validation needs to occur before the orders are propagated to the data warehouse for further analysis. In the following step-by-step process it is explained how to build a demo with Confluent Cloud and the ODM Framework.
Steps to try the demo:
- Create a Confluent Cloud Account.
- Sign up for a Confluent Cloud account here.
- Once you have signed up and logged in, click on the menu icon at the upper right hand corner, click on “Billing & payment”, then enter payment details under “Payment details & contacts”. A screenshot of the billing UI is included below.
- Launch an OPA Server.
- In the solution the OPA server is going to be the centralized place where all the policies are stored. To launch a local OPA server using Docker, you can use the following command:
docker run --name odmopa-opa-server -d -p 8181:8181 \ openpolicyagent/opa:latest-rootless run --server --log-level=info \ --log-format=json-pretty
- Configure the Policy Service of the ODM Framework
- Clone the repository from the official Github page
git clone https://github.com/opendatamesh-initiative/odm-platform-up-services-policy-opa.git
- Follow the README file inside the repository to run the application locally. At the end there will be the ODM Policy Service listening on the port 9001.
- Log into Confluent Cloud and enter your email and password.
- If you are logging in for the first time, you will see a self-guided wizard that walks you through spinning up a cluster.
An environment contains clusters and its deployed components such as Apache Flink, Connectors, ksqlDB, and Schema Registry. You have the ability to create different environments based on your company's requirements. For example, you can use environments to separate Development/Testing, Pre-Production, and Production clusters.
- Click + Add Environment. Specify an Environment Name and Click Create.
Note: There is a default environment ready in your account upon account creation. You can use this default environment for the purpose of this demo if you do not wish to create an additional environment.
When it is asked to upgrade for more advanced Stream Governance capabilities, click Upgrade to Stream Governance Advanced starting 1$/hour. Note that using this option there will be costs based on the time that your environment is active. Currently there is no way to go back to Stream Governance Essentials, therefore the only way to stop paying is to delete the envirnment (ref).
- Click Create Cluster.
Note: Confluent Cloud clusters are available in 4 types: Basic, Standard, Enterprise and Dedicated. Basic is intended for development use cases so you will use that for the demo Basic clusters only support single zone availability. Standard and Dedicated clusters are intended for production use and support Multi-zone deployments. If you are interested in learning more about the different types of clusters and their associated features and limits, refer to this documentation.
- Chose the Basic cluster type.
- Click Begin Configuration.
- Select AWS as the provider and eu-west-2 as the region, with single zone.
- Specify a Cluster Name. For the purpose of this lab, any name will work here.
Note If you didn't enter credit card information at the beginning, it may ask you to enter credit card information at Set Payment step. When you sign up for a Confluent Cloud account, you will get free credits to use in Confluent Cloud.
- View the associated Configuration & Cost, Usage Limits, and Uptime SLA information before launching.
- Click Launch Cluster.
- On the navigation menu, you will see Cluster Overview.
Note: This section shows Cluster Metrics, such as Throughput and Storage. This page also shows the number of Topics, Partitions, Connectors, and ksqlDB Applications. Below is an example of the metrics dashboard once you have data flowing through Confluent Cloud.
- Click on Cluster Settings. This is where you can find your Cluster ID, Bootstrap Server, Cloud Details, Cluster Type, and Capacity Limits.
- On the same navigation menu, select Topics and click Create Topic.
- Enter orders as the topic name, 3 as the number of partitions, and then click Create with defaults.
- In the next page click Create Schema to create a Data Contract for the value of the topic.
- Copy the text inside file src/main/avro/order.avsc and place in the form on schema page. Then click Create
- Repeat the step 4 and create a second topic name bad-orders and 3 as the number of partitions. Don't set any Data Contract for this topic.
Note: Topics have many configurable parameters. A complete list of those configurations for Confluent Cloud can be found here. If you are interested in viewing the default configurations, you can view them in the Topic Summary on the right side.
- After topic creation, the Topics UI allows you to monitor production and consumption throughput metrics and the configuration parameters for your topics. When you begin sending messages to Confluent Cloud, you will be able to view those messages and message schemas.
- Below is a look at the topic, orders, but you need to send data to this topic before you see any metrics.
- Click API Keys on the navigation menu.
- Click Create Key in order to create your first API Key. If you have an existing API Key select + Add Key to create another API Key.
- Select Global Access and then click Next.
- Copy or save your API Key and Secret somewhere. You will need these later on in the lab, you will not be able to view the secret again once you close this dialogue.
- After creating and saving the API key, you will see this API key in the Confluent Cloud UI in the API Keys section. If you don’t see the API key populate right away, refresh the browser.
- Click your environment name on the resources path on the top.
- On the right side of the screen click Add key under the Stream Governance API description.
- Click on Create Key in order to create your first Schema registry API Key. If you have an existing API Key select + Add Key to create another API Key
- Copy or save your Schema registry API Key and Secret somewhere. You will need these later on in the lab, you will not be able to view the secret again once you close this dialogue.
Use a tool like Postman or cUrl to POST a policy in the ODM Policy Service. In the src/main/resources/policies folder you can find some policy sample, to use for the demo. Use the following commands to create the two policies that are used in the demo
curl -X POST localhost:9001/api/v1/planes/utility/policy-services/opa/policies \
-H "Content-Type: application/json" -d @src/main/resources/policies/order_value.json
curl -X POST localhost:9001/api/v1/planes/utility/policy-services/opa/policies \
-H "Content-Type: application/json" -d @src/main/resources/policies/order_status.json
- Clone this repository
git clone ...
- Set up the properties file (src/main/resources/data-contracts.properties), filling the following placeholders:
- bootstrap_servers
- cluster_api_key
- cluster_api_secret
- schema_registry_endpoint
- schema_registry_basic_auth
- Set all the needed env var:
export EMAIL_USER=<mail-address> export EMAIL_PASS=<password>
- Open file containing the order data contract located in ** and modify the following fields:
- owner_email: the owner of the data contract. A notification email will be sent to this address if there's a validation error.
- mail.smtp.host: the host of the SMTP server used to send the mail. The mail address and password set in the previous step will authenticate to this server.
- mail.smtp.port: the host of the SMTP server port used to send the mail.
- Update the schema of topic order to include the metadata and domain rules:
curl -X POST http://<schema-registry-endpoint>/subjects/orders-value/versions \ -u "<schema-registry-api-key>:<schema-registry-api-secret>" \ -H "Content-Type: application/json" -d @src/main/resources/order-data-contract.json
To enforce a validation of policies written in the OPA Server, the solution makes use of domain rules. Those rules can be used to specify integrity constraints or data policies in a data contract. For a practical explanation of the possibilities offered by domain rules we suggest to read the following blogpost and the official documentation.
Start the application to produce the invalid message:
mvn exec:java -D"exec.mainClass"="org.opendatamesh.platform.up.policy.confluent.adapter.util.ProducerApp" -D"exec.args"="./src/main/resources/data-contracts.properties client-1"
In the terminal, the logs will show an error stating that the message failed the validation of the order_value policy. There will be two results:
- A mail will be sent to the owner email
- The message will be written in the dlq topic (bad-orders)
Note: The kafka producer is defined in the package org.opendatamesh.platform.up.policy.confluent.adapter.org.opendatamesh.platform.up.policy.confluent.adapter.util.ProducerApp class. As it is configured, the producer will try to send only one message, the one defined in src/main/resources/order.json file. You can change the code to send infinite messages or change the sent order.
Deleting the resources you created during this demo will prevent you from incurring additional charges.
-
Under Environment Settings, select the Delete Environment button at the bottom of the left menu. Enter the Environment Name and select Continue.
-
Stop the Policy Service of ODM framework from the terminal where it was launched
-
Stop the Docker container of the OPA Server that runs on your local machine with the following command
docker stop <container-id>