Skip to content

Latest commit

 

History

History
 
 

connect-azure-data-lake-storage-gen1-sink

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

Azure Data Lake Storage Gen1 Sink connector

asciinema

Objective

Quickly test Azure Data Lake Storage Gen1 Sink connector.

How to run

Simply run:

$ ./azure-data-lake-storage-gen1.sh

Details of what the script is doing

Logging to Azure using browser (or using environment variables AZ_USER and AZ_PASS if set)

az login

Creating resource $AZURE_RESOURCE_GROUP in $AZURE_REGION

$ az group create \
    --name $AZURE_RESOURCE_GROUP \
    --location $AZURE_REGION

Registering active directory App $AZURE_AD_APP_NAME

$ AZURE_DATALAKE_CLIENT_ID=$(az ad app create --display-name "$AZURE_AD_APP_NAME" --password mypassword --native-app false --available-to-other-tenants false --query appId -o tsv)

Creating Service Principal associated to the App

$ SERVICE_PRINCIPAL_ID=$(az ad sp create --id $AZURE_DATALAKE_CLIENT_ID | jq -r '.objectId')

Creating data lake $AZURE_DATALAKE_ACCOUNT_NAME in resource $AZURE_RESOURCE_GROUP

$ az dls account create --account $AZURE_DATALAKE_ACCOUNT_NAME --resource-group $AZURE_RESOURCE_GROUP

Giving permission to app $AZURE_AD_APP_NAME to get access to data lake $AZURE_DATALAKE_ACCOUNT_NAME

$ az dls fs access set-entry --account $AZURE_DATALAKE_ACCOUNT_NAME  --acl-spec user:$SERVICE_PRINCIPAL_ID:rwx --path /

The connector is created with:

$ docker exec -e AZURE_DATALAKE_CLIENT_ID="$AZURE_DATALAKE_CLIENT_ID" -e AZURE_DATALAKE_ACCOUNT_NAME="$AZURE_DATALAKE_ACCOUNT_NAME" -e AZURE_DATALAKE_TOKEN_ENDPOINT="$AZURE_DATALAKE_TOKEN_ENDPOINT" connect \
     curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class": "io.confluent.connect.azure.datalake.gen1.AzureDataLakeGen1StorageSinkConnector",
                    "tasks.max": "1",
                    "topics": "datalake_topic",
                    "flush.size": "3",
                    "azure.datalake.client.id": "'"$AZURE_DATALAKE_CLIENT_ID"'",
                    "azure.datalake.client.key": "mypassword",
                    "azure.datalake.account.name": "'"$AZURE_DATALAKE_ACCOUNT_NAME"'",
                    "azure.datalake.token.endpoint": "'"$AZURE_DATALAKE_TOKEN_ENDPOINT"'",
                    "format.class": "io.confluent.connect.azure.storage.format.avro.AvroFormat",
                    "confluent.license": "",
                    "confluent.topic.bootstrap.servers": "broker:9092",
                    "confluent.topic.replication.factor": "1"
          }' \
     http://localhost:8083/connectors/azure-datalake-gen1-sink/config | jq .

Sending messages to topic datalake_topic

$ seq -f "{\"f1\": \"value%g\"}" 10 | docker exec -i connect kafka-avro-console-producer --broker-list broker:9092 --property schema.registry.url=http://schema-registry:8081 --topic datalake_topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Listing ${AZURE_DATALAKE_CLIENT_KEY} in Azure Blob Storage

$ az dls fs list --account "${AZURE_DATALAKE_ACCOUNT_NAME}" --path /topics

Getting one of the avro files locally and displaying content with avro-tools

$ az dls fs download --account "${AZURE_DATALAKE_ACCOUNT_NAME}" --overwrite --source-path /topics/datalake_topic/partition=0/datalake_topic+0+0000000000.avro --destination-path /tmp/datalake_topic+0+0000000000.avro

Results:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

N.B: Control Center is reachable at http://127.0.0.1:9021