Skip to content

Commit

Permalink
Copy demo workflows to Airflow Home
Browse files Browse the repository at this point in the history
Allow to fix the bug when Docker cannot
mount folders from /usr/local/lib/ on
macOS
  • Loading branch information
michael-kotliar committed Aug 6, 2018
1 parent 6b1fc5f commit 44a0763
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,10 @@ Common errors due to different Python version and different ways to install it
- Docker is unable to pull images from the Internet.

If you are using proxy, your Docker should be configured properly too.
Refer to the official [documentation](https://docs.docker.com/config/daemon/systemd/#httphttps-proxy)
Refer to the official [documentation](https://docs.docker.com/config/daemon/systemd/#httphttps-proxy)

- Docker is unable to mount directory.

For macOS docker has a list of directories that it's allowed to mount by default. If your input files are located in
the directories that are not included in this list, you are better of either changing the location of
input files and updating your Job file or adding this directories into Docker configuration *Preferences / File Sharing*.
4 changes: 3 additions & 1 deletion cwl_airflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
get_airflow_default_args,
clean_jobs_folder,
get_webserver_url,
asset_conf)
asset_conf,
copy_demo_workflows)
from cwl_airflow.utils.utils import (get_workflow_output,
normalize_args,
exit_if_unsupported_feature,
Expand Down Expand Up @@ -123,6 +124,7 @@ def run_init(args):
update_config(args)
create_folders()
export_dags()
copy_demo_workflows()
logging.info("Init Airflow DB")
with Mute():
initdb(argparse.Namespace())
Expand Down
32 changes: 22 additions & 10 deletions cwl_airflow/utils/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from airflow import conf as conf
from airflow.models import DagRun
from airflow.utils.state import State
from airflow.settings import DAGS_FOLDER
from airflow.settings import DAGS_FOLDER, AIRFLOW_HOME
from airflow.bin.cli import get_dag, CLIFactory, scheduler
from airflow.exceptions import AirflowConfigException
from cwl_airflow.utils.utils import (set_logger,
Expand All @@ -31,8 +31,8 @@


def get_demo_workflow(target_wf=None, job_ext=".json"):
workflows = get_files(norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/cwl/workflows")))
jobs = get_files(norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/job")))
workflows = get_files(os.path.join(AIRFLOW_HOME, "demo/cwl/workflows"))
jobs = get_files(os.path.join(AIRFLOW_HOME, "demo/job"))
combined_data = []
for wf_name, wf_path in workflows.items():
job_name = os.path.splitext(wf_name)[0] + job_ext
Expand Down Expand Up @@ -193,8 +193,7 @@ def config():
conf.get('cwl', 'jobs')
conf.get('cwl', 'limit')

def paths():
items = [conf.get('cwl', 'jobs'), DAGS_FOLDER, os.path.join(DAGS_FOLDER, "cwl_dag.py")]
def paths(items=[]):
for item in items:
if not os.path.exists(item):
raise OSError(item)
Expand All @@ -204,9 +203,8 @@ def docker():
subprocess.check_call("docker -v", shell=True, stdout=devnull)

def docker_demo_mount():
tests = norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests/data"))
with open(os.devnull, 'w') as devnull:
subprocess.check_call("docker run --rm -v {}:/tmp hello-world".format(tests), shell=True, stdout=devnull)
subprocess.check_call("docker run --rm -v {}:/tmp hello-world".format(AIRFLOW_HOME), shell=True, stdout=devnull)

def docker_pull():
with open(os.devnull, 'w') as devnull:
Expand All @@ -216,10 +214,16 @@ def airflow():
with open(os.devnull, 'w') as devnull:
subprocess.check_call("airflow -h", shell=True, stdout=devnull)

def demo_paths():
paths([os.path.join(AIRFLOW_HOME, "demo", name) for name in ["cwl", "data", "job"]])

def general_paths():
paths([conf.get('cwl', 'jobs'), DAGS_FOLDER, os.path.join(DAGS_FOLDER, "cwl_dag.py")])

check_set = {
"init": [airflow, docker],
"demo": [airflow, docker, docker_pull, docker_demo_mount, paths, config],
None: [airflow, docker, paths, config]
"demo": [airflow, docker, docker_pull, docker_demo_mount, general_paths, demo_paths, config],
None: [airflow, docker, general_paths, config]
}

for check_criteria in check_set[mode]:
Expand All @@ -240,4 +244,12 @@ def airflow():


def get_webserver_url():
return "{}:{}".format(conf.get('webserver', 'WEB_SERVER_HOST'), conf.get('webserver', 'WEB_SERVER_PORT'))
return "{}:{}".format(conf.get('webserver', 'WEB_SERVER_HOST'), conf.get('webserver', 'WEB_SERVER_PORT'))


def copy_demo_workflows():
src = norm_path(os.path.join(os.path.dirname(os.path.abspath(os.path.join(__file__, "../"))), "tests"))
dst = os.path.join(AIRFLOW_HOME, "demo")
logging.info("Copy demo workflows\n- from: {}\n- to: {}".format(src, dst))
shutil.rmtree(dst, ignore_errors=True)
shutil.copytree(src, dst)

0 comments on commit 44a0763

Please sign in to comment.