dispatch job #71
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: dispatch job | |
on: | |
workflow_dispatch: | |
inputs: | |
repo: | |
description: 'The https github url for the recipe feedstock' | |
required: true | |
ref: | |
description: 'The tag or branch to target in your recipe repo' | |
required: true | |
default: 'main' | |
feedstock_subdir: | |
description: 'The subdir of the feedstock directory in the repo' | |
required: true | |
default: 'feedstock' | |
bucket: | |
description: 'This job runner leverages s3fs.S3FileSystem for your recipe cache and output. Choices currently are: "default"' | |
required: true | |
default: 'default' | |
prune: | |
description: 'Only run the first two time steps' | |
required: true | |
default: '0' | |
parallelism: | |
description: 'Number of task managers to spin up' | |
required: true | |
default: '1' | |
jobs: | |
name-job: | |
runs-on: ubuntu-latest | |
outputs: | |
repo_name: ${{ steps.string_manipulation.outputs.result }} | |
steps: | |
- name: manipuluate strings | |
id: string_manipulation | |
run: | | |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}") | |
echo "result=$repo_name" >> $GITHUB_OUTPUT | |
run-job: | |
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: name-job | |
outputs: | |
job_name: ${{ steps.report_ids.outputs.job_name }} | |
job_id: ${{ steps.report_ids.outputs.job_id }} | |
runs-on: ubuntu-latest | |
steps: | |
- name: checkout repository | |
uses: actions/checkout@v3 | |
- name: set up python 3.10 | |
uses: actions/setup-python@v3 | |
with: | |
python-version: '3.10' | |
- name: echo server | |
run: | | |
echo "Manually triggered workflow: \ | |
${{ github.event.inputs.repo }} \ | |
${{ github.event.inputs.ref }} \ | |
${{ github.event.inputs.bucket }} \ | |
${{ github.event.inputs.parallelism }} \ | |
${{ github.event.inputs.prune }}" | |
- name: install deps | |
run: | | |
# TODO: move to requirements file | |
python -m pip install --upgrade pip | |
pip install \ | |
fsspec \ | |
s3fs \ | |
boto3 \ | |
apache-beam==2.52.0 \ | |
pangeo-forge-recipes>=0.10.0 \ | |
pangeo-forge-runner>=0.9.1 \ | |
python-cmr==0.9.0 | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: execute recipe on k8s cluster | |
id: executejob | |
continue-on-error: true | |
run: | | |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever | |
# so if the command fails (for example b/c it doesn't have the right requirements) | |
# then we wont' be able to see the errors until we run it without redirecting output | |
pangeo-forge-runner \ | |
bake \ | |
--repo=${{ github.event.inputs.repo }} \ | |
--ref=${{ github.event.inputs.ref }} \ | |
-f .github/workflows/config.py > execute.log | |
# export all the valuable information from the logs | |
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1) | |
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV | |
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+') | |
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV | |
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')") | |
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV | |
env: | |
REPO: ${{ github.event.inputs.repo }} | |
REF: ${{ github.event.inputs.ref }} | |
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }} | |
PRUNE_OPTION: ${{ github.event.inputs.prune }} | |
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }} | |
S3_BUCKET: ${{ github.event.inputs.bucket }} | |
S3_DEFAULT_AWS_ACCESS_KEY_ID: ${{ secrets.S3_DEFAULT_AWS_ACCESS_KEY_ID }} | |
S3_DEFAULT_AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_DEFAULT_AWS_SECRET_ACCESS_KEY }} | |
- name: cleanup if "pangeo-forge-runner bake" failed | |
if: steps.executejob.outcome == 'failure' | |
run: | | |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..." | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
# force GH action to show failed result | |
exit 128 | |
- name: report running job id for user | |
id: report_ids | |
run: | | |
# TODO: we also need to report historyserver URL and flink dashboard URL | |
# but this also requires us to think how we're going to have a thin | |
# layer of authentication around these services so they aren't totally public | |
echo '############ JOB NAME ################' | |
echo $JOB_NAME | |
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT | |
echo '############ JOB ID ################' | |
echo $JOB_ID | |
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT | |
echo '############ FLINK DASHBOARD ################' | |
echo $FLINK_DASH | |
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT | |
monitor-job: | |
runs-on: ubuntu-latest | |
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: [name-job, run-job] | |
steps: | |
- name: set up aws credentials for job runner user | |
uses: aws-actions/configure-aws-credentials@v2 | |
with: | |
aws-access-key-id: ${{ secrets.GH_ACTIONS_AWS_ACCESS_KEY_ID }} | |
aws-secret-access-key: ${{ secrets.GH_ACTIONS_AWS_SECRET_ACCESS_KEY }} | |
aws-region: ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name pangeo-forge-v3 --region ${{ secrets.GH_ACTIONS_AWS_REGION }} | |
- name: monitor logs of job manager | |
id: monitorjob | |
timeout-minutes: 120 | |
continue-on-error: true | |
run: | | |
# TODO: this needs to not check the logs but the historyserver status | |
# but first we need think about authentication and a reverse proxy | |
echo "find job status on the job manager logs..." | |
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do | |
echo "still waiting for a status on the job manager logs..." | |
sleep 1 | |
done | |
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1) | |
echo "##### INPUT STATUS #####" | |
echo $input_status | |
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)') | |
echo "##### STATUS #####" | |
echo $status | |
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then | |
echo "job failed with '$status', will dump the logs now..." | |
# force exit so we can move to next step | |
exit 128 | |
fi | |
- name: cleanup if monitor job fails | |
if: steps.monitorjob.outcome == 'failure' | |
run: | | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log | |
cat /tmp/jobmanager.log | |
echo "##################### TASK MANAGER ######################" | |
# depending on the `inputs.parallism` we can more than one taskmanager so grab the first one | |
# TODO: loop through them all and dump the logs | |
kubectl get pod | grep taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log | |
cat /tmp/taskmanager.log | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
################################################################# | |
# provide feedback about OOM errors where we've seen them before | |
################################################################# | |
RED='\033[0;31m' | |
GREEN='\033[0;32m' | |
NOCOLOR='\033[0m' # To reset the color | |
# grok if taskmanager produced a JVM OOM error | |
error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the taskmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
# grok if this was OOM killed | |
error=$(cat /tmp/jobmanager.log | grep "reason=OOMKilled") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be an OOMKilled error in the jobmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/jobmanager.log | grep -a20 "reason=OOMKilled") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
################################################################# | |
# end | |
################################################################# | |
# force GH action to show failed result | |
exit 128 |