Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename and segment avro_client functionality #29

Merged
merged 16 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ __pycache__/
*env/
*.py[cod]
*$py.class
*.pytest_cache
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.16
3.9.16
14 changes: 11 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "nypl_py_utils"
version = "1.1.6"
version = "1.2.0"
authors = [
{ name="Aaron Friedman", email="[email protected]" },
]
Expand All @@ -23,7 +23,7 @@ dependencies = []
"Bug Tracker" = "https://github.com/NYPL/python-utils/issues"

[project.optional-dependencies]
avro-encoder = [
avro-client = [
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"avro>=1.11.1",
"requests>=2.28.1"
]
Expand Down Expand Up @@ -67,11 +67,19 @@ research-catalog-identifier-helper = [
"requests>=2.28.1"
]
development = [
"nypl_py_utils[avro-encoder,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
"pytest>=7.2.0",
"pytest-mock>=3.10.0",
"requests-mock>=1.10.0"
]

[tool.pytest.ini_options]
minversion = "8.0"
addopts = "-ra -q"
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
pythonpath = "src"
testpaths = [
"tests"
]
162 changes: 162 additions & 0 deletions src/nypl_py_utils/classes/avro_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import avro.schema
import requests

from avro.errors import AvroException
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from io import BytesIO
from nypl_py_utils.functions.log_helper import create_log
from requests.exceptions import JSONDecodeError, RequestException


class AvroClient:
"""
Base class for Avro schema interaction. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def __init__(self, platform_schema_url):
self.logger = create_log("avro_encoder")
self.schema = avro.schema.parse(
self.get_json_schema(platform_schema_url))

def get_json_schema(self, platform_schema_url):
"""
Fetches a JSON response from the input Platform API endpoint and
interprets it as an Avro schema.
"""
self.logger.info(
"Fetching Avro schema from {}".format(platform_schema_url))
try:
response = requests.get(platform_schema_url)
response.raise_for_status()
except RequestException as e:
self.logger.error(
"Failed to retrieve schema from {url}: {error}".format(
url=platform_schema_url, error=e
)
)
raise AvroClientError(
"Failed to retrieve schema from {url}: {error}".format(
url=platform_schema_url, error=e
)
) from None

try:
json_response = response.json()
return json_response["data"]["schema"]
except (JSONDecodeError, KeyError) as e:
self.logger.error(
"Retrieved schema is malformed: {errorType} {errorMessage}"
.format(errorType=type(e), errorMessage=e)
)
raise AvroClientError(
"Retrieved schema is malformed: {errorType} {errorMessage}"
.format(errorType=type(e), errorMessage=e)
) from None


class AvroEncoder(AvroClient):
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"""
Class for encoding records using an Avro schema. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def encode_record(self, record):
"""
Encodes a single JSON record using the given Avro schema.

Returns the encoded record as a byte string.
"""
self.logger.debug(
"Encoding record using {schema} schema".format(
schema=self.schema.name)
)
datum_writer = DatumWriter(self.schema)
with BytesIO() as output_stream:
encoder = BinaryEncoder(output_stream)
try:
datum_writer.write(record, encoder)
return output_stream.getvalue()
except AvroException as e:
self.logger.error("Failed to encode record: {}".format(e))
raise AvroClientError(
"Failed to encode record: {}".format(e)) from None

def encode_batch(self, record_list):
"""
Encodes a list of JSON records using the given Avro schema.

Returns a list of byte strings where each string is an encoded record.
"""
self.logger.info(
"Encoding ({num_rec}) records using {schema} schema".format(
num_rec=len(record_list), schema=self.schema.name
)
)
encoded_records = []
datum_writer = DatumWriter(self.schema)
with BytesIO() as output_stream:
encoder = BinaryEncoder(output_stream)
for record in record_list:
try:
datum_writer.write(record, encoder)
encoded_records.append(output_stream.getvalue())
output_stream.seek(0)
output_stream.truncate(0)
except AvroException as e:
self.logger.error("Failed to encode record: {}".format(e))
raise AvroClientError(
"Failed to encode record: {}".format(e)
) from None
return encoded_records


class AvroDecoder(AvroClient):
"""
Class for decoding records using an Avro schema. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def decode_record(self, record):
"""
Decodes a single record represented either as a byte or
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
base64 string, using the given Avro schema.

Returns a dictionary where each key is a field in the schema.
"""
self.logger.info(
"Decoding {rec} using {schema} schema".format(
rec=record, schema=self.schema.name
)
)
datum_reader = DatumReader(self.schema)
with BytesIO(record) as input_stream:
decoder = BinaryDecoder(input_stream)
try:
return datum_reader.read(decoder)
except Exception as e:
self.logger.error("Failed to decode record: {}".format(e))
raise AvroClientError(
"Failed to decode record: {}".format(e)) from None

def decode_batch(self, record_list):
"""
Decodes a list of JSON records using the given Avro schema.

Returns a list of strings where each string is a decoded record.
"""
self.logger.info(
"Decoding ({num_rec}) records using {schema} schema".format(
num_rec=len(record_list), schema=self.schema.name
)
)
decoded_records = []
for record in record_list:
decoded_record = self.decode_record(record)
decoded_records.append(decoded_record)
return decoded_records


class AvroClientError(Exception):
def __init__(self, message=None):
self.message = message
118 changes: 0 additions & 118 deletions src/nypl_py_utils/classes/avro_encoder.py

This file was deleted.

Loading
Loading