Skip to content

Commit

Permalink
Enable more parameters in dag_args (#26)
Browse files Browse the repository at this point in the history
* Enable more parameters in dag_args

* change import order to pass CI

* make post_dump for dagrun timeout configurable
  • Loading branch information
everglory99 authored and mchalek committed Apr 28, 2019
1 parent 945b32f commit 9b09956
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
3 changes: 2 additions & 1 deletion boundary_layer/builders/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def preamble(self):

template = self.get_jinja_template('primary_preamble.j2')

dag_args_dumped = DagArgsSchema().dump(self.dag.get('dag_args', {}))
dag_args_dumped = DagArgsSchema(context={'for_dag_output': True}).dump(
self.dag.get('dag_args', {}))
if dag_args_dumped.errors:
# should not happen because the schema was validated upon load,
# but we should check
Expand Down
77 changes: 76 additions & 1 deletion boundary_layer/schemas/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from datetime import timedelta
import semver
from marshmallow import fields, validates_schema, ValidationError
from marshmallow import fields, validates_schema, ValidationError, post_dump
from boundary_layer import VERSION, MIN_SUPPORTED_VERSION
from boundary_layer.schemas.base import StrictSchema

Expand Down Expand Up @@ -99,6 +101,79 @@ class DagArgsSchema(StrictSchema):
# schedule_interval argument supports cron strings (e.g. 0 * * * *),
# '@hourly/daily/etc', or numeric (seconds)
schedule_interval = fields.String(allow_none=True)
description = fields.String()
dagrun_timeout = fields.Integer()
default_view = fields.String()
orientation = fields.String()
sla_miss_callback = fields.String()
on_success_callback = fields.String()
on_failure_callback = fields.String()
params = fields.Dict()
full_filepath = fields.String()
template_searchpath = fields.List(fields.String())
template_undefined = fields.String()
user_defined_macros = fields.Dict()
user_defined_filters = fields.Dict()
doc_md = fields.String()
access_control = fields.Dict()

@validates_schema
def validate_callbacks(self, data):
callbacks = ['sla_miss_callback', 'on_success_callback',
'on_failure_callback']
for cb in callbacks:
if cb not in data:
continue
if not re.compile('<<.+>>').match(data[cb]):
raise ValidationError(
'{} must be a verbatim string like <<...>>'.format(cb),
[cb])

@validates_schema
def validate_default_view(self, data):
if 'default_view' not in data:
return

allowed_values = ('tree', 'graph', 'duration', 'gantt',
'landing_times')
if data['default_view'] not in allowed_values:
raise ValidationError(
'Value must be one of {}'.format(allowed_values),
['default_view'])

@validates_schema
def validate_orientation(self, data):
if 'orientation' not in data:
return

allowed_values = ('LR', 'TB', 'RL', 'BT')
if data['orientation'] not in allowed_values:
raise ValidationError(
'Value must be one of {}'.format(allowed_values),
['orientation'])

@validates_schema
def validate_template_undefined(self, data):
if 'template_undefined' not in data:
return
if not re.compile('<<.+>>').match(data['template_undefined']):
raise ValidationError(
'template_undefined must be a verbatim string like <<...>>',
['template_undefined'])

@post_dump
def dagrun_timeout_to_timedelta(self, data):
if not self.context.get('for_dag_output'):
return data
if 'dagrun_timeout' in data:
try:
delta = timedelta(seconds=data['dagrun_timeout'])
except TypeError as e:
raise Exception(
'Error in making dagrun_timeout into timedelta object : {}'
.format(str(e)))
data['dagrun_timeout'] = delta
return data


class PrimaryDagSchema(BaseDagSchema):
Expand Down

0 comments on commit 9b09956

Please sign in to comment.