From 728952223afb498aadab784da92a6282fd783081 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 31 Oct 2024 18:27:09 +0800 Subject: [PATCH] Fix failed AsyncioTest.test_send_failure and clean up tests (#231) After https://github.com/apache/pulsar/pull/23291, which is included in Pulsar 4.0.0, when the tenant does not exist, the broker will respond with `BrokerMetadataError`, which is retryable. Before that, the error code is `AuthorizationError`, which is not retryable so that `create_producer` will fail immediately. This patch fixes the `test_send_failure` to assert the error is `Timeout`. Additional, separate some tests from `pulsar_test.py`: 1. debug logger tests will affect other tests so that all tests will print debug logs 2. running `schema_test` in `pulsar_test` might have unexpected failures like ``` Failed to create ConsumerImpl for persistent://public/default/my-python-pattern-consumer-3-partition-0: Failed to create steady_timer: kqueue: Too many open files [system:24] Failed when subscribed to topic persistent://public/default/my-python-pattern-consumer-3 in TopicsConsumer. Error - ConnectError Unable to create Consumer - [Muti Topics Consumer: TopicName - persistent://public/default/my-python-pattern-consumer.* - Subscription - my-pattern-consumer-sub] Error - ConnectError Failed to retry lookup for get-partition-metadata-persistent://public/default/my-v2-topic-producer-consumer: Failed to create steady_timer: kqueue: Too many open files [system:24] Error Checking/Getting Partition Metadata while Subscribing on persistent://public/default/my-v2-topic-producer-consumer -- ConnectError Failed to retry lookup for get-partition-metadata-persistent://public/default/my-v2-topic-producer-consumer: Failed to create steady_timer: kqueue: Too many open files [system:24] Error Checking/Getting Partition Metadata while Subscribing on persistent://public/default/my-v2-topic-producer-consumer -- ConnectError Failed to retry lookup for get-partition-metadata-persistent://public/default/test_has_message_available_after_seek-1730263910.78957: Failed to create steady_timer: kqueue: Too many open files [system:24] Error Checking/Getting Partition Metadata while creating producer on persistent://public/default/test_has_message_available_after_seek-1730263910.78957 -- ConnectError Failed to retry lookup for get-partition-metadata-persistent://public/default/test_seek_latest_message_id-1730263910.789991: Failed to create steady_timer: kqueue: Too many open files [system:24] ``` --- pkg/manylinux2014/Dockerfile | 2 +- pkg/manylinux_musl/Dockerfile | 2 +- tests/asyncio_test.py | 5 ++-- tests/debug_logger_test.py | 51 +++++++++++++++++++++++++++++++++++ tests/pulsar_test.py | 26 ------------------ tests/run-unit-tests.sh | 3 +++ tests/schema_test.py | 5 ---- 7 files changed, 59 insertions(+), 35 deletions(-) create mode 100644 tests/debug_logger_test.py diff --git a/pkg/manylinux2014/Dockerfile b/pkg/manylinux2014/Dockerfile index c646f40..c5c4cf0 100644 --- a/pkg/manylinux2014/Dockerfile +++ b/pkg/manylinux2014/Dockerfile @@ -32,7 +32,7 @@ ENV PATH="/opt/python/${PYTHON_SPEC}/bin:${PATH}" ENV PYTHON_INCLUDE_DIR /opt/python/${PYTHON_SPEC}/include ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION} -RUN pip3 install pyyaml +RUN pip3 install pyyaml setuptools ADD .build/dependencies.yaml / ADD .build/dep-version.py /usr/local/bin diff --git a/pkg/manylinux_musl/Dockerfile b/pkg/manylinux_musl/Dockerfile index 0a19799..3002194 100644 --- a/pkg/manylinux_musl/Dockerfile +++ b/pkg/manylinux_musl/Dockerfile @@ -33,7 +33,7 @@ ENV PATH="/opt/python/${PYTHON_SPEC}/bin:${PATH}" ENV PYTHON_INCLUDE_DIR /opt/python/${PYTHON_SPEC}/include ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION} -RUN pip install pyyaml +RUN pip install pyyaml setuptools RUN apk add cmake diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 5478b60..fe6877f 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -34,7 +34,8 @@ class AsyncioTest(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: - self._client = Client(service_url) + self._client = Client(service_url, + operation_timeout_seconds=5) async def asyncTearDown(self) -> None: await self._client.close() @@ -62,7 +63,7 @@ async def test_create_producer_failure(self): await self._client.create_producer('tenant/ns/awaitio-test-send-failure') self.fail() except PulsarException as e: - self.assertEqual(e.error(), pulsar.Result.AuthorizationError) + self.assertEqual(e.error(), pulsar.Result.Timeout) async def test_send_failure(self): producer = await self._client.create_producer('awaitio-test-send-failure') diff --git a/tests/debug_logger_test.py b/tests/debug_logger_test.py new file mode 100644 index 0000000..ebc8e59 --- /dev/null +++ b/tests/debug_logger_test.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase, main +import pulsar + +class DebugLoggerTest(TestCase): + + def test_configure_log_level(self): + client = pulsar.Client( + service_url="pulsar://localhost:6650", + logger=pulsar.ConsoleLogger(pulsar.LoggerLevel.Debug) + ) + + producer = client.create_producer( + topic='test_log_level' + ) + + producer.send(b'hello') + + def test_configure_log_to_file(self): + client = pulsar.Client( + service_url="pulsar://localhost:6650", + logger=pulsar.FileLogger(pulsar.LoggerLevel.Debug, 'test.log') + ) + + producer = client.create_producer( + topic='test_log_to_file' + ) + + producer.send(b'hello') + +if __name__ == "__main__": + main() diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index bb62d99..a062bc1 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -51,9 +51,6 @@ from _pulsar import ProducerConfiguration, ConsumerConfiguration, RegexSubscriptionMode -from schema_test import * -from reader_test import * - from urllib.request import urlopen, Request TM = 10000 # Do not wait forever in tests @@ -1427,29 +1424,6 @@ def test_json_schema_encode(self): second_encode = schema.encode(record) self.assertEqual(first_encode, second_encode) - def test_configure_log_level(self): - client = pulsar.Client( - service_url="pulsar://localhost:6650", - logger=pulsar.ConsoleLogger(pulsar.LoggerLevel.Debug) - ) - - producer = client.create_producer( - topic='test_log_level' - ) - - producer.send(b'hello') - - def test_configure_log_to_file(self): - client = pulsar.Client( - service_url="pulsar://localhost:6650", - logger=pulsar.FileLogger(pulsar.LoggerLevel.Debug, 'test.log') - ) - - producer = client.create_producer( - topic='test_log_to_file' - ) - - producer.send(b'hello') def test_logger_thread_leaks(self): def _do_connect(close): diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index ea0b450..0d6fabf 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -24,6 +24,9 @@ ROOT_DIR=$(git rev-parse --show-toplevel) cd $ROOT_DIR/tests python3 custom_logger_test.py +python3 debug_logger_test.py python3 interrupted_test.py python3 pulsar_test.py +python3 schema_test.py +python3 reader_test.py python3 asyncio_test.py diff --git a/tests/schema_test.py b/tests/schema_test.py index acba03e..9d031d1 100755 --- a/tests/schema_test.py +++ b/tests/schema_test.py @@ -19,7 +19,6 @@ # import math -import logging import requests from typing import List from unittest import TestCase, main @@ -31,10 +30,6 @@ import json from fastavro.schema import load_schema -logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(levelname)-5s %(message)s') - - class ExampleRecord(Record): str_field = String() int_field = Integer()