From b086e6d142cbfcddfced250617fed7e651fce919 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Wed, 20 Nov 2024 16:18:26 -0600 Subject: [PATCH] DM-47721: Improved schema manager logging * `PydanticSchemaManager` takes an optional structlog `BoundLogger` * Event manager `make_manager` passes `logger` arg through to the `PydanticSchemaManager` that it makes * More detailed logging when registering schemas fails Schema manager logging isn't a breaking change --- ...39_danfuchs_event_manager_test_failures.md | 4 ++++ safir/src/safir/kafka/_manager.py | 21 ++++++++++++++++--- .../safir/kafka/_schema_registry_config.py | 9 ++++++-- safir/src/safir/metrics/_config.py | 2 +- 4 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 changelog.d/20241120_161839_danfuchs_event_manager_test_failures.md diff --git a/changelog.d/20241120_161839_danfuchs_event_manager_test_failures.md b/changelog.d/20241120_161839_danfuchs_event_manager_test_failures.md new file mode 100644 index 00000000..0acaa8c7 --- /dev/null +++ b/changelog.d/20241120_161839_danfuchs_event_manager_test_failures.md @@ -0,0 +1,4 @@ +### New features + +- `safir.kafka.PydanticSchemaManager` takes an optional structlog `BoundLogger`. If not provided, the default logger is a `BoundLogger`, rather than a `logging.Logger`. +- `logger` value to `safir.metrics.KafkaMetricsConfiguration.make_manager` is passed through to the `PydanticSchemaManager` instance that it creates. diff --git a/safir/src/safir/kafka/_manager.py b/safir/src/safir/kafka/_manager.py index 87f21509..98a8c09a 100644 --- a/safir/src/safir/kafka/_manager.py +++ b/safir/src/safir/kafka/_manager.py @@ -8,17 +8,19 @@ """ import inspect -import logging import re from dataclasses import dataclass from enum import StrEnum from typing import Any, TypeVar +import structlog from dataclasses_avroschema.pydantic import AvroBaseModel from schema_registry.client import AsyncSchemaRegistryClient +from schema_registry.client.errors import ClientError from schema_registry.serializers.message_serializer import ( AsyncAvroMessageSerializer, ) +from structlog.stdlib import BoundLogger from ._exceptions import ( IncompatibleSchemaError, @@ -84,18 +86,22 @@ class PydanticSchemaManager: compatibility continuity of a production subject. For production, it's best to not set a suffix. + logger + Logger to use for internal logging. If not given, the + ``safir.kafka.manager`` logger will be used. """ def __init__( self, registry: AsyncSchemaRegistryClient, suffix: str = "", + logger: BoundLogger | None = None, ) -> None: self._registry = registry self._serializer = AsyncAvroMessageSerializer(self._registry) self._suffix = suffix - self._logger = logging.getLogger(__name__) + self._logger = logger or structlog.get_logger("safir.kafka.manager") # A mapping of subjects to registered schema ids. self._subjects_to_ids: dict[str, int] = {} @@ -164,7 +170,16 @@ async def register_model( if result["is_compatible"] is False: raise IncompatibleSchemaError(result["messages"]) - schema_id = await self._registry.register(subject, schema) + try: + schema_id = await self._registry.register(subject, schema) + except ClientError as e: + self._logger.exception( + "schema registry ClientError", + msg=e.message, + http_code=e.http_code, + server_traceback=e.server_traceback, + ) + raise self._subjects_to_ids[subject] = schema_id return SchemaInfo(schema=schema, schema_id=schema_id, subject=subject) diff --git a/safir/src/safir/kafka/_schema_registry_config.py b/safir/src/safir/kafka/_schema_registry_config.py index 99ee47f8..4a8b969f 100644 --- a/safir/src/safir/kafka/_schema_registry_config.py +++ b/safir/src/safir/kafka/_schema_registry_config.py @@ -7,6 +7,7 @@ from pydantic import AliasChoices, AnyUrl, Field from pydantic_settings import BaseSettings, SettingsConfigDict from schema_registry.client import AsyncSchemaRegistryClient +from structlog.stdlib import BoundLogger from safir.kafka._manager import PydanticSchemaManager @@ -58,7 +59,11 @@ def to_registry_params(self) -> SchemaRegistryClientParams: """Make a dict of params to construct an AsyncSchemaRegistryClient.""" return {"url": str(self.registry_url)} - def make_manager(self) -> PydanticSchemaManager: + def make_manager( + self, logger: BoundLogger | None = None + ) -> PydanticSchemaManager: """Construct a PydanticSchemaManager from the fields of this model.""" registry = AsyncSchemaRegistryClient(**self.to_registry_params()) - return PydanticSchemaManager(registry=registry, suffix=self.suffix) + return PydanticSchemaManager( + registry=registry, suffix=self.suffix, logger=logger + ) diff --git a/safir/src/safir/metrics/_config.py b/safir/src/safir/metrics/_config.py index 5a5e029f..2d7b8dbb 100644 --- a/safir/src/safir/metrics/_config.py +++ b/safir/src/safir/metrics/_config.py @@ -190,7 +190,7 @@ def make_manager( client_id=f"{ADMIN_CLIENT_PREFIX}-{self.application}", **self.kafka.to_aiokafka_params(), ) - schema_manager = self.schema_manager.make_manager() + schema_manager = self.schema_manager.make_manager(logger=logger) return KafkaEventManager( application=self.application,