Skip to content

Commit

Permalink
[boundary finder] add support for newlines in single quotes (Recidivi…
Browse files Browse the repository at this point in the history
…z/recidiviz-data#34236)

## Description of the change

Adds supporting for safely splitting files along row-safe boundaries
when newlines are quoted. if we are splitting a quoted file, the control
flow is

(1) find the next single, unescaped quote in the block
(2) classify that unescaped quote state (`UnescapedQuoteState`)
(3) see if we can use that quote to determine our place in the csv. if
we can't return to (1) and try again
(4) walk the csv to find the next newline that is not in a quoted block

## Type of change

> All pull requests must have at least one of the following labels
applied (otherwise the PR will fail):

| Label | Description |
|-----------------------------
|-----------------------------------------------------------------------------------------------------------
|
| Type: Bug | non-breaking change that fixes an issue |
| Type: Feature | non-breaking change that adds functionality |
| Type: Breaking Change | fix or feature that would cause existing
functionality to not work as expected |
| Type: Non-breaking refactor | change addresses some tech debt item or
prepares for a later change, but does not change functionality |
| Type: Configuration Change | adjusts configuration to achieve some end
related to functionality, development, performance, or security |
| Type: Dependency Upgrade | upgrades a project dependency - these
changes are not included in release notes |

## Related issues

Part of Recidiviz/recidiviz-data#30505

## Checklists

### Development

**This box MUST be checked by the submitter prior to merging**:
- [x] **Double- and triple-checked that there is no Personally
Identifiable Information (PII) being mistakenly added in this pull
request**

These boxes should be checked by the submitter prior to merging:
- [x] Tests have been written to cover the code changed/added as part of
this pull request

### Code review

These boxes should be checked by reviewers prior to merging:

- [ ] This pull request has a descriptive title and information useful
to a reviewer
- [ ] Potential security implications or infrastructural changes have
been considered, if relevant

GitOrigin-RevId: 2d56abe4a5943d11022dc2c34a1af26740334a39
  • Loading branch information
ethan-oro authored and Helper Bot committed Nov 2, 2024
1 parent 5dad113 commit 3a963e8
Show file tree
Hide file tree
Showing 11 changed files with 953 additions and 73 deletions.
184 changes: 163 additions & 21 deletions recidiviz/cloud_storage/gcsfs_csv_chunk_boundary_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Code for finding byte offsets for row-safe chunk boundaries for Google Cloud Storage CSV files. """
import csv
import io
import json
import logging
from typing import Annotated, List, Optional

import attr
from annotated_types import Gt

from recidiviz.cloud_storage.gcs_file_system import GCSFileSystem
from recidiviz.cloud_storage.gcsfs_path import GcsfsFilePath
from recidiviz.common.constants.csv import (
DEFAULT_CSV_ENCODING,
DEFAULT_CSV_LINE_TERMINATOR,
from recidiviz.common.constants.csv import DEFAULT_CSV_QUOTE_CHAR, VALID_QUOTE_CHARS
from recidiviz.utils.quoted_csv_line_terminator_finder import (
find_line_terminator_for_quoted_csv,
)

DEFAULT_CHUNK_SIZE = 100 * 1024 * 1024 # TODO(#28653) configure a sensible default
DEFAULT_PEEK_SIZE = 300
DEFAULT_PEEK_SIZE_NO_QUOTES = 600
DEFAULT_PEEK_SIZE_QUOTES = 1024 * 512
DEFAULT_MAX_CELL_SIZE = 1024 * 1024


@attr.define
Expand Down Expand Up @@ -68,13 +72,19 @@ class GcsfsCsvChunkBoundaryFinder:

def __init__(
self,
*,
fs: GCSFileSystem,
encoding: str,
line_terminator: str,
separator: str,
quoting_mode: int,
quote_char: str = DEFAULT_CSV_QUOTE_CHAR,
chunk_size: Optional[Annotated[int, Gt(0)]] = None,
peek_size: Optional[Annotated[int, Gt(0)]] = None,
line_terminator: Optional[str] = None,
encoding: Optional[str] = None,
max_cell_size: Optional[int] = None,
) -> None:
self._fs = fs

# approximately how large each individual chunk will be
self._chunk_size = chunk_size if chunk_size is not None else DEFAULT_CHUNK_SIZE

Expand All @@ -83,15 +93,105 @@ def __init__(

# how much we will "peek" past the chunk size boundary to look for the next
# newline char
self._peek_size = peek_size if peek_size is not None else DEFAULT_PEEK_SIZE
self._peek_size = self._get_peek_size(peek_size, quoting_mode)

if self._peek_size <= 0:
raise ValueError("peek_size must be at least 1 byte")

# byte-encoded newline terminator
self._line_terminator = bytes(
line_terminator or DEFAULT_CSV_LINE_TERMINATOR,
encoding or DEFAULT_CSV_ENCODING,
if self._chunk_size < self._peek_size:
raise ValueError("chunk_size should never be smaller than peek_size")

max_cell_size = (
max_cell_size if max_cell_size is not None else DEFAULT_MAX_CELL_SIZE
)

if max_cell_size <= 0:
raise ValueError("max_cell_size must be at least 1 byte")

# we will, at most, read twice the max cell size to ensure that we will always
# capture the entirety of a quoted field
self._max_read_size = max_cell_size * 2

self._quoting_mode = quoting_mode

if quote_char is not None and quote_char not in VALID_QUOTE_CHARS:
raise ValueError(
f"Found invalid quote character [{quote_char}], expected one of: [{VALID_QUOTE_CHARS}]"
)

self._encoding = encoding
self._line_terminator = bytes(line_terminator, self._encoding)
self._quote_char = bytes(quote_char, self._encoding)
self._separator = bytes(separator, self._encoding)

@staticmethod
def _get_peek_size(peek_size: Optional[int], quoting: int) -> int:
"""If |peek_size| is provided, use the provided value. If it is not and we are
using quoting, we will want to read large chunks at a time as we are looking
for quoted fields, not just line terminators.
"""
if peek_size is not None:
return peek_size

return (
DEFAULT_PEEK_SIZE_NO_QUOTES
if quoting == csv.QUOTE_NONE
else DEFAULT_PEEK_SIZE_QUOTES
)

def _find_line_terminator_for_quoted_csv(
self,
buffer: bytes,
buffer_byte_start: int,
) -> int | None:
"""Searches |buffer| in an attempt to find an non-quoted newline. We will keep
on reading until |buffer| is bigger than self._max_read_size. At this point,
we think that: we are in a minimally quoted file that has no usable quotes,
so we will warn, declare bankruptcy and return the first newline we can find.
"""
line_term_index = find_line_terminator_for_quoted_csv(
buffer=buffer,
buffer_byte_start=buffer_byte_start,
quote_char=self._quote_char,
separator=self._separator,
line_terminator=self._line_terminator,
)

if line_term_index is not None:
return line_term_index

# if we haven't been able to determine our place in the file, let's declare
# bankruptcy and guess
if len(buffer) >= self._max_read_size:
logging.warning(
"[Warning] encountered chunk between byte offsets [%s] and [%s] without any "
"usable quotes so we think we (1) are in a quote minimal file that has "
"no quotes and (2) each quoted cell is no larger than [%s] bytes so we "
"are returning the first newline we find",
buffer_byte_start,
buffer_byte_start + len(buffer),
self._max_read_size,
)
first_line_term = buffer.find(self._line_terminator)
return first_line_term if first_line_term != -1 else None

# if we have't read to our max read size, let's keep looking
return None

def _find_line_term_quote_safe(
self,
buffer: bytes,
buffer_byte_start: int,
) -> int | None:
"""Searches |buffer| for the first quote-safe line terminator. If the file is
not quoted, returns the first line terminator we find.
"""
if self._quoting_mode == csv.QUOTE_NONE:
first_line_term = buffer.find(self._line_terminator)
return first_line_term if first_line_term != -1 else None

return self._find_line_terminator_for_quoted_csv(
buffer=buffer, buffer_byte_start=buffer_byte_start
)

def get_chunks_for_gcs_path(self, path: GcsfsFilePath) -> List[CsvChunkBoundary]:
Expand All @@ -112,30 +212,61 @@ def get_chunks_for_gcs_path(self, path: GcsfsFilePath) -> List[CsvChunkBoundary]
where M is the average row length in the file.
"""
_size = self._fs.get_file_size(path)
if _size is None:
path_size = self._fs.get_file_size(path)
if path_size is None:
raise ValueError(
"Cannot find chunk boundaries for path without a file size"
)

logging.info(
"reading [%s] w quoting [%s]",
path.file_name,
"Off" if self._quoting_mode == csv.QUOTE_NONE else "On",
)

with self._fs.open(path, mode="rb", verifiable=False) as f:

at_eof = False
chunks: List[CsvChunkBoundary] = []
cursor = 0 # file pointer, our current place in the file (more or less)

while not (at_eof := cursor == _size):
while not (at_eof := cursor == path_size):

chunk_start = cursor
cursor += self._chunk_size

peeked_bytes = b""
lineterm_index = -1
line_term_index: int | None = None

# until we find the end of the file or a line terminator, keep reading
while (
not at_eof
and (lineterm_index := peeked_bytes.find(self._line_terminator))
== -1
and (
line_term_index := self._find_line_term_quote_safe(
peeked_bytes, cursor
)
)
is None
):
if len(peeked_bytes) >= self._max_read_size:
raise ValueError(
f"Could not find line terminator between bytes offset [{cursor}] "
f"and [{cursor + len(peeked_bytes)}] without exceeding our "
f"max_read_size of [{self._max_read_size}]for [{path.uri()}]. "
f"Please ensure that the encoding [{self._encoding}], "
f"separator [{self._separator!r}], line terminator [{self._line_terminator!r}] "
f"quote char [{self._quote_char!r}] and quoting mode [{self._quoting_mode}] "
f"are accurate for this file. If they are, consider "
f"increasing the [max_cell_size] for this file."
)

logging.info(
"\t reading from [%s]: from byte offset [%s] -> [%s]",
path.file_name,
cursor + len(peeked_bytes),
cursor + len(peeked_bytes) + self._peek_size,
)

f.seek(cursor + len(peeked_bytes), io.SEEK_SET)
end_of_chunk_peek = f.read(self._peek_size)

Expand All @@ -146,10 +277,21 @@ def get_chunks_for_gcs_path(self, path: GcsfsFilePath) -> List[CsvChunkBoundary]
# byte and gets split between two reads
peeked_bytes += end_of_chunk_peek

cursor = (
_size
if at_eof
else cursor + lineterm_index + len(self._line_terminator)
if at_eof:
cursor = path_size
elif line_term_index is None:
raise ValueError(
"Should always have a non-None line_term_index if we have not"
"reached the end of the file."
)
else:
cursor = cursor + line_term_index + len(self._line_terminator)

logging.info(
"\t\t[%s]: found CSV line boundary for [%s] @ byte offset [%s]",
len(chunks),
path.file_name,
cursor,
)
chunks.append(
CsvChunkBoundary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ def _extract_file_chunks(
region_raw_file_config, requires_pre_import_normalization_file_path
)

chunker = GcsfsCsvChunkBoundaryFinder(fs)
chunker = GcsfsCsvChunkBoundaryFinder(
fs=fs,
line_terminator=raw_file_config.line_terminator,
separator=raw_file_config.separator,
encoding=raw_file_config.encoding,
quoting_mode=raw_file_config.quoting_mode,
)
chunks = chunker.get_chunks_for_gcs_path(
requires_pre_import_normalization_file_path,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
"""Code for normalizing any csv file into a BigQuery-readable file."""
import csv
import io
import logging
from types import ModuleType
from typing import IO, Optional, Union

from recidiviz.cloud_storage.bytes_chunk_reader import BytesChunkReader
from recidiviz.cloud_storage.gcs_file_system import GCSFileSystem
from recidiviz.cloud_storage.gcsfs_path import GcsfsFilePath
from recidiviz.cloud_storage.gcsfs_path import GcsfsDirectoryPath, GcsfsFilePath
from recidiviz.cloud_storage.read_only_csv_normalizing_stream import (
ReadOnlyCsvNormalizingStream,
)
Expand Down Expand Up @@ -56,11 +57,13 @@ def __init__(
state_code: StateCode,
region_module_override: Optional[ModuleType] = None,
read_chunk_size: int = DEFAULT_READ_CHUNK_SIZE,
temp_output_dir: Optional[GcsfsDirectoryPath] = None,
) -> None:
self._fs = fs
self._state_code = state_code
# TODO(#29013) allow for sandbox name here
self._temp_output_dir = gcsfs_direct_ingest_temporary_output_directory_path()
self._temp_output_dir = (
temp_output_dir or gcsfs_direct_ingest_temporary_output_directory_path()
)
self._region_config = get_region_raw_file_config(
self._state_code.value.lower(), region_module_override
)
Expand Down Expand Up @@ -112,6 +115,14 @@ def normalize_chunk_for_import(
config = self._region_config.raw_file_configs[path_parts.file_tag]
output_path = self.output_path_for_chunk(chunk)

logging.info(
"normalizing [%s]: chunk [%s] (%s -> %s)",
chunk.path.file_name,
chunk.chunk_boundary.chunk_num,
chunk.chunk_boundary.start_inclusive,
chunk.chunk_boundary.end_exclusive,
)

# then, execute the actual normalization step
with self._fs.open(
chunk.path, mode="rb", chunk_size=self._read_chunk_size, verifiable=False
Expand Down
5 changes: 5 additions & 0 deletions recidiviz/ingest/direct/raw_data/raw_file_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# =============================================================================
"""Contains all classes related to raw file configs."""
import csv
import os
import re
from collections import defaultdict
Expand Down Expand Up @@ -668,6 +669,10 @@ def primary_key_str(self) -> str:
"""A comma-separated string representation of the primary keys"""
return ", ".join(self.primary_key_cols)

@property
def quoting_mode(self) -> int:
return csv.QUOTE_NONE if self.ignore_quotes else csv.QUOTE_MINIMAL

# TODO(#28239) remove this once raw data import dag is fully rolled out
def encodings_to_try(self) -> List[str]:
"""Returns an ordered list of encodings we should try for this file."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
numerical_col,string_col_1,string_col_2
1234,"Hello, world",I'm Anna
4567,"This line
is split in two","This word is ""quoted"""
7890,"""quoted value""",
Loading

0 comments on commit 3a963e8

Please sign in to comment.