dispatch job #280
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: dispatch job | |
on: | |
workflow_dispatch: | |
inputs: | |
repo: | |
description: 'The https github url for the recipe feedstock' | |
required: true | |
ref: | |
description: 'The tag or branch to target in your recipe repo' | |
required: true | |
default: 'main' | |
feedstock_subdir: | |
description: 'The subdir of the feedstock directory in the repo' | |
required: true | |
default: 'feedstock' | |
prune: | |
description: 'Only run the first two time steps' | |
required: true | |
default: '0' | |
parallelism: | |
description: 'Number of workers to run in parallel' | |
required: true | |
default: '1' | |
auth_mode: | |
description: 'What auth mode (edl or iamrole) to use when accessing files.' | |
required: false | |
default: 'iamrole' | |
job_name: | |
description: 'A unique job name (no other existing filnk deployment can have it) so we can inspect metrics easier in Grafana.' | |
required: true | |
resource_profile: | |
description: 'jobs have different memory requirements so choose (small[7824_MiB], medium[9824_MiB], large[11824_MiB], xlarge[13824_MiB])' | |
required: false | |
default: 'small' | |
permissions: | |
id-token: write # This is required for requesting the JWT | |
contents: read # This is required for actions/checkout | |
jobs: | |
name-job: | |
runs-on: ubuntu-latest | |
outputs: | |
repo_name: ${{ steps.string_manipulation.outputs.result }} | |
steps: | |
- name: manipuluate strings | |
id: string_manipulation | |
run: | | |
repo_name=$(basename -s .git "${{ github.event.inputs.repo }}") | |
echo "result=$repo_name" >> $GITHUB_OUTPUT | |
run-job: | |
if: contains('["ranchodeluxe","abarciauskas-bgse", "norlandrhagen", "sharkinsspatial", "moradology", "thodson-usgs"]', github.actor) | |
name: kickoff job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
needs: name-job | |
environment: veda-smce | |
outputs: | |
job_name: ${{ steps.report_ids.outputs.job_name }} | |
job_id: ${{ steps.report_ids.outputs.job_id }} | |
runs-on: ubuntu-latest | |
steps: | |
- name: checkout repository | |
uses: actions/checkout@v3 | |
- name: Configure AWS credentials | |
uses: aws-actions/configure-aws-credentials@v3 | |
with: | |
role-to-assume: arn:aws:iam::444055461661:role/github-actions-role-eodc | |
role-session-name: veda-pforge-run-job | |
role-duration-seconds: 3600 | |
aws-region: us-west-2 | |
- name: set up python 3.10 | |
uses: actions/setup-python@v3 | |
with: | |
python-version: '3.10' | |
- name: echo inputs to user | |
run: | | |
echo "Manually triggered workflow: \ | |
${{ github.event.inputs.repo }} \ | |
${{ github.event.inputs.ref }} \ | |
${{ github.event.inputs.parallelism }} \ | |
${{ github.event.inputs.prune }}" | |
- name: install deps | |
run: | | |
python -m pip install --upgrade pip | |
pip install pangeo-forge-runner>=0.10.0 | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name ${{ vars.EKS_CLUSTER_NAME }} --region us-west-2 | |
- name: execute recipe on k8s cluster | |
id: executejob | |
continue-on-error: true | |
run: | | |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever | |
# so if the command fails (for example b/c it doesn't have the right requirements) | |
# then we wont' be able to see the errors until we run it without redirecting output | |
pangeo-forge-runner \ | |
bake \ | |
--repo=${{ github.event.inputs.repo }} \ | |
--ref=${{ github.event.inputs.ref }} \ | |
--Bake.job_name="${{ github.event.inputs.job_name }}" \ | |
-f .github/workflows/config.py > execute.log | |
# export all the valuable information from the logs | |
RECIPE_JOB_NAME=$(cat execute.log | grep -oP 'Job name is \K[^ ]+' | head -n1) | |
echo "RECIPE_JOB_NAME=$RECIPE_JOB_NAME" >> $GITHUB_ENV | |
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1) | |
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV | |
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+') | |
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV | |
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')") | |
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV | |
env: | |
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }} | |
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }} | |
REPO: ${{ github.event.inputs.repo }} | |
REF: ${{ github.event.inputs.ref }} | |
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }} | |
PRUNE_OPTION: ${{ github.event.inputs.prune }} | |
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }} | |
OUTPUT_BUCKET: ${{ vars.OUTPUT_BUCKET }} | |
AUTH_MODE: ${{ github.event.inputs.auth_mode }} | |
AWS_ROLE_ARN: ${{ vars.AWS_ROLE_ARN }} | |
RESOURCE_PROFILE: ${{ github.event.inputs.resource_profile }} | |
- name: cleanup if "pangeo-forge-runner bake" failed | |
if: steps.executejob.outcome == 'failure' | |
run: | | |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..." | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/operator.log | |
cat /tmp/operator.log | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log | |
cat /tmp/jobmanager.log | |
################################################################# | |
# provide feedback about OOM errors where we've seen them before | |
################################################################# | |
RED='\033[0;31m' | |
GREEN='\033[0;32m' | |
NOCOLOR='\033[0m' # To reset the color | |
# grok if operator logs produced a error that makes things unable to schedule | |
error=$(cat /tmp/operator.log | grep "ReconciliationException") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a ReconciliationException in the operator logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/operator.log | grep -a20 "ReconciliationException") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
################################################################# | |
# end | |
################################################################# | |
# delete the flinkdeployment so we don't have old failures hanging around | |
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
# force GH action to show failed result | |
exit 128 | |
- name: echo JobID, JobName, FlinkDashboard to user | |
id: report_ids | |
run: | | |
# TODO: we also need to report historyserver URL and flink dashboard URL | |
# but this also requires us to think how we're going to have a thin | |
# layer of authentication around these services so they aren't totally public | |
echo '############ RECIPE JOB NAME ################' | |
echo $RECIPE_JOB_NAME | |
echo '############ FLINK JOB NAME ################' | |
echo $JOB_NAME | |
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT | |
echo '############ JOB ID ################' | |
echo $JOB_ID | |
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT | |
echo '############ FLINK DASHBOARD ################' | |
echo $FLINK_DASH | |
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT | |
monitor-job: | |
runs-on: ubuntu-latest | |
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }} | |
environment: veda-smce | |
needs: [name-job, run-job] | |
steps: | |
- name: Configure AWS credentials | |
uses: aws-actions/configure-aws-credentials@v3 | |
with: | |
role-to-assume: arn:aws:iam::444055461661:role/github-actions-role-eodc | |
role-session-name: veda-pforge-monitor-job | |
role-duration-seconds: 43200 # note this has to match our timeout-minutes below for monitoring | |
aws-region: us-west-2 | |
- name: install kubectl | |
run: | | |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" | |
chmod +x ./kubectl | |
sudo mv ./kubectl /usr/local/bin/kubectl | |
- name: update kubeconfig with cluster | |
run: | | |
aws eks update-kubeconfig --name ${{ vars.EKS_CLUSTER_NAME }} --region us-west-2 | |
# - name: Setup upterm session | |
# uses: lhotari/action-upterm@v1 | |
# | |
- name: monitor logs of job manager and report final status | |
id: monitorjob | |
timeout-minutes: 720 | |
continue-on-error: true | |
run: | | |
# TODO: this needs to not check the logs but the historyserver status | |
# but first we need think about authentication and a reverse proxy | |
echo "find job status on the job manager logs..." | |
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do | |
echo "still waiting for a status on the job manager logs..." | |
sleep 1 | |
done | |
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1) | |
echo "##### INPUT STATUS #####" | |
echo $input_status | |
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)') | |
echo "##### STATUS #####" | |
echo $status | |
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then | |
echo "job failed with '$status', will dump the logs now..." | |
# force exit so we can move to next step | |
exit 128 | |
fi | |
- name: dump logs | |
if: steps.monitorjob.outcome == 'failure' | |
run: | | |
# much easier to do in bash than in Python via subprocess | |
echo "##################### OPERATOR ######################" | |
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000 | |
echo "##################### JOB MANAGER ######################" | |
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log | |
cat /tmp/jobmanager.log | |
echo "##################### TASK MANAGER ######################" | |
# depending on the `inputs.parallism` we can have more than one taskmanager | |
parallelism_input="${{ github.event.inputs.parallelism }}" | |
iterations=$(expr $parallelism_input + 0) # cast to integer | |
for (( i = 1; i <= iterations; i++ )); do | |
echo "echo #### taskmanager-$i ####" | |
kubectl get pod | grep ${{ needs.run-job.outputs.job_name }} | grep taskmanager-1-$i | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log | |
cat /tmp/taskmanager.log | |
done | |
# NOTE: we actually want the failed flink deployments to stick around b/c we might want to inspect the flink dashboard | |
# kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{} | |
################################################################# | |
# provide feedback about OOM errors where we've seen them before | |
################################################################# | |
RED='\033[0;31m' | |
GREEN='\033[0;32m' | |
NOCOLOR='\033[0m' # To reset the color | |
# grok if taskmanager produced a JVM OOM error | |
error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the taskmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
# grok if this was OOM killed | |
error=$(cat /tmp/jobmanager.log | grep "reason=OOMKilled") | |
if [[ "$error" ]]; then | |
echo -e "${RED}################### ERROR ###########################${NOCOLOR}" | |
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be an OOMKilled error in the jobmanager logs...${NOCOLOR}" | |
dump_error=$(cat /tmp/jobmanager.log | grep -a20 "reason=OOMKilled") | |
echo "$dump_error" | |
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}" | |
fi | |
################################################################# | |
# end | |
################################################################# | |
# force GH action to show failed result | |
exit 128 |