Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename and segment avro_client functionality #29

Merged
merged 16 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ __pycache__/
*env/
*.py[cod]
*$py.class
*.pytest_cache
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.16
3.9.16
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
## v1.1.6 6/26/24
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
- Generalized Avro functions and separated encoding/decoding behavior.

## v1.1.5 6/6/24
- Use executemany instead of execute when appropriate in RedshiftClient.execute_transaction

Expand Down
12 changes: 10 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = []
"Bug Tracker" = "https://github.com/NYPL/python-utils/issues"

[project.optional-dependencies]
avro-encoder = [
avro-client = [
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"avro>=1.11.1",
"requests>=2.28.1"
]
Expand Down Expand Up @@ -67,11 +67,19 @@ research-catalog-identifier-helper = [
"requests>=2.28.1"
]
development = [
"nypl_py_utils[avro-encoder,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
"flake8>=6.0.0",
"freezegun>=1.2.2",
"mock>=4.0.3",
"pytest>=7.2.0",
"pytest-mock>=3.10.0",
"requests-mock>=1.10.0"
]

[tool.pytest.ini_options]
minversion = "8.0"
addopts = "-ra -q"
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
pythonpath = "src"
testpaths = [
"tests"
]
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import avro.schema
import base64
import json
import requests

from avro.errors import AvroException
Expand All @@ -8,16 +10,52 @@
from requests.exceptions import JSONDecodeError, RequestException


class AvroEncoder:
class AvroClient:
"""
Class for encoding records using an Avro schema. Takes as input the
Base class for Avro schema interaction. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def __init__(self, platform_schema_url):
self.logger = create_log('avro_encoder')
self.schema = avro.schema.parse(
self._get_json_schema(platform_schema_url))
self.get_json_schema(platform_schema_url))

def get_json_schema(self, platform_schema_url):
"""
Fetches a JSON response from the input Platform API endpoint and
interprets it as an Avro schema.
"""
self.logger.info('Fetching Avro schema from {}'.format(
platform_schema_url))
try:
response = requests.get(platform_schema_url)
response.raise_for_status()
except RequestException as e:
self.logger.error(
'Failed to retrieve schema from {url}: {error}'.format(
url=platform_schema_url, error=e))
raise AvroClientError(
'Failed to retrieve schema from {url}: {error}'.format(
url=platform_schema_url, error=e)) from None

try:
json_response = response.json()
return json_response['data']['schema']
except (JSONDecodeError, KeyError) as e:
self.logger.error(
'Retrieved schema is malformed: {errorType} {errorMessage}'
.format(errorType=type(e), errorMessage=e))
raise AvroClientError(
'Retrieved schema is malformed: {errorType} {errorMessage}'
.format(errorType=type(e), errorMessage=e)) from None


class AvroEncoder(AvroClient):
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"""
Class for encoding records using an Avro schema. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def encode_record(self, record):
"""
Expand All @@ -27,7 +65,7 @@ def encode_record(self, record):
"""
self.logger.debug(
'Encoding record using {schema} schema'.format(
schema=self.schema.name))
schema=self.schema.name))
datum_writer = DatumWriter(self.schema)
with BytesIO() as output_stream:
encoder = BinaryEncoder(output_stream)
Expand All @@ -36,7 +74,7 @@ def encode_record(self, record):
return output_stream.getvalue()
except AvroException as e:
self.logger.error('Failed to encode record: {}'.format(e))
raise AvroEncoderError(
raise AvroClientError(
'Failed to encode record: {}'.format(e)) from None

def encode_batch(self, record_list):
Expand All @@ -60,59 +98,63 @@ def encode_batch(self, record_list):
output_stream.truncate(0)
except AvroException as e:
self.logger.error('Failed to encode record: {}'.format(e))
raise AvroEncoderError(
raise AvroClientError(
'Failed to encode record: {}'.format(e)) from None
return encoded_records

def decode_record(self, record):

class AvroDecoder(AvroClient):
"""
Class for decoding records using an Avro schema. Takes as input the
Platform API endpoint from which to fetch the schema in JSON format.
"""

def decode_record(self, record, encoding="binary"):
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"""
Decodes a single record represented as a byte string using the given
Avro schema.
Decodes a single record represented either as a byte or
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
base64 string, using the given Avro schema.

Returns a dictionary where each key is a field in the schema.
"""
self.logger.debug('Decoding {rec} using {schema} schema'.format(
rec=record, schema=self.schema.name))
self.logger.info('Decoding {rec} of type {type} using {schema} schema'
.format(rec=record, type=encoding,
schema=self.schema.name))

if encoding == "base64":
return self._decode_base64(record)
elif encoding == "binary":
return self._decode_binary(record)
else:
self.logger.error(
'Failed to decode record due to encoding type: {}'
.format(encoding))
raise AvroClientError(
'Invalid encoding type: {}'.format(encoding))

def _decode_base64(self, record):
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
decoded_data = base64.b64decode(record).decode("utf-8")
try:
return json.loads(decoded_data)
except Exception as e:
if isinstance(decoded_data, bytes):
self._decode_binary(decoded_data)
else:
self.logger.error('Failed to decode record: {}'.format(e))
raise AvroClientError(
'Failed to decode record: {}'.format(e)) from None

def _decode_binary(self, record):
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
datum_reader = DatumReader(self.schema)
with BytesIO(record) as input_stream:
decoder = BinaryDecoder(input_stream)
try:
return datum_reader.read(decoder)
except Exception as e:
self.logger.error('Failed to decode record: {}'.format(e))
raise AvroEncoderError(
raise AvroClientError(
'Failed to decode record: {}'.format(e)) from None

def _get_json_schema(self, platform_schema_url):
"""
Fetches a JSON response from the input Platform API endpoint and
interprets it as an Avro schema.
"""
self.logger.info('Fetching Avro schema from {}'.format(
platform_schema_url))
try:
response = requests.get(platform_schema_url)
response.raise_for_status()
except RequestException as e:
self.logger.error(
'Failed to retrieve schema from {url}: {error}'.format(
url=platform_schema_url, error=e))
raise AvroEncoderError(
'Failed to retrieve schema from {url}: {error}'.format(
url=platform_schema_url, error=e)) from None

try:
json_response = response.json()
return json_response['data']['schema']
except (JSONDecodeError, KeyError) as e:
self.logger.error(
'Retrieved schema is malformed: {errorType} {errorMessage}'
.format(errorType=type(e), errorMessage=e))
raise AvroEncoderError(
'Retrieved schema is malformed: {errorType} {errorMessage}'
.format(errorType=type(e), errorMessage=e)) from None


class AvroEncoderError(Exception):
class AvroClientError(Exception):
def __init__(self, message=None):
self.message = message
111 changes: 111 additions & 0 deletions tests/test_avro_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import json
import pytest

from nypl_py_utils.classes.avro_client import (
AvroDecoder, AvroEncoder, AvroClientError)
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
from requests.exceptions import ConnectTimeout

_TEST_SCHEMA = {'data': {'schema': json.dumps({
'name': 'TestSchema',
'type': 'record',
'fields': [
{
'name': 'patron_id',
'type': 'int'
},
{
'name': 'library_branch',
'type': ['null', 'string']
}
]
})}}


class TestAvroClient:

@pytest.fixture
def test_avro_encoder_instance(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
return AvroEncoder('https://test_schema_url')

@pytest.fixture
def test_avro_decoder_instance(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
return AvroDecoder('https://test_schema_url')

def test_get_json_schema(self, test_avro_encoder_instance,
test_avro_decoder_instance):
assert test_avro_encoder_instance.schema == _TEST_SCHEMA['data'][
'schema']
assert test_avro_decoder_instance.schema == _TEST_SCHEMA['data'][
'schema']

def test_request_error(self, requests_mock):
requests_mock.get('https://test_schema_url', exc=ConnectTimeout)
with pytest.raises(AvroClientError):
AvroEncoder('https://test_schema_url')

def test_bad_json_error(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text='bad json')
with pytest.raises(AvroClientError):
AvroEncoder('https://test_schema_url')

def test_missing_key_error(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text=json.dumps({'field': 'value'}))
with pytest.raises(AvroClientError):
AvroEncoder('https://test_schema_url')

def test_encode_record(self, test_avro_encoder_instance,
test_avro_decoder_instance):
TEST_RECORD = {'patron_id': 123, 'library_branch': 'aa'}
encoded_record = test_avro_encoder_instance.encode_record(TEST_RECORD)
assert type(encoded_record) is bytes
assert test_avro_decoder_instance.decode_record(
encoded_record) == TEST_RECORD

def test_encode_record_error(self, test_avro_encoder_instance):
TEST_RECORD = {'patron_id': 123, 'bad_field': 'bad'}
with pytest.raises(AvroClientError):
test_avro_encoder_instance.encode_record(TEST_RECORD)

def test_encode_batch(self, test_avro_encoder_instance,
test_avro_decoder_instance):
TEST_BATCH = [
{'patron_id': 123, 'library_branch': 'aa'},
{'patron_id': 456, 'library_branch': None},
{'patron_id': 789, 'library_branch': 'bb'}]
encoded_records = test_avro_encoder_instance.encode_batch(TEST_BATCH)
assert len(encoded_records) == len(TEST_BATCH)
for i in range(3):
assert type(encoded_records[i]) is bytes
assert test_avro_decoder_instance.decode_record(
encoded_records[i]) == TEST_BATCH[i]

def test_encode_batch_error(self, test_avro_encoder_instance):
BAD_BATCH = [
{'patron_id': 123, 'library_branch': 'aa'},
{'patron_id': 456, 'bad_field': 'bad'}]
with pytest.raises(AvroClientError):
test_avro_encoder_instance.encode_batch(BAD_BATCH)

def test_decode_record_binary(self, test_avro_decoder_instance):
TEST_DECODED_RECORD = {"patron_id": 123, "library_branch": "aa"}
TEST_ENCODED_RECORD = b'\xf6\x01\x02\x04aa'
aaronfriedman6 marked this conversation as resolved.
Show resolved Hide resolved
assert test_avro_decoder_instance.decode_record(
TEST_ENCODED_RECORD) == TEST_DECODED_RECORD

def test_decode_record_b64(self, test_avro_decoder_instance):
TEST_DECODED_RECORD = {"patron_id'": 123, "library_branch": "aa"}
TEST_ENCODED_RECORD = (
"eyJwYXRyb25faWQnIjogMTIzLCAibGlicmFyeV9icmFuY2giOiAiYWEifQ==")
assert test_avro_decoder_instance.decode_record(
TEST_ENCODED_RECORD, "base64") == TEST_DECODED_RECORD

def test_decode_record_error(self, test_avro_decoder_instance):
TEST_ENCODED_RECORD = b'bad-encoding'
with pytest.raises(AvroClientError):
test_avro_decoder_instance.decode_record(TEST_ENCODED_RECORD)
Loading
Loading