Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add broadcast to agents via protocol #116

Merged
merged 9 commits into from
Aug 24, 2023
2 changes: 2 additions & 0 deletions python/src/uagents/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ class AgentNetwork(Enum):
AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET

AGENTVERSE_URL = "https://agentverse.ai"
ALMANAC_API_URL = AGENTVERSE_URL + "/v1/almanac/"
MAILBOX_POLL_INTERVAL_SECONDS = 1.0

DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30
DEFAULT_SEARCH_LIMIT = 100


def parse_endpoint_config(
Expand Down
67 changes: 64 additions & 3 deletions python/src/uagents/context.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
from __future__ import annotations

import asyncio
import logging
import uuid
from dataclasses import dataclass
from time import time
from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING
from typing import (
Dict,
List,
Set,
Optional,
Callable,
Any,
Awaitable,
Type,
TYPE_CHECKING,
)

import aiohttp
import requests
from cosmpy.aerial.client import LedgerClient
from cosmpy.aerial.wallet import LocalWallet

from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.config import (
ALMANAC_API_URL,
DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
DEFAULT_SEARCH_LIMIT,
)
from uagents.crypto import Identity
from uagents.dispatch import JsonStr, dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.models import ErrorMessage, Model
from uagents.resolver import Resolver
from uagents.storage import KeyValueStore

Expand Down Expand Up @@ -97,6 +113,25 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]:
return protocol_digest
return None

def get_agents_by_protocol(
self, protocol_digest: str, limit: Optional[int] = None
) -> List[str]:
if not isinstance(protocol_digest, str) or not protocol_digest.startswith(
"proto:"
):
self.logger.error(f"Invalid protocol digest: {protocol_digest}")
raise ValueError("Invalid protocol digest")
response = requests.post(
url=ALMANAC_API_URL + "search",
json={"text": protocol_digest[6:]},
timeout=DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
)
if response.status_code == 200:
data = response.json()
agents = [agent["address"] for agent in data if agent["status"] == "local"]
return agents[:limit]
return []

async def send(
self,
destination: str,
Expand All @@ -112,6 +147,32 @@ async def send(
timeout=timeout,
)

async def experimental_broadcast(
self,
destination_protocol: str,
message: Model,
limit: Optional[int] = DEFAULT_SEARCH_LIMIT,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
agents = self.get_agents_by_protocol(destination_protocol, limit=limit)
if not agents:
self.logger.error(f"No active agents found for: {destination_protocol}")
return
schema_digest = Model.build_schema_digest(message)
futures = await asyncio.gather(
*[
self.send_raw(
address,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)
for address in agents
]
)
self.logger.debug(f"Sent {len(futures)} messages")

async def send_raw(
self,
destination: str,
Expand Down