You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi,
Looks like the combination of sla_miss_callback and S3KeySensor is inconsistent.
We have a scenario where there is a check if upstream data is available by checking availability of a marker file in s3 using S3KeySensor dependency check. if available for say within 3 minutes sla, then perform next tasks.
if dependency isn't met by sla, send an email/print.
For now used below code, which works sometimes and later do not or does not work anytime in other environment. the combination of sla_miss_callback and S3KeySensor looks inconsistent.
"""
A simple example showing the basics of using a custom SLA notification response.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.sensors.s3_key_sensor import S3KeySensor
# Our notification function which simply prints out some fo the information passed in to the SLA notification miss.
def print_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
print('SLA was missed on DAG %(dag)s by task id %(blocking_tis)s with task list %(task_list)s which are blocking ' \
'%(blocking_task_list)s' % locals())
# Set up some simple DAG args, note the email is set to None to avoid the missed SLA from creating a failure in the
# DAG. SLA emails are sent out if the SLA is missed and the email is defined, there is currently not a flag to disable.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 1, 1),
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
# Create a basic DAG with our args
dag = DAG(
'basic_sla_s3',
default_args=default_args,
# Add our method as a SLA callback
sla_miss_callback=print_sla_miss,
# A common interval to make the job fire when we run it
schedule_interval=timedelta(minutes=2),
catchup=False,
max_active_runs=1
)
# Add a task that will always fail the SLA
t1 = BashOperator(
task_id='start',
# Set our task up with a 10 second SLA
#sla=timedelta(seconds=10),
# Sleep 15 seconds to guarantee we miss the SLA
bash_command='sleep 1',
# Do not retry so the SLA miss fires after the first execution
retries=0,
dag=dag,
)
# DAILY_FREQUENCY
s3_dependency = S3KeySensor(
task_id='check_for_files_in_s3_v3',
bucket_key='slatest/',
wildcard_match=True,
bucket_name='my-test',
timeout=120,
poke_interval=10,
sla=timedelta(seconds=30),
dag=dag)
t1 >> s3_dependency
The text was updated successfully, but these errors were encountered:
Hi,
Looks like the combination of sla_miss_callback and S3KeySensor is inconsistent.
We have a scenario where there is a check if upstream data is available by checking availability of a marker file in s3 using S3KeySensor dependency check. if available for say within 3 minutes sla, then perform next tasks.
if dependency isn't met by sla, send an email/print.
For now used below code, which works sometimes and later do not or does not work anytime in other environment. the combination of sla_miss_callback and S3KeySensor looks inconsistent.
The text was updated successfully, but these errors were encountered: