diff --git a/e2e-tests/test_kubernetes_application_ha.sh b/e2e-tests/test_kubernetes_application_ha.sh index d6a48b5e5b..2b2ffde1fc 100755 --- a/e2e-tests/test_kubernetes_application_ha.sh +++ b/e2e-tests/test_kubernetes_application_ha.sh @@ -23,29 +23,12 @@ CLUSTER_ID="flink-example-statemachine" APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml" TIMEOUT=300 -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi - - kubectl delete -f $APPLICATION_YAML - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} - -on_exit cleanup_and_exit +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - -kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 -jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - -echo "Waiting for jobmanager pod ${jm_pod_name} ready." -kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - -wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh index ca04923ac9..a024c457c0 100755 --- a/e2e-tests/test_last_state_upgrade.sh +++ b/e2e-tests/test_last_state_upgrade.sh @@ -23,50 +23,17 @@ CLUSTER_ID="flink-example-statemachine" APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml" TIMEOUT=300 -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi - - kubectl delete -f $APPLICATION_YAML - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} - -function wait_for_jobmanager_running() { - retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - - kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 - jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - - echo "Waiting for jobmanager pod ${jm_pod_name} ready." - kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - - wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 -} - -function assert_available_slots() { - expected=$1 - ip=$(minikube ip) - curl http://$ip/default/${CLUSTER_ID}/overview - actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}') - if [[ expected -ne actual ]]; then - echo "Expected available slots: $expected, actual: $actual" - exit 1 - fi - echo "Successfully assert available slots" -} - -on_exit cleanup_and_exit +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -wait_for_jobmanager_running +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 0 +assert_available_slots 0 $CLUSTER_ID job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') @@ -74,14 +41,15 @@ job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | a kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" -wait_for_jobmanager_running +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) # Check the new JobManager recovering from latest successful checkpoint wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 1 +assert_available_slots 1 $CLUSTER_ID echo "Successfully run the last-state upgrade test" diff --git a/e2e-tests/test_sessionjob_ha.sh b/e2e-tests/test_sessionjob_ha.sh index 5b8107860d..0a1f74462a 100755 --- a/e2e-tests/test_sessionjob_ha.sh +++ b/e2e-tests/test_sessionjob_ha.sh @@ -24,29 +24,12 @@ TIMEOUT=300 SESSION_CLUSTER_IDENTIFIER="flinkdep/session-cluster-1" SESSION_JOB_IDENTIFIER="sessionjob/flink-example-statemachine" -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi - - kubectl delete -f $APPLICATION_YAML - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} - -on_exit cleanup_and_exit +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - -kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 -jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - -echo "Waiting for jobmanager pod ${jm_pod_name} ready." -kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - -wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 diff --git a/e2e-tests/test_sessionjob_savepoint_upgrade.sh b/e2e-tests/test_sessionjob_savepoint_upgrade.sh index e08766e4f4..9e34c5cf32 100755 --- a/e2e-tests/test_sessionjob_savepoint_upgrade.sh +++ b/e2e-tests/test_sessionjob_savepoint_upgrade.sh @@ -22,54 +22,21 @@ source "$(dirname "$0")"/utils.sh CLUSTER_ID="session-cluster-1" APPLICATION_YAML="e2e-tests/data/sessionjob-cr.yaml" TIMEOUT=300 -SESSION_CLUSTER_IDENTIFIER="flinkdep/session-cluster-1" -SESSION_JOB_IDENTIFIER="sessionjob/flink-example-statemachine" +SESSION_CLUSTER_IDENTIFIER="flinkdep/$CLUSTER_ID" SESSION_JOB_NAME="flink-example-statemachine" +SESSION_JOB_IDENTIFIER="sessionjob/$SESSION_JOB_NAME" -function cleanup_and_exit() { - if [ $TRAPPED_EXIT_CODE != 0 ];then - debug_and_show_logs - fi - - kubectl delete -f $APPLICATION_YAML - kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" - kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" -} - -function wait_for_jobmanager_running() { - retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 - - kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 - jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') - - echo "Waiting for jobmanager pod ${jm_pod_name} ready." - kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 - - wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 -} - -function assert_available_slots() { - expected=$1 - ip=$(minikube ip) - curl http://$ip/default/${CLUSTER_ID}/overview - actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}') - if [[ expected -ne actual ]]; then - echo "Expected available slots: $expected, actual: $actual" - exit 1 - fi - echo "Successfully assert available slots" -} - -on_exit cleanup_and_exit +on_exit cleanup_and_exit $APPLICATION_YAML $TIMEOUT $CLUSTER_ID retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 -wait_for_jobmanager_running +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 0 +assert_available_slots 0 $CLUSTER_ID # Update the FlinkSessionJob and trigger the savepoint upgrade kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' @@ -79,7 +46,7 @@ wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 1 +assert_available_slots 1 $CLUSTER_ID echo "Successfully run the sessionjob savepoint upgrade test" diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh index 8a6c181abb..d49ef13180 100755 --- a/e2e-tests/utils.sh +++ b/e2e-tests/utils.sh @@ -57,6 +57,38 @@ function wait_for_status { exit 1 } +function assert_available_slots() { + expected=$1 + CLUSTER_ID=$2 + ip=$(minikube ip) + actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}') + if [[ expected -ne actual ]]; then + echo "Expected available slots: $expected, actual: $actual" + exit 1 + fi + echo "Successfully assert available slots" +} + +function wait_for_jobmanager_running() { + CLUSTER_ID=$1 + TIMEOUT=$2 + retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1 + + kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1 + jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + + echo "Waiting for jobmanager pod ${jm_pod_name} ready." + kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1 + + wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1 +} + +function get_jm_pod_name() { + CLUSTER_ID=$1 + jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') + echo $jm_pod_name +} + function retry_times() { local retriesNumber=$1 local backoff=$2 @@ -131,6 +163,20 @@ function stop_minikube { fi } +function cleanup_and_exit() { + if [ $TRAPPED_EXIT_CODE != 0 ];then + debug_and_show_logs + fi + + APPLICATION_YAML=$1 + TIMEOUT=$2 + CLUSTER_ID=$3 + + kubectl delete -f $APPLICATION_YAML + kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}" + kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability" +} + function _on_exit_callback { # Export the exit code so that it could be used by the callback commands export TRAPPED_EXIT_CODE=$? @@ -154,5 +200,5 @@ function on_exit { local command="$1" # Keep commands in reverse order, so commands would be executed in LIFO order. - _on_exit_commands=("${command}" "${_on_exit_commands[@]-}") + _on_exit_commands=("${command} `echo "${@:2}"`" "${_on_exit_commands[@]-}") }