Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
Molecules update (#427)
Browse files Browse the repository at this point in the history
* Added gitignore

* Updated README

* Simplified and optimized parsing DoFn

* Updated versions

* Added copyright

* Added --region option

* Updated README

* Updated, cleanup and made Python 3 compatible
  • Loading branch information
David Cavazos authored and nnegrey committed Jun 4, 2019
1 parent d8211b3 commit bb6fbcd
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 217 deletions.
130 changes: 130 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Visual Studio Code
.vscode/*
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
2 changes: 0 additions & 2 deletions molecules/.gitignore

This file was deleted.

199 changes: 122 additions & 77 deletions molecules/README.md

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions molecules/data-extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache
#!/usr/bin/env python

# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache
# License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -11,7 +13,8 @@

# This tool downloads SDF files from an FTP source.

import StringIO
from __future__ import absolute_import

import argparse
import ftplib
import multiprocessing as mp
Expand All @@ -21,6 +24,7 @@
import tempfile
import tensorflow as tf
import zlib
from io import BytesIO


# Regular expressions to parse an FTP URI.
Expand Down Expand Up @@ -71,7 +75,7 @@ def extract_data_file(ftp_file, data_dir):
if not tf.gfile.Exists(sdf_file):
# The `ftp` object cannot be pickled for multithreading, so we open a
# new connection here
memfile = StringIO.StringIO()
memfile = BytesIO()
ftp = ftplib.FTP(server, user, password)
ftp.retrbinary('RETR ' + path, memfile.write)
ftp.quit()
Expand Down Expand Up @@ -132,15 +136,12 @@ def run(data_sources, filter_regex, max_data_files, data_dir):

parser.add_argument(
'--work-dir',
type=str,
default=os.path.join(
tempfile.gettempdir(), 'cloudml-samples', 'molecules'),
required=True,
help='Directory for staging and working files. '
'This can be a Google Cloud Storage path.')

parser.add_argument(
'--data-sources',
type=str,
nargs='+',
default=['ftp://anonymous:[email protected]/'
'pubchem/Compound_3D/01_conf_per_cmpd/SDF'],
Expand All @@ -152,7 +153,6 @@ def run(data_sources, filter_regex, max_data_files, data_dir):

parser.add_argument(
'--filter-regex',
type=str,
default=r'\.sdf',
help='Regular expression to filter which files to use. '
'The regular expression will be searched on the full absolute path. '
Expand Down
22 changes: 8 additions & 14 deletions molecules/predict.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
# Copyright 2018 Google Inc. All Rights Reserved. Licensed under the Apache

# Copyright 2019 Google Inc. All Rights Reserved. Licensed under the Apache
# License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -13,6 +13,7 @@

# This tool does either batch or streaming predictions on a trained model.

from __future__ import absolute_import
from __future__ import print_function

import argparse
Expand All @@ -31,7 +32,6 @@
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from tensorflow.python.framework import ops
from tensorflow.python.saved_model import loader


class Predict(beam.DoFn):
Expand Down Expand Up @@ -59,7 +59,7 @@ def process(self, inputs):
self.graph = ops.Graph()
with self.graph.as_default():
self.session = tf.Session()
metagraph_def = loader.load(
metagraph_def = tf.compat.v1.saved_model.load(
self.session, {self.meta_tag}, self.model_dir)
signature_def = metagraph_def.signature_def[self.meta_signature]

Expand Down Expand Up @@ -91,6 +91,7 @@ def process(self, inputs):

# [START dataflow_molecules_run_definition]
def run(model_dir, feature_extraction, sink, beam_options=None):
print('Listening...')
with beam.Pipeline(options=beam_options) as p:
_ = (p
| 'Feature extraction' >> feature_extraction
Expand All @@ -107,15 +108,12 @@ def run(model_dir, feature_extraction, sink, beam_options=None):

parser.add_argument(
'--work-dir',
type=str,
default=os.path.join(
tempfile.gettempdir(), 'cloudml-samples', 'molecules'),
required=True,
help='Directory for temporary files and preprocessed datasets to. '
'This can be a Google Cloud Storage path.')

parser.add_argument(
'--model-dir',
type=str,
required=True,
help='Path to the exported TensorFlow model. '
'This can be a Google Cloud Storage path.')
Expand All @@ -124,27 +122,23 @@ def run(model_dir, feature_extraction, sink, beam_options=None):
batch_verb = verbs.add_parser('batch', help='Batch prediction')
batch_verb.add_argument(
'--inputs-dir',
type=str,
required=True,
help='Input directory where SDF data files are read from. '
'This can be a Google Cloud Storage path.')
batch_verb.add_argument(
'--outputs-dir',
type=str,
required=True,
help='Directory to store prediction results. '
'This can be a Google Cloud Storage path.')

stream_verb = verbs.add_parser('stream', help='Streaming prediction')
stream_verb.add_argument(
'--inputs-topic',
type=str,
default='molecules-inputs',
help='PubSub topic to subscribe for molecules.')

stream_verb.add_argument(
'--outputs-topic',
type=str,
default='molecules-predictions',
help='PubSub topic to publish predictions.')

Expand All @@ -159,7 +153,7 @@ def run(model_dir, feature_extraction, sink, beam_options=None):
if args.verb == 'batch':
data_files_pattern = os.path.join(args.inputs_dir, '*.sdf')
results_prefix = os.path.join(args.outputs_dir, 'part')
source = beam.io.Read(pubchem.ParseSDF(data_files_pattern))
source = pubchem.ParseSDF(data_files_pattern)
sink = beam.io.WriteToText(results_prefix)

elif args.verb == 'stream':
Expand All @@ -171,7 +165,7 @@ def run(model_dir, feature_extraction, sink, beam_options=None):
beam_options.view_as(StandardOptions).streaming = True
source = beam.io.ReadFromPubSub(topic='projects/{}/topics/{}'.format(
project, args.inputs_topic))
sink = beam.io.WriteStringsToPubSub(topic='projects/{}/topics/{}'.format(
sink = beam.io.WriteToPubSub(topic='projects/{}/topics/{}'.format(
project, args.outputs_topic))
# [END dataflow_molecules_batch_or_stream]

Expand Down
Loading

0 comments on commit bb6fbcd

Please sign in to comment.