From 9b0995625e12a9401788b5f22d78df28e94221ea Mon Sep 17 00:00:00 2001 From: "Haoran(Richard) Cai" Date: Sun, 28 Apr 2019 11:00:03 -0700 Subject: [PATCH] Enable more parameters in dag_args (#26) * Enable more parameters in dag_args * change import order to pass CI * make post_dump for dagrun timeout configurable --- boundary_layer/builders/primary.py | 3 +- boundary_layer/schemas/dag.py | 77 +++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/boundary_layer/builders/primary.py b/boundary_layer/builders/primary.py index 368f824..91d2876 100644 --- a/boundary_layer/builders/primary.py +++ b/boundary_layer/builders/primary.py @@ -29,7 +29,8 @@ def preamble(self): template = self.get_jinja_template('primary_preamble.j2') - dag_args_dumped = DagArgsSchema().dump(self.dag.get('dag_args', {})) + dag_args_dumped = DagArgsSchema(context={'for_dag_output': True}).dump( + self.dag.get('dag_args', {})) if dag_args_dumped.errors: # should not happen because the schema was validated upon load, # but we should check diff --git a/boundary_layer/schemas/dag.py b/boundary_layer/schemas/dag.py index fc3a3cc..b741b83 100644 --- a/boundary_layer/schemas/dag.py +++ b/boundary_layer/schemas/dag.py @@ -13,8 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re +from datetime import timedelta import semver -from marshmallow import fields, validates_schema, ValidationError +from marshmallow import fields, validates_schema, ValidationError, post_dump from boundary_layer import VERSION, MIN_SUPPORTED_VERSION from boundary_layer.schemas.base import StrictSchema @@ -99,6 +101,79 @@ class DagArgsSchema(StrictSchema): # schedule_interval argument supports cron strings (e.g. 0 * * * *), # '@hourly/daily/etc', or numeric (seconds) schedule_interval = fields.String(allow_none=True) + description = fields.String() + dagrun_timeout = fields.Integer() + default_view = fields.String() + orientation = fields.String() + sla_miss_callback = fields.String() + on_success_callback = fields.String() + on_failure_callback = fields.String() + params = fields.Dict() + full_filepath = fields.String() + template_searchpath = fields.List(fields.String()) + template_undefined = fields.String() + user_defined_macros = fields.Dict() + user_defined_filters = fields.Dict() + doc_md = fields.String() + access_control = fields.Dict() + + @validates_schema + def validate_callbacks(self, data): + callbacks = ['sla_miss_callback', 'on_success_callback', + 'on_failure_callback'] + for cb in callbacks: + if cb not in data: + continue + if not re.compile('<<.+>>').match(data[cb]): + raise ValidationError( + '{} must be a verbatim string like <<...>>'.format(cb), + [cb]) + + @validates_schema + def validate_default_view(self, data): + if 'default_view' not in data: + return + + allowed_values = ('tree', 'graph', 'duration', 'gantt', + 'landing_times') + if data['default_view'] not in allowed_values: + raise ValidationError( + 'Value must be one of {}'.format(allowed_values), + ['default_view']) + + @validates_schema + def validate_orientation(self, data): + if 'orientation' not in data: + return + + allowed_values = ('LR', 'TB', 'RL', 'BT') + if data['orientation'] not in allowed_values: + raise ValidationError( + 'Value must be one of {}'.format(allowed_values), + ['orientation']) + + @validates_schema + def validate_template_undefined(self, data): + if 'template_undefined' not in data: + return + if not re.compile('<<.+>>').match(data['template_undefined']): + raise ValidationError( + 'template_undefined must be a verbatim string like <<...>>', + ['template_undefined']) + + @post_dump + def dagrun_timeout_to_timedelta(self, data): + if not self.context.get('for_dag_output'): + return data + if 'dagrun_timeout' in data: + try: + delta = timedelta(seconds=data['dagrun_timeout']) + except TypeError as e: + raise Exception( + 'Error in making dagrun_timeout into timedelta object : {}' + .format(str(e))) + data['dagrun_timeout'] = delta + return data class PrimaryDagSchema(BaseDagSchema):