diff --git a/python/src/uagents/config.py b/python/src/uagents/config.py index c4867098..c66ae42c 100644 --- a/python/src/uagents/config.py +++ b/python/src/uagents/config.py @@ -28,9 +28,11 @@ class AgentNetwork(Enum): AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET AGENTVERSE_URL = "https://agentverse.ai" +ALMANAC_API_URL = AGENTVERSE_URL + "/v1/almanac/" MAILBOX_POLL_INTERVAL_SECONDS = 1.0 DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30 +DEFAULT_SEARCH_LIMIT = 100 def parse_endpoint_config( diff --git a/python/src/uagents/context.py b/python/src/uagents/context.py index b88f779b..685b50a3 100644 --- a/python/src/uagents/context.py +++ b/python/src/uagents/context.py @@ -1,20 +1,36 @@ from __future__ import annotations + import asyncio import logging import uuid from dataclasses import dataclass from time import time -from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING +from typing import ( + Dict, + List, + Set, + Optional, + Callable, + Any, + Awaitable, + Type, + TYPE_CHECKING, +) import aiohttp +import requests from cosmpy.aerial.client import LedgerClient from cosmpy.aerial.wallet import LocalWallet -from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS +from uagents.config import ( + ALMANAC_API_URL, + DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + DEFAULT_SEARCH_LIMIT, +) from uagents.crypto import Identity from uagents.dispatch import JsonStr, dispatcher from uagents.envelope import Envelope -from uagents.models import Model, ErrorMessage +from uagents.models import ErrorMessage, Model from uagents.resolver import Resolver from uagents.storage import KeyValueStore @@ -97,6 +113,25 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]: return protocol_digest return None + def get_agents_by_protocol( + self, protocol_digest: str, limit: Optional[int] = None + ) -> List[str]: + if not isinstance(protocol_digest, str) or not protocol_digest.startswith( + "proto:" + ): + self.logger.error(f"Invalid protocol digest: {protocol_digest}") + raise ValueError("Invalid protocol digest") + response = requests.post( + url=ALMANAC_API_URL + "search", + json={"text": protocol_digest[6:]}, + timeout=DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + ) + if response.status_code == 200: + data = response.json() + agents = [agent["address"] for agent in data if agent["status"] == "local"] + return agents[:limit] + return [] + async def send( self, destination: str, @@ -112,6 +147,32 @@ async def send( timeout=timeout, ) + async def experimental_broadcast( + self, + destination_protocol: str, + message: Model, + limit: Optional[int] = DEFAULT_SEARCH_LIMIT, + timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS, + ): + agents = self.get_agents_by_protocol(destination_protocol, limit=limit) + if not agents: + self.logger.error(f"No active agents found for: {destination_protocol}") + return + schema_digest = Model.build_schema_digest(message) + futures = await asyncio.gather( + *[ + self.send_raw( + address, + message.json(), + schema_digest, + message_type=type(message), + timeout=timeout, + ) + for address in agents + ] + ) + self.logger.debug(f"Sent {len(futures)} messages") + async def send_raw( self, destination: str,