diff --git a/diaspora_event_sdk/sdk/kafka_client.py b/diaspora_event_sdk/sdk/kafka_client.py index 6d0393e..cef0d76 100644 --- a/diaspora_event_sdk/sdk/kafka_client.py +++ b/diaspora_event_sdk/sdk/kafka_client.py @@ -27,7 +27,6 @@ def get_diaspora_config(extra_configs: Dict[str, Any] = {}) -> Dict[str, Any]: Merges default configurations with custom ones provided. """ - bootstrap_servers = None try: if ( "OCTOPUS_AWS_ACCESS_KEY_ID" not in os.environ @@ -37,14 +36,13 @@ def get_diaspora_config(extra_configs: Dict[str, Any] = {}) -> Dict[str, Any]: keys = Client().retrieve_key() os.environ["OCTOPUS_AWS_ACCESS_KEY_ID"] = keys["access_key"] os.environ["OCTOPUS_AWS_SECRET_ACCESS_KEY"] = keys["secret_key"] - bootstrap_servers = keys["endpoint"] - else: - bootstrap_servers = os.environ["OCTOPUS_BOOTSTRAP_SERVERS"] + os.environ["OCTOPUS_BOOTSTRAP_SERVERS"] = keys["endpoint"] + except Exception as e: raise RuntimeError("Failed to retrieve Kafka keys") from e conf = { - "bootstrap_servers": bootstrap_servers, + "bootstrap_servers": os.environ["OCTOPUS_BOOTSTRAP_SERVERS"], "security_protocol": "SASL_SSL", "sasl_mechanism": "OAUTHBEARER", "api_version": (3, 5, 1), @@ -100,8 +98,9 @@ def producer_connection_test(result): value={"message": "Synchronous message from Diaspora SDK"}, ) result["producer_connection_test"] = future.get(timeout=10) - except Exception: - pass + except Exception as e: + raise e + print(e) def consumer_connection_test(result): try: @@ -113,8 +112,9 @@ def consumer_connection_test(result): for msg in consumer: result["consumer_connection_test"] = msg break - except Exception: - pass + except Exception as e: + raise e + print(e) result, retry_count = {}, 0 start_time = time.time()