Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add LocalRuntime and rename EventStreamRuntime to DockerRuntime #5284

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
149f60a
feat: Add LocalRuntime and rename EventStreamRuntime to LocalDockerRu…
openhands-agent Nov 26, 2024
39911f4
test: Add LocalRuntime to test configuration
openhands-agent Nov 26, 2024
1c9b6ca
fix: Fix mypy errors in LocalRuntime
openhands-agent Nov 26, 2024
52d6c60
Fix pr #5284: feat: Add LocalRuntime and rename EventStreamRuntime to…
openhands-agent Nov 26, 2024
000b749
Merge branch 'main' into add-local-runtime
enyst Nov 29, 2024
e1ac042
Fix pr #5284: feat: Add LocalRuntime and rename EventStreamRuntime to…
openhands-agent Nov 29, 2024
5ce262b
Delete openhands/runtime/impl/eventstream/__init__.py
enyst Nov 29, 2024
24f29c6
Delete openhands/runtime/impl/eventstream/eventstream_runtime.py
enyst Nov 29, 2024
910cc68
Update openhands/runtime/impl/modal/modal_runtime.py
enyst Nov 29, 2024
60839c4
Update openhands/runtime/impl/runloop/runloop_runtime.py
enyst Nov 29, 2024
c6aae8b
Update tests/runtime/conftest.py
enyst Nov 29, 2024
7fc2c87
Update tests/runtime/conftest.py
enyst Nov 29, 2024
3dec4cd
Update tests/runtime/conftest.py
enyst Nov 29, 2024
18987b8
Fix pr #5284: feat: Add LocalRuntime and rename EventStreamRuntime to…
openhands-agent Nov 29, 2024
0b1e1aa
Update openhands/runtime/__init__.py
enyst Nov 30, 2024
a136ab1
Update openhands/runtime/__init__.py
enyst Nov 30, 2024
af56cea
Merge branch 'main' into add-local-runtime
enyst Nov 30, 2024
e3560ae
Update openhands/runtime/README.md
xingyaoww Nov 30, 2024
598112c
Update openhands/runtime/README.md
xingyaoww Nov 30, 2024
f2c2311
Fix pr #5284: feat: Add LocalRuntime and rename EventStreamRuntime to…
openhands-agent Nov 30, 2024
fe1c50c
Merge branch 'main' into add-local-runtime
enyst Dec 1, 2024
724c1b0
Fix pr #5284: feat: Add LocalRuntime and rename EventStreamRuntime to…
openhands-agent Dec 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions openhands/runtime/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Introduction

The OpenHands Runtime folder contains the core components responsible for executing actions and managing the runtime environment for the OpenHands project. This README provides an overview of the main components and their interactions.
You can learn more about how the runtime works in the [EventStream Runtime](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation.
You can learn more about how the runtime works in the [Runtime Architecture](https://docs.all-hands.dev/modules/usage/architecture/runtime) documentation.

## Main Components

Expand Down Expand Up @@ -76,9 +76,9 @@ Key features of the `ActionExecutor` class:

## Runtime Types

### EventStream Runtime
### Local Docker Runtime
xingyaoww marked this conversation as resolved.
Show resolved Hide resolved

The EventStream Runtime is designed for local execution using Docker containers:
The Local Docker Runtime (formerly EventStream Runtime) is designed for local execution using Docker containers:
xingyaoww marked this conversation as resolved.
Show resolved Hide resolved

- Creates and manages a Docker container for each session
- Executes actions within the container
Expand All @@ -89,9 +89,27 @@ Key features:
- Real-time logging and debugging capabilities
- Direct access to the local file system
- Faster execution due to local resources
- Container isolation for security

This is the default runtime used within OpenHands.

### Local Runtime

The Local Runtime is designed for direct execution on the local machine:

- Runs the action_execution_server directly on the host
- No Docker container overhead
- Direct access to local system resources
- Ideal for development and testing when Docker is not available or desired

Key features:
- Minimal setup required
- Direct access to local resources
- No container overhead
- Fastest execution speed

Important: This runtime provides no isolation as it runs directly on the host machine. All actions are executed with the same permissions as the user running OpenHands. For secure execution with proper isolation, use the Local Docker Runtime instead.

### Remote Runtime

The Remote Runtime is designed for execution in a remote environment:
Expand Down
4 changes: 4 additions & 0 deletions openhands/runtime/impl/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from openhands.runtime.impl.docker import DockerRuntime
from openhands.runtime.impl.local import LocalRuntime

__all__ = ["DockerRuntime", "LocalRuntime"]
3 changes: 3 additions & 0 deletions openhands/runtime/impl/docker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from openhands.runtime.impl.docker.docker_runtime import DockerRuntime

__all__ = ["DockerRuntime"]
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
RuntimeNotFoundError,
)
from openhands.runtime.builder import DockerRuntimeBuilder
from openhands.runtime.impl.eventstream.containers import remove_all_containers
from openhands.runtime.impl.docker.containers import remove_all_containers
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.log_buffer import LogBuffer
from openhands.runtime.utils.request import send_request
from openhands.runtime.utils.runtime_build import build_runtime_image
from openhands.utils.async_utils import call_sync_from_async
Expand All @@ -57,72 +58,9 @@ def remove_all_runtime_containers():

atexit.register(remove_all_runtime_containers)


class LogBuffer:
"""Synchronous buffer for Docker container logs.

This class provides a thread-safe way to collect, store, and retrieve logs
from a Docker container. It uses a list to store log lines and provides methods
for appending, retrieving, and clearing logs.
"""

def __init__(self, container: docker.models.containers.Container, logFn: Callable):
self.init_msg = 'Runtime client initialized.'

self.buffer: list[str] = []
self.lock = threading.Lock()
self._stop_event = threading.Event()
self.log_generator = container.logs(stream=True, follow=True)
self.log_stream_thread = threading.Thread(target=self.stream_logs)
self.log_stream_thread.daemon = True
self.log_stream_thread.start()
self.log = logFn

def append(self, log_line: str):
with self.lock:
self.buffer.append(log_line)

def get_and_clear(self) -> list[str]:
with self.lock:
logs = list(self.buffer)
self.buffer.clear()
return logs

def stream_logs(self):
"""Stream logs from the Docker container in a separate thread.

This method runs in its own thread to handle the blocking
operation of reading log lines from the Docker SDK's synchronous generator.
"""
try:
for log_line in self.log_generator:
if self._stop_event.is_set():
break
if log_line:
decoded_line = log_line.decode('utf-8').rstrip()
self.append(decoded_line)
except Exception as e:
self.log('error', f'Error streaming docker logs: {e}')

def __del__(self):
if self.log_stream_thread.is_alive():
self.log(
'warn',
"LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.",
)
self.close(timeout=5)

def close(self, timeout: float = 5.0):
self._stop_event.set()
self.log_stream_thread.join(timeout)
# Close the log generator to release the file descriptor
if hasattr(self.log_generator, 'close'):
self.log_generator.close()


class EventStreamRuntime(Runtime):
"""This runtime will subscribe the event stream.
When receive an event, it will send the event to runtime-client which run inside the docker environment.
class DockerRuntime(Runtime):
"""This runtime runs the action_execution_server inside a Docker container.
When receiving an event, it will send the event to the server via HTTP.

Args:
config (AppConfig): The application configuration.
Expand All @@ -133,7 +71,7 @@ class EventStreamRuntime(Runtime):
"""

# Need to provide this method to allow inheritors to init the Runtime
# without initting the EventStreamRuntime.
# without initting the LocalDockerRuntime.
def init_base_runtime(
self,
config: AppConfig,
Expand Down
3 changes: 3 additions & 0 deletions openhands/runtime/impl/local/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from openhands.runtime.impl.local.local_runtime import LocalRuntime

__all__ = ["LocalRuntime"]
192 changes: 192 additions & 0 deletions openhands/runtime/impl/local/local_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""
This runtime runs the action_execution_server directly on the local machine without Docker.
"""

import os
import subprocess
import threading
from functools import lru_cache
from pathlib import Path
from typing import Callable, Optional

import requests
import tenacity
from requests import Response

from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.events.action import Action
from openhands.events.observation import ErrorObservation, Observation
from openhands.events.serialization import event_to_dict, observation_from_dict
from openhands.runtime.base import Runtime, RuntimeDisconnectedError
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils import find_available_tcp_port
from openhands.runtime.utils.request import send_request
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.tenacity_stop import stop_if_should_exit


class LocalRuntime(Runtime):
"""This runtime will run the action_execution_server directly on the local machine.
When receiving an event, it will send the event to the server via HTTP.

Args:
config (AppConfig): The application configuration.
event_stream (EventStream): The event stream to subscribe to.
sid (str, optional): The session ID. Defaults to 'default'.
plugins (list[PluginRequirement] | None, optional): List of plugin requirements. Defaults to None.
env_vars (dict[str, str] | None, optional): Environment variables to set. Defaults to None.
"""

def __init__(
self,
config: AppConfig,
event_stream: EventStream,
sid: str = "default",
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_callback: Callable | None = None,
attach_to_existing: bool = False,
headless_mode: bool = True,
):
self.config = config
self._host_port = 30000 # initial dummy value
self._runtime_initialized: bool = False
self.api_url = f"{self.config.sandbox.local_runtime_url}:{self._host_port}"
self.session = requests.Session()
self.status_callback = status_callback
self.server_process: Optional[subprocess.Popen[str]] = None
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time

super().__init__(
config,
event_stream,
sid,
plugins,
env_vars,
status_callback,
attach_to_existing,
headless_mode,
)

async def connect(self):
"""Start the action_execution_server on the local machine."""
self.send_status_message("STATUS$STARTING_RUNTIME")

self._host_port = self._find_available_port()
self.api_url = f"{self.config.sandbox.local_runtime_url}:{self._host_port}"

plugin_arg = ""
if self.plugins is not None and len(self.plugins) > 0:
plugin_arg = f"--plugins {' '.join([plugin.name for plugin in self.plugins])} "

if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = f"--browsergym-eval-env {self.config.sandbox.browsergym_eval_env}"
else:
browsergym_arg = ""

# Start the server process
cmd = (
f"python -u -m openhands.runtime.action_execution_server {self._host_port} "
f"--working-dir {self.config.workspace_mount_path_in_sandbox} "
f"{plugin_arg}"
f"--username {'openhands' if self.config.run_as_openhands else 'root'} "
f"--user-id {self.config.sandbox.user_id} "
f"{browsergym_arg}"
)

self.log("debug", f"Starting server with command: {cmd}")
self.server_process = subprocess.Popen(
cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)

# Start a thread to read and log server output
def log_output():
if self.server_process and self.server_process.stdout:
while True:
line = self.server_process.stdout.readline()
if not line:
break
self.log("debug", f"Server: {line.strip()}")

log_thread = threading.Thread(target=log_output, daemon=True)
log_thread.start()

self.log("info", f"Waiting for server to become ready at {self.api_url}...")
self.send_status_message("STATUS$WAITING_FOR_CLIENT")

await call_sync_from_async(self._wait_until_alive)

if not self.attach_to_existing:
await call_sync_from_async(self.setup_initial_env)

self.log(
"debug",
f"Server initialized with plugins: {[plugin.name for plugin in self.plugins]}",
)
if not self.attach_to_existing:
self.send_status_message(" ")
self._runtime_initialized = True

def _find_available_port(self) -> int:
"""Find an available port to use for the server."""
return find_available_tcp_port()

@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=0.1, min=0.1, max=1),
stop=stop_if_should_exit,
before_sleep=lambda retry_state: logger.debug(
f"Waiting for server to be ready... (attempt {retry_state.attempt_number})"
),
)
def _wait_until_alive(self):
"""Wait until the server is ready to accept requests."""
if self.server_process and self.server_process.poll() is not None:
raise RuntimeError("Server process died")

try:
response = self.session.get(f"{self.api_url}/health")
response.raise_for_status()
return True
except Exception as e:
self.log("debug", f"Server not ready yet: {e}")
raise

async def execute_action(self, action: Action) -> Observation:
"""Execute an action by sending it to the server."""
if not self._runtime_initialized:
return ErrorObservation("Runtime not initialized")

if self.server_process is None or self.server_process.poll() is not None:
return ErrorObservation("Server process died")

with self.action_semaphore:
try:
response = await call_sync_from_async(
lambda: self.session.post(
f"{self.api_url}/action",
json={"action": event_to_dict(action)},
)
)
return observation_from_dict(response.json())
except requests.exceptions.ConnectionError:
raise RuntimeDisconnectedError("Server connection lost")
except requests.exceptions.RequestException as e:
return ErrorObservation(f"Failed to execute action: {e}")

def close(self):
"""Stop the server process."""
if self.server_process:
self.server_process.terminate()
try:
self.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_process.kill()
self.server_process = None

super().close()
6 changes: 2 additions & 4 deletions openhands/runtime/impl/modal/modal_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@

from openhands.core.config import AppConfig
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogBuffer,
)
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
enyst marked this conversation as resolved.
Show resolved Hide resolved
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.log_buffer import LogBuffer
from openhands.runtime.utils.runtime_build import (
BuildFromImageType,
prep_build_folder,
Expand Down
6 changes: 2 additions & 4 deletions openhands/runtime/impl/runloop/runloop_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
from openhands.core.config import AppConfig
from openhands.core.logger import openhands_logger as logger
from openhands.events import EventStream
from openhands.runtime.impl.eventstream.eventstream_runtime import (
EventStreamRuntime,
LogBuffer,
)
from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
enyst marked this conversation as resolved.
Show resolved Hide resolved
from openhands.runtime.plugins import PluginRequirement
from openhands.runtime.utils.command import get_remote_startup_command
from openhands.runtime.utils.log_buffer import LogBuffer
from openhands.runtime.utils.request import send_request
from openhands.utils.tenacity_stop import stop_if_should_exit

Expand Down
Loading
Loading