Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max timeout limit #134

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
"dev": {
"connection/valory/websocket_client/0.1.0": "bafybeia3lo6hyhom2ht56gzgkv4gdsul5s4qoelljyks2wrcfvx5dtvsiq",
"skill/valory/contract_subscription/0.1.0": "bafybeigl52zsiduaabccpeqtii2f7uorzdnb6xgpoimo3mudvbudifmwtu",
"agent/valory/mech/0.1.0": "bafybeicijidwsryabdhaof3t3xir5scducd6c2i53pwx7mk4b22e7kezca",
"agent/valory/mech/0.1.0": "bafybeigqhambbevdcqvaadx6mqmjnfnle434nucd6322ystalpgdgdrzre",
"skill/valory/mech_abci/0.1.0": "bafybeih3gz2kphxkshag2x7zk53m3zqq6djcalt6surtquyhamnok5mnwy",
"contract/valory/agent_mech/0.1.0": "bafybeidaavljfscd5s6lp3d3e7ulsshp6gayo6b56sl5h4leclujc7spui",
"service/valory/mech/0.1.0": "bafybeiaqbhr44lcvmmqdhgwib4sizkjqnyhmpbpjxwam7tg4rwobjtorgu",
"service/valory/mech/0.1.0": "bafybeibaofedo7nudeihnp7ppzcmdzuohavjvoe7fnnlvpinccrbphviem",
"protocol/valory/acn_data_share/0.1.0": "bafybeih5ydonnvrwvy2ygfqgfabkr47s4yw3uqxztmwyfprulwfsoe7ipq",
"protocol/valory/default/1.0.0": "bafybeifqcqy5hfbnd7fjv4mqdjrtujh2vx3p2xhe33y67zoxa6ph7wdpaq",
"skill/valory/task_submission_abci/0.1.0": "bafybeifo5acrwmojpepocaphucesn4w5poxdzcjfovmcyij2ptcaozbgbe",
"skill/valory/task_execution/0.1.0": "bafybeibnudzqxomqsadcq3wqv3b6nab3aikbm5txq64dp4ftgpn3u2ogpi",
"skill/valory/task_execution/0.1.0": "bafybeibn5v2sdub3h6ube3wykwda7hyb2g3b52drc7anea2gxsc3awu5ke",
"skill/valory/reset_pause_abci/0.1.0": "bafybeify27qvpxb2pkr7fmgf3vvxnolorunl7losg55mmxgrobxp5ny5te",
"skill/valory/registration_abci/0.1.0": "bafybeiafeoigktmz6u4g2btrlp5ssutclaek6no4sz2zzc67fbfotsnn3a",
"skill/valory/abstract_round_abci/0.1.0": "bafybeih7nivpffgvqvribkxxejuhnkrhypadoqrfd2ca5xhc2ujhyuualu",
Expand Down
3 changes: 2 additions & 1 deletion packages/valory/agents/mech/aea-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ skills:
- valory/abstract_round_abci:0.1.0:bafybeih7nivpffgvqvribkxxejuhnkrhypadoqrfd2ca5xhc2ujhyuualu
- valory/contract_subscription:0.1.0:bafybeigl52zsiduaabccpeqtii2f7uorzdnb6xgpoimo3mudvbudifmwtu
- valory/mech_abci:0.1.0:bafybeih3gz2kphxkshag2x7zk53m3zqq6djcalt6surtquyhamnok5mnwy
- valory/task_execution:0.1.0:bafybeibnudzqxomqsadcq3wqv3b6nab3aikbm5txq64dp4ftgpn3u2ogpi
- valory/task_execution:0.1.0:bafybeibn5v2sdub3h6ube3wykwda7hyb2g3b52drc7anea2gxsc3awu5ke
- valory/registration_abci:0.1.0:bafybeiafeoigktmz6u4g2btrlp5ssutclaek6no4sz2zzc67fbfotsnn3a
- valory/reset_pause_abci:0.1.0:bafybeify27qvpxb2pkr7fmgf3vvxnolorunl7losg55mmxgrobxp5ny5te
- valory/task_submission_abci:0.1.0:bafybeifo5acrwmojpepocaphucesn4w5poxdzcjfovmcyij2ptcaozbgbe
Expand Down Expand Up @@ -191,6 +191,7 @@ models:
agent_index: ${int:0}
num_agents: ${int:4}
from_block_range: ${int:50000}
timeout_limit: ${int:3}
---
public_id: valory/ledger:0.19.0
type: connection
Expand Down
22 changes: 13 additions & 9 deletions packages/valory/services/mech/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeif7ia4jdlazy6745ke2k2x5yoqlwsgwr6sbztbgqtwvs3ndm2p7ba
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeicijidwsryabdhaof3t3xir5scducd6c2i53pwx7mk4b22e7kezca
agent: valory/mech:0.1.0:bafybeigqhambbevdcqvaadx6mqmjnfnle434nucd6322ystalpgdgdrzre
number_of_agents: 4
deployment:
agent:
Expand Down Expand Up @@ -43,7 +43,7 @@ type: skill
tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_0:str:node0:26656}
termination_sleep: ${TERMINATION_SLEEP:int:900}
use_termination: ${USE_TERMINATION:bool:false}
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
reset_period_count: ${RESET_PERIOD_COUNT:int:1000}
use_slashing: ${USE_SLASHING:bool:false}
slash_cooldown_hours: ${SLASH_COOLDOWN_HOURS:int:3}
Expand All @@ -70,7 +70,7 @@ type: skill
tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_0:str:node0:26656}
termination_sleep: ${TERMINATION_SLEEP:int:900}
use_termination: ${USE_TERMINATION:bool:false}
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
reset_period_count: ${RESET_PERIOD_COUNT:int:1000}
use_slashing: ${USE_SLASHING:bool:false}
slash_cooldown_hours: ${SLASH_COOLDOWN_HOURS:int:3}
Expand All @@ -97,7 +97,7 @@ type: skill
tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_0:str:node0:26656}
termination_sleep: ${TERMINATION_SLEEP:int:900}
use_termination: ${USE_TERMINATION:bool:false}
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
reset_period_count: ${RESET_PERIOD_COUNT:int:1000}
use_slashing: ${USE_SLASHING:bool:false}
slash_cooldown_hours: ${SLASH_COOLDOWN_HOURS:int:3}
Expand All @@ -124,7 +124,7 @@ type: skill
tendermint_p2p_url: ${TM_P2P_ENDPOINT_NODE_0:str:node0:26656}
termination_sleep: ${TERMINATION_SLEEP:int:900}
use_termination: ${USE_TERMINATION:bool:false}
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
reset_period_count: ${RESET_PERIOD_COUNT:int:1000}
use_slashing: ${USE_SLASHING:bool:false}
slash_cooldown_hours: ${SLASH_COOLDOWN_HOURS:int:3}
Expand All @@ -141,46 +141,50 @@ type: skill
models:
params:
args:
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
task_deadline: ${TASK_DEADLINE:float:240.0}
file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]}
api_keys_json: ${API_KEYS:list:[]}
polling_interval: ${POLLING_INTERVAL:float:30.0}
agent_index: ${AGENT_INDEX_0:int:0}
num_agents: ${NUM_AGENTS:int:4}
timeout_limit: ${TIMEOUT_LIMIT:int:3}
1:
models:
params:
args:
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
task_deadline: ${TASK_DEADLINE:float:240.0}
file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]}
api_keys_json: ${API_KEYS:list:[]}
polling_interval: ${POLLING_INTERVAL:float:30.0}
agent_index: ${AGENT_INDEX_1:int:1}
num_agents: ${NUM_AGENTS:int:4}
timeout_limit: ${TIMEOUT_LIMIT:int:3}
2:
models:
params:
args:
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
task_deadline: ${TASK_DEADLINE:float:240.0}
file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]}
api_keys_json: ${API_KEYS:list:[]}
polling_interval: ${POLLING_INTERVAL:float:30.0}
agent_index: ${AGENT_INDEX_2:int:2}
num_agents: ${NUM_AGENTS:int:4}
timeout_limit: ${TIMEOUT_LIMIT:int:3}
3:
models:
params:
args:
agent_mech_contract_address: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
agent_mech_contract_addresses: ${AGENT_MECH_CONTRACT_ADDRESSES:list:["0xFf82123dFB52ab75C417195c5fDB87630145ae81","0x77af31De935740567Cf4fF1986D04B2c964A786a"]}
task_deadline: ${TASK_DEADLINE:float:240.0}
file_hash_to_tools_json: ${FILE_HASH_TO_TOOLS:list:[]}
api_keys_json: ${API_KEYS:list:[]}
polling_interval: ${POLLING_INTERVAL:float:30.0}
agent_index: ${AGENT_INDEX_3:int:3}
num_agents: ${NUM_AGENTS:int:4}
timeout_limit: ${TIMEOUT_LIMIT:int:3}
---
public_id: valory/ledger:0.19.0
type: connection
Expand Down
47 changes: 40 additions & 7 deletions packages/valory/skills/task_execution/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ def params(self) -> Params:
"""Get the parameters."""
return cast(Params, self.context.params)

@property
def request_id_to_num_timeouts(self) -> Dict[int, int]:
"""Maps the request id to the number of times it has timed out."""
return self.params.request_id_to_num_timeouts

def count_timeout(self, request_id: int) -> None:
"""Increase the timeout for a request."""
self.request_id_to_num_timeouts[request_id] += 1

def timeout_limit_reached(self, request_id: int) -> bool:
"""Check if the timeout limit has been reached."""
return self.params.timeout_limit <= self.request_id_to_num_timeouts[request_id]

@property
def pending_tasks(self) -> List[Dict[str, Any]]:
"""Get pending_tasks."""
Expand Down Expand Up @@ -227,7 +240,8 @@ def _execute_task(self) -> None:

if self._executing_task is not None:
if self._is_executing_task_ready() or self._invalid_request:
self._handle_done_task()
task_result = self._get_executing_task_result()
self._handle_done_task(task_result)
elif self._has_executing_task_timed_out():
self._handle_timeout_task()
return
Expand Down Expand Up @@ -255,12 +269,11 @@ def send_message(
self.params.req_to_callback[nonce] = callback
self.params.in_flight_req = True

def _handle_done_task(self) -> None:
def _handle_done_task(self, task_result: Any) -> None:
"""Handle done tasks"""
executing_task = cast(Dict[str, Any], self._executing_task)
req_id = executing_task.get("requestId", None)
mech_address = executing_task.get("contract_address", None)
task_result = self._get_executing_task_result()
response = {"requestId": req_id, "result": "Invalid response"}
self._done_task = {"request_id": req_id, "mech_address": mech_address}
if task_result is not None:
Expand All @@ -279,12 +292,30 @@ def _handle_timeout_task(self) -> None:
"""Handle timeout tasks"""
executing_task = cast(Dict[str, Any], self._executing_task)
req_id = executing_task.get("requestId", None)
self.count_timeout(req_id)
self.context.logger.info(f"Task timed out for request {req_id}")
# added to end of queue
self.pending_tasks.append(executing_task)
self.context.logger.info(
f"Task {req_id} has timed out {self.request_id_to_num_timeouts[req_id]} times"
)
async_result = cast(Future, self._async_result)
async_result.cancel()
self._executing_task = None
if not self.timeout_limit_reached(req_id):
# added to end of queue
self.context.logger.info(f"Adding task {req_id} to the end of the queue")
self.pending_tasks.append(executing_task)
self._executing_task = None
return None

self.context.logger.info(
f"Task {req_id} has reached the timeout limit of{self.params.timeout_limit}. "
f"It won't be added to the end of the queue again."
)
task_result = (
f"Task timed out {self.params.timeout_limit} times during execution. ",
"",
None,
Comment on lines +315 to +316
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the prompt will not be in the response in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we cannot know the prompt, as the task never returns in such instances.

)
self._handle_done_task(task_result)

def _handle_get_task(self, message: IpfsMessage, dialogue: Dialogue) -> None:
"""Handle the response from ipfs for a task request."""
Expand Down Expand Up @@ -386,8 +417,10 @@ def _handle_store_response(self, message: IpfsMessage, dialogue: Dialogue) -> No
executing_task["requestId"],
executing_task["sender"],
)
self.context.logger.info(f"Response for request {req_id} stored on IPFS.")
ipfs_hash = to_v1(message.ipfs_hash)
self.context.logger.info(
f"Response for request {req_id} stored on IPFS with hash {ipfs_hash}."
)
self.send_data_via_acn(
sender_address=sender,
request_id=str(req_id),
Expand Down
5 changes: 5 additions & 0 deletions packages/valory/skills/task_execution/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# ------------------------------------------------------------------------------

"""This module contains the shared state for the abci skill of Mech."""
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, cast

from aea.exceptions import enforce
Expand Down Expand Up @@ -59,6 +60,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
enforce(self.agent_index is not None, "agent_index must be set!")
self.from_block_range = kwargs.get("from_block_range", None)
enforce(self.from_block_range is not None, "from_block_range must be set!")
self.timeout_limit = kwargs.get("timeout_limit", None)
enforce(self.timeout_limit is not None, "timeout_limit must be set!")
# maps the request id to the number of times it has timed out
self.request_id_to_num_timeouts: Dict[int, int] = defaultdict(lambda: 0)
super().__init__(*args, **kwargs)

def _nested_list_todict_workaround(
Expand Down
5 changes: 3 additions & 2 deletions packages/valory/skills/task_execution/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeidqhvvlnthkbnmrdkdeyjyx2f2ab6z4xdgmagh7welqnh2v6wczx4
behaviours.py: bafybeibrvfpdvcdtel3juuxjcbtvrwazgdg3na5h4fzh2akfldk2eipudq
behaviours.py: bafybeie4h4m4py7u3ah4mcotiah32umi7lb5xnxmzpvvhikushaedcglkq
dialogues.py: bafybeid4zxalqdlo5mw4yfbuf34hx4jp5ay5z6chm4zviwu4cj7fudtwca
handlers.py: bafybeidbt5ezj74cgfogk3w4uw4si2grlnk5g54veyumw7g5yh6gdscywu
models.py: bafybeiavbz7un34qpxbmi3bmvk7yogc4w7d5wd3eymonelsqep5li222y4
models.py: bafybeihc2kmymmh5oousjddbc7xujqbk5niermuqak2dhtgryukzq5wxeq
utils/__init__.py: bafybeiccdijaigu6e5p2iruwo5mkk224o7ywedc7nr6xeu5fpmhjqgk24e
utils/ipfs.py: bafybeicuaj23qrcdv6ly4j7yo6il2r5plozhd6mwvcp5acwqbjxb2t3u2i
utils/task.py: bafybeiakokty64m5cqp72drrpvfckhruldlwcge5hcc2bsy2ujk6nnrazq
Expand Down Expand Up @@ -87,6 +87,7 @@ models:
polling_interval: 30.0
task_deadline: 240.0
use_slashing: false
timeout_limit: 3
slash_cooldown_hours: 3
slash_threshold_amount: 10000000000000000
light_slash_unit_amount: 5000000000000000
Expand Down
Loading