From fb13a1424492f2bd81454eeb7d5efc7947c8634f Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sun, 24 Apr 2022 14:08:06 +0800 Subject: [PATCH] [FLINK-27160] Add e2e tests for session job This closes #179. --- Dockerfile | 1 + e2e-tests/data/{cr.yaml => flinkdep-cr.yaml} | 0 e2e-tests/data/sessionjob-cr.yaml | 108 ++++++++++++++++++ e2e-tests/test_kubernetes_application_ha.sh | 26 +---- e2e-tests/test_last_state_upgrade.sh | 48 ++------ e2e-tests/test_sessionjob_ha.sh | 51 +++++++++ .../test_sessionjob_savepoint_upgrade.sh | 52 +++++++++ e2e-tests/utils.sh | 48 +++++++- examples/basic-session-job.yaml | 4 +- 9 files changed, 275 insertions(+), 63 deletions(-) rename e2e-tests/data/{cr.yaml => flinkdep-cr.yaml} (100%) create mode 100644 e2e-tests/data/sessionjob-cr.yaml create mode 100755 e2e-tests/test_sessionjob_ha.sh create mode 100755 e2e-tests/test_sessionjob_savepoint_upgrade.sh mode change 100644 => 100755 e2e-tests/utils.sh diff --git a/Dockerfile b/Dockerfile index 85eed80918..8b7bf03b52 100644 --- a/Dockerfile +++ b/Dockerfile @@ -62,6 +62,7 @@ COPY docker-entrypoint.sh / RUN chown -R flink:flink $FLINK_HOME && \ chown flink:flink $OPERATOR_JAR && \ chown flink:flink $WEBHOOK_JAR && \ + chown flink:flink $FLINK_KUBERNETES_SHADED_JAR && \ chown flink:flink docker-entrypoint.sh USER flink diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/flinkdep-cr.yaml similarity index 100% rename from e2e-tests/data/cr.yaml rename to e2e-tests/data/flinkdep-cr.yaml diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml new file mode 100644 index 0000000000..3df051b236 --- /dev/null +++ b/e2e-tests/data/sessionjob-cr.yaml @@ -0,0 +1,108 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + namespace: default + name: session-cluster-1 +spec: + image: flink:1.14.3 + flinkVersion: v1_14 + ingress: + template: "/{{namespace}}/{{name}}(/|$)(.*)" + className: "nginx" + annotations: + nginx.ingress.kubernetes.io/rewrite-target: "/$2" + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: file:///opt/flink/volume/flink-ha + state.checkpoints.dir: file:///opt/flink/volume/flink-cp + state.savepoints.dir: file:///opt/flink/volume/flink-sp + serviceAccount: flink + podTemplate: + apiVersion: v1 + kind: Pod + metadata: + name: pod-template + spec: + containers: + # Do not change the main container name + - name: flink-main-container + resources: + requests: + ephemeral-storage: 2048Mi + limits: + ephemeral-storage: 2048Mi + volumeMounts: + - mountPath: /opt/flink/volume + name: flink-volume + volumes: + - name: flink-volume + persistentVolumeClaim: + claimName: session-cluster-1-pvc + jobManager: + replicas: 1 + resource: + memory: "1024m" + cpu: 0.5 + taskManager: + resource: + memory: "1024m" + cpu: 0.5 + +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + namespace: default + name: flink-example-statemachine +spec: + deploymentName: session-cluster-1 + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar + parallelism: 2 + upgradeMode: savepoint + entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: session-cluster-1-pvc +spec: + accessModes: + - ReadWriteOnce + volumeMode: Filesystem + resources: + requests: + storage: 1Gi + +--- +apiVersion: networking.k8s.io/v1 +kind: IngressClass +metadata: + annotations: + ingressclass.kubernetes.io/is-default-class: "true" + labels: + app.kubernetes.io/component: controller + name: nginx +spec: + controller: k8s.io/ingress-nginx + diff --git a/e2e-tests/test_kubernetes_application_ha.sh b/e2e-tests/test_kubernetes_application_ha.sh index acd854f7ab..2b2ffde1fc 100755 --- a/e2e-tests/test_kubernetes_application_ha.sh +++ b/e2e-tests/test_kubernetes_application_ha.sh @@ -20,31 +20,15 @@ source "$(dirname "$0")"/utils.sh CLUSTER_ID="flink-example-statemachine" +APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml" TIMEOUT=300 -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID - kubectl delete -f e2e-tests/data/cr.yaml - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -on_exit cleanup_and_exit - -retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1 - -retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - -kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 -jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - -echo "Waiting for jobmanager pod ${jm_pod_name} ready." -kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - -wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh index dd1ffc9fe4..a024c457c0 100755 --- a/e2e-tests/test_last_state_upgrade.sh +++ b/e2e-tests/test_last_state_upgrade.sh @@ -20,51 +20,20 @@ source "$(dirname "$0")"/utils.sh CLUSTER_ID="flink-example-statemachine" +APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml" TIMEOUT=300 -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID - kubectl delete -f e2e-tests/data/cr.yaml - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -function wait_for_jobmanager_running() { - retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - - kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 - jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - - echo "Waiting for jobmanager pod ${jm_pod_name} ready." - kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - - wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 -} - -function assert_available_slots() { - expected=$1 - ip=$(minikube ip) - actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}') - if [[ expected -ne actual ]]; then - echo "Expected available slots: $expected, actual: $actual" - exit 1 - fi - echo "Successfully assert available slots" -} - -on_exit cleanup_and_exit - -retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1 - -wait_for_jobmanager_running +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 0 +assert_available_slots 0 $CLUSTER_ID job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') @@ -72,14 +41,15 @@ job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | a kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" -wait_for_jobmanager_running +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) # Check the new JobManager recovering from latest successful checkpoint wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 1 +assert_available_slots 1 $CLUSTER_ID echo "Successfully run the last-state upgrade test" diff --git a/e2e-tests/test_sessionjob_ha.sh b/e2e-tests/test_sessionjob_ha.sh new file mode 100755 index 0000000000..0a1f74462a --- /dev/null +++ b/e2e-tests/test_sessionjob_ha.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$(dirname "$0")"/utils.sh + +CLUSTER_ID="session-cluster-1" +APPLICATION_YAML="e2e-tests/data/sessionjob-cr.yaml" +TIMEOUT=300 +SESSION_CLUSTER_IDENTIFIER="flinkdep/session-cluster-1" +SESSION_JOB_IDENTIFIER="sessionjob/flink-example-statemachine" + +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') + +# Kill the JobManager +echo "Kill the $jm_pod_name" +kubectl exec $jm_pod_name -- /bin/sh -c "kill 1" + +# Check the new JobManager recovering from latest successful checkpoint +wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +echo "Successfully run the Flink Session Job HA test" + diff --git a/e2e-tests/test_sessionjob_savepoint_upgrade.sh b/e2e-tests/test_sessionjob_savepoint_upgrade.sh new file mode 100755 index 0000000000..9e34c5cf32 --- /dev/null +++ b/e2e-tests/test_sessionjob_savepoint_upgrade.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/utils.sh + +CLUSTER_ID="session-cluster-1" +APPLICATION_YAML="e2e-tests/data/sessionjob-cr.yaml" +TIMEOUT=300 +SESSION_CLUSTER_IDENTIFIER="flinkdep/$CLUSTER_ID" +SESSION_JOB_NAME="flink-example-statemachine" +SESSION_JOB_IDENTIFIER="sessionjob/$SESSION_JOB_NAME" + +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +assert_available_slots 0 $CLUSTER_ID + +# Update the FlinkSessionJob and trigger the savepoint upgrade +kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' + +# Check the new JobManager recovering from savepoint +wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 +assert_available_slots 1 $CLUSTER_ID + +echo "Successfully run the sessionjob savepoint upgrade test" + diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh old mode 100644 new mode 100755 index 8a6c181abb..d49ef13180 --- a/e2e-tests/utils.sh +++ b/e2e-tests/utils.sh @@ -57,6 +57,38 @@ function wait_for_status { exit 1 } +function assert_available_slots() { + expected=$1 + CLUSTER_ID=$2 + ip=$(minikube ip) + actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}') + if [[ expected -ne actual ]]; then + echo "Expected available slots: $expected, actual: $actual" + exit 1 + fi + echo "Successfully assert available slots" +} + +function wait_for_jobmanager_running() { + CLUSTER_ID=$1 + TIMEOUT=$2 + retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 + + kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 + jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + + echo "Waiting for jobmanager pod ${jm_pod_name} ready." + kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 + + wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +} + +function get_jm_pod_name() { + CLUSTER_ID=$1 + jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') + echo $jm_pod_name +} + function retry_times() { local retriesNumber=$1 local backoff=$2 @@ -131,6 +163,20 @@ function stop_minikube { fi } +function cleanup_and_exit() { + if [ $TRAPPED_EXIT_CODE != 0 ];then + debug_and_show_logs + fi + + APPLICATION_YAML=$1 + TIMEOUT=$2 + CLUSTER_ID=$3 + + kubectl delete -f $APPLICATION_YAML + kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" + kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" +} + function _on_exit_callback { # Export the exit code so that it could be used by the callback commands export TRAPPED_EXIT_CODE=$? @@ -154,5 +200,5 @@ function on_exit { local command="$1" # Keep commands in reverse order, so commands would be executed in LIFO order. - _on_exit_commands=("${command}" "${_on_exit_commands[@]-}") + _on_exit_commands=("${command} `echo "${@:2}"`" "${_on_exit_commands[@]-}") } diff --git a/examples/basic-session-job.yaml b/examples/basic-session-job.yaml index 68dc36bd4d..e7937b05bb 100644 --- a/examples/basic-session-job.yaml +++ b/examples/basic-session-job.yaml @@ -61,7 +61,7 @@ metadata: namespace: default name: basic-session-ha-job-example spec: - clusterId: basic-session-cluster-with-ha + deploymentName: basic-session-cluster-with-ha job: jarURI: file:///opt/flink/artifacts/TopSpeedWindowing.jar parallelism: 4 @@ -75,7 +75,7 @@ metadata: namespace: default name: basic-session-ha-job-example2 spec: - clusterId: basic-session-cluster-with-ha + deploymentName: basic-session-cluster-with-ha job: jarURI: file:///opt/flink/artifacts/flink-examples-streaming_2.12-1.14.3.jar parallelism: 2