Skip to content

Commit

Permalink
Merge pull request #161 from Aiven-Open/aindriu-improve-logging-for-m…
Browse files Browse the repository at this point in the history
…ultiple-senders

Logging improvements for multiple senders and consistency fixes
  • Loading branch information
jlprat authored Nov 20, 2023
2 parents 495d96f + 837fe81 commit 0de7dd3
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 24 deletions.
3 changes: 1 addition & 2 deletions journalpump/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ def reload_config(self):

if file_ctime != self.config_file_ctime:
daemon.notify("RELOADING=1")
self.log.info("%sloading configuration", "re" if self.config_file_ctime else "")
self.log.info("%soading configuration", "Rel" if self.config_file_ctime else "L")
self.config_file_ctime = file_ctime
with open(self.config_path) as fp:
self.config = json.load(fp)
self.log.info("new config: %r", self.config)
self.log_level = self.config.get("log_level", logging.INFO)
self.configure_logging()
self.handle_new_config()
Expand Down
10 changes: 5 additions & 5 deletions journalpump/journalpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def initialize_senders(self):

if not isinstance(extra_field_values, dict):
self.log.warning(
"extra_field_values: %r not a dictionary object, ignoring",
"Extra_field_values: %r not a dictionary object, ignoring",
extra_field_values,
)
extra_field_values = {}
Expand Down Expand Up @@ -428,13 +428,13 @@ def get_reader(self, seek_to=None, reinit=False):
if not (self.journald_reader.has_persistent_files() or self.journald_reader.has_runtime_files()):
# If journal files are not ready (e.g. files with namespace are not yet created), reader won't fail,
# it will silently not deliver anything. We don't want this - return None to re-create reader later
self.log.warning("journal files for %r are not yet available", self.name)
self.log.warning("Journal files for %r are not yet available", self.name)
self.journald_reader.close()
self.journald_reader = None
return None
except FileNotFoundError as ex:
self.log.warning(
"journal for %r not available yet: %s: %s",
"Journal for %r not available yet: %s: %s",
self.name,
ex.__class__.__name__,
ex,
Expand Down Expand Up @@ -1002,7 +1002,7 @@ def register_for_poll(self, reader: JournalReader) -> bool:
fd = reader.fileno()
# fd in this case is anonymous inotify node
if fd is not None and fd not in self.reader_by_fd:
self.log.info("Registered reader %r with fd %r", self.name, fd)
self.log.info("Registered reader %r with fd %r", reader.name, fd)
self.poller.register(fd)
self.reader_by_fd[fd] = reader
return True
Expand All @@ -1014,7 +1014,7 @@ def unregister_from_poll(self, reader: JournalReader) -> None:
if fd is not None and fd in self.reader_by_fd:
self.poller.unregister(fd)
self.reader_by_fd[fd] = self._STALE_FD
self.log.info("Unregistered reader %r with fd %r", self.name, fd)
self.log.info("Unregistered reader %r with fd %r", reader.name, fd)

def refresh_gauges(self) -> None:
if self.stats:
Expand Down
8 changes: 3 additions & 5 deletions journalpump/senders/elasticsearch_opensearch_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def create(*, sender_type: SenderType, config: Dict[str, Any]) -> "Config":


class _EsOsLogSenderBase(LogSender):

_DEFAULT_MAX_SENDER_INTERVAL = 10.0

_INDICIES_URL_REDACTION_REGEXP = r"(\w*?://[A-Za-z0-9\-._~%!$&'()*+,;=]*)(:)([A-Za-z0-9\-._~%!$&'()*+,;=]*)(@)"
Expand Down Expand Up @@ -130,7 +129,7 @@ def _load_indices(self) -> bool:
self.stats.unexpected_exception(ex, where="es_pump_init_es_client")
return False

self.log.info("Initialized %s HTTP connection", self._config.sender_type.value)
self.log.info("Initialized %s HTTP connection for %s", self._config.sender_type.value, self.name)
self.mark_connected()
return True

Expand Down Expand Up @@ -175,7 +174,7 @@ def send_messages(self, *, messages, cursor) -> bool:
es_available = self._load_indices()
if not es_available:
redacted_url = re.sub(self._INDICIES_URL_REDACTION_REGEXP, r"\1\2[REDACTED]\4", self._indices_url)
self.log.warning("Waiting for connection to %s", redacted_url)
self.log.warning("Waiting for connection to %s for %s", redacted_url, self.name)
self._backoff()
return False
for msg in messages:
Expand Down Expand Up @@ -239,7 +238,7 @@ def _message_header(self, index_name: str) -> Dict[str, Any]:

def _create_index_and_mapping(self, *, index_name: str, message: Dict[str, Any]) -> None:
try:
self.log.info("Creating index: %r", index_name)
self.log.info("Creating index: %r for %s", index_name, self.name)
res = self._session.put(
self._index_url(index_name),
json=self._create_mapping(message),
Expand Down Expand Up @@ -301,7 +300,6 @@ def _message_fields(self, message: Dict[str, Any]) -> Dict[str, Any]:


class ElasticsearchSender(_EsOsLogSenderBase):

_VERSION_WITH_MAPPING_TYPE_SUPPORT = 7

_LEGACY_TYPE = "journal_msg"
Expand Down
9 changes: 5 additions & 4 deletions journalpump/senders/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _generate_producer_config(self) -> dict:
return producer_config

def _init_kafka(self) -> None:
self.log.info("Initializing Kafka client, address: %r", self.config["kafka_address"])
self.log.info("Initializing Kafka client, address: %r for %s", self.config["kafka_address"], self.name)

if self.kafka_producer:
self.kafka_producer.close()
Expand All @@ -93,8 +93,9 @@ def _init_kafka(self) -> None:
self._backoff()
else:
self.log.info(
"Initialized Kafka Client, address: %r",
"Initialized Kafka Client, address: %r for %s",
self.config["kafka_address"],
self.name,
)
self.kafka_producer = kafka_producer
self.mark_connected()
Expand All @@ -110,9 +111,9 @@ def _init_kafka(self) -> None:
try:
kafka_admin.create_topics([NewTopic(self.topic, num_partitions, replication_factor)])
except errors.TopicAlreadyExistsError:
self.log.info("Kafka topic %r already exists", self.topic)
self.log.info("Kafka topic %r already exists for %s", self.topic, self.name)
else:
self.log.info("Create Kafka topic, address: %r", self.topic)
self.log.info("Create Kafka topic, address: %r for %s", self.topic, self.name)

def send_messages(self, *, messages, cursor):
if not self.kafka_producer:
Expand Down
4 changes: 2 additions & 2 deletions journalpump/senders/rsyslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, *, config, **kwargs):
self.default_severity = 6

def _init_rsyslog_client(self):
self.log.info("Initializing Rsyslog Client")
self.log.info("Initializing Rsyslog Client %s", self.name)
self.mark_disconnected()
while self.running:
try:
Expand All @@ -41,7 +41,7 @@ def _init_rsyslog_client(self):
certfile=self.config.get("client_cert"),
log_format=self.config.get("logline"),
)
self.log.info("Initialized Rsyslog Client, server: %s, port: %d", server, port)
self.log.info("Initialized Rsyslog Client %s, server: %s, port: %d", self.name, server, port)
self.mark_connected()
break
except RSYSLOG_CONN_ERRORS as ex:
Expand Down
9 changes: 3 additions & 6 deletions journalpump/senders/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def __init__(self, *, config, **kwargs):
self.config = config

def _init_websocket(self) -> None:
self.log.info("Initializing Websocket client, address: %r", self.config["websocket_uri"])
self.log.info("Initializing Websocket client, address: %r for %s", self.config["websocket_uri"], self.name)

if self.runner:
self.runner.close()
Expand Down Expand Up @@ -336,10 +336,7 @@ def _init_websocket(self) -> None:
)
self._backoff()
else:
self.log.info(
"Initialized Websocket client, address: %r",
self.config["websocket_uri"],
)
self.log.info("Initialized Websocket client, address: %r for %s", self.config["websocket_uri"], self.name)
self.runner = runner

def request_stop(self):
Expand All @@ -364,7 +361,7 @@ def send_messages(self, *, messages, cursor):
return True
except (CancelledError, asyncio.CancelledError) as ex:
self.mark_disconnected(ex)
self.log.info("Send to websocket failed, connection was closed")
self.log.info("Send to websocket failed, connection was closed for %s", self.name)
return False
except Exception as ex: # pylint: disable=broad-except
self.mark_disconnected(ex)
Expand Down

0 comments on commit 0de7dd3

Please sign in to comment.