Skip to content

Commit

Permalink
[FLINK-27160] Add e2e tests for session job
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Apr 24, 2022
1 parent a1bbca9 commit 7320512
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 4 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ COPY docker-entrypoint.sh /
RUN chown -R flink:flink $FLINK_HOME && \
chown flink:flink $OPERATOR_JAR && \
chown flink:flink $WEBHOOK_JAR && \
chown flink:flink $FLINK_KUBERNETES_SHADED_JAR && \
chown flink:flink docker-entrypoint.sh

USER flink
Expand Down
File renamed without changes.
108 changes: 108 additions & 0 deletions e2e-tests/data/sessionjob-cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: session-cluster-1
spec:
image: flink:1.14.3
flinkVersion: v1_14
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/volume
name: flink-volume
volumes:
- name: flink-volume
persistentVolumeClaim:
claimName: session-cluster-1-pvc
jobManager:
replicas: 1
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "1024m"
cpu: 0.5

---
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
namespace: default
name: flink-example-statemachine
spec:
clusterId: session-cluster-1
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: session-cluster-1-pvc
spec:
accessModes:
- ReadWriteOnce
volumeMode: Filesystem
resources:
requests:
storage: 1Gi

---
apiVersion: networking.k8s.io/v1
kind: IngressClass
metadata:
annotations:
ingressclass.kubernetes.io/is-default-class: "true"
labels:
app.kubernetes.io/component: controller
name: nginx
spec:
controller: k8s.io/ingress-nginx

5 changes: 3 additions & 2 deletions e2e-tests/test_kubernetes_application_ha.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
source "$(dirname "$0")"/utils.sh

CLUSTER_ID="flink-example-statemachine"
APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml"
TIMEOUT=300

function cleanup_and_exit() {
if [ $TRAPPED_EXIT_CODE != 0 ];then
debug_and_show_logs
fi

kubectl delete -f e2e-tests/data/cr.yaml
kubectl delete -f $APPLICATION_YAML
kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
}

on_exit cleanup_and_exit

retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1

Expand Down
6 changes: 4 additions & 2 deletions e2e-tests/test_last_state_upgrade.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
source "$(dirname "$0")"/utils.sh

CLUSTER_ID="flink-example-statemachine"
APPLICATION_YAML="e2e-tests/data/flinkdep-cr.yaml"
TIMEOUT=300

function cleanup_and_exit() {
if [ $TRAPPED_EXIT_CODE != 0 ];then
debug_and_show_logs
fi

kubectl delete -f e2e-tests/data/cr.yaml
kubectl delete -f $APPLICATION_YAML
kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
}
Expand All @@ -47,6 +48,7 @@ function wait_for_jobmanager_running() {
function assert_available_slots() {
expected=$1
ip=$(minikube ip)
curl http://$ip/default/${CLUSTER_ID}/overview
actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}')
if [[ expected -ne actual ]]; then
echo "Expected available slots: $expected, actual: $actual"
Expand All @@ -57,7 +59,7 @@ function assert_available_slots() {

on_exit cleanup_and_exit

retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running

Expand Down
68 changes: 68 additions & 0 deletions e2e-tests/test_sessionjob_ha.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

source "$(dirname "$0")"/utils.sh

CLUSTER_ID="session-cluster-1"
APPLICATION_YAML="e2e-tests/data/sessionjob-cr.yaml"
TIMEOUT=300
SESSION_CLUSTER_IDENTIFIER="flinkdep/session-cluster-1"
SESSION_JOB_IDENTIFIER="sessionjob/flink-example-statemachine"

function cleanup_and_exit() {
if [ $TRAPPED_EXIT_CODE != 0 ];then
debug_and_show_logs
fi

kubectl delete -f $APPLICATION_YAML
kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
}

on_exit cleanup_and_exit

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1

kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')

echo "Waiting for jobmanager pod ${jm_pod_name} ready."
kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1

wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1

wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1

job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')

# Kill the JobManager
echo "Kill the $jm_pod_name"
kubectl exec $jm_pod_name -- /bin/sh -c "kill 1"

# Check the new JobManager recovering from latest successful checkpoint
wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} || exit 1
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1

echo "Successfully run the Flink Session Job HA test"

85 changes: 85 additions & 0 deletions e2e-tests/test_sessionjob_savepoint_upgrade.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

source "$(dirname "$0")"/utils.sh

CLUSTER_ID="session-cluster-1"
APPLICATION_YAML="e2e-tests/data/sessionjob-cr.yaml"
TIMEOUT=300
SESSION_CLUSTER_IDENTIFIER="flinkdep/session-cluster-1"
SESSION_JOB_IDENTIFIER="sessionjob/flink-example-statemachine"
SESSION_JOB_NAME="flink-example-statemachine"

function cleanup_and_exit() {
if [ $TRAPPED_EXIT_CODE != 0 ];then
debug_and_show_logs
fi

kubectl delete -f $APPLICATION_YAML
kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
kubectl delete cm --selector="app=${CLUSTER_ID},configmap-type=high-availability"
}

function wait_for_jobmanager_running() {
retry_times 30 3 "kubectl get deploy/${CLUSTER_ID}" || exit 1

kubectl wait --for=condition=Available --timeout=${TIMEOUT}s deploy/${CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')

echo "Waiting for jobmanager pod ${jm_pod_name} ready."
kubectl wait --for=condition=Ready --timeout=${TIMEOUT}s pod/$jm_pod_name || exit 1

wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
}

function assert_available_slots() {
expected=$1
ip=$(minikube ip)
curl http://$ip/default/${CLUSTER_ID}/overview
actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}')
if [[ expected -ne actual ]]; then
echo "Expected available slots: $expected, actual: $actual"
exit 1
fi
echo "Successfully assert available slots"
}

on_exit cleanup_and_exit

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running

wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
assert_available_slots 0

# Update the FlinkSessionJob and trigger the savepoint upgrade
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'

# Check the new JobManager recovering from savepoint
wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
assert_available_slots 1

echo "Successfully run the sessionjob savepoint upgrade test"

Empty file modified e2e-tests/utils.sh
100644 → 100755
Empty file.

0 comments on commit 7320512

Please sign in to comment.