Skip to content

Commit

Permalink
Merge pull request #331 from lsst-sqre/tickets/DM-47721/event-manager…
Browse files Browse the repository at this point in the history
…-test-failures

DM-47721: Improved schema manager logging
  • Loading branch information
fajpunk authored Nov 21, 2024
2 parents ab9e5e6 + b086e6d commit 4e418d8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### New features

- `safir.kafka.PydanticSchemaManager` takes an optional structlog `BoundLogger`. If not provided, the default logger is a `BoundLogger`, rather than a `logging.Logger`.
- `logger` value to `safir.metrics.KafkaMetricsConfiguration.make_manager` is passed through to the `PydanticSchemaManager` instance that it creates.
21 changes: 18 additions & 3 deletions safir/src/safir/kafka/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
"""

import inspect
import logging
import re
from dataclasses import dataclass
from enum import StrEnum
from typing import Any, TypeVar

import structlog
from dataclasses_avroschema.pydantic import AvroBaseModel
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.client.errors import ClientError
from schema_registry.serializers.message_serializer import (
AsyncAvroMessageSerializer,
)
from structlog.stdlib import BoundLogger

from ._exceptions import (
IncompatibleSchemaError,
Expand Down Expand Up @@ -84,18 +86,22 @@ class PydanticSchemaManager:
compatibility continuity of a production subject.
For production, it's best to not set a suffix.
logger
Logger to use for internal logging. If not given, the
``safir.kafka.manager`` logger will be used.
"""

def __init__(
self,
registry: AsyncSchemaRegistryClient,
suffix: str = "",
logger: BoundLogger | None = None,
) -> None:
self._registry = registry
self._serializer = AsyncAvroMessageSerializer(self._registry)
self._suffix = suffix

self._logger = logging.getLogger(__name__)
self._logger = logger or structlog.get_logger("safir.kafka.manager")

# A mapping of subjects to registered schema ids.
self._subjects_to_ids: dict[str, int] = {}
Expand Down Expand Up @@ -164,7 +170,16 @@ async def register_model(
if result["is_compatible"] is False:
raise IncompatibleSchemaError(result["messages"])

schema_id = await self._registry.register(subject, schema)
try:
schema_id = await self._registry.register(subject, schema)
except ClientError as e:
self._logger.exception(
"schema registry ClientError",
msg=e.message,
http_code=e.http_code,
server_traceback=e.server_traceback,
)
raise
self._subjects_to_ids[subject] = schema_id

return SchemaInfo(schema=schema, schema_id=schema_id, subject=subject)
Expand Down
9 changes: 7 additions & 2 deletions safir/src/safir/kafka/_schema_registry_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import AliasChoices, AnyUrl, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from schema_registry.client import AsyncSchemaRegistryClient
from structlog.stdlib import BoundLogger

from safir.kafka._manager import PydanticSchemaManager

Expand Down Expand Up @@ -58,7 +59,11 @@ def to_registry_params(self) -> SchemaRegistryClientParams:
"""Make a dict of params to construct an AsyncSchemaRegistryClient."""
return {"url": str(self.registry_url)}

def make_manager(self) -> PydanticSchemaManager:
def make_manager(
self, logger: BoundLogger | None = None
) -> PydanticSchemaManager:
"""Construct a PydanticSchemaManager from the fields of this model."""
registry = AsyncSchemaRegistryClient(**self.to_registry_params())
return PydanticSchemaManager(registry=registry, suffix=self.suffix)
return PydanticSchemaManager(
registry=registry, suffix=self.suffix, logger=logger
)
2 changes: 1 addition & 1 deletion safir/src/safir/metrics/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def make_manager(
client_id=f"{ADMIN_CLIENT_PREFIX}-{self.application}",
**self.kafka.to_aiokafka_params(),
)
schema_manager = self.schema_manager.make_manager()
schema_manager = self.schema_manager.make_manager(logger=logger)

return KafkaEventManager(
application=self.application,
Expand Down

0 comments on commit 4e418d8

Please sign in to comment.