diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..d6f5fd7fe --- /dev/null +++ b/.gitignore @@ -0,0 +1,130 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Visual Studio Code +.vscode/* +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json \ No newline at end of file diff --git a/molecules/.gitignore b/molecules/.gitignore deleted file mode 100644 index 7fdea582c..000000000 --- a/molecules/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.pyc -*.egg-info diff --git a/molecules/README.md b/molecules/README.md index 6b0fe649c..1f933df87 100644 --- a/molecules/README.md +++ b/molecules/README.md @@ -3,7 +3,9 @@ For more details, see [Machine Learning with Apache Beam and TensorFlow](https:/ This sample shows how to create, train, evaluate, and make predictions on a machine learning model, using [Apache Beam](https://beam.apache.org/), [Google Cloud Dataflow](https://cloud.google.com/dataflow/), [TensorFlow](https://www.tensorflow.org/), and [AI Platform](https://cloud.google.com/ai-platform/). -The dataset for this sample is extracted from the [National Center for Biotechnology Information](https://www.ncbi.nlm.nih.gov/) ([FTP source](ftp://ftp.ncbi.nlm.nih.gov/pubchem/Compound_3D/01_conf_per_cmpd/SDF)). The file format is [`SDF`](https://en.wikipedia.org/wiki/Chemical_table_file#SDF). Here's a more detailed description of the [MDL/SDF file format](http://c4.cabrillo.edu/404/ctfile.pdf). +The dataset for this sample is extracted from the [National Center for Biotechnology Information](https://www.ncbi.nlm.nih.gov/) ([FTP source](ftp://ftp.ncbi.nlm.nih.gov/pubchem/Compound_3D/01_conf_per_cmpd/SDF)). +The file format is [`SDF`](https://en.wikipedia.org/wiki/Chemical_table_file#SDF). +Here's a more detailed description of the [MDL/SDF file format](http://c4.cabrillo.edu/404/ctfile.pdf). These are the general steps: 1. Data extraction @@ -12,7 +14,6 @@ These are the general steps: 4. Doing predictions ## Initial setup -> NOTE: This requires `python2`, Apache Beam does not currently support `python3`. ### Getting the source code You can clone the github repository and then navigate to the `molecules` sample directory. @@ -23,24 +24,19 @@ cd cloudml-samples/molecules ``` ### Python virtual environment -Using [virtualenv](https://virtualenv.pypa.io/en/stable/) to isolate your dependencies is recommended. -To set up, make sure you have the `virtualenv` package installed. -```bash -pip install --user virtualenv -``` -To create and activate a new virtual environment, run the following commands: -```bash -python -m virtualenv env -source env/bin/activate -``` +> NOTE: It is recommended to run on `python2`. +> Support for using the Apache Beam SDK for Python 3 is in a prerelease state [alpha](https://cloud.google.com/products/?hl=EN#product-launch-stages) and might change or have limited support. + +Install a [Python virtual environment](https://packaging.python.org/guides/installing-using-pip-and-virtual-environments). -To deactivate the virtual environment, run: +Run the following to set up and activate a new virtual environment: ```bash -deactivate +python2.7 -m virtualenv env +source env/bin/activate ``` -See [virtualenv](https://virtualenv.pypa.io/en/stable/installation/) for details. +Once you are done with the tutorial, you can deactivate the virtual environment by running `deactivate`. ### Installing requirements You can use the `requirements.txt` to install the dependencies. @@ -55,7 +51,6 @@ We'll start by running the end-to-end script locally. To run simply run the [`ru ``` The script requires a working directory, which is where all the temporary files and other intermediate data will be stored throughout the full run. By default it will use `/tmp/cloudml-samples/molecules` as the working directory. To specify a different working directory, you can specify it via the `--work-dir` option. - ```bash # To use a different local path ./run-local --work-dir ~/cloudml-samples/molecules @@ -64,49 +59,61 @@ The script requires a working directory, which is where all the temporary files ./run-local --work-dir gs:///cloudml-samples/molecules ``` -Each SDF file contains data for 25,000 molecules. The script will download only 5 SDF files to the working directory by default. To use a different number of data files, you can use the `--max-data-files` -option. - +Each SDF file contains data for 25,000 molecules. +The script will download only 5 SDF files to the working directory by default. +To use a different number of data files, you can use the `--max-data-files` option. ```bash # To use 10 data files ./run-local --max-data-files 10 ``` -To run on Google Cloud Platform, all the files must reside in Google Cloud Storage. To run use the [`run-cloud`](run-cloud) command. +To run on Google Cloud Platform, all the files must reside in Google Cloud Storage, so you will have to specify a Google Cloud Storage path as the working directory. +To run use the [`run-cloud`](run-cloud) command. > NOTE: this will incur charges on your Google Cloud Platform project. ```bash # This will use only 5 data files by default -./run-cloud --work-dir gs:///cloudml-samples/molecules +./run-cloud --work-dir gs:///cloudml-samples/molecules ``` -## Data Extraction -Source code: [`data-extractor.py`](data-extractor.py) +## Manual run + +### Data Extraction +> Source code: [`data-extractor.py`](data-extractor.py) -This is a data extraction tool to download SDF files from the specified FTP source. The data files will be stored within a `data` subdirectory inside the working directory. +This is a data extraction tool to download SDF files from the specified FTP source. +The data files will be stored within a `data` subdirectory inside the working directory. To store data files locally: ```bash -# The default --work-dir is /tmp/cloudml-samples/molecules -python data-extractor.py --max-data-files 5 +WORK_DIR=/tmp/cloudml-samples/molecules +python data-extractor.py --work-dir $WORK_DIR --max-data-files 5 ``` To store data files to a Google Cloud Storage location: > NOTE: this will incur charges on your Google Cloud Platform project. See [Storage pricing](https://cloud.google.com/storage/pricing). ```bash -WORK_DIR=gs:///cloudml-samples/molecules +WORK_DIR=gs:///cloudml-samples/molecules python data-extractor.py --work-dir $WORK_DIR --max-data-files 5 ``` -## Preprocessing -Source code: [`preprocess.py`](preprocess.py) +### Preprocessing +> Source code: [`preprocess.py`](preprocess.py) -This is an [Apache Beam](https://beam.apache.org/) pipeline that will do all the preprocessing necessary to train a Machine Learning model. It uses [tf.Transform](https://github.com/tensorflow/transform), which is part of [TensorFlow Extended](https://www.tensorflow.org/tfx/), to do any processing that requires a full pass over the dataset. +This is an [Apache Beam](https://beam.apache.org/) pipeline that will do all the preprocessing necessary to train a Machine Learning model. +It uses [tf.Transform](https://github.com/tensorflow/transform), which is part of [TensorFlow Extended](https://www.tensorflow.org/tfx/), to do any processing that requires a full pass over the dataset. -For this sample, we're doing a very simple feature extraction. It uses Apache Beam to parse the SDF files and count how many Carbon, Hydrogen, Oxygen, and Nitrogen atoms a molecule has. To create more complex models we would need to extract more sophisticated features, such as [Coulomb Matrices](https://journals.aps.org/prl/abstract/10.1103/PhysRevLett.108.058301). +For this sample, we're doing a very simple feature extraction. +It uses Apache Beam to parse the SDF files and count how many Carbon, Hydrogen, Oxygen, and Nitrogen atoms a molecule has. +To create more complex models we would need to extract more sophisticated features, such as [Coulomb Matrices](https://journals.aps.org/prl/abstract/10.1103/PhysRevLett.108.058301). -We will eventually train a [Neural Network](https://skymind.ai/wiki/neural-network) to do the predictions. Neural Networks are more stable when dealing with small values, so it's always a good idea to [normalize](https://en.wikipedia.org/wiki/Feature_scaling) the inputs to a small range (typically from 0 to 1). Since there's no maximum number of atoms a molecule can have, we have to go through the entire dataset to find the minimum and maximum counts. Fortunately, tf.Transform integrates with our Apache Beam pipeline and does that for us. +We will eventually train a [Neural Network](https://skymind.ai/wiki/neural-network) to do the predictions. +Neural Networks are more stable when dealing with small values, so it's always a good idea to [normalize](https://en.wikipedia.org/wiki/Feature_scaling) the inputs to a small range (typically from 0 to 1). +Since there's no maximum number of atoms a molecule can have, we have to go through the entire dataset to find the minimum and maximum counts. +Fortunately, tf.Transform integrates with our Apache Beam pipeline and does that for us. -After preprocessing our dataset, we also want to split it into a training and evaluation dataset. The training dataset will be used to train the model. The evaluation dataset contains elements that the training has never seen, and since we also know the "answers" (the molecular energy), we'll use these to validate that the training accuracy roughly matches the accuracy on unseen elements. +After preprocessing our dataset, we also want to split it into a training and evaluation dataset. +The training dataset will be used to train the model. +The evaluation dataset contains elements that the training has never seen, and since we also know the "answers" (the molecular energy), we'll use these to validate that the training accuracy roughly matches the accuracy on unseen elements. These are the general steps: 1) Parse the SDF files @@ -114,21 +121,23 @@ These are the general steps: 3) *Normalization (normalize counts to 0 to 1) 4) Split into 80% training data and 20% evaluation data -> (*) During the normalization step, the Beam pipeline doesn't actually apply the tf.Transform function to our data. It analyzes the whole dataset to find the values it needs (in this case the minimums and maximums), and with that it creates a TensorFlow graph of operations with those values as constants. This graph of operations will be applied by TensorFlow itself, allowing us to pass the unnormalized data as inputs rather than having to normalize them ourselves during prediction. +> (*) During the normalization step, the Beam pipeline doesn't actually apply the tf.Transform function to our data. +It analyzes the whole dataset to find the values it needs (in this case the minimums and maximums), and with that it creates a TensorFlow graph of operations with those values as constants. +This graph of operations will be applied by TensorFlow itself, allowing us to pass the unnormalized data as inputs rather than having to normalize them ourselves during prediction. The `preprocess.py` script will preprocess all the data files it finds under `$WORK_DIR/data/`, which is the path where `data-extractor.py` stores the files. If your training data is small enough, it will be faster to run locally. ```bash -# The default --work-dir is /tmp/cloudml-samples/molecules -python preprocess.py +WORK_DIR=/tmp/cloudml-samples/molecules +python preprocess.py --work-dir $WORK_DIR ``` As you want to preprocess a larger amount of data files, it will scale better using [Cloud Dataflow](https://cloud.google.com/dataflow/). > NOTE: this will incur charges on your Google Cloud Platform project. See [Dataflow pricing](https://cloud.google.com/dataflow/pricing). ```bash PROJECT=$(gcloud config get-value project) -WORK_DIR=gs:///cloudml-samples/molecules +WORK_DIR=gs:///cloudml-samples/molecules python preprocess.py \ --project $PROJECT \ --runner DataflowRunner \ @@ -137,41 +146,44 @@ python preprocess.py \ --work-dir $WORK_DIR ``` -## Training the Model -Source code: [`trainer/task.py`](trainer/task.py) +### Training the Model +> Source code: [`trainer/task.py`](trainer/task.py) -We'll train a [Deep Neural Network Regressor](https://www.tensorflow.org/api_docs/python/tf/estimator/DNNRegressor) in [TensorFlow](https://www.tensorflow.org/). This will use the preprocessed data stored within the working directory. During the preprocessing stage, the Apache Beam pipeline transformed extracted all the features (counts of elements) and tf.Transform generated a graph of operations to normalize those features. +We'll train a [Deep Neural Network Regressor](https://www.tensorflow.org/api_docs/python/tf/estimator/DNNRegressor) in [TensorFlow](https://www.tensorflow.org/). +This will use the preprocessed data stored within the working directory. +During the preprocessing stage, the Apache Beam pipeline transformed extracted all the features (counts of elements) and tf.Transform generated a graph of operations to normalize those features. The TensorFlow model actually takes the unnormalized inputs (counts of elements), applies the tf.Transform's graph of operations to normalize the data, and then feeds that into our DNN regressor. For small datasets it will be faster to run locally. ```bash -# The default --work-dir is /tmp/cloudml-samples/molecules -python trainer/task.py +WORK_DIR=/tmp/cloudml-samples/molecules +python trainer/task.py --work-dir $WORK_DIR # To get the path of the trained model -EXPORT_DIR=/tmp/cloudml-samples/molecules/model/export/final +EXPORT_DIR=$WORK_DIR/model/export/final MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1) ``` -If the training dataset is too large, it will scale better to train on [AI Platform](https://cloud.google.com/ml-engine/). +If the training dataset is too large, it will scale better to train on [AI Platform](https://cloud.google.com/ai-platform/). > NOTE: this will incur charges on your Google Cloud Platform project. See [AI Platform pricing](https://cloud.google.com/ml-engine/docs/pricing). ```bash JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)" -BUCKET=gs:// -WORK_DIR=$BUCKET/cloudml-samples/molecules -gcloud ml-engine jobs submit training $JOB \ +BUCKET=gs:// +WORK_DIR=gs:///cloudml-samples/molecules +gcloud ai-platform jobs submit training $JOB \ --module-name trainer.task \ --package-path trainer \ --staging-bucket $BUCKET \ - --runtime-version 1.8 \ + --runtime-version 1.13 \ + --region us-central1 \ --stream-logs \ -- \ --work-dir $WORK_DIR # To get the path of the trained model EXPORT_DIR=$WORK_DIR/model/export/final -MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1) +MODEL_DIR=$(gsutil ls -d "$EXPORT_DIR/*" | sort -r | head -n 1) ``` To visualize the training job, we can use [TensorBoard](https://www.tensorflow.org/guide/summaries_and_tensorboard). @@ -179,29 +191,36 @@ To visualize the training job, we can use [TensorBoard](https://www.tensorflow.o tensorboard --logdir $WORK_DIR/model ``` -You can access the results at `localhost:6006`. +You can access the results at [`localhost:6006`](localhost:6006). -## Predictions -### Option 1: Batch Predictions -Source code: [`predict.py`](predict.py) +### Predictions -Batch predictions are optimized for throughput rather than latency. These work best if there's a large amount of predictions to make and you can wait for all of them to finish before having the results. +#### Option 1: Batch Predictions +> Source code: [`predict.py`](predict.py) + +Batch predictions are optimized for throughput rather than latency. +These work best if there's a large amount of predictions to make and you can wait for all of them to finish before having the results. For batches with a small number of data files, it will be faster to run locally. ```bash # For simplicity, we'll use the same files we used for training +WORK_DIR=/tmp/cloudml-samples/molecules python predict.py \ + --work-dir $WORK_DIR \ --model-dir $MODEL_DIR \ batch \ - --inputs-dir /tmp/cloudml-samples/molecules/data \ - --outputs-dir /tmp/cloudml-samples/molecules/predictions + --inputs-dir $WORK_DIR/data \ + --outputs-dir $WORK_DIR/predictions + +# To check the outputs +head -n 10 $WORK_DIR/predictions/* ``` For batches with a large number of data files, it will scale better using [Cloud Dataflow](https://cloud.google.com/dataflow/). ```bash # For simplicity, we'll use the same files we used for training PROJECT=$(gcloud config get-value project) -WORK_DIR=gs:///cloudml-samples/molecules +WORK_DIR=gs:///cloudml-samples/molecules python predict.py \ --work-dir $WORK_DIR \ --model-dir $MODEL_DIR \ @@ -212,14 +231,19 @@ python predict.py \ --setup_file ./setup.py \ --inputs-dir $WORK_DIR/data \ --outputs-dir $WORK_DIR/predictions + +# To check the outputs +gsutil cat "$WORK_DIR/predictions/*" | head -n 10 ``` -### Option 2: Streaming Predictions -Source code: [`predict.py`](predict.py) +#### Option 2: Streaming Predictions +> Source code: [`predict.py`](predict.py) -Streaming predictions are optimized for latency rather than throughput. These work best if you are sending sporadic predictions, but want to get the results as soon as possible. +Streaming predictions are optimized for latency rather than throughput. +These work best if you are sending sporadic predictions, but want to get the results as soon as possible. -This streaming service will receive molecules from a PubSub topic and publish the prediction results to another PubSub topic. We'll have to create the topics first. +This streaming service will receive molecules from a PubSub topic and publish the prediction results to another PubSub topic. +We'll have to create the topics first. ```bash # To create the inputs topic gcloud pubsub topics create molecules-inputs @@ -232,19 +256,20 @@ For testing purposes, we can start the online streaming prediction service local ```bash # Run on terminal 1 PROJECT=$(gcloud config get-value project) +WORK_DIR=/tmp/cloudml-samples/molecules python predict.py \ + --work-dir $WORK_DIR \ --model-dir $MODEL_DIR \ stream \ - --project $PROJECT + --project $PROJECT \ --inputs-topic molecules-inputs \ --outputs-topic molecules-predictions ``` For a highly available and scalable service, it will scale better using [Cloud Dataflow](https://cloud.google.com/dataflow/). ```bash -# Run on terminal 1 PROJECT=$(gcloud config get-value project) -WORK_DIR=gs:///cloudml-samples/molecules +WORK_DIR=gs:///cloudml-samples/molecules python predict.py \ --work-dir $WORK_DIR \ --model-dir $MODEL_DIR \ @@ -262,6 +287,7 @@ Now that we have the prediction service running, we want to run a publisher to s For convenience, we provided a sample [`publisher.py`](publisher.py) and [`subscriber.py`](subscriber.py) to show how to implement one. These will have to be run as different processes concurrently, so you'll need to have a different terminal running each command. + > NOTE: remember to activate the `virtualenv` on each terminal. We'll first run the subscriber, which will listen for prediction results and log them. @@ -272,7 +298,8 @@ python subscriber.py \ --topic molecules-predictions ``` -We'll then run the publisher, which will parse SDF files from a directory and publish them to the inputs topic. For convenience, we'll use the same SDF files we used for training. +We'll then run the publisher, which will parse SDF files from a directory and publish them to the inputs topic. +For convenience, we'll use the same SDF files we used for training. ```bash # Run on terminal 3 python publisher.py \ @@ -283,7 +310,8 @@ python publisher.py \ Once the publisher starts parsing and publishing molecules, we'll start seeing predictions from the subscriber. -### Option 3: AI Platform Predictions +#### Option 3: AI Platform Predictions + If you have a different way to extract the features (in this case the atom counts) that is not through our existing preprocessing pipeline for SDF files, it might be easier to build a JSON file with one request per line and make the predictions on AI Platform. We've included the [`sample-requests.json`](sample-requests.json) file with an example of how these requests look like. Here are the contents of the file: @@ -300,14 +328,14 @@ Before creating the model in AI Platform, it is a good idea to test our model's EXPORT_DIR=$WORK_DIR/model/export/final if [[ $EXPORT_DIR == gs://* ]]; then # If it's a GCS path, use gsutil - MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1) + MODEL_DIR=$(gsutil ls -d "$EXPORT_DIR/*" | sort -r | head -n 1) else # If it's a local path, use ls MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1) fi # To do the local predictions -gcloud ml-engine local predict \ +gcloud ai-platform local predict \ --model-dir $MODEL_DIR \ --json-instances sample-requests.json ``` @@ -337,22 +365,39 @@ fi # Now create the model and a version in AI Platform and set it as default MODEL=molecules -REGION=$(gcloud config get-value compute/region) -gcloud ml-engine models create $MODEL \ - --regions $REGION +REGION=us-central1 +gcloud ai-platform models create $MODEL --regions $REGION VERSION="${MODEL}_$(date +%Y%m%d_%H%M%S)" -gcloud ml-engine versions create $VERSION \ +gcloud ai-platform versions create $VERSION \ --model $MODEL \ --origin $MODEL_DIR \ - --runtime-version 1.8 + --runtime-version 1.13 -gcloud ml-engine versions set-default $VERSION \ - --model $MODEL +gcloud ai-platform versions set-default $VERSION --model $MODEL -# Finally, we can request predictions via gcloud ml-engine -gcloud ml-engine predict \ +# Finally, we can request predictions via `gcloud ai-platform` +gcloud ai-platform predict \ --model $MODEL \ --version $VERSION \ --json-instances sample-requests.json ``` + +## Cleanup + +Finally, let's clean up all the Google Cloud resources used, if any. + +```sh +# To delete the model and version +gcloud ai-platform versions delete $VERSION --model $MODEL +gcloud ai-platform models delete $MODEL + +# To delete the inputs topic +gcloud pubsub topics delete molecules-inputs + +# To delete the outputs topic +gcloud pubsub topics delete molecules-predictions + +# To delete the working directory +gsutil -m rm -rf $WORK_DIR +``` diff --git a/molecules/data-extractor.py b/molecules/data-extractor.py index f606ddeef..ec32bcd67 100644 --- a/molecules/data-extractor.py +++ b/molecules/data-extractor.py @@ -1,4 +1,6 @@ -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache +#!/usr/bin/env python + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -11,7 +13,8 @@ # This tool downloads SDF files from an FTP source. -import StringIO +from __future__ import absolute_import + import argparse import ftplib import multiprocessing as mp @@ -21,6 +24,7 @@ import tempfile import tensorflow as tf import zlib +from io import BytesIO # Regular expressions to parse an FTP URI. @@ -71,7 +75,7 @@ def extract_data_file(ftp_file, data_dir): if not tf.gfile.Exists(sdf_file): # The `ftp` object cannot be pickled for multithreading, so we open a # new connection here - memfile = StringIO.StringIO() + memfile = BytesIO() ftp = ftplib.FTP(server, user, password) ftp.retrbinary('RETR ' + path, memfile.write) ftp.quit() @@ -132,15 +136,12 @@ def run(data_sources, filter_regex, max_data_files, data_dir): parser.add_argument( '--work-dir', - type=str, - default=os.path.join( - tempfile.gettempdir(), 'cloudml-samples', 'molecules'), + required=True, help='Directory for staging and working files. ' 'This can be a Google Cloud Storage path.') parser.add_argument( '--data-sources', - type=str, nargs='+', default=['ftp://anonymous:guest@ftp.ncbi.nlm.nih.gov/' 'pubchem/Compound_3D/01_conf_per_cmpd/SDF'], @@ -152,7 +153,6 @@ def run(data_sources, filter_regex, max_data_files, data_dir): parser.add_argument( '--filter-regex', - type=str, default=r'\.sdf', help='Regular expression to filter which files to use. ' 'The regular expression will be searched on the full absolute path. ' diff --git a/molecules/predict.py b/molecules/predict.py index df825e716..9a66d7bec 100644 --- a/molecules/predict.py +++ b/molecules/predict.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -13,6 +13,7 @@ # This tool does either batch or streaming predictions on a trained model. +from __future__ import absolute_import from __future__ import print_function import argparse @@ -31,7 +32,6 @@ from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from tensorflow.python.framework import ops -from tensorflow.python.saved_model import loader class Predict(beam.DoFn): @@ -59,7 +59,7 @@ def process(self, inputs): self.graph = ops.Graph() with self.graph.as_default(): self.session = tf.Session() - metagraph_def = loader.load( + metagraph_def = tf.compat.v1.saved_model.load( self.session, {self.meta_tag}, self.model_dir) signature_def = metagraph_def.signature_def[self.meta_signature] @@ -91,6 +91,7 @@ def process(self, inputs): # [START dataflow_molecules_run_definition] def run(model_dir, feature_extraction, sink, beam_options=None): + print('Listening...') with beam.Pipeline(options=beam_options) as p: _ = (p | 'Feature extraction' >> feature_extraction @@ -107,15 +108,12 @@ def run(model_dir, feature_extraction, sink, beam_options=None): parser.add_argument( '--work-dir', - type=str, - default=os.path.join( - tempfile.gettempdir(), 'cloudml-samples', 'molecules'), + required=True, help='Directory for temporary files and preprocessed datasets to. ' 'This can be a Google Cloud Storage path.') parser.add_argument( '--model-dir', - type=str, required=True, help='Path to the exported TensorFlow model. ' 'This can be a Google Cloud Storage path.') @@ -124,13 +122,11 @@ def run(model_dir, feature_extraction, sink, beam_options=None): batch_verb = verbs.add_parser('batch', help='Batch prediction') batch_verb.add_argument( '--inputs-dir', - type=str, required=True, help='Input directory where SDF data files are read from. ' 'This can be a Google Cloud Storage path.') batch_verb.add_argument( '--outputs-dir', - type=str, required=True, help='Directory to store prediction results. ' 'This can be a Google Cloud Storage path.') @@ -138,13 +134,11 @@ def run(model_dir, feature_extraction, sink, beam_options=None): stream_verb = verbs.add_parser('stream', help='Streaming prediction') stream_verb.add_argument( '--inputs-topic', - type=str, default='molecules-inputs', help='PubSub topic to subscribe for molecules.') stream_verb.add_argument( '--outputs-topic', - type=str, default='molecules-predictions', help='PubSub topic to publish predictions.') @@ -159,7 +153,7 @@ def run(model_dir, feature_extraction, sink, beam_options=None): if args.verb == 'batch': data_files_pattern = os.path.join(args.inputs_dir, '*.sdf') results_prefix = os.path.join(args.outputs_dir, 'part') - source = beam.io.Read(pubchem.ParseSDF(data_files_pattern)) + source = pubchem.ParseSDF(data_files_pattern) sink = beam.io.WriteToText(results_prefix) elif args.verb == 'stream': @@ -171,7 +165,7 @@ def run(model_dir, feature_extraction, sink, beam_options=None): beam_options.view_as(StandardOptions).streaming = True source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format( project, args.inputs_topic)) - sink = beam.io.WriteStringsToPubSub(topic='projects/{}/topics/{}'.format( + sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format( project, args.outputs_topic)) # [END dataflow_molecules_batch_or_stream] diff --git a/molecules/preprocess.py b/molecules/preprocess.py index 5af2ca8ba..dc0987f56 100644 --- a/molecules/preprocess.py +++ b/molecules/preprocess.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -13,8 +13,11 @@ # This tool preprocesses and extracts features from SDF files. +from __future__ import absolute_import + import argparse import dill as pickle +import logging import os import random import tempfile @@ -27,7 +30,6 @@ from apache_beam.io import tfrecordio from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions from tensorflow_transform.beam.tft_beam_io import transform_fn_io from tensorflow_transform.coders import example_proto_coder from tensorflow_transform.tf_metadata import dataset_metadata @@ -62,26 +64,26 @@ def load(filename): class ValidateInputData(beam.DoFn): """This DoFn validates that every element matches the metadata given.""" - def __init__(self, input_metadata): + def __init__(self, feature_spec): super(ValidateInputData, self).__init__() - self.schema_keys = set(input_metadata.schema.column_schemas.keys()) + self.feature_names = set(feature_spec.keys()) def process(self, elem): if not isinstance(elem, dict): raise ValueError( 'Element must be a dict(str, value). ' 'Given: {} {}'.format(elem, type(elem))) - elem_keys = set(elem.keys()) - if not self.schema_keys.issubset(elem_keys): + elem_features = set(elem.keys()) + if not self.feature_names.issubset(elem_features): raise ValueError( - "Element keys are missing from schema keys. " - 'Given: {}; Schema: {}'.format( - list(elem_keys), list(self.schema_keys))) + "Element features are missing from feature_spec keys. " + 'Given: {}; Features: {}'.format( + list(elem_features), list(self.feature_names))) yield elem def run( - input_schema, + input_feature_spec, labels, feature_extraction, feature_scaling=None, @@ -134,9 +136,6 @@ def run( if tf.gfile.Exists(transform_fn_dir): tf.gfile.DeleteRecursively(transform_fn_dir) - input_metadata = dataset_metadata.DatasetMetadata( - dataset_schema.Schema(input_schema)) - # [START dataflow_molecules_create_pipeline] # Build and run a Beam Pipeline with beam.Pipeline(options=beam_options) as p, \ @@ -150,11 +149,15 @@ def run( | 'Feature extraction' >> feature_extraction # [END dataflow_molecules_feature_extraction] # [START dataflow_molecules_validate_inputs] - | 'Validate inputs' >> beam.ParDo(ValidateInputData(input_metadata))) + | 'Validate inputs' >> beam.ParDo(ValidateInputData( + input_feature_spec))) # [END dataflow_molecules_validate_inputs] # [START dataflow_molecules_analyze_and_transform_dataset] # Apply the tf.Transform preprocessing_fn + input_metadata = dataset_metadata.DatasetMetadata( + dataset_schema.from_feature_spec(input_feature_spec)) + dataset_and_metadata, transform_fn = ( (dataset, input_metadata) | 'Feature scaling' >> beam_impl.AnalyzeAndTransformDataset( @@ -194,7 +197,7 @@ def run( # [END dataflow_molecules_write_tfrecords] return PreprocessData( - input_metadata.schema.as_feature_spec(), + input_feature_spec, labels, train_dataset_prefix + '*', eval_dataset_prefix + '*') @@ -204,27 +207,20 @@ def run( """Main function""" parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument( '--work-dir', - type=str, - default=os.path.join( - tempfile.gettempdir(), 'cloudml-samples', 'molecules'), + required=True, help='Directory for staging and working files. ' 'This can be a Google Cloud Storage path.') - args, pipeline_args = parser.parse_known_args() - beam_options = PipelineOptions(pipeline_args) - beam_options.view_as(SetupOptions).save_main_session = True - data_files_pattern = os.path.join(args.work_dir, 'data', '*.sdf') + beam_options = PipelineOptions(pipeline_args, save_main_session=True) preprocess_data = run( - pubchem.INPUT_SCHEMA, + pubchem.FEATURE_SPEC, pubchem.LABELS, # [START dataflow_molecules_feature_extraction_transform] - pubchem.SimpleFeatureExtraction( - beam.io.Read(pubchem.ParseSDF(data_files_pattern))), + pubchem.SimpleFeatureExtraction(pubchem.ParseSDF(data_files_pattern)), # [END dataflow_molecules_feature_extraction_transform] feature_scaling=pubchem.normalize_inputs, beam_options=beam_options, diff --git a/molecules/pubchem/__init__.py b/molecules/pubchem/__init__.py index 8f243ae16..17f3ef491 100644 --- a/molecules/pubchem/__init__.py +++ b/molecules/pubchem/__init__.py @@ -1,2 +1,15 @@ -from pipeline import * -from sdf import * +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache +# License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +from __future__ import absolute_import + +from .pipeline import * +from .sdf import * \ No newline at end of file diff --git a/molecules/pubchem/pipeline.py b/molecules/pubchem/pipeline.py index 9720d5647..0a108c2e8 100644 --- a/molecules/pubchem/pipeline.py +++ b/molecules/pubchem/pipeline.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -11,7 +11,7 @@ # This file contains the shared functionality for preprocess.py and predict.py -import sdf +from __future__ import absolute_import import json import logging @@ -24,52 +24,43 @@ from apache_beam.io import filebasedsource from tensorflow_transform.tf_metadata import dataset_schema +from pubchem import sdf -INPUT_SCHEMA = { + +FEATURE_SPEC = { # Features (inputs) - 'TotalC': dataset_schema.ColumnSchema( - tf.int64, [], dataset_schema.FixedColumnRepresentation()), - 'TotalH': dataset_schema.ColumnSchema( - tf.int64, [], dataset_schema.FixedColumnRepresentation()), - 'TotalO': dataset_schema.ColumnSchema( - tf.int64, [], dataset_schema.FixedColumnRepresentation()), - 'TotalN': dataset_schema.ColumnSchema( - tf.int64, [], dataset_schema.FixedColumnRepresentation()), + 'TotalC': tf.io.FixedLenFeature([], tf.int64), + 'TotalH': tf.io.FixedLenFeature([], tf.int64), + 'TotalO': tf.io.FixedLenFeature([], tf.int64), + 'TotalN': tf.io.FixedLenFeature([], tf.int64), # Labels (outputs/predictions) - 'Energy': dataset_schema.ColumnSchema( - tf.float32, [], dataset_schema.FixedColumnRepresentation()), + 'Energy': tf.io.FixedLenFeature([], tf.float32), } LABELS = ['Energy'] -class ParseSDF(filebasedsource.FileBasedSource): - def __init__(self, file_pattern): - super(ParseSDF, self).__init__(file_pattern, splittable=False) +class ParseSDF(beam.PTransform): + def __init__(self, file_patterns): + super(ParseSDF, self).__init__() + if isinstance(file_patterns, str): + file_patterns = [file_patterns] + self.file_patterns = file_patterns - def in_range(self, range_tracker, position): - """This helper method tries to set the position to where the file left off. - Since this is a non-splittable source, we have to call - `set_current_position`, but if it tries to set the position to a split - point, we still have to call `try_claim`, otherwise an error will be raised. - """ - try: - return range_tracker.set_current_position(position) - except ValueError: - return range_tracker.try_claim(position) - - def read_records(self, filename, range_tracker): - """This yields a dictionary with the sections of the file that we're - interested in. The `range_tracker` allows us to mark the position where - possibly another worker left, so we make sure to start from there. - """ - with self.open_file(filename) as f: - f.seek(range_tracker.start_position() or 0) - while self.in_range(range_tracker, f.tell()): + def expand(self, pcollection): + def parse_molecules(filename): + with tf.gfile.Open(filename) as f: for json_molecule in sdf.parse_molecules(f): yield json_molecule + return ( + pcollection + | 'Create file patterns' >> beam.Create(self.file_patterns) + | 'Expand file patterns' >> beam.FlatMap(tf.gfile.Glob) + | 'Parse molecules' >> beam.ParDo(parse_molecules) + ) + class FormatMolecule(beam.DoFn): def process(self, json_molecule): @@ -148,17 +139,20 @@ def count_by_atom_symbol(self, molecule, atom_symbol): for atom in molecule['atoms']) def process(self, molecule): - uid = int(molecule[''][0]) - label = float(molecule[''][0]) - result = { - 'ID': uid, - 'TotalC': self.count_by_atom_symbol(molecule, 'C'), - 'TotalH': self.count_by_atom_symbol(molecule, 'H'), - 'TotalO': self.count_by_atom_symbol(molecule, 'O'), - 'TotalN': self.count_by_atom_symbol(molecule, 'N'), - 'Energy': label, - } - yield result + try: + uid = int(molecule[''][0]) + label = float(molecule[''][0]) + result = { + 'ID': uid, + 'TotalC': self.count_by_atom_symbol(molecule, 'C'), + 'TotalH': self.count_by_atom_symbol(molecule, 'H'), + 'TotalO': self.count_by_atom_symbol(molecule, 'O'), + 'TotalN': self.count_by_atom_symbol(molecule, 'N'), + 'Energy': label, + } + yield result + except Exception as e: + logging.info('Invalid molecule, skipping: {}'.format(molecule), exc_info=True) # [START dataflow_molecules_simple_feature_extraction] diff --git a/molecules/pubchem/sdf.py b/molecules/pubchem/sdf.py index f5b708a57..34cae03f9 100644 --- a/molecules/pubchem/sdf.py +++ b/molecules/pubchem/sdf.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -11,6 +11,8 @@ # This contains a simple parser for SDF files. +from __future__ import absolute_import + import json MOLECULE_START = '-OEChem-' @@ -23,6 +25,8 @@ def parse_molecules(raw_lines): molecule = None section = None for raw_line in raw_lines: + if isinstance(raw_line, bytes): + raw_line = raw_line.decode('utf-8') line = raw_line.lstrip('>').strip() if not line: continue diff --git a/molecules/publisher.py b/molecules/publisher.py index e494ec120..9a814b7ef 100644 --- a/molecules/publisher.py +++ b/molecules/publisher.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -13,6 +13,9 @@ # This is a sample publisher for the streaming predictions service. +from __future__ import absolute_import +from __future__ import print_function + import argparse import os import sys @@ -23,8 +26,6 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions -from apache_beam.options.pipeline_options import StandardOptions if __name__ == '__main__': @@ -34,22 +35,22 @@ parser.add_argument( '--topic', - type=str, default='molecules-inputs', help='PubSub topic to publish molecules.') parser.add_argument( '--inputs-dir', - type=str, required=True, help='Input directory where SDF data files are read from. ' 'This can be a Google Cloud Storage path.') args, pipeline_args = parser.parse_known_args() - beam_options = PipelineOptions(pipeline_args) - beam_options.view_as(SetupOptions).save_main_session = True - beam_options.view_as(StandardOptions).streaming = True + beam_options = PipelineOptions( + pipeline_args, + save_main_session=True, + streaming=True, + ) project = beam_options.view_as(GoogleCloudOptions).project if not project: @@ -61,5 +62,6 @@ topic_path = 'projects/{}/topics/{}'.format(project, args.topic) with beam.Pipeline(options=beam_options) as p: _ = (p - | 'Read SDF files' >> beam.io.Read(pubchem.ParseSDF(data_files_pattern)) - | 'Publish molecules' >> beam.io.WriteStringsToPubSub(topic=topic_path)) + | 'Read SDF files' >> pubchem.ParseSDF(data_files_pattern) + | 'Print element' >> beam.Map(lambda elem: print(str(elem)[:70] + '...') or elem) + | 'Publish molecules' >> beam.io.WriteToPubSub(topic=topic_path)) diff --git a/molecules/requirements.txt b/molecules/requirements.txt index 6fc994391..388a8e914 100644 --- a/molecules/requirements.txt +++ b/molecules/requirements.txt @@ -1,3 +1,3 @@ -apache-beam[gcp]==2.5 -tensorflow-transform==0.8 -tensorflow==1.8 +apache-beam[gcp]==2.11.* +tensorflow-transform==0.13.* +tensorflow==1.13.* diff --git a/molecules/run-cloud b/molecules/run-cloud index 65f93e4f3..60d22658c 100755 --- a/molecules/run-cloud +++ b/molecules/run-cloud @@ -1,11 +1,23 @@ #!/bin/bash +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache +# License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + set -e # Parse command line arguments unset WORK_DIR MAX_DATA_FILES=5 PROJECT=$(gcloud config get-value project || echo $PROJECT) +REGION=us-central1 while [[ $# -gt 0 ]]; do case $1 in --work-dir) @@ -20,6 +32,10 @@ while [[ $# -gt 0 ]]; do PROJECT=$2 shift ;; + --region) + REGION=$2 + shift + ;; *) echo "error: unrecognized argument $1" exit 1 @@ -70,13 +86,13 @@ echo '' # Train and evaluate the model in AI Platform echo '>> Training' JOB="cloudml_samples_molecules_$(date +%Y%m%d_%H%M%S)" -BUCKET=$(echo $WORK_DIR | egrep -o gs://[-_.a-zA-Z0-9]+) -RUNTIME=1.8 -run gcloud ml-engine jobs submit training $JOB \ +BUCKET=$(echo $WORK_DIR | egrep -o gs://[^/]+) +run gcloud ai-platform jobs submit training $JOB \ --module-name trainer.task \ --package-path trainer \ --staging-bucket $BUCKET \ - --runtime-version $RUNTIME \ + --runtime-version 1.13 \ + --region $REGION \ --stream-logs \ -- \ --work-dir $WORK_DIR @@ -84,7 +100,7 @@ echo '' # Get the model path EXPORT_DIR=$WORK_DIR/model/export/final -MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1) +MODEL_DIR=$(gsutil ls -d "$EXPORT_DIR/*" | sort -r | head -n 1) echo "Model: $MODEL_DIR" echo '' diff --git a/molecules/run-local b/molecules/run-local index fc7300e5c..52d9b99c3 100755 --- a/molecules/run-local +++ b/molecules/run-local @@ -1,5 +1,16 @@ #!/bin/bash +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache +# License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + set -e # Parse command line arguments @@ -51,7 +62,7 @@ echo '' # Get the model path EXPORT_DIR=$WORK_DIR/model/export/final if [[ $EXPORT_DIR == gs://* ]]; then - MODEL_DIR=$(gsutil ls -d $EXPORT_DIR/* | sort -r | head -n 1) + MODEL_DIR=$(gsutil ls -d "$EXPORT_DIR/*" | sort -r | head -n 1) else MODEL_DIR=$(ls -d -1 $EXPORT_DIR/* | sort -r | head -n 1) fi diff --git a/molecules/setup.py b/molecules/setup.py index b554f90f6..622c6f8b5 100644 --- a/molecules/setup.py +++ b/molecules/setup.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -16,9 +16,9 @@ # cloud runs. REQUIRED_PACKAGES = [ - 'apache-beam[gcp]==2.5', - 'tensorflow-transform==0.8', - 'tensorflow==1.8', + 'apache-beam[gcp]==2.11.*', + 'tensorflow-transform==0.13.*', + 'tensorflow==1.13.*', ] setuptools.setup( diff --git a/molecules/subscriber.py b/molecules/subscriber.py index 4f9e31d87..c3f768372 100644 --- a/molecules/subscriber.py +++ b/molecules/subscriber.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -13,6 +13,8 @@ # This is a sample subscriber for the streaming predictions service. +from __future__ import absolute_import + import argparse import json import logging @@ -22,8 +24,6 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions -from apache_beam.options.pipeline_options import StandardOptions if __name__ == '__main__': @@ -33,15 +33,16 @@ parser.add_argument( '--topic', - type=str, default='molecules-predictions', help='PubSub topic to subscribe for predictions.') args, pipeline_args = parser.parse_known_args() - beam_options = PipelineOptions(pipeline_args) - beam_options.view_as(SetupOptions).save_main_session = True - beam_options.view_as(StandardOptions).streaming = True + beam_options = PipelineOptions( + pipeline_args, + save_main_session=True, + streaming=True, + ) project = beam_options.view_as(GoogleCloudOptions).project if not project: @@ -51,6 +52,7 @@ # We'll just log the results logging.basicConfig(level=logging.INFO) + logging.info('Listening...') topic_path = 'projects/{}/topics/{}'.format(project, args.topic) with beam.Pipeline(options=beam_options) as p: _ = (p diff --git a/molecules/trainer/__init__.py b/molecules/trainer/__init__.py index 90092ff99..e69de29bb 100644 --- a/molecules/trainer/__init__.py +++ b/molecules/trainer/__init__.py @@ -1 +0,0 @@ -from task import * diff --git a/molecules/trainer/task.py b/molecules/trainer/task.py index 902948f49..20f767462 100644 --- a/molecules/trainer/task.py +++ b/molecules/trainer/task.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# -# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache + +# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache # License, Version 2.0 (the "License"); you may not use this file except in # compliance with the License. You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 @@ -13,6 +13,8 @@ # This tool trains an ML model on preprocessed data. +from __future__ import absolute_import + import argparse import dill as pickle import multiprocessing as mp @@ -37,13 +39,13 @@ def decode(elem): # For more information, check: # https://www.tensorflow.org/performance/datasets_performance files = tf.data.Dataset.list_files(file_pattern) - dataset = files.apply(tf.contrib.data.parallel_interleave( + dataset = files.apply(tf.data.experimental.parallel_interleave( tf.data.TFRecordDataset, cycle_length=mp.cpu_count())) dataset = dataset.map(decode, num_parallel_calls=mp.cpu_count()) dataset = dataset.take(-1) if mode == tf.estimator.ModeKeys.TRAIN: if shuffle: - dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat( + dataset = dataset.apply(tf.data.experimental.shuffle_and_repeat( batch_size * 8)) else: dataset = dataset.cache() @@ -179,9 +181,7 @@ def train_and_evaluate( parser.add_argument( '--work-dir', - type=str, - default=os.path.join( - tempfile.gettempdir(), 'cloudml-samples', 'molecules'), + required=True, help='Directory for staging and working files. ' 'This can be a Google Cloud Storage path.')