Quickly test Azure Data Lake Storage Gen2 Sink connector.
Simply run:
$ ./azure-data-lake-storage-gen2.sh
Or using 2 way SSL authentication:
$ ./azure-data-lake-storage-gen2-2way-ssl.sh
Logging to Azure using browser (or using environment variables AZ_USER
and AZ_PASS
if set)
az login
Add the CLI extension for Azure Data Lake Gen 2
$ az extension add --name storage-preview
Creating resource $AZURE_RESOURCE_GROUP in $AZURE_REGION
$ az group create \
--name $AZURE_RESOURCE_GROUP \
--location $AZURE_REGION
Registering active directory App $AZURE_AD_APP_NAME
$ AZURE_DATALAKE_CLIENT_ID=$(az ad app create --display-name "$AZURE_AD_APP_NAME" --password mypassword --native-app false --available-to-other-tenants false --query appId -o tsv)
Creating Service Principal associated to the App
$ SERVICE_PRINCIPAL_ID=$(az ad sp create --id $AZURE_DATALAKE_CLIENT_ID | jq -r '.objectId')
$ AZURE_TENANT_ID=$(az account list | jq -r '.[].tenantId')
$ AZURE_DATALAKE_TOKEN_ENDPOINT="https://login.microsoftonline.com/$AZURE_TENANT_ID/oauth2/token"
Creating data lake $AZURE_DATALAKE_ACCOUNT_NAME in resource $AZURE_RESOURCE_GROUP
$ az storage account create \
--name $AZURE_DATALAKE_ACCOUNT_NAME \
--resource-group $AZURE_RESOURCE_GROUP \
--location $AZURE_REGION \
--sku Standard_LRS \
--kind StorageV2 \
--hierarchical-namespace true
Assigning Storage Blob Data Owner role to Service Principal $SERVICE_PRINCIPAL_ID
$ az role assignment create --assignee $SERVICE_PRINCIPAL_ID --role "Storage Blob Data Owner"
The connector is created with:
$ curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max": "1",
"topics": "datalake_topic",
"flush.size": "3",
"azure.datalake.gen2.client.id": "'"$AZURE_DATALAKE_CLIENT_ID"'",
"azure.datalake.gen2.client.key": "mypassword",
"azure.datalake.gen2.account.name": "'"$AZURE_DATALAKE_ACCOUNT_NAME"'",
"azure.datalake.gen2.token.endpoint": "'"$AZURE_DATALAKE_TOKEN_ENDPOINT"'",
"format.class": "io.confluent.connect.azure.storage.format.avro.AvroFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1"
}' \
http://localhost:8083/connectors/azure-datalake-gen2-sink/config | jq .
Sending messages to topic datalake_topic
$ seq -f "{\"f1\": \"value%g\"}" 10 | docker exec -i connect kafka-avro-console-producer --broker-list broker:9092 --property schema.registry.url=http://schema-registry:8081 --topic datalake_topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Listing ${AZURE_DATALAKE_CLIENT_KEY} in Azure Blob Storage
$ az storage blob list --account-name "${AZURE_DATALAKE_ACCOUNT_NAME}" --container-name topics
Getting one of the avro files locally and displaying content with avro-tools
$ az storage blob download --container-name topics --name datalake_topic/partition=0/datalake_topic+0+0000000000.avro --file /tmp/datalake_topic+0+0000000000.avro --account-name "${AZURE_DATALAKE_ACCOUNT_NAME}"
Results:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
The connector is created with:
$ docker exec -e AZURE_DATALAKE_CLIENT_ID="$AZURE_DATALAKE_CLIENT_ID" -e AZURE_DATALAKE_ACCOUNT_NAME="$AZURE_DATALAKE_ACCOUNT_NAME" -e AZURE_DATALAKE_TOKEN_ENDPOINT="$AZURE_DATALAKE_TOKEN_ENDPOINT" connect \
curl -X PUT \
--cert /etc/kafka/secrets/connect.certificate.pem --key /etc/kafka/secrets/connect.key --tlsv1.2 --cacert /etc/kafka/secrets/snakeoil-ca-1.crt \
-H "Content-Type: application/json" \
--data '{
"connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max": "1",
"topics": "datalake_topic",
"flush.size": "3",
"azure.datalake.gen2.client.id": "'"$AZURE_DATALAKE_CLIENT_ID"'",
"azure.datalake.gen2.client.key": "mypassword",
"azure.datalake.gen2.account.name": "'"$AZURE_DATALAKE_ACCOUNT_NAME"'",
"azure.datalake.gen2.token.endpoint": "'"$AZURE_DATALAKE_TOKEN_ENDPOINT"'",
"format.class": "io.confluent.connect.azure.storage.format.avro.AvroFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.topic.replication.factor": "1",
"confluent.topic.ssl.keystore.location" : "/etc/kafka/secrets/kafka.connect.keystore.jks",
"confluent.topic.ssl.keystore.password" : "confluent",
"confluent.topic.ssl.key.password" : "confluent",
"confluent.topic.ssl.truststore.location" : "/etc/kafka/secrets/kafka.connect.truststore.jks",
"confluent.topic.ssl.truststore.password" : "confluent",
"confluent.topic.ssl.keystore.type" : "JKS",
"confluent.topic.ssl.truststore.type" : "JKS",
"confluent.topic.security.protocol" : "SSL"
}' \
https://localhost:8083/connectors/azure-datalake-gen2-sink/config | jq .
Notes:
Broker config has KAFKA_SSL_PRINCIPAL_MAPPING_RULES: RULE:^CN=(.*?),OU=TEST.*$$/$$1/,DEFAULT
. This is because we don't want to set user CN=connect,OU=TEST,O=CONFLUENT,L=PaloAlto,ST=Ca,C=US
as super user. Documentation for ssl.principal.mapping.rules
is here
Script certs-create.sh
has:
keytool -noprompt -destkeystore kafka.$i.truststore.jks -importkeystore -srckeystore $(find $JAVA_HOME -follow -name cacerts) -srcstorepass changeit -deststorepass confluent
This is because we set for connect
service:
KAFKA_OPTS: -Djavax.net.ssl.trustStore=/etc/kafka/secrets/kafka.connect.truststore.jks
-Djavax.net.ssl.trustStorePassword=confluent
-Djavax.net.ssl.keyStore=/etc/kafka/secrets/kafka.connect.keystore.jks
-Djavax.net.ssl.keyStorePassword=confluent
It applies to every java component ran on that JVM, and for instance on Connect every connector will then use the given truststore
One example here is that if you use an AWS connector (S3, Kinesis etc) or GCP connector (GCS, SQS, etc..) and do not have AWS cert chain in the given truststore, the connector won't work and raise exception. The workaround is to import in our truststore the regular JAVA certificates.
Results:
{"f1":"This is a message sent with SSL authentication 1"}
{"f1":"This is a message sent with SSL authentication 2"}
{"f1":"This is a message sent with SSL authentication 3"}
N.B: Control Center is reachable at http://127.0.0.1:9021