Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add broadcast to agents via protocol #116

Merged
merged 9 commits into from
Aug 24, 2023
1 change: 1 addition & 0 deletions src/uagents/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AgentNetwork(Enum):
AGENT_NETWORK = AgentNetwork.FETCHAI_TESTNET

AGENTVERSE_URL = "https://agentverse.ai"
ALMANAC_API_URL = "https://staging.agentverse.ai/v1/almanac/"
jrriehl marked this conversation as resolved.
Show resolved Hide resolved
MAILBOX_POLL_INTERVAL_SECONDS = 1.0

DEFAULT_ENVELOPE_TIMEOUT_SECONDS = 30
Expand Down
49 changes: 45 additions & 4 deletions src/uagents/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations

import asyncio
import logging
import uuid
Expand All @@ -7,14 +8,15 @@
from typing import Dict, Set, Optional, Callable, Any, Awaitable, Type, TYPE_CHECKING

import aiohttp
import requests
from cosmpy.aerial.client import LedgerClient
from cosmpy.aerial.wallet import LocalWallet

from uagents.config import DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.config import ALMANAC_API_URL, DEFAULT_ENVELOPE_TIMEOUT_SECONDS
from uagents.crypto import Identity
from uagents.dispatch import JsonStr, dispatcher
from uagents.envelope import Envelope
from uagents.models import Model, ErrorMessage
from uagents.models import ErrorMessage, Model
from uagents.resolver import Resolver
from uagents.storage import KeyValueStore

Expand Down Expand Up @@ -97,22 +99,61 @@ def get_message_protocol(self, message_schema_digest) -> Optional[str]:
return protocol_digest
return None

def get_agents_by_protocol(self, protocol_digest: str) -> list:
Archento marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(protocol_digest, str) or not protocol_digest.startswith(
"proto:"
):
self.logger.error(f"Invalid protocol digest: {protocol_digest}")
return []
Archento marked this conversation as resolved.
Show resolved Hide resolved
response = requests.post(
url=ALMANAC_API_URL + "search",
json={"text": protocol_digest[6:]},
timeout=DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
)
if response.status_code == 200:
data = response.json()
return [agent["address"] for agent in data if agent["status"] == "local"]
return []

async def send(
self,
destination: str,
message: Model,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
schema_digest = Model.build_schema_digest(message)
await self.send_raw(
await self._send_raw(
jrriehl marked this conversation as resolved.
Show resolved Hide resolved
destination,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)

async def send_raw(
async def broadcast(
self,
destination: str,
message: Model,
timeout: Optional[int] = DEFAULT_ENVELOPE_TIMEOUT_SECONDS,
):
Archento marked this conversation as resolved.
Show resolved Hide resolved
agents = self.get_agents_by_protocol(destination)
if not agents:
self.logger.error(f"No active agents found for: {destination}")
return
schema_digest = Model.build_schema_digest(message)
for address in agents:
try:
await self._send_raw(
address,
message.json(),
schema_digest,
message_type=type(message),
timeout=timeout,
)
except Exception as e:
self.logger.error(f"Error sending message to {address}: {e}")

async def _send_raw(
self,
destination: str,
json_message: JsonStr,
Expand Down