Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added async streaming for gemini-pro #21

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion libs/vertexai/langchain_google_vertexai/chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Optional, Union, cast
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union, cast

import proto # type: ignore[import-untyped]
from google.cloud.aiplatform_v1beta1.types.content import Part as GapicPart
Expand Down Expand Up @@ -539,6 +539,48 @@ def _stream(
),
)

async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
if not self._is_gemini_model:
raise NotImplementedError()
params = self._prepare_params(stop=stop, stream=True, **kwargs)
history_gemini = _parse_chat_history_gemini(
messages,
project=self.project,
convert_system_message_to_human=self.convert_system_message_to_human,
)
message = history_gemini.pop()
chat = self.client.start_chat(history=history_gemini)
raw_tools = params.pop("functions") if "functions" in params else None
tools = _format_tools_to_vertex_tool(raw_tools) if raw_tools else None
safety_settings = params.pop("safety_settings", None)
async for chunk in await chat.send_message_async(
message,
stream=True,
generation_config=params,
safety_settings=safety_settings,
tools=tools,
):
message = _parse_response_candidate(chunk.candidates[0])
if run_manager:
await run_manager.on_llm_new_token(message.content)
yield ChatGenerationChunk(
message=AIMessageChunk(
content=message.content,
additional_kwargs=message.additional_kwargs,
),
generation_info=get_generation_info(
chunk.candidates[0],
self._is_gemini_model,
usage_metadata=chunk.to_dict().get("usage_metadata"),
),
)

def _start_chat(
self, history: _ChatHistory, **kwargs: Any
) -> Union[ChatSession, CodeChatSession]:
Expand Down
40 changes: 37 additions & 3 deletions libs/vertexai/langchain_google_vertexai/llms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from concurrent.futures import Executor
from typing import Any, ClassVar, Dict, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, ClassVar, Dict, Iterator, List, Optional, Union

import vertexai # type: ignore[import-untyped]
from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -95,6 +95,7 @@ async def _acompletion_with_retry(
llm: VertexAI,
prompt: str,
is_gemini: bool = False,
stream: bool = False,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
Expand All @@ -105,17 +106,22 @@ async def _acompletion_with_retry(

@retry_decorator
async def _acompletion_with_retry_inner(
prompt: str, is_gemini: bool = False, **kwargs: Any
prompt: str, is_gemini: bool = False, stream: bool = False, **kwargs: Any
) -> Any:
if is_gemini:
return await llm.client.generate_content_async(
prompt,
generation_config=kwargs,
stream=stream,
safety_settings=kwargs.pop("safety_settings", None),
)
if stream:
raise ValueError("Async streaming is supported only for Gemini family!")
return await llm.client.predict_async(prompt, **kwargs)

return await _acompletion_with_retry_inner(prompt, is_gemini, **kwargs)
return await _acompletion_with_retry_inner(
prompt, is_gemini, stream=stream, **kwargs
)


class _VertexAIBase(BaseModel):
Expand Down Expand Up @@ -453,6 +459,34 @@ def _stream(
verbose=self.verbose,
)

async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
params = self._prepare_params(stop=stop, stream=True, **kwargs)
if not self._is_gemini_model:
raise ValueError("Async streaming is supported only for Gemini family!")
async for chunk in await _acompletion_with_retry(
self,
prompt,
stream=True,
is_gemini=self._is_gemini_model,
run_manager=run_manager,
**params,
):
usage_metadata = chunk.to_dict().get("usage_metadata")
chunk = self._candidate_to_generation(
chunk.candidates[0], stream=True, usage_metadata=usage_metadata
)
yield chunk
if run_manager:
await run_manager.on_llm_new_token(
chunk.text, chunk=chunk, verbose=self.verbose
)


class VertexAIModelGarden(_VertexAIBase, BaseLLM):
"""Large language models served from Vertex AI Model Garden."""
Expand Down
8 changes: 8 additions & 0 deletions libs/vertexai/tests/integration_tests/test_chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ def test_vertexai_stream(model_name: str) -> None:
assert isinstance(chunk, AIMessageChunk)


async def test_vertexai_astream() -> None:
model = ChatVertexAI(temperature=0, model_name="gemini-pro")
message = HumanMessage(content="Hello")

async for chunk in model.astream([message]):
assert isinstance(chunk, AIMessageChunk)


def test_vertexai_single_call_with_context() -> None:
model = ChatVertexAI()
raw_context = (
Expand Down
2 changes: 2 additions & 0 deletions libs/vertexai/tests/integration_tests/test_image_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
from google.cloud import storage # type: ignore[attr-defined]
from google.cloud.exceptions import NotFound

from langchain_google_vertexai._image_utils import ImageBytesLoader


@pytest.mark.skip("CI testing not set up")
def test_image_utils():
base64_image = (
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAA"
Expand Down
6 changes: 6 additions & 0 deletions libs/vertexai/tests/integration_tests/test_llms.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ async def test_vertex_consistency() -> None:
assert output.generations[0][0].text == async_output.generations[0][0].text


async def test_astream() -> None:
llm = VertexAI(temperature=0, model_name="gemini-pro")
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token, str)


@pytest.mark.skip("CI testing not set up")
@pytest.mark.parametrize(
"endpoint_os_variable_name,result_arg",
Expand Down
Loading