Skip to content

Commit

Permalink
use strimzi kafka on beam_utility cluster (#30967)
Browse files Browse the repository at this point in the history
  • Loading branch information
volatilemolotov authored Apr 24, 2024
1 parent 2db9b80 commit e784799
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions .github/workflows/beam_PerformanceTests_xlang_KafkaIO_Python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,26 @@ jobs:
- name: Set k8s access
uses: ./.github/actions/setup-k8s-access
with:
cluster_name: io-datastores-for-tests
cluster_name: beam-utility
k8s_namespace: ${{ matrix.job_name }}-${{ github.run_id }}
cluster_zone: us-central1
- name: Install Kafka
id: install_kafka
run: |
cd ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
kubectl apply -R -f .
- name: Get Kafka IP
id: kafka_ip
kubectl apply -k ${{ github.workspace }}/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced
kubectl wait kafka beam-testing-cluster --for=condition=Ready --timeout=1800s
- name: Set up Kafka brokers
id: set_brokers
run: |
kubectl wait svc/outside-0 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
kubectl wait svc/outside-1 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
kubectl wait svc/outside-2 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
KAFKA_BROKER_0_IP=$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
KAFKA_BROKER_1_IP=$(kubectl get svc outside-1 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
KAFKA_BROKER_2_IP=$(kubectl get svc outside-2 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo KAFKA_BROKER_0=$KAFKA_BROKER_0_IP >> $GITHUB_OUTPUT
echo KAFKA_BROKER_1=$KAFKA_BROKER_1_IP >> $GITHUB_OUTPUT
echo KAFKA_BROKER_2=$KAFKA_BROKER_2_IP >> $GITHUB_OUTPUT
declare -a kafka_service_brokers
declare -a kafka_service_brokers_ports
for INDEX in {0..2}; do
kubectl wait svc/beam-testing-cluster-kafka-${INDEX} --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=1200s
kafka_service_brokers[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
kafka_service_brokers_ports[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.spec.ports[0].port}')
echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT
echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT
done
- name: Prepare test arguments
uses: ./.github/actions/test-arguments-action
with:
Expand All @@ -110,7 +111,7 @@ jobs:
${{ github.workspace }}/.github/workflows/performance-tests-pipeline-options/xlang_KafkaIO_Python.txt
arguments: |
--filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
--bootstrap_servers=${{ steps.kafka_ip.outputs.KAFKA_BROKER_0 }}:32400,${{ steps.kafka_ip.outputs.KAFKA_BROKER_1 }}:32400,${{ steps.kafka_ip.outputs.KAFKA_BROKER_2 }}:32400
--bootstrap_servers=${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}
- name: run shadowJar
uses: ./.github/actions/gradle-command-self-hosted-action
with:
Expand Down

0 comments on commit e784799

Please sign in to comment.