Skip to content

Commit

Permalink
Merge pull request #54 from NASA-IMPACT/gc/emr
Browse files Browse the repository at this point in the history
WIP: Draft 1 Changes Toward EMR Serverless
  • Loading branch information
ranchodeluxe authored May 9, 2024
2 parents e55c5ff + ddf0f6d commit c29622d
Show file tree
Hide file tree
Showing 30 changed files with 1,119 additions and 250 deletions.
272 changes: 50 additions & 222 deletions .github/workflows/job-runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,13 @@ on:
description: 'The subdir of the feedstock directory in the repo'
required: true
default: 'feedstock'
prune:
description: 'Only run the first two time steps'
required: true
default: '0'
parallelism:
description: 'Number of workers to run in parallel'
description: 'Number of partitions to divide the the Spark RDD into (usually equals [num-of-executors]*[num-of-vcpus])'
required: true
default: '1'
auth_mode:
description: 'What auth mode (edl or iamrole) to use when accessing files.'
required: false
default: 'iamrole'
default: '1280'
job_name:
description: 'A unique job name (no other existing filnk deployment can have it) so we can inspect metrics easier in Grafana. job_name must match the regex ^[a-z][-_0-9a-z]{0,62}$.'
description: 'Name the EMR job'
required: true
resource_profile:
description: 'jobs have different memory requirements so choose (small[7824_MiB], medium[9824_MiB], large[11824_MiB], xlarge[13824_MiB])'
required: false
default: 'small'

permissions:
id-token: write # This is required for requesting the JWT
Expand Down Expand Up @@ -81,225 +69,65 @@ jobs:
echo "Manually triggered workflow: \
${{ github.event.inputs.repo }} \
${{ github.event.inputs.ref }} \
${{ github.event.inputs.parallelism }} \
${{ github.event.inputs.prune }}"
- name: install deps
run: |
python -m pip install --upgrade pip
pip install pangeo-forge-runner>=0.10.0
- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
${{ github.event.inputs.feedstock_subdir}} \
${{ github.event.inputs.parallelism }}
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name ${{ vars.EKS_CLUSTER_NAME }} --region us-west-2
- name: execute recipe on k8s cluster
- name: submit job to EMR serverless
id: executejob
continue-on-error: true
run: |
# NOTE: we can't use `2>&1 | tee execute.log` b/c it hangs forever
# so if the command fails (for example b/c it doesn't have the right requirements)
# then we wont' be able to see the errors until we run it without redirecting output
pangeo-forge-runner \
bake \
--repo=${{ github.event.inputs.repo }} \
--ref=${{ github.event.inputs.ref }} \
--Bake.job_name="${{ github.event.inputs.job_name }}" \
-f .github/workflows/config.py > execute.log
# export all the valuable information from the logs
RECIPE_JOB_NAME=$(cat execute.log | grep -oP 'Job name is \K[^ ]+' | head -n1)
echo "RECIPE_JOB_NAME=$RECIPE_JOB_NAME" >> $GITHUB_ENV
JOB_NAME=$(cat execute.log | grep -oP 'flinkdeployment\.flink\.apache\.org/\K[^ ]+' | head -n1)
echo "JOB_NAME=$JOB_NAME" >> $GITHUB_ENV
JOB_ID=$(cat execute.log | grep -oP 'Started Flink job as \K[^ ]+')
echo "JOB_ID=$JOB_ID" >> $GITHUB_ENV
FLINK_DASH=$(cat execute.log | grep -oP "You can run '\K[^']+(?=')")
echo "FLINK_DASH=$FLINK_DASH" >> $GITHUB_ENV
# TODO: make submit_spark_job.py or some other config.py checkout, build env and package on s3
# before submission
python submit_spark_job.py \
--name=${{ github.event.inputs.job_name }} \
--application-id="00firgpmjusj5e0l" \
--execution-role-arn="arn:aws:iam::444055461661:role/veda-data-reader-dev" \
--entry-point="s3://veda-pforge-emr-input-scripts-v3/runwrapper.py" \
--entry-point-arguments="${{ github.event.inputs.repo }} ${{ github.event.inputs.ref }} ${{ github.event.inputs.feedstock_subdir }}" \
--spark-submit-parameters="--conf spark.executor.cores=16 --conf spark.executor.memory=60G --conf spark.executor.memoryOverhead=60G --conf spark.driver.memory=10G --conf spark.driver.memoryOverhead=4G --conf spark.shuffle.file.buffer=64k --conf spark.default.parallelism=${{ github.event.inputs.ref }} --conf spark.emr-serverless.executor.disk=200G"
env:
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }}
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }}
REPO: ${{ github.event.inputs.repo }}
REF: ${{ github.event.inputs.ref }}
FEEDSTOCK_SUBDIR: ${{ github.event.inputs.feedstock_subdir }}
PRUNE_OPTION: ${{ github.event.inputs.prune }}
PARALLELISM_OPTION: ${{ github.event.inputs.parallelism }}
OUTPUT_BUCKET: ${{ vars.OUTPUT_BUCKET }}
AUTH_MODE: ${{ github.event.inputs.auth_mode }}
AWS_ROLE_ARN: ${{ vars.AWS_ROLE_ARN }}
RESOURCE_PROFILE: ${{ github.event.inputs.resource_profile }}
JOB_NAME: ${{ github.event.inputs.job_name }}

- name: cleanup if "pangeo-forge-runner bake" failed
- name: cleanup if submission failed
if: steps.executejob.outcome == 'failure'
run: |
echo "The previous 'bake' command failed or timed out. Running cleanup logic..."
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/operator.log
cat /tmp/operator.log
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v manager | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log
cat /tmp/jobmanager.log
#################################################################
# provide feedback about OOM errors where we've seen them before
#################################################################
RED='\033[0;31m'
GREEN='\033[0;32m'
NOCOLOR='\033[0m' # To reset the color
# grok if operator logs produced a error that makes things unable to schedule
error=$(cat /tmp/operator.log | grep "ReconciliationException")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a ReconciliationException in the operator logs...${NOCOLOR}"
dump_error=$(cat /tmp/operator.log | grep -a20 "ReconciliationException")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
#################################################################
# end
#################################################################
# delete the flinkdeployment so we don't have old failures hanging around
kubectl get flinkdeployment --no-headers | grep $JOB_NAME | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
echo "The previous command failed. Running cleanup logic..."
# force GH action to show failed result
exit 128
- name: echo JobID, JobName, FlinkDashboard to user
id: report_ids
run: |
# TODO: we also need to report historyserver URL and flink dashboard URL
# but this also requires us to think how we're going to have a thin
# layer of authentication around these services so they aren't totally public
echo '############ RECIPE JOB NAME ################'
echo $RECIPE_JOB_NAME
echo '############ FLINK JOB NAME ################'
echo $JOB_NAME
echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT
echo '############ JOB ID ################'
echo $JOB_ID
echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT
echo '############ FLINK DASHBOARD ################'
echo $FLINK_DASH
echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT
monitor-job:
runs-on: ubuntu-latest
name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
environment: veda-smce
needs: [name-job, run-job]
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v3
with:
role-to-assume: arn:aws:iam::444055461661:role/github-actions-role-eodc
role-session-name: veda-pforge-monitor-job
role-duration-seconds: 43200 # note this has to match our timeout-minutes below for monitoring
aws-region: us-west-2

- name: install kubectl
run: |
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin/kubectl
- name: update kubeconfig with cluster
run: |
aws eks update-kubeconfig --name ${{ vars.EKS_CLUSTER_NAME }} --region us-west-2
# - name: Setup upterm session
# uses: lhotari/action-upterm@v1
#
- name: monitor logs of job manager and report final status
id: monitorjob
timeout-minutes: 720
continue-on-error: true
run: |
# TODO: this needs to not check the logs but the historyserver status
# but first we need think about authentication and a reverse proxy
echo "find job status on the job manager logs..."
while [[ -z "$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)" ]]; do
echo "still waiting for a status on the job manager logs..."
sleep 1
done
input_status=$(kubectl get pod --no-headers | grep -v manager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | grep 'ExecutionGraph.*Job BeamApp.*from state RUNNING.*' | head -n 1)
echo "##### INPUT STATUS #####"
echo $input_status
status=$(echo "$input_status" | grep -oP '\b\w+(?=\.$)')
echo "##### STATUS #####"
echo $status
if [[ "$status" == "FAILING" || "$status" == "FAILED" ]]; then
echo "job failed with '$status', will dump the logs now..."
# force exit so we can move to next step
exit 128
fi
- name: dump logs
if: steps.monitorjob.outcome == 'failure'
run: |
# much easier to do in bash than in Python via subprocess
echo "##################### OPERATOR ######################"
kubectl get pod | grep operator | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} | tail -n 1000
echo "##################### JOB MANAGER ######################"
kubectl get pod | grep -v taskmanager | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl logs pod/{} > /tmp/jobmanager.log
cat /tmp/jobmanager.log
echo "##################### TASK MANAGER ######################"
# depending on the `inputs.parallism` we can have more than one taskmanager
parallelism_input="${{ github.event.inputs.parallelism }}"
iterations=$(expr $parallelism_input + 0) # cast to integer
for (( i = 1; i <= iterations; i++ )); do
echo "echo #### taskmanager-$i ####"
kubectl get pod | grep ${{ needs.run-job.outputs.job_name }} | grep taskmanager-1-$i | cut -d' ' -f1 | head -n1 | xargs -I{} kubectl logs pod/{} -c flink-main-container > /tmp/taskmanager.log
cat /tmp/taskmanager.log
done
# NOTE: we actually want the failed flink deployments to stick around b/c we might want to inspect the flink dashboard
# kubectl get flinkdeployment --no-headers | grep ${{ needs.run-job.outputs.job_name }} | cut -d' ' -f1 | xargs -I{} kubectl delete flinkdeployment/{}
#################################################################
# provide feedback about OOM errors where we've seen them before
#################################################################
RED='\033[0;31m'
GREEN='\033[0;32m'
NOCOLOR='\033[0m' # To reset the color
# grok if taskmanager produced a JVM OOM error
error=$(cat /tmp/taskmanager.log | grep "java.lang.OutOfMemoryError")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be a JVM OOM error in the taskmanager logs...${NOCOLOR}"
dump_error=$(cat /tmp/taskmanager.log | grep -a20 "java.lang.OutOfMemoryError")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
# grok if this was OOM killed
error=$(cat /tmp/jobmanager.log | grep "reason=OOMKilled")
if [[ "$error" ]]; then
echo -e "${RED}################### ERROR ###########################${NOCOLOR}"
echo -e "${RED}ERROR: ${NOCOLOR}${GREEN}There seems to be an OOMKilled error in the jobmanager logs...${NOCOLOR}"
dump_error=$(cat /tmp/jobmanager.log | grep -a20 "reason=OOMKilled")
echo "$dump_error"
echo -e "${RED}################### END ERROR ###########################${NOCOLOR}"
fi
#################################################################
# end
#################################################################
# force GH action to show failed result
exit 128
# - name: echo JobID, JobName, FlinkDashboard to user
# id: report_ids
# run: |
# # TODO: we also need to report historyserver URL and flink dashboard URL
# # but this also requires us to think how we're going to have a thin
# # layer of authentication around these services so they aren't totally public
# echo '############ RECIPE JOB NAME ################'
# echo $RECIPE_JOB_NAME
# echo '############ FLINK JOB NAME ################'
# echo $JOB_NAME
# echo "job_name=$JOB_NAME" >> $GITHUB_OUTPUT
# echo '############ JOB ID ################'
# echo $JOB_ID
# echo "job_id=$JOB_ID" >> $GITHUB_OUTPUT
# echo '############ FLINK DASHBOARD ################'
# echo $FLINK_DASH
# echo "flink_dash=$FLINK_DASH" >> $GITHUB_OUTPUT

# monitor-job:
# runs-on: ubuntu-latest
# name: monitor job ${{ needs.name-job.outputs.repo_name }}@${{ github.event.inputs.ref }}
# environment: veda-smce
# needs: [name-job, run-job]
# steps:
# - name: Configure AWS credentials
# uses: aws-actions/configure-aws-credentials@v3
# with:
# role-to-assume: arn:aws:iam::444055461661:role/github-actions-role-eodc
# role-session-name: veda-pforge-monitor-job
# role-duration-seconds: 43200 # note this has to match our timeout-minutes below for monitoring
# aws-region: us-west-2
57 changes: 57 additions & 0 deletions .github/workflows/submit_spark_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import argparse
import boto3
import json

def start_emr_job(application_id, execution_role_arn, entry_point, entry_point_arguments, spark_submit_params, configuration_overrides, tags, execution_timeout, name):
client = boto3.client('emr-serverless')

job_driver = {
'sparkSubmit': {
'entryPoint': entry_point,
'entryPointArguments': entry_point_arguments.split(),
'sparkSubmitParameters': spark_submit_params
}
}

response = client.start_job_run(
applicationId=application_id,
clientToken='token', # Generate a unique token here if needed
executionRoleArn=execution_role_arn,
jobDriver=job_driver,
configurationOverrides=configuration_overrides,
tags=tags,
executionTimeoutMinutes=execution_timeout,
name=name
)
return response


if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Start a Spark job on EMR Serverless.')

parser.add_argument('--application-id', required=True, help='Application ID for the EMR Serverless application')
parser.add_argument('--execution-role-arn', required=True, help='Execution role ARN')
parser.add_argument('--entry-point', required=True, help='Entry point for the Spark job (e.g., s3://bucket/script.py)')
parser.add_argument('--entry-point-arguments', default='', help='Space-separated entry point arguments')
parser.add_argument('--spark-submit-parameters', default='', help='Spark submit parameters')
parser.add_argument('--configuration-overrides', type=json.loads, default={}, help='JSON string for configuration overrides')
parser.add_argument('--tags', type=json.loads, default={}, help='JSON string for tags')
parser.add_argument('--execution-timeout', type=int, default=123, help='Execution timeout in minutes')
parser.add_argument('--name', required=True, help='Name for the job run')

args = parser.parse_args()

response = start_emr_job(
application_id=args.application_id,
execution_role_arn=args.execution_role_arn,
entry_point=args.entry_point,
entry_point_arguments=args.entry_point_arguments,
spark_submit_params=args.spark_submit_parameters,
configuration_overrides=args.configuration_overrides,
tags=args.tags,
execution_timeout=args.execution_timeout,
name=args.name
)

print("Job started successfully. Response:")
print(response)
Loading

0 comments on commit c29622d

Please sign in to comment.