diff --git a/opensearchpy/_async/helpers/test.py b/opensearchpy/_async/helpers/test.py index 9f9cbc4d..ad13a0c8 100644 --- a/opensearchpy/_async/helpers/test.py +++ b/opensearchpy/_async/helpers/test.py @@ -6,9 +6,8 @@ # # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. - +import asyncio import os -import time from typing import Any from unittest import SkipTest @@ -37,7 +36,7 @@ async def get_test_client(nowait: bool = False, **kwargs: Any) -> Any: await client.cluster.health(wait_for_status="yellow") return client except ConnectionError: - time.sleep(0.1) + await asyncio.sleep(0.1) else: # timeout raise SkipTest("OpenSearch failed to start.") diff --git a/opensearchpy/_async/http_aiohttp.py b/opensearchpy/_async/http_aiohttp.py index 1e3da465..ed1d26e7 100644 --- a/opensearchpy/_async/http_aiohttp.py +++ b/opensearchpy/_async/http_aiohttp.py @@ -137,6 +137,7 @@ def __init__( url_prefix=url_prefix, timeout=timeout, use_ssl=use_ssl, + maxsize=maxsize, headers=headers, http_compress=http_compress, opaque_id=opaque_id, @@ -219,6 +220,10 @@ def __init__( self.loop = loop self.session = None + # Align with Sync Interface + if "pool_maxsize" in kwargs: + maxsize = kwargs.pop("pool_maxsize") + # Parameters for creating an aiohttp.ClientSession later. self._limit = maxsize self._http_auth = http_auth diff --git a/opensearchpy/_async/transport.py b/opensearchpy/_async/transport.py index 2d631ee7..70ad43d8 100644 --- a/opensearchpy/_async/transport.py +++ b/opensearchpy/_async/transport.py @@ -74,6 +74,7 @@ def __init__( serializers: Any = None, default_mimetype: str = "application/json", max_retries: int = 3, + pool_maxsize: Optional[int] = None, retry_on_status: Any = (502, 503, 504), retry_on_timeout: bool = False, send_get_body_as: str = "GET", @@ -102,6 +103,8 @@ def __init__( :arg default_mimetype: when no mimetype is specified by the server response assume this mimetype, defaults to `'application/json'` :arg max_retries: maximum number of retries before an exception is propagated + :arg pool_maxsize: Maximum connection pool size used by pool-manager + For custom connection-pooling on current session :arg retry_on_status: set of HTTP status codes on which we should retry on a different node. defaults to ``(502, 503, 504)`` :arg retry_on_timeout: should timeout trigger a retry on different @@ -134,6 +137,7 @@ def __init__( serializers=serializers, default_mimetype=default_mimetype, max_retries=max_retries, + pool_maxsize=pool_maxsize, retry_on_status=retry_on_status, retry_on_timeout=retry_on_timeout, send_get_body_as=send_get_body_as, diff --git a/opensearchpy/connection/http_async.py b/opensearchpy/connection/http_async.py index d0490878..9add4785 100644 --- a/opensearchpy/connection/http_async.py +++ b/opensearchpy/connection/http_async.py @@ -142,6 +142,10 @@ def __init__( self.loop = loop self.session = None + # Align with Sync Interface + if "pool_maxsize" in kwargs: + maxsize = kwargs.pop("pool_maxsize") + # Parameters for creating an aiohttp.ClientSession later. self._limit = maxsize self._http_auth = http_auth diff --git a/test_opensearchpy/test_async/test_aiohttp.py b/test_opensearchpy/test_async/test_aiohttp.py new file mode 100644 index 00000000..0fb4b5b0 --- /dev/null +++ b/test_opensearchpy/test_async/test_aiohttp.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +import os +from typing import Type + +import pytest +from pytest import MarkDecorator + +from opensearchpy import ( + AIOHttpConnection, + AsyncConnection, + AsyncHttpConnection, + AsyncOpenSearch, +) +from opensearchpy._async.helpers.test import get_test_client + +pytestmark: MarkDecorator = pytest.mark.asyncio + + +class TestAIOHttp: + + def test_default(self) -> None: + client = AsyncOpenSearch() + assert client.transport.connection_class == AIOHttpConnection + assert client.transport.pool_maxsize is None + + def test_connection_class(self) -> None: + client = AsyncOpenSearch(connection_class=AsyncHttpConnection) + assert client.transport.connection_class == AsyncHttpConnection + assert client.transport.pool_maxsize is None + + def test_pool_maxsize(self) -> None: + client = AsyncOpenSearch(connection_class=AsyncHttpConnection, pool_maxsize=42) + assert client.transport.connection_class == AsyncHttpConnection + assert client.transport.pool_maxsize == 42 + + @pytest.mark.parametrize( # type: ignore[misc] + "connection_class", [AIOHttpConnection, AsyncHttpConnection] + ) + async def test_default_limit(self, connection_class: Type[AsyncConnection]) -> None: + client = await get_test_client( + connection_class=connection_class, + verify_certs=False, + http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")), + ) + assert isinstance( + client.transport.connection_pool.connections[0], connection_class + ) + assert ( + client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined] + == 10 + ) + + @pytest.mark.parametrize( # type: ignore[misc] + "connection_class", [AIOHttpConnection, AsyncHttpConnection] + ) + async def test_custom_limit(self, connection_class: Type[AsyncConnection]) -> None: + client = await get_test_client( + connection_class=connection_class, + verify_certs=False, + pool_maxsize=42, + http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")), + ) + assert isinstance( + client.transport.connection_pool.connections[0], connection_class + ) + assert ( + client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined] + == 42 + )