From 27a5eb6642e1c5c79f1dc13b506e1fb935e9fdef Mon Sep 17 00:00:00 2001 From: Mohamedelfatih Mohamedkhair Date: Thu, 22 Feb 2024 03:10:46 -0800 Subject: [PATCH] No public description PiperOrigin-RevId: 609308154 --- requirements.txt | 6 +- src/create_labeled_dataset.py | 10 +- src/create_labeling_examples.py | 8 - src/skai/cloud_labeling.py | 3 - src/skai/labeling.py | 202 ++++++++---------- src/skai/model/train.py | 2 +- .../model/xm_launch_single_model_vertex.py | 34 ++- .../model/xmanager_external_metric_logger.py | 2 +- src/skai/open_street_map.py | 2 +- src/tools/examples_to_csv.py | 81 +++---- 10 files changed, 146 insertions(+), 204 deletions(-) diff --git a/requirements.txt b/requirements.txt index b5b57f1c..550fcfb2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,14 @@ absl-py>=0.12.0 -apache_beam==2.54.0 +apache_beam==2.53.0 earthengine-api gcsfs -geopandas>=0.8 +geopandas google_apitools google-cloud-aiplatform>=1.35.0 ml-collections numpy opencv-python -pandas>=2 +pandas<2.0.0 pillow pyproj pytest diff --git a/src/create_labeled_dataset.py b/src/create_labeled_dataset.py index 317fb593..f9e47f42 100644 --- a/src/create_labeled_dataset.py +++ b/src/create_labeled_dataset.py @@ -18,7 +18,6 @@ merged with the original Tensorflow Examples in order to create a labeled training and test set. """ -import multiprocessing import random from absl import app @@ -62,11 +61,6 @@ True, 'If true, starts multiple processes to run task.', ) -flags.DEFINE_integer( - 'max_processes', - multiprocessing.cpu_count(), - 'If using multiprocessing, the maximum number of processes to use.', -) def main(unused_argv): @@ -95,9 +89,7 @@ def main(unused_argv): FLAGS.train_output_path, FLAGS.test_output_path, FLAGS.connecting_distance_meters, - FLAGS.use_multiprocessing, - None, - FLAGS.max_processes) + FLAGS.use_multiprocessing) if __name__ == '__main__': diff --git a/src/create_labeling_examples.py b/src/create_labeling_examples.py index 2a30ba18..d220ece5 100644 --- a/src/create_labeling_examples.py +++ b/src/create_labeling_examples.py @@ -30,7 +30,6 @@ """ # pylint: enable=line-too-long -import multiprocessing import sys from absl import app @@ -60,11 +59,6 @@ True, 'If true, starts multiple processes to run task.', ) -flags.DEFINE_integer( - 'max_processes', - multiprocessing.cpu_count(), - 'If using multiprocessing, the maximum number of processes to use.', -) flags.DEFINE_float( 'buffered_sampling_radius', 70.0, @@ -90,8 +84,6 @@ def main(unused_argv): FLAGS.exclude_import_file_patterns, FLAGS.output_dir, FLAGS.use_multiprocessing, - None, - FLAGS.max_processes, FLAGS.buffered_sampling_radius, ) diff --git a/src/skai/cloud_labeling.py b/src/skai/cloud_labeling.py index 3f83072f..88e74722 100644 --- a/src/skai/cloud_labeling.py +++ b/src/skai/cloud_labeling.py @@ -885,9 +885,6 @@ def _merge_examples_and_labels( for result in results: all_labeled_examples.extend(result) - if not all_labeled_examples: - raise ValueError('No examples found matching labels.') - train_examples, test_examples = _split_examples( all_labeled_examples, test_fraction, connecting_distance_meters ) diff --git a/src/skai/labeling.py b/src/skai/labeling.py index c3f27619..0db2d36a 100644 --- a/src/skai/labeling.py +++ b/src/skai/labeling.py @@ -18,8 +18,9 @@ import functools import multiprocessing import os +import queue import random -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import Dict, Iterable, List, Optional, Set, Tuple from absl import logging import geopandas as gpd @@ -212,8 +213,6 @@ def create_labeling_images( excluded_import_file_patterns: List[str], output_dir: str, use_multiprocessing: bool, - multiprocessing_context: Any, - max_processes: int, buffered_sampling_radius: float, ) -> Tuple[int, Optional[str]]: """Creates PNGs used for labeling from TFRecords. @@ -232,9 +231,6 @@ def create_labeling_images( output_dir: Output directory. use_multiprocessing: If true, create multiple processes to create labeling images. - multiprocessing_context: Context to spawn processes with when using - multiprocessing. - max_processes: Maximum number of processes. buffered_sampling_radius: The minimum distance between two examples for the two examples to be in the labeling task. @@ -248,11 +244,21 @@ def create_labeling_images( f'Example pattern {examples_pattern} did not match any files.' ) + excluded_example_ids = set() + if excluded_import_file_patterns: + for pattern in excluded_import_file_patterns: + for path in tf.io.gfile.glob(pattern): + logging.info('Excluding example ids from "%s"', path) + excluded_example_ids.update(_read_example_ids_from_import_file(path)) + logging.info('Excluding %d example ids', len(excluded_example_ids)) + + allowed_example_ids = None if allowed_example_ids_path: with tf.io.gfile.GFile(allowed_example_ids_path, 'r') as f: allowed_example_ids = set(line.strip() for line in f) logging.info('Allowing %d example ids', len(allowed_example_ids)) else: + df_metadata = None metadata_path = str( os.path.join( '/'.join(examples_pattern.split('/')[:-2]), @@ -270,17 +276,6 @@ def create_labeling_images( ' PATH_DIR/examples/' ) from error - if excluded_import_file_patterns: - excluded_example_ids = set() - for pattern in excluded_import_file_patterns: - for path in tf.io.gfile.glob(pattern): - logging.info('Excluding example ids from "%s"', path) - excluded_example_ids.update(_read_example_ids_from_import_file(path)) - logging.info('Excluding %d example ids', len(excluded_example_ids)) - df_metadata = df_metadata[ - df_metadata['example_id'].isin(excluded_example_ids) - ] - logging.info( 'Randomly searching for buffered samples with buffer radius %.2f' ' metres...', @@ -301,93 +296,73 @@ def create_labeling_images( len(allowed_example_ids), buffered_sampling_radius, ) + max_images = len(allowed_example_ids) - all_images = [] if use_multiprocessing: - def accumulate(images: list[tuple[int, str, str, str]]) -> None: - all_images.extend(images) + labeling_images_info_queue = multiprocessing.Manager().Queue( + maxsize=max_images + ) + num_workers = min(multiprocessing.cpu_count(), len(example_files)) - num_workers = min( - multiprocessing.cpu_count(), len(example_files), max_processes) - if multiprocessing_context: - pool = multiprocessing_context.Pool(num_workers) - else: - pool = multiprocessing.Pool(num_workers) + arg_list = [] for example_file in example_files: - pool.apply_async( + arg_list.append(( + example_file, + output_dir, + allowed_example_ids, + excluded_example_ids, + labeling_images_info_queue, + )) + + with multiprocessing.Pool(num_workers) as pool_executor: + _ = pool_executor.starmap( _create_labeling_images_from_example_file, - args=(example_file, output_dir, allowed_example_ids), - callback=accumulate, - error_callback=print, + arg_list, ) - pool.close() - pool.join() else: + labeling_images_info_queue = queue.Queue(maxsize=max_images) for example_file in example_files: - all_images.extend( - _create_labeling_images_from_example_file( - example_file, - output_dir, - allowed_example_ids, - ) + _create_labeling_images_from_example_file( + example_file, + output_dir, + allowed_example_ids, + excluded_example_ids, + labeling_images_info_queue, ) - if not all_images: + if labeling_images_info_queue.empty(): return 0, None image_metadata_csv = os.path.join( output_dir, 'image_metadata.csv' ) - num_images = len(all_images) + num_images = labeling_images_info_queue.qsize() with tf.io.gfile.GFile(image_metadata_csv, 'w') as f: - f.write( - 'id,int64_id,example_id,image,image_source_path,tfrecord_source_path\n' - ) - for int64_id, example_id, image_path, tfrecord_source_path in all_images: + while not labeling_images_info_queue.empty(): + if labeling_images_info_queue.qsize() == num_images: + f.write( + 'id,int64_id,example_id,image,' + + 'image_source_path,tfrecord_source_path\n' + ) + int64_id, example_id, image_path, tfrecord_source_path = ( + labeling_images_info_queue.get() + ) f.write( f'{int64_id},{int64_id},{example_id},' + f'file://{image_path},{image_path},{tfrecord_source_path}\n' ) - return num_images, image_metadata_csv - - -def _tfrecord_iterator(path: str) -> tf.train.Example: - """Creates an iterator over TFRecord files. - - Supports both eager and non-eager execution. - Args: - path: Path to TFRecord file. - - Yields: - Examples from the TFRecord file. - """ - ds = tf.data.TFRecordDataset([path]).prefetch(tf.data.AUTOTUNE) - if tf.executing_eagerly(): - for record in ds: - example = tf.train.Example() - example.ParseFromString(record.numpy()) - yield example - else: - iterator = tf.compat.v1.data.make_one_shot_iterator(ds) - next_element = iterator.get_next() - with tf.compat.v1.Session() as sess: - while True: - try: - value = sess.run(next_element) - except tf.errors.OutOfRangeError: - return - example = tf.train.Example() - example.ParseFromString(value) - yield example + return num_images, image_metadata_csv def _create_labeling_images_from_example_file( example_file: str, output_dir: str, allowed_example_ids: Set[str], -) -> list[tuple[int, str, str, str]]: + excluded_example_ids: Set[str], + labeling_images_info_queue: queue.Queue[tuple[int, str, str, str]], +) -> None: """Creates PNGs used for labeling from TFRecords for a single example_file. Also writes an import file in CSV format that is used to upload the images @@ -398,12 +373,13 @@ def _create_labeling_images_from_example_file( output_dir: Output directory. allowed_example_ids: Set of example_id from which a subset will be used in creating labeling task. - - Returns: - List of tuples of int64_id, example_id, image path, example path. + excluded_example_ids: Set of example_id to be excluded. + labeling_images_info_queue: List of tuples int64_id, example_id, paths to + images created for labeling task. """ - images = [] - for example in _tfrecord_iterator(example_file): + for record in tf.data.TFRecordDataset([example_file]): + example = Example() + example.ParseFromString(record.numpy()) if 'example_id' in example.features.feature: example_id = ( example.features.feature['example_id'].bytes_list.value[0].decode() @@ -432,7 +408,12 @@ def _create_labeling_images_from_example_file( else: plus_code = 'unknown' - if example_id not in allowed_example_ids: + if (allowed_example_ids is not None + and example_id not in allowed_example_ids): + continue + + if example_id in excluded_example_ids: + logging.info('"%s" excluded', example_id) continue before_image = utils.deserialize_image( @@ -449,11 +430,15 @@ def _create_labeling_images_from_example_file( labeling_image_bytes = utils.serialize_image(labeling_image, 'png') path = os.path.join(output_dir, f'{example_id}.png') - with tf.io.gfile.GFile(path, 'wb') as writer: - writer.write(labeling_image_bytes) - images.append((int64_id, str(example_id), path, example_file)) + try: + _ = labeling_images_info_queue.put_nowait( + [int64_id, str(example_id), path, example_file] + ) + with tf.io.gfile.GFile(path, 'w') as writer: + writer.write(labeling_image_bytes) - return images + except queue.Full: + break def _write_tfrecord(examples: Iterable[Example], path: str) -> None: @@ -631,7 +616,9 @@ def _merge_single_example_file_and_labels( List of TF examples merged with labels for a single example_file. """ labeled_examples = [] - for example in _tfrecord_iterator(example_file): + for record in tf.data.TFRecordDataset([example_file]): + example = Example() + example.ParseFromString(record.numpy()) if 'example_id' in example.features.feature: example_id = ( example.features.feature['example_id'].bytes_list.value[0].decode() @@ -684,8 +671,6 @@ def _merge_examples_and_labels( test_output_path: str, connecting_distance_meters: float, use_multiprocessing: bool, - multiprocessing_context: Any, - max_processes: int, ) -> None: """Merges examples with labels into train and test TFRecords. @@ -699,9 +684,6 @@ def _merge_examples_and_labels( connecting_distance_meters: Maximum distance for two points to be connected. use_multiprocessing: If true, create multiple processes to create labeled examples. - multiprocessing_context: Context to spawn processes with when using - multiprocessing. - max_processes: Maximum number of processes. """ example_files = tf.io.gfile.glob(examples_pattern) @@ -714,21 +696,15 @@ def _merge_examples_and_labels( ) if use_multiprocessing: - num_workers = min( - multiprocessing.cpu_count(), len(example_files), max_processes - ) - if multiprocessing_context: - pool = multiprocessing_context.Pool(num_workers) - else: - pool = multiprocessing.Pool(num_workers) - - logging.info('Using multiprocessing with %d processes.', num_workers) - results = pool.map( - functools.partial( - _merge_single_example_file_and_labels, labels=labels - ), - example_files, - ) + num_workers = min(multiprocessing.cpu_count(), len(example_files)) + with multiprocessing.Pool(num_workers) as pool_executor: + logging.info('Using multiprocessing with %d processes.', num_workers) + results = pool_executor.map( + functools.partial( + _merge_single_example_file_and_labels, labels=labels + ), + example_files, + ) else: logging.info('Not using multiprocessing.') results = [ @@ -740,9 +716,6 @@ def _merge_examples_and_labels( for result in results: all_labeled_examples.extend(result) - if not all_labeled_examples: - raise ValueError('No examples found matching labels.') - train_examples, test_examples = _split_examples( all_labeled_examples, test_fraction, connecting_distance_meters ) @@ -787,9 +760,7 @@ def create_labeled_examples( train_output_path: str, test_output_path: str, connecting_distance_meters: float, - use_multiprocessing: bool, - multiprocessing_context: Any, - max_processes: int) -> None: + use_multiprocessing: bool) -> None: """Creates a labeled dataset by merging cloud labels and unlabeled examples. Args: @@ -803,9 +774,6 @@ def create_labeled_examples( connecting_distance_meters: Maximum distance for two points to be connected. use_multiprocessing: If true, create multiple processes to create labeled examples. - multiprocessing_context: Context to spawn processes with when using - multiprocessing. - max_processes: Maximum number of processes. """ string_to_numeric_map = {} for string_to_numeric_label in string_to_numeric_labels: @@ -846,6 +814,4 @@ def create_labeled_examples( test_output_path, connecting_distance_meters, use_multiprocessing, - multiprocessing_context, - max_processes ) diff --git a/src/skai/model/train.py b/src/skai/model/train.py index c0a08828..85bf8fc1 100644 --- a/src/skai/model/train.py +++ b/src/skai/model/train.py @@ -66,7 +66,7 @@ def get_model_dir(root_dir: str) -> str: - if FLAGS.is_vertex and FLAGS.trial_name: + if FLAGS.is_vertex: basename = FLAGS.trial_name.split('/')[-1] else: # TODO(skai) - Maybe change directory name in case vertex ai is not used in diff --git a/src/skai/model/xm_launch_single_model_vertex.py b/src/skai/model/xm_launch_single_model_vertex.py index 43ea0e9a..097fbe8f 100644 --- a/src/skai/model/xm_launch_single_model_vertex.py +++ b/src/skai/model/xm_launch_single_model_vertex.py @@ -233,29 +233,19 @@ def main(_) -> None: if FLAGS.cloud_location is None: raise ValueError('Google Cloud location is either None or invalid.') + vizier_cloud.VizierExploration( + experiment=experiment, + job=xm.Job( + executable=train_executable, executor=executor, args=job_args + ), + study_factory=vizier_cloud.NewStudy( + study_config=get_study_config(), + location=FLAGS.cloud_location + ), - if FLAGS.use_vizier: - vizier_cloud.VizierExploration( - experiment=experiment, - job=xm.Job( - executable=train_executable, executor=executor, args=job_args - ), - study_factory=vizier_cloud.NewStudy( - study_config=get_study_config(), - location=FLAGS.cloud_location - ), - - num_trials_total=100, - num_parallel_trial_runs=3, - ).launch() - else: - experiment.add( - xm.Job( - executable=train_executable, - executor=executor, - args=job_args, - ) - ) + num_trials_total=100, + num_parallel_trial_runs=3, + ).launch() if __name__ == '__main__': diff --git a/src/skai/model/xmanager_external_metric_logger.py b/src/skai/model/xmanager_external_metric_logger.py index d25faaca..acd59abd 100644 --- a/src/skai/model/xmanager_external_metric_logger.py +++ b/src/skai/model/xmanager_external_metric_logger.py @@ -59,7 +59,7 @@ def log_scalar_metric( if xm_label == 'epoch_main_aucpr_1_vs_rest_val': self.worker.add_trial_measurement(step, {xm_label: metric_value}) else: - with self._get_summary_writer(is_val_metric)[0].as_default(): + with self._get_summary_writer(is_val_metric).as_default(): tf.summary.scalar(metric_label, metric_value, step=step) def _get_summary_writer( diff --git a/src/skai/open_street_map.py b/src/skai/open_street_map.py index 97142d87..9d1886da 100644 --- a/src/skai/open_street_map.py +++ b/src/skai/open_street_map.py @@ -138,5 +138,5 @@ def get_building_centroids_in_regions( for region in regions: polygons.extend(get_buildings_in_region(region, overpass_url)) buildings.write_buildings_file( - gpd.GeoDataFrame(geometry=polygons, crs=4326), output_path + gpd.GeoDataFrame(geometry=polygons), output_path ) diff --git a/src/tools/examples_to_csv.py b/src/tools/examples_to_csv.py index ddcf0374..f4ca1e1c 100644 --- a/src/tools/examples_to_csv.py +++ b/src/tools/examples_to_csv.py @@ -17,11 +17,13 @@ Useful for performing analysis on the examples. """ -import collections +from collections.abc import Sequence +import multiprocessing from absl import app from absl import flags import pandas as pd +from skai import utils import tensorflow as tf import tqdm @@ -29,53 +31,56 @@ FLAGS = flags.FLAGS flags.DEFINE_string('examples_pattern', None, 'Examples pattern', required=True) flags.DEFINE_string('output_path', None, 'Output path.', required=True) +flags.DEFINE_bool('parallel', False, 'Read TFRecords in parallel.') -def _single_parse_function(example): - features = { - 'example_id': tf.io.FixedLenFeature(1, tf.string), - 'plus_code': tf.io.FixedLenFeature(1, tf.string), - 'pre_image_id': tf.io.FixedLenFeature(1, tf.string), - 'post_image_id': tf.io.FixedLenFeature(1, tf.string), - 'encoded_coordinates': tf.io.FixedLenFeature(1, tf.string), - 'string_label': tf.io.FixedLenFeature(1, tf.string), - 'coordinates': tf.io.FixedLenFeature((2,), tf.float32), - } - return tf.io.parse_single_example(example, features=features) - - -def read_tfrecords(pattern: str) -> pd.DataFrame: - """Reads TFRecords and returns Pandas DataFrame with metadata. +def read_single_tfrecord(path: str) -> pd.DataFrame: + """Reads example properties from a single TFRecord. Args: - pattern: File pattern for TFRecords. + path: TFRecord path. Returns: - DataFrame with example metadata. + DataFrame with example properties. """ - paths = tf.io.gfile.glob(pattern) - ds = tf.data.Dataset.from_tensor_slices(paths) - ds = ds.interleave( - tf.data.TFRecordDataset, - cycle_length=len(paths), - num_parallel_calls=tf.data.AUTOTUNE, - deterministic=False, - ) - ds = ds.map(_single_parse_function, num_parallel_calls=tf.data.AUTOTUNE) - ds = ds.prefetch(tf.data.AUTOTUNE) - feature_values = collections.defaultdict(list) - for x in tqdm.tqdm(ds.as_numpy_iterator(), smoothing=0): - for feature, values_array in x.items(): - if feature == 'coordinates': - feature_values['longitude'].append(values_array[0]) - feature_values['latitude'].append(values_array[1]) - else: - feature_values[feature].append(values_array[0].decode()) - return pd.DataFrame(feature_values) + example_properties = [] + for record in tf.data.TFRecordDataset([path]).as_numpy_iterator(): + example = tf.train.Example() + example.ParseFromString(record) + longitude, latitude = utils.get_float_feature(example, 'coordinates') + properties = { + 'longitude': longitude, + 'latitude': latitude, + } + for string_prop in [ + 'example_id', + 'plus_code', + 'pre_image_id', + 'post_image_id', + 'encoded_coordinates', + 'string_label', + ]: + properties[string_prop] = utils.get_bytes_feature(example, string_prop)[ + 0 + ].decode() + example_properties.append(properties) + return pd.DataFrame(example_properties) + + +def read_tfrecords(paths: Sequence[str]) -> pd.DataFrame: + if FLAGS.parallel: + num_workers = min(multiprocessing.cpu_count(), len(paths)) + with multiprocessing.Pool(num_workers) as executor: + results = tqdm.tqdm( + executor.imap(read_single_tfrecord, paths), total=len(paths) + ) + else: + results = [read_single_tfrecord(p) for p in tqdm.tqdm(paths)] + return pd.concat(results) def main(_) -> None: - df = read_tfrecords(FLAGS.examples_pattern) + df = read_tfrecords(tf.io.gfile.glob(FLAGS.examples_pattern)) with tf.io.gfile.GFile(FLAGS.output_path, 'w') as f: df.to_csv(f, index=False)