Skip to content

Commit

Permalink
Merge pull request #21 from langchain-ai/async_streaming
Browse files Browse the repository at this point in the history
added async streaming for gemini-pro
  • Loading branch information
lkuligin authored Feb 21, 2024
2 parents aa6f9a9 + f34b794 commit 85886f1
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 4 deletions.
44 changes: 43 additions & 1 deletion libs/vertexai/langchain_google_vertexai/chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Optional, Union, cast
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union, cast

import proto # type: ignore[import-untyped]
from google.cloud.aiplatform_v1beta1.types.content import Part as GapicPart
Expand Down Expand Up @@ -539,6 +539,48 @@ def _stream(
),
)

async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
if not self._is_gemini_model:
raise NotImplementedError()
params = self._prepare_params(stop=stop, stream=True, **kwargs)
history_gemini = _parse_chat_history_gemini(
messages,
project=self.project,
convert_system_message_to_human=self.convert_system_message_to_human,
)
message = history_gemini.pop()
chat = self.client.start_chat(history=history_gemini)
raw_tools = params.pop("functions") if "functions" in params else None
tools = _format_tools_to_vertex_tool(raw_tools) if raw_tools else None
safety_settings = params.pop("safety_settings", None)
async for chunk in await chat.send_message_async(
message,
stream=True,
generation_config=params,
safety_settings=safety_settings,
tools=tools,
):
message = _parse_response_candidate(chunk.candidates[0])
if run_manager:
await run_manager.on_llm_new_token(message.content)
yield ChatGenerationChunk(
message=AIMessageChunk(
content=message.content,
additional_kwargs=message.additional_kwargs,
),
generation_info=get_generation_info(
chunk.candidates[0],
self._is_gemini_model,
usage_metadata=chunk.to_dict().get("usage_metadata"),
),
)

def _start_chat(
self, history: _ChatHistory, **kwargs: Any
) -> Union[ChatSession, CodeChatSession]:
Expand Down
40 changes: 37 additions & 3 deletions libs/vertexai/langchain_google_vertexai/llms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from concurrent.futures import Executor
from typing import Any, ClassVar, Dict, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, ClassVar, Dict, Iterator, List, Optional, Union

import vertexai # type: ignore[import-untyped]
from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -95,6 +95,7 @@ async def _acompletion_with_retry(
llm: VertexAI,
prompt: str,
is_gemini: bool = False,
stream: bool = False,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
Expand All @@ -105,17 +106,22 @@ async def _acompletion_with_retry(

@retry_decorator
async def _acompletion_with_retry_inner(
prompt: str, is_gemini: bool = False, **kwargs: Any
prompt: str, is_gemini: bool = False, stream: bool = False, **kwargs: Any
) -> Any:
if is_gemini:
return await llm.client.generate_content_async(
prompt,
generation_config=kwargs,
stream=stream,
safety_settings=kwargs.pop("safety_settings", None),
)
if stream:
raise ValueError("Async streaming is supported only for Gemini family!")
return await llm.client.predict_async(prompt, **kwargs)

return await _acompletion_with_retry_inner(prompt, is_gemini, **kwargs)
return await _acompletion_with_retry_inner(
prompt, is_gemini, stream=stream, **kwargs
)


class _VertexAIBase(BaseModel):
Expand Down Expand Up @@ -453,6 +459,34 @@ def _stream(
verbose=self.verbose,
)

async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
params = self._prepare_params(stop=stop, stream=True, **kwargs)
if not self._is_gemini_model:
raise ValueError("Async streaming is supported only for Gemini family!")
async for chunk in await _acompletion_with_retry(
self,
prompt,
stream=True,
is_gemini=self._is_gemini_model,
run_manager=run_manager,
**params,
):
usage_metadata = chunk.to_dict().get("usage_metadata")
chunk = self._candidate_to_generation(
chunk.candidates[0], stream=True, usage_metadata=usage_metadata
)
yield chunk
if run_manager:
await run_manager.on_llm_new_token(
chunk.text, chunk=chunk, verbose=self.verbose
)


class VertexAIModelGarden(_VertexAIBase, BaseLLM):
"""Large language models served from Vertex AI Model Garden."""
Expand Down
8 changes: 8 additions & 0 deletions libs/vertexai/tests/integration_tests/test_chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ def test_vertexai_stream(model_name: str) -> None:
assert isinstance(chunk, AIMessageChunk)


async def test_vertexai_astream() -> None:
model = ChatVertexAI(temperature=0, model_name="gemini-pro")
message = HumanMessage(content="Hello")

async for chunk in model.astream([message]):
assert isinstance(chunk, AIMessageChunk)


def test_vertexai_single_call_with_context() -> None:
model = ChatVertexAI()
raw_context = (
Expand Down
2 changes: 2 additions & 0 deletions libs/vertexai/tests/integration_tests/test_image_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
from google.cloud import storage # type: ignore[attr-defined]
from google.cloud.exceptions import NotFound

from langchain_google_vertexai._image_utils import ImageBytesLoader


@pytest.mark.skip("CI testing not set up")
def test_image_utils():
base64_image = (
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAA"
Expand Down
6 changes: 6 additions & 0 deletions libs/vertexai/tests/integration_tests/test_llms.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ async def test_vertex_consistency() -> None:
assert output.generations[0][0].text == async_output.generations[0][0].text


async def test_astream() -> None:
llm = VertexAI(temperature=0, model_name="gemini-pro")
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token, str)


@pytest.mark.skip("CI testing not set up")
@pytest.mark.parametrize(
"endpoint_os_variable_name,result_arg",
Expand Down

0 comments on commit 85886f1

Please sign in to comment.