From e192dbd82c167a48b0dbc31bed1a9a0c81b4567c Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Wed, 15 Nov 2023 14:58:16 +0000 Subject: [PATCH 1/3] Logging improvements for multiple senders and consistency fixes Signed-off-by: Aindriu Lavelle --- journalpump/daemon.py | 3 +-- journalpump/journalpump.py | 10 +++++----- journalpump/senders/elasticsearch_opensearch_sender.py | 6 +++--- journalpump/senders/kafka.py | 9 +++++---- journalpump/senders/rsyslog.py | 10 +++++++--- journalpump/senders/websocket.py | 7 ++++--- 6 files changed, 25 insertions(+), 20 deletions(-) diff --git a/journalpump/daemon.py b/journalpump/daemon.py index 1bb87ab..a66e88b 100644 --- a/journalpump/daemon.py +++ b/journalpump/daemon.py @@ -91,11 +91,10 @@ def reload_config(self): if file_ctime != self.config_file_ctime: daemon.notify("RELOADING=1") - self.log.info("%sloading configuration", "re" if self.config_file_ctime else "") + self.log.info("%soading configuration", "Rel" if self.config_file_ctime else "L") self.config_file_ctime = file_ctime with open(self.config_path) as fp: self.config = json.load(fp) - self.log.info("new config: %r", self.config) self.log_level = self.config.get("log_level", logging.INFO) self.configure_logging() self.handle_new_config() diff --git a/journalpump/journalpump.py b/journalpump/journalpump.py index a908f55..abbd9c3 100644 --- a/journalpump/journalpump.py +++ b/journalpump/journalpump.py @@ -273,7 +273,7 @@ def initialize_senders(self): if not isinstance(extra_field_values, dict): self.log.warning( - "extra_field_values: %r not a dictionary object, ignoring", + "Extra_field_values: %r not a dictionary object, ignoring", extra_field_values, ) extra_field_values = {} @@ -428,13 +428,13 @@ def get_reader(self, seek_to=None, reinit=False): if not (self.journald_reader.has_persistent_files() or self.journald_reader.has_runtime_files()): # If journal files are not ready (e.g. files with namespace are not yet created), reader won't fail, # it will silently not deliver anything. We don't want this - return None to re-create reader later - self.log.warning("journal files for %r are not yet available", self.name) + self.log.warning("Journal files for %r are not yet available", self.name) self.journald_reader.close() self.journald_reader = None return None except FileNotFoundError as ex: self.log.warning( - "journal for %r not available yet: %s: %s", + "Journal for %r not available yet: %s: %s", self.name, ex.__class__.__name__, ex, @@ -1002,7 +1002,7 @@ def register_for_poll(self, reader: JournalReader) -> bool: fd = reader.fileno() # fd in this case is anonymous inotify node if fd is not None and fd not in self.reader_by_fd: - self.log.info("Registered reader %r with fd %r", self.name, fd) + self.log.info("Registered reader %r with fd %r", reader.name, fd) self.poller.register(fd) self.reader_by_fd[fd] = reader return True @@ -1014,7 +1014,7 @@ def unregister_from_poll(self, reader: JournalReader) -> None: if fd is not None and fd in self.reader_by_fd: self.poller.unregister(fd) self.reader_by_fd[fd] = self._STALE_FD - self.log.info("Unregistered reader %r with fd %r", self.name, fd) + self.log.info("Unregistered reader %r with fd %r", reader.name, fd) def refresh_gauges(self) -> None: if self.stats: diff --git a/journalpump/senders/elasticsearch_opensearch_sender.py b/journalpump/senders/elasticsearch_opensearch_sender.py index 529c3d7..44270d8 100644 --- a/journalpump/senders/elasticsearch_opensearch_sender.py +++ b/journalpump/senders/elasticsearch_opensearch_sender.py @@ -130,7 +130,7 @@ def _load_indices(self) -> bool: self.stats.unexpected_exception(ex, where="es_pump_init_es_client") return False - self.log.info("Initialized %s HTTP connection", self._config.sender_type.value) + self.log.info("Initialized %s HTTP connection for %s", self._config.sender_type.value, self.name) self.mark_connected() return True @@ -175,7 +175,7 @@ def send_messages(self, *, messages, cursor) -> bool: es_available = self._load_indices() if not es_available: redacted_url = re.sub(self._INDICIES_URL_REDACTION_REGEXP, r"\1\2[REDACTED]\4", self._indices_url) - self.log.warning("Waiting for connection to %s", redacted_url) + self.log.warning("Waiting for connection to %s for %s", redacted_url, self.name) self._backoff() return False for msg in messages: @@ -239,7 +239,7 @@ def _message_header(self, index_name: str) -> Dict[str, Any]: def _create_index_and_mapping(self, *, index_name: str, message: Dict[str, Any]) -> None: try: - self.log.info("Creating index: %r", index_name) + self.log.info("Creating index: %r for %s", index_name, self.name) res = self._session.put( self._index_url(index_name), json=self._create_mapping(message), diff --git a/journalpump/senders/kafka.py b/journalpump/senders/kafka.py index 13a549c..57c9a48 100644 --- a/journalpump/senders/kafka.py +++ b/journalpump/senders/kafka.py @@ -70,7 +70,7 @@ def _generate_producer_config(self) -> dict: return producer_config def _init_kafka(self) -> None: - self.log.info("Initializing Kafka client, address: %r", self.config["kafka_address"]) + self.log.info("Initializing Kafka client, address: %r for %s", self.config["kafka_address"], self.name) if self.kafka_producer: self.kafka_producer.close() @@ -93,8 +93,9 @@ def _init_kafka(self) -> None: self._backoff() else: self.log.info( - "Initialized Kafka Client, address: %r", + "Initialized Kafka Client, address: %r for %s", self.config["kafka_address"], + self.name, ) self.kafka_producer = kafka_producer self.mark_connected() @@ -110,9 +111,9 @@ def _init_kafka(self) -> None: try: kafka_admin.create_topics([NewTopic(self.topic, num_partitions, replication_factor)]) except errors.TopicAlreadyExistsError: - self.log.info("Kafka topic %r already exists", self.topic) + self.log.info("Kafka topic %r already exists for %s", self.topic, self.name) else: - self.log.info("Create Kafka topic, address: %r", self.topic) + self.log.info("Create Kafka topic, address: %r for %s", self.topic, self.name) def send_messages(self, *, messages, cursor): if not self.kafka_producer: diff --git a/journalpump/senders/rsyslog.py b/journalpump/senders/rsyslog.py index c2f61e9..6c69e9b 100644 --- a/journalpump/senders/rsyslog.py +++ b/journalpump/senders/rsyslog.py @@ -17,7 +17,7 @@ def __init__(self, *, config, **kwargs): self.default_severity = 6 def _init_rsyslog_client(self): - self.log.info("Initializing Rsyslog Client") + self.log.info("Initializing Rsyslog Client %s", self.name) self.mark_disconnected() while self.running: try: @@ -41,7 +41,7 @@ def _init_rsyslog_client(self): certfile=self.config.get("client_cert"), log_format=self.config.get("logline"), ) - self.log.info("Initialized Rsyslog Client, server: %s, port: %d", server, port) + self.log.info("Initialized Rsyslog Client %s, server: %s, port: %d", self.name, server, port) self.mark_connected() break except RSYSLOG_CONN_ERRORS as ex: @@ -101,7 +101,11 @@ def send_messages(self, *, messages, cursor): self._init_rsyslog_client() except Exception as ex: # pylint: disable=broad-except self.mark_disconnected(ex) - self.log.exception("Unexpected exception during send to rsyslog") + self.log.exception( + "Unexpected exception during send to rsyslog", + ex.__class__.__name__, + ex, + ) self.stats.unexpected_exception(ex=ex, where="sender", tags=self.make_tags({"app": "journalpump"})) self._backoff() self._init_rsyslog_client() diff --git a/journalpump/senders/websocket.py b/journalpump/senders/websocket.py index abdaecd..ff1f1bd 100644 --- a/journalpump/senders/websocket.py +++ b/journalpump/senders/websocket.py @@ -300,7 +300,7 @@ def __init__(self, *, config, **kwargs): self.config = config def _init_websocket(self) -> None: - self.log.info("Initializing Websocket client, address: %r", self.config["websocket_uri"]) + self.log.info("Initializing Websocket client, address: %r for %s", self.config["websocket_uri"], self.name) if self.runner: self.runner.close() @@ -337,8 +337,9 @@ def _init_websocket(self) -> None: self._backoff() else: self.log.info( - "Initialized Websocket client, address: %r", + "Initialized Websocket client, address: %r for %s", self.config["websocket_uri"], + self.name ) self.runner = runner @@ -364,7 +365,7 @@ def send_messages(self, *, messages, cursor): return True except (CancelledError, asyncio.CancelledError) as ex: self.mark_disconnected(ex) - self.log.info("Send to websocket failed, connection was closed") + self.log.info("Send to websocket failed, connection was closed for %s", self.name) return False except Exception as ex: # pylint: disable=broad-except self.mark_disconnected(ex) From 412fa8dccf19f250c2a924a32714c8b00cb09e92 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Wed, 15 Nov 2023 15:31:55 +0000 Subject: [PATCH 2/3] Fixing Linting issue with too many args for logging format string. Signed-off-by: Aindriu Lavelle --- journalpump/senders/rsyslog.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/journalpump/senders/rsyslog.py b/journalpump/senders/rsyslog.py index 6c69e9b..3d0fe61 100644 --- a/journalpump/senders/rsyslog.py +++ b/journalpump/senders/rsyslog.py @@ -101,11 +101,7 @@ def send_messages(self, *, messages, cursor): self._init_rsyslog_client() except Exception as ex: # pylint: disable=broad-except self.mark_disconnected(ex) - self.log.exception( - "Unexpected exception during send to rsyslog", - ex.__class__.__name__, - ex, - ) + self.log.exception("Unexpected exception during send to rsyslog") self.stats.unexpected_exception(ex=ex, where="sender", tags=self.make_tags({"app": "journalpump"})) self._backoff() self._init_rsyslog_client() From 837fe81c1a4c7c579431e08ee2fa336a7c930b06 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Wed, 15 Nov 2023 15:45:26 +0000 Subject: [PATCH 3/3] Run black to reformat updated files. which failed the pipeline. Signed-off-by: Aindriu Lavelle --- journalpump/senders/elasticsearch_opensearch_sender.py | 2 -- journalpump/senders/websocket.py | 6 +----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/journalpump/senders/elasticsearch_opensearch_sender.py b/journalpump/senders/elasticsearch_opensearch_sender.py index 44270d8..d377901 100644 --- a/journalpump/senders/elasticsearch_opensearch_sender.py +++ b/journalpump/senders/elasticsearch_opensearch_sender.py @@ -70,7 +70,6 @@ def create(*, sender_type: SenderType, config: Dict[str, Any]) -> "Config": class _EsOsLogSenderBase(LogSender): - _DEFAULT_MAX_SENDER_INTERVAL = 10.0 _INDICIES_URL_REDACTION_REGEXP = r"(\w*?://[A-Za-z0-9\-._~%!$&'()*+,;=]*)(:)([A-Za-z0-9\-._~%!$&'()*+,;=]*)(@)" @@ -301,7 +300,6 @@ def _message_fields(self, message: Dict[str, Any]) -> Dict[str, Any]: class ElasticsearchSender(_EsOsLogSenderBase): - _VERSION_WITH_MAPPING_TYPE_SUPPORT = 7 _LEGACY_TYPE = "journal_msg" diff --git a/journalpump/senders/websocket.py b/journalpump/senders/websocket.py index ff1f1bd..a48ec5c 100644 --- a/journalpump/senders/websocket.py +++ b/journalpump/senders/websocket.py @@ -336,11 +336,7 @@ def _init_websocket(self) -> None: ) self._backoff() else: - self.log.info( - "Initialized Websocket client, address: %r for %s", - self.config["websocket_uri"], - self.name - ) + self.log.info("Initialized Websocket client, address: %r for %s", self.config["websocket_uri"], self.name) self.runner = runner def request_stop(self):