From 44a07634ad6440957d45196ef5b63b265adcdcdb Mon Sep 17 00:00:00 2001 From: Michael Kotliar Date: Mon, 6 Aug 2018 15:09:05 -0400 Subject: [PATCH] Copy demo workflows to Airflow Home Allow to fix the bug when Docker cannot mount folders from /usr/local/lib/ on macOS --- README.md | 8 +++++++- cwl_airflow/main.py | 4 +++- cwl_airflow/utils/func.py | 32 ++++++++++++++++++++++---------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 4aad27a..a4e40b7 100644 --- a/README.md +++ b/README.md @@ -263,4 +263,10 @@ Common errors due to different Python version and different ways to install it - Docker is unable to pull images from the Internet. If you are using proxy, your Docker should be configured properly too. - Refer to the official [documentation](https://docs.docker.com/config/daemon/systemd/#httphttps-proxy) \ No newline at end of file + Refer to the official [documentation](https://docs.docker.com/config/daemon/systemd/#httphttps-proxy) + +- Docker is unable to mount directory. + + For macOS docker has a list of directories that it's allowed to mount by default. If your input files are located in + the directories that are not included in this list, you are better of either changing the location of + input files and updating your Job file or adding this directories into Docker configuration *Preferences / File Sharing*. \ No newline at end of file diff --git a/cwl_airflow/main.py b/cwl_airflow/main.py index 8256000..fc6a828 100755 --- a/cwl_airflow/main.py +++ b/cwl_airflow/main.py @@ -21,7 +21,8 @@ get_airflow_default_args, clean_jobs_folder, get_webserver_url, - asset_conf) + asset_conf, + copy_demo_workflows) from cwl_airflow.utils.utils import (get_workflow_output, normalize_args, exit_if_unsupported_feature, @@ -123,6 +124,7 @@ def run_init(args): update_config(args) create_folders() export_dags() + copy_demo_workflows() logging.info("Init Airflow DB") with Mute(): initdb(argparse.Namespace()) diff --git a/cwl_airflow/utils/func.py b/cwl_airflow/utils/func.py index f337b7b..4cc5d75 100644 --- a/cwl_airflow/utils/func.py +++ b/cwl_airflow/utils/func.py @@ -13,7 +13,7 @@ from airflow import conf as conf from airflow.models import DagRun from airflow.utils.state import State -from airflow.settings import DAGS_FOLDER +from airflow.settings import DAGS_FOLDER, AIRFLOW_HOME from airflow.bin.cli import get_dag, CLIFactory, scheduler from airflow.exceptions import AirflowConfigException from cwl_airflow.utils.utils import (set_logger, @@ -31,8 +31,8 @@ def get_demo_workflow(target_wf=None, job_ext=".json"): - workflows = get_files(norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/cwl/workflows"))) - jobs = get_files(norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/job"))) + workflows = get_files(os.path.join(AIRFLOW_HOME, "demo/cwl/workflows")) + jobs = get_files(os.path.join(AIRFLOW_HOME, "demo/job")) combined_data = [] for wf_name, wf_path in workflows.items(): job_name = os.path.splitext(wf_name)[0] + job_ext @@ -193,8 +193,7 @@ def config(): conf.get('cwl', 'jobs') conf.get('cwl', 'limit') - def paths(): - items = [conf.get('cwl', 'jobs'), DAGS_FOLDER, os.path.join(DAGS_FOLDER, "cwl_dag.py")] + def paths(items=[]): for item in items: if not os.path.exists(item): raise OSError(item) @@ -204,9 +203,8 @@ def docker(): subprocess.check_call("docker -v", shell=True, stdout=devnull) def docker_demo_mount(): - tests = norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/data")) with open(os.devnull, 'w') as devnull: - subprocess.check_call("docker run --rm -v {}:/tmp hello-world".format(tests), shell=True, stdout=devnull) + subprocess.check_call("docker run --rm -v {}:/tmp hello-world".format(AIRFLOW_HOME), shell=True, stdout=devnull) def docker_pull(): with open(os.devnull, 'w') as devnull: @@ -216,10 +214,16 @@ def airflow(): with open(os.devnull, 'w') as devnull: subprocess.check_call("airflow -h", shell=True, stdout=devnull) + def demo_paths(): + paths([os.path.join(AIRFLOW_HOME, "demo", name) for name in ["cwl", "data", "job"]]) + + def general_paths(): + paths([conf.get('cwl', 'jobs'), DAGS_FOLDER, os.path.join(DAGS_FOLDER, "cwl_dag.py")]) + check_set = { "init": [airflow, docker], - "demo": [airflow, docker, docker_pull, docker_demo_mount, paths, config], - None: [airflow, docker, paths, config] + "demo": [airflow, docker, docker_pull, docker_demo_mount, general_paths, demo_paths, config], + None: [airflow, docker, general_paths, config] } for check_criteria in check_set[mode]: @@ -240,4 +244,12 @@ def airflow(): def get_webserver_url(): - return "{}:{}".format(conf.get('webserver', 'WEB_SERVER_HOST'), conf.get('webserver', 'WEB_SERVER_PORT')) \ No newline at end of file + return "{}:{}".format(conf.get('webserver', 'WEB_SERVER_HOST'), conf.get('webserver', 'WEB_SERVER_PORT')) + + +def copy_demo_workflows(): + src = norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests")) + dst = os.path.join(AIRFLOW_HOME, "demo") + logging.info("Copy demo workflows\n- from: {}\n- to: {}".format(src, dst)) + shutil.rmtree(dst, ignore_errors=True) + shutil.copytree(src, dst) \ No newline at end of file