Skip to content

Commit

Permalink
feat: add broadcast to agents via protocol (#116)
Browse files Browse the repository at this point in the history
Co-authored-by: James Riehl <[email protected]>
  • Loading branch information
Archento and jrriehl authored Aug 24, 2023
1 parent ba7af5e commit b68ece2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
2 changes: 2 additions & 0 deletions python/src/uagents/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ class AgentNetwork(Enum):
AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET

AGENTVERSE_URL = "https://agentverse.ai"
ALMANAC_API_URL = AGENTVERSE_URL + "/v1/almanac/"
MAILBOX_POLL_INTERVAL_SECONDS = 1.0

DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30
DEFAULT_SEARCH_LIMIT = 100


def parse_endpoint_config(
Expand Down
67 changes: 64 additions & 3 deletions python/src/uagents/context.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
from __future__ import annotations

import asyncio
import logging
import uuid
from dataclasses import dataclass
from time import time
from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING
from typing import (
Dict,
List,
Set,
Optional,
Callable,
Any,
Awaitable,
Type,
TYPE_CHECKING,
)

import aiohttp
import requests
from cosmpy.aerial.client import LedgerClient
from cosmpy.aerial.wallet import LocalWallet

from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.config import (
ALMANAC_API_URL,
DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
DEFAULT_SEARCH_LIMIT,
)
from uagents.crypto import Identity
from uagents.dispatch import JsonStr, dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.models import ErrorMessage, Model
from uagents.resolver import Resolver
from uagents.storage import KeyValueStore

Expand Down Expand Up @@ -97,6 +113,25 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]:
return protocol_digest
return None

def get_agents_by_protocol(
self, protocol_digest: str, limit: Optional[int] = None
) -> List[str]:
if not isinstance(protocol_digest, str) or not protocol_digest.startswith(
"proto:"
):
self.logger.error(f"Invalid protocol digest: {protocol_digest}")
raise ValueError("Invalid protocol digest")
response = requests.post(
url=ALMANAC_API_URL + "search",
json={"text": protocol_digest[6:]},
timeout=DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
)
if response.status_code == 200:
data = response.json()
agents = [agent["address"] for agent in data if agent["status"] == "local"]
return agents[:limit]
return []

async def send(
self,
destination: str,
Expand All @@ -112,6 +147,32 @@ async def send(
timeout=timeout,
)

async def experimental_broadcast(
self,
destination_protocol: str,
message: Model,
limit: Optional[int] = DEFAULT_SEARCH_LIMIT,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
agents = self.get_agents_by_protocol(destination_protocol, limit=limit)
if not agents:
self.logger.error(f"No active agents found for: {destination_protocol}")
return
schema_digest = Model.build_schema_digest(message)
futures = await asyncio.gather(
*[
self.send_raw(
address,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)
for address in agents
]
)
self.logger.debug(f"Sent {len(futures)} messages")

async def send_raw(
self,
destination: str,
Expand Down

0 comments on commit b68ece2

Please sign in to comment.