Skip to content

Commit

Permalink
Migrate to PyFilesystem and support multiple input dirs (#352)
Browse files Browse the repository at this point in the history
- Replace all uses of FileHandler with new PyFilesystem wrapper classes.
- Update config key lookups to use new file matching util. This allows
specifying matching by either file name (already supported) or path
(new), with support for wildcards.
- Put input subdirectory traversal behind a config option
`includeInputSubdirs` which defaults to false.
  • Loading branch information
hqpho authored Nov 18, 2024
1 parent c3030d7 commit 98cd40c
Show file tree
Hide file tree
Showing 44 changed files with 1,110 additions and 731 deletions.
126 changes: 69 additions & 57 deletions simple/stats/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re

from stats import constants
from stats.data import AggregationConfig
from stats.data import EntityType
Expand All @@ -23,6 +21,8 @@
from stats.data import Provenance
from stats.data import Source
from stats.data import StatVar
from util.file_match import match
from util.filesystem import File

_INPUT_FILES_FIELD = "inputFiles"
_IMPORT_TYPE_FIELD = "importType"
Expand Down Expand Up @@ -55,6 +55,7 @@
_GROUP_STAT_VARS_BY_PROPERTY = "groupStatVarsByProperty"
_GENERATE_TOPICS = "generateTopics"
_OBSERVATION_PROPERTIES = "observationProperties"
_INCLUDE_INPUT_SUBDIRS_PROPERTY = "includeInputSubdirs"


class Config:
Expand All @@ -67,11 +68,12 @@ def __init__(self, data: dict) -> None:
self.data = data
self._input_files_config: dict[str, dict] = self.data.get(
_INPUT_FILES_FIELD, {})
# If input file names are specified with wildcards - e.g. "foo*.csv",
# this dict maintains a mapping from actual file name to the wildcard key
# If input file paths are specified with wildcards - e.g. "gs://bucket/foo*.csv",
# this dict maintains a mapping from actual file path to the wildcard key
# for fast lookup.
# e.g. "foo1.csv" -> "foo*.csv", "foo2.csv" -> "foo*.csv", etc.
self._input_file_name_keys: dict[str, str] = {}
# e.g. "foo1.csv" -> "foo*.csv", "foo2.csv" -> "foo*.csv",
# "path/to/foo.csv" -> "**/foo.csv, etc.
self._config_key_by_full_path: dict[str, str] = {}
# dict from provenance name to Provenance
self.provenances: dict[str, Provenance] = {}
# dict from provenance name to Source
Expand All @@ -87,28 +89,29 @@ def data_download_urls(self) -> list[str]:
raise ValueError(
f"{_DATA_DOWNLOAD_URL_FIELD} can only be a list, found: {cfg}")

def import_type(self, input_file_name: str) -> ImportType:
import_type_str = self._input_file(input_file_name).get(_IMPORT_TYPE_FIELD)
def import_type(self, input_file: File) -> ImportType:
import_type_str = self._per_file_config(input_file).get(_IMPORT_TYPE_FIELD)
if not import_type_str:
return ImportType.OBSERVATIONS
if import_type_str not in iter(ImportType):
raise ValueError(
f"Unsupported import type: {import_type_str} ({input_file_name})")
f"Unsupported import type: {import_type_str} ({input_file.full_path()})"
)
return ImportType(import_type_str)

def format(self, input_file_name: str) -> ImportType | None:
format_str = self._input_file(input_file_name).get(_FORMAT_FIELD)
def format(self, input_file: File) -> ImportType | None:
format_str = self._per_file_config(input_file).get(_FORMAT_FIELD)
if not format_str:
return None
if format_str not in iter(InputFileFormat):
raise ValueError(f"Unsupported format: {format_str} ({input_file_name})")
raise ValueError(f"Unsupported format: {format_str} ({input_file})")
return InputFileFormat(format_str)

def column_mappings(self, input_file_name: str) -> dict[str, str]:
return self._input_file(input_file_name).get(_COLUMN_MAPPINGS_FIELD, {})
def column_mappings(self, input_file: File) -> dict[str, str]:
return self._per_file_config(input_file).get(_COLUMN_MAPPINGS_FIELD, {})

def computed_variables(self, input_file_name: str) -> list[str]:
return self._input_file(input_file_name).get(_COMPUTED_VARIABLES_FIELD, [])
def computed_variables(self, input_file: File) -> list[str]:
return self._per_file_config(input_file).get(_COMPUTED_VARIABLES_FIELD, [])

def variable(self, variable_name: str) -> StatVar:
var_cfg = self.data.get(_VARIABLES_FIELD, {}).get(variable_name, {})
Expand All @@ -130,8 +133,8 @@ def aggregation(self, variable_name: str) -> AggregationConfig:
.get(_AGGREGATION_FIELD, {})
return AggregationConfig(**aggregation_cfg)

def event_type(self, input_file_name: str) -> str:
return self._input_file(input_file_name).get(_EVENT_TYPE_FIELD, "")
def event_type(self, input_file: File) -> str:
return self._per_file_config(input_file).get(_EVENT_TYPE_FIELD, "")

def event(self, event_type_name: str) -> EventType:
event_type_cfg = self.data.get(_EVENTS_FIELD, {}).get(event_type_name, {})
Expand All @@ -146,73 +149,82 @@ def entity(self, entity_type_name: str) -> EntityType:
entity_type_cfg.get(_NAME_FIELD, entity_type_name),
description=entity_type_cfg.get(_DESCRIPTION_FIELD, ""))

def id_column(self, input_file_name: str) -> str:
return self._input_file(input_file_name).get(_ID_COLUMN_FIELD, "")
def id_column(self, input_file: File) -> str:
return self._per_file_config(input_file).get(_ID_COLUMN_FIELD, "")

def entity_type(self, input_file_name: str) -> str:
return self._input_file(input_file_name).get(_ENTITY_TYPE_FIELD, "")
def entity_type(self, input_file: File) -> str:
return self._per_file_config(input_file).get(_ENTITY_TYPE_FIELD, "")

def ignore_columns(self, input_file_name: str) -> list[str]:
return self._input_file(input_file_name).get(_IGNORE_COLUMNS_FIELD, [])
def ignore_columns(self, input_file: File) -> list[str]:
return self._per_file_config(input_file).get(_IGNORE_COLUMNS_FIELD, [])

def provenance_name(self, input_file_name: str) -> str:
return self._input_file(input_file_name).get(_PROVENANCE_FIELD,
input_file_name)
def provenance_name(self, input_file: File) -> str:
return self._per_file_config(input_file).get(_PROVENANCE_FIELD,
input_file.path)

def row_entity_type(self, input_file_name: str) -> str:
return self._input_file(input_file_name).get(_ROW_ENTITY_TYPE_FIELD, "")
def row_entity_type(self, input_file: File) -> str:
return self._per_file_config(input_file).get(_ROW_ENTITY_TYPE_FIELD, "")

def entity_columns(self, input_file_name: str) -> list[str]:
return self._input_file(input_file_name).get(_ENTITY_COLUMNS, [])
def entity_columns(self, input_file: File) -> list[str]:
return self._per_file_config(input_file).get(_ENTITY_COLUMNS, [])

def observation_properties(self, input_file_name: str) -> dict[str, str]:
return self._input_file(input_file_name).get(_OBSERVATION_PROPERTIES, {})
def observation_properties(self, input_file: File) -> dict[str, str]:
return self._per_file_config(input_file).get(_OBSERVATION_PROPERTIES, {})

def database(self) -> dict:
return self.data.get(_DATABASE_FIELD)

def generate_hierarchy(self) -> bool:
return self.data.get(_GROUP_STAT_VARS_BY_PROPERTY) or False

def include_input_subdirs(self) -> bool:
return self.data.get(_INCLUDE_INPUT_SUBDIRS_PROPERTY) or False

def special_files(self) -> dict[str, str]:
special_files: dict[str, str] = {}
for special_file_type in constants.SPECIAL_FILE_TYPES:
special_file = self.data.get(special_file_type, "")
if special_file:
special_files[special_file] = special_file_type
special_file_name = self.data.get(special_file_type, "")
if special_file_name:
special_files[special_file_type] = special_file_name
return special_files

def generate_topics(self) -> bool:
return self.data.get(_GENERATE_TOPICS) or False

def _input_file(self, input_file_name: str) -> dict:
# Exact match.
input_file_config = self._input_files_config.get(input_file_name, {})
if input_file_config:
return input_file_config
def _per_file_config(self, input_file: File) -> dict:
""" Looks up the config for a given file.
The lookup process is as follows:
- If the file name exactly matches a config key, the config for that key
is returned.
- Else if the file's path relative to the input directory exactly matches
a config key, the config for that key is returned.
- Else, we attempt to match the file with each config key in order,
returning the first matching result.
Matches are checked with the match function in simple/util/file_match.py.
"""
for_exact_name = self._input_files_config.get(input_file.name(), {})
if for_exact_name:
return for_exact_name

for_exact_full_path = self._input_files_config.get(input_file.full_path(),
{})
if for_exact_full_path:
return for_exact_full_path

if input_file.full_path() not in self._config_key_by_full_path.keys():
self._config_key_by_full_path[input_file.full_path(
)] = self._find_first_matching_config_key(input_file)

# Wildcard match
if input_file_name not in self._input_file_name_keys.keys():
self._input_file_name_keys[input_file_name] = self._input_file_name_match(
input_file_name)
return self._input_files_config.get(
self._input_file_name_keys[input_file_name], {})
self._config_key_by_full_path[input_file.full_path()], {})

def _input_file_name_match(self, input_file_name: str) -> str | None:
def _find_first_matching_config_key(self, input_file: File) -> str | None:
for input_file_pattern in self._input_files_config.keys():
if "*" not in input_file_pattern:
continue
regex = self._input_file_pattern_to_regex(input_file_pattern)
if re.match(regex, input_file_name) is not None:
if match(input_file, input_file_pattern):
return input_file_pattern
return None

def _input_file_pattern_to_regex(self, input_file_pattern: str) -> str:
"""
Transforms a string of the form "a*b.c" to the regex "a.*b\.c".
"""
return input_file_pattern.replace(".", r"\.").replace("*", ".*")

def _parse_provenances_and_sources(self):
sources_cfg = self.data.get(_SOURCES_FIELD, {})
for source_name, source_cfg in sources_cfg.items():
Expand Down
Loading

0 comments on commit 98cd40c

Please sign in to comment.