Skip to content

Commit

Permalink
feat: add healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
0xArdi committed Nov 22, 2024
1 parent fd86736 commit 09f8e4d
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 19 deletions.
12 changes: 6 additions & 6 deletions packages/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
"contract/valory/mech_marketplace/0.1.0": "bafybeigulghv7bvx4evl6kmogvv6gjxovyt22tkwlbdmeretmop2crdmem",
"connection/valory/websocket_client/0.1.0": "bafybeic4ag3gqc7kd3k2o3pucddj2odck5yrfbgmwh5veqny7zao5qayli",
"skill/valory/contract_subscription/0.1.0": "bafybeiefuemlp75obgpxrp6iuleb3hn6vcviwh5oetk5djbuprf4xsmgjy",
"skill/valory/mech_abci/0.1.0": "bafybeihshsjc2pngbaglptgeijgpcd6ns7darhhp75ryhfao5t4uuw3hym",
"skill/valory/task_submission_abci/0.1.0": "bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy",
"skill/valory/task_execution/0.1.0": "bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm",
"skill/valory/mech_abci/0.1.0": "bafybeihmuztss3v4n74bai24fqfnez5alcomvoicr4tjx6s2nwlpwc2sp4",
"skill/valory/task_submission_abci/0.1.0": "bafybeibr7xqf7qqjnu234e4benwxhqc22sxlv5orcvcynz67yfb5lbsypy",
"skill/valory/task_execution/0.1.0": "bafybeihfbzsgmmcg7l3ws2crjpm2clccha5dakag2f5cnlk5n3bdmdo56u",
"skill/valory/websocket_client/0.1.0": "bafybeif7rrvsu6z4evqkhblxj3u6wwv2eqou576hgkyoehxuj7cntw7o2m",
"skill/valory/subscription_abci/0.1.0": "bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq",
"agent/valory/mech/0.1.0": "bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4",
"service/valory/mech/0.1.0": "bafybeic7hnfs3gum33ndinhyzablfa4yf5y4lkvivuxg4bgvhz5zdxn6oa",
"service/valory/mech_quickstart/0.1.0": "bafybeiezswb5cbhuauk23kd6gsdbwnp2ndjysgc4uabk44k3vy2lmwz5mu"
"agent/valory/mech/0.1.0": "bafybeicfuco5l6klp5mpwl24wkua6aca5qz5tci4mitjlcg7bajirzdsoa",
"service/valory/mech/0.1.0": "bafybeignnlf5wtp7zuyep7rdxoalc3fittbtzvm4m23c2vw2nc74yxb5p4",
"service/valory/mech_quickstart/0.1.0": "bafybeicjnaaqsertmq7ycrmwne6et4ome2b7yayqhllbmrnyazvkpaaffe"
},
"third_party": {
"protocol/valory/default/1.0.0": "bafybeifqcqy5hfbnd7fjv4mqdjrtujh2vx3p2xhe33y67zoxa6ph7wdpaq",
Expand Down
9 changes: 5 additions & 4 deletions packages/valory/agents/mech/aea-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ skills:
- valory/abstract_abci:0.1.0:bafybeieo7pe5wqjphs5izpz5aujjbubymlxub62b3rhx6yglu65ibalffu
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/contract_subscription:0.1.0:bafybeiefuemlp75obgpxrp6iuleb3hn6vcviwh5oetk5djbuprf4xsmgjy
- valory/mech_abci:0.1.0:bafybeihshsjc2pngbaglptgeijgpcd6ns7darhhp75ryhfao5t4uuw3hym
- valory/mech_abci:0.1.0:bafybeihmuztss3v4n74bai24fqfnez5alcomvoicr4tjx6s2nwlpwc2sp4
- valory/registration_abci:0.1.0:bafybeib3n6vqkfbrcubcbliebjnuwyywdinxkbzt76n6gbn2kg7ace47dq
- valory/reset_pause_abci:0.1.0:bafybeihkj6lmaypspyxe5qqrjgnolyck62pyvqoylr24ab6ue4steqcw7e
- valory/subscription_abci:0.1.0:bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq
- valory/task_execution:0.1.0:bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm
- valory/task_submission_abci:0.1.0:bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy
- valory/task_execution:0.1.0:bafybeihfbzsgmmcg7l3ws2crjpm2clccha5dakag2f5cnlk5n3bdmdo56u
- valory/task_submission_abci:0.1.0:bafybeibr7xqf7qqjnu234e4benwxhqc22sxlv5orcvcynz67yfb5lbsypy
- valory/termination_abci:0.1.0:bafybeifi2uodnrjsrivj53g3sjutocmyusbx6mlsb6oanqdyt2mfbyvusy
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/websocket_client:0.1.0:bafybeif7rrvsu6z4evqkhblxj3u6wwv2eqou576hgkyoehxuj7cntw7o2m
Expand Down Expand Up @@ -237,4 +237,5 @@ type: connection
config:
host: ${str:0.0.0.0}
target_skill_id: valory/mech_abci:0.1.0
is_abstract: true
port: ${int:9999}
is_abstract: false
2 changes: 1 addition & 1 deletion packages/valory/services/mech/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeif7ia4jdlazy6745ke2k2x5yoqlwsgwr6sbztbgqtwvs3ndm2p7ba
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4
agent: valory/mech:0.1.0:bafybeicfuco5l6klp5mpwl24wkua6aca5qz5tci4mitjlcg7bajirzdsoa
number_of_agents: 4
deployment:
agent:
Expand Down
2 changes: 1 addition & 1 deletion packages/valory/services/mech_quickstart/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license: Apache-2.0
fingerprint:
README.md: bafybeiaqaedhfzjxxdfxtygjulorvd4x2h3cbwtiw3xgbigjgsc6qfn7zy
fingerprint_ignore_patterns: []
agent: valory/mech:0.1.0:bafybeifvo3s5mjidovenei6wwozejatfh4kjcapqnipuou4q3v6ff6ycl4
agent: valory/mech:0.1.0:bafybeicfuco5l6klp5mpwl24wkua6aca5qz5tci4mitjlcg7bajirzdsoa
number_of_agents: 1
deployment:
agent:
Expand Down
69 changes: 69 additions & 0 deletions packages/valory/skills/mech_abci/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import json
import re
import time
from datetime import datetime
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple, Union, cast
Expand Down Expand Up @@ -67,6 +68,11 @@
TendermintHandler = BaseTendermintHandler
IpfsHandler = BaseIpfsHandler

LAST_SUCCESSFUL_READ = "last_successful_read"
LAST_SUCCESSFUL_EXECUTED_TASK = "last_successful_executed_task"
WAS_LAST_READ_SUCCESSFUL = "was_last_read_successful"
LAST_TX = "last_tx"


class HttpCode(Enum):
"""Http codes"""
Expand All @@ -89,6 +95,32 @@ class HttpHandler(BaseHttpHandler):

SUPPORTED_PROTOCOL = HttpMessage.protocol_id

@property
def last_successful_read(self) -> Optional[Tuple[int, float]]:
"""Get the last successful read."""
return cast(
Optional[Tuple[int, float]],
self.context.shared_state.get(LAST_SUCCESSFUL_READ),
)

@property
def last_successful_executed_task(self) -> Optional[Tuple[int, float]]:
"""Get the last successful executed task."""
return cast(
Optional[Tuple[int, float]],
self.context.shared_state.get(LAST_SUCCESSFUL_EXECUTED_TASK),
)

@property
def was_last_read_successful(self) -> bool:
"""Get the last read status."""
return self.context.shared_state.get(WAS_LAST_READ_SUCCESSFUL) is not False

@property
def last_tx(self) -> Optional[Tuple[str, float]]:
"""Get the last transaction."""
return cast(Optional[Tuple[str, float]], self.context.shared_state.get(LAST_TX))

def setup(self) -> None:
"""Implement the setup."""

Expand Down Expand Up @@ -308,6 +340,23 @@ def _handle_get_health(
r.round_id for r in round_sequence._abci_app._previous_rounds[-10:]
]

# ensure we are delivering
grace_period = 300 # 5 min
last_executed_task = (
self.last_successful_executed_task[1]
if self.last_successful_executed_task
else time.time() + grace_period * 2
)
last_tx_made = self.last_tx[1] if self.last_tx else time.time()
we_are_delivering = last_executed_task > last_tx_made + grace_period

# ensure we can get new reqs
last_successful_read = (
self.last_successful_read[1] if self.last_successful_read else time.time()
)
grace_period = 300 # 5 min
we_can_get_new_reqs = last_successful_read > time.time() - grace_period

data = {
"seconds_since_last_transition": seconds_since_last_transition,
"is_tm_healthy": not is_tm_unhealthy,
Expand All @@ -316,6 +365,26 @@ def _handle_get_health(
"current_round": current_round,
"previous_rounds": previous_rounds,
"is_transitioning_fast": is_transitioning_fast,
"last_successful_read": {
"block_number": self.last_successful_read[0],
"timestamp": self.last_successful_read[1],
}
if self.last_successful_read
else None,
"last_successful_executed_task": {
"request_id": self.last_successful_executed_task[0],
"timestamp": self.last_successful_executed_task[1],
}
if self.last_successful_executed_task
else None,
"was_last_read_successful": self.was_last_read_successful,
"last_tx": {
"tx_hash": self.last_tx[0],
"timestamp": self.last_tx[1],
}
if self.last_tx
else None,
"is_ok": (we_are_delivering and we_can_get_new_reqs),
}

self._send_ok_response(http_msg, http_dialogue, data)
4 changes: 2 additions & 2 deletions packages/valory/skills/mech_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fingerprint:
composition.py: bafybeiaorp75iva5xgl4ebk3lg7oenqmd6wg2dxlm33oserb7aszyujml4
dialogues.py: bafybeifhydd6xmstbh2jx5igj33upip5a3hhlcaxttfsc77heszqmru7ri
fsm_specification.yaml: bafybeib5yne2ke3oc4amgehhn75vajexr3sedehdzmuabhyrovfqpmuipe
handlers.py: bafybeibfsyvno2qgcftlftjmhj66aiurmcdqwfj2ac7jm44z7kwwk6illu
handlers.py: bafybeig2giscwsatvtzhnrhlll3q4stdwax5qbfmnphgcxparcemc2affi
models.py: bafybeigpimz5vhgzelhc7c3ipo56wh2o7d7whyqcjd2kjigtxos5d6bwqa
fingerprint_ignore_patterns: []
connections:
Expand All @@ -23,7 +23,7 @@ skills:
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/registration_abci:0.1.0:bafybeib3n6vqkfbrcubcbliebjnuwyywdinxkbzt76n6gbn2kg7ace47dq
- valory/reset_pause_abci:0.1.0:bafybeihkj6lmaypspyxe5qqrjgnolyck62pyvqoylr24ab6ue4steqcw7e
- valory/task_submission_abci:0.1.0:bafybeihzghxmvmylj2gehkfbleek5e6q3qyggfmnpve2sov437nyzvu5gy
- valory/task_submission_abci:0.1.0:bafybeibr7xqf7qqjnu234e4benwxhqc22sxlv5orcvcynz67yfb5lbsypy
- valory/termination_abci:0.1.0:bafybeifi2uodnrjsrivj53g3sjutocmyusbx6mlsb6oanqdyt2mfbyvusy
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/subscription_abci:0.1.0:bafybeifilanuxfvuypcccjku7nphurgp27i2iwncdmug3in6xuzfmslgaq
Expand Down
10 changes: 10 additions & 0 deletions packages/valory/skills/task_execution/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from packages.valory.protocols.ipfs import IpfsMessage
from packages.valory.protocols.ipfs.dialogues import IpfsDialogue
from packages.valory.protocols.ledger_api import LedgerApiMessage
from packages.valory.skills.task_execution.handlers import LAST_SUCCESSFUL_EXECUTED_TASK
from packages.valory.skills.task_execution.models import Params
from packages.valory.skills.task_execution.utils.apis import KeyChain
from packages.valory.skills.task_execution.utils.benchmarks import TokenCounterCallback
Expand Down Expand Up @@ -116,6 +117,13 @@ def request_id_to_num_timeouts(self) -> Dict[int, int]:
"""Maps the request id to the number of times it has timed out."""
return self.params.request_id_to_num_timeouts

def set_last_executed_task(self, request_id: int) -> None:
"""Set the last executed task."""
self.context.shared_state[LAST_SUCCESSFUL_EXECUTED_TASK] = (
request_id,
time.time(),
)

def count_timeout(self, request_id: int) -> None:
"""Increase the timeout for a request."""
self.request_id_to_num_timeouts[request_id] += 1
Expand Down Expand Up @@ -552,6 +560,8 @@ def _handle_store_response(self, message: IpfsMessage, dialogue: Dialogue) -> No
request_id=str(req_id),
data=ipfs_hash,
)
# for health check metrics
self.set_last_executed_task(req_id)
done_task = cast(Dict[str, Any], self._done_task)
task_result = to_multihash(ipfs_hash)
cost = get_cost_for_done_task(done_task)
Expand Down
20 changes: 19 additions & 1 deletion packages/valory/skills/task_execution/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

"""This package contains a scaffold of a handler."""
import threading
from typing import Any, Dict, List, cast
import time
from typing import Any, Dict, List, Optional, cast

from aea.protocols.base import Message
from aea.skills.base import Handler
Expand All @@ -37,6 +38,9 @@
PENDING_TASKS = "pending_tasks"
DONE_TASKS = "ready_tasks"
DONE_TASKS_LOCK = "lock"
LAST_SUCCESSFUL_READ = "last_successful_read"
LAST_SUCCESSFUL_EXECUTED_TASK = "last_successful_executed_task"
WAS_LAST_READ_SUCCESSFUL = "was_last_read_successful"

LEDGER_API_ADDRESS = str(LEDGER_CONNECTION_PUBLIC_ID)

Expand Down Expand Up @@ -131,6 +135,14 @@ def pending_tasks(self) -> List[Dict[str, Any]]:
"""Get pending_tasks."""
return self.context.shared_state[PENDING_TASKS]

def set_last_successful_read(self, block_number: Optional[int]) -> None:
"""Set the last successful read."""
self.context.shared_state[LAST_SUCCESSFUL_READ] = (block_number, time.time())

def set_was_last_read_successful(self, was_successful: bool) -> None:
"""Set the last successful read."""
self.context.shared_state[WAS_LAST_READ_SUCCESSFUL] = was_successful

def handle(self, message: Message) -> None:
"""
Implement the reaction to a contract message.
Expand All @@ -140,6 +152,8 @@ def handle(self, message: Message) -> None:
self.context.logger.info(f"Received message: {message}")
contract_api_msg = cast(ContractApiMessage, message)
if contract_api_msg.performative != ContractApiMessage.Performative.STATE:
# for healthcheck metrics
self.set_was_last_read_successful(False)
self.context.logger.warning(
f"Contract API Message performative not recognized: {contract_api_msg.performative}"
)
Expand All @@ -155,10 +169,14 @@ def _handle_get_undelivered_reqs(self, body: Dict[str, Any]) -> None:
"""Handle get undelivered reqs."""
reqs = body.get("data", [])
if len(reqs) == 0:
# for healthcheck metrics
self.set_last_successful_read(self.params.from_block)
return

self.params.from_block = max([req["block_number"] for req in reqs]) + 1
self.context.logger.info(f"Received {len(reqs)} new requests.")
# for healthcheck metrics
self.set_last_successful_read(self.params.from_block)
reqs = [
req
for req in reqs
Expand Down
4 changes: 2 additions & 2 deletions packages/valory/skills/task_execution/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeidqhvvlnthkbnmrdkdeyjyx2f2ab6z4xdgmagh7welqnh2v6wczx4
behaviours.py: bafybeihuja3eox24bl7kyym2hbyrkuktkhso5s7yt3cvrjn7ng73bzvvga
behaviours.py: bafybeigt442yaasazy4qlbcvyxswxvmgardufabnphknv4yrzyhauhbqae
dialogues.py: bafybeid4zxalqdlo5mw4yfbuf34hx4jp5ay5z6chm4zviwu4cj7fudtwca
handlers.py: bafybeidbt5ezj74cgfogk3w4uw4si2grlnk5g54veyumw7g5yh6gdscywu
handlers.py: bafybeiggofhcn4jc6gu6rtx2zxbmb6dvvuxuystmrl6rjvxpcvbv22wmiq
models.py: bafybeicohoprd4f6rxnt6zxgwzzb3djpyk4o72bepoty4lybnf7fdpkgbu
utils/__init__.py: bafybeiccdijaigu6e5p2iruwo5mkk224o7ywedc7nr6xeu5fpmhjqgk24e
utils/apis.py: bafybeigu73lfz3g3mc6iupisrvlsp3fyl4du3oqlyajgdpfvtqypddh3w4
Expand Down
9 changes: 9 additions & 0 deletions packages/valory/skills/task_submission_abci/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
)
FILENAME = "usage"
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"
LAST_TX = "last_tx"


class TaskExecutionBaseBehaviour(BaseBehaviour, ABC):
Expand Down Expand Up @@ -103,6 +104,12 @@ def done_tasks(self) -> List[Dict[str, Any]]:
done_tasks = deepcopy(self.context.shared_state.get(DONE_TASKS, []))
return cast(List[Dict[str, Any]], done_tasks)

def set_tx(self, last_tx: str) -> None:
"""Signal that the transaction was prepared."""
now = time.time()
# store the tx hash and the time it was stored
self.context.shared_state[LAST_TX] = (last_tx, now)

def done_tasks_lock(self) -> threading.Lock:
"""Get done_tasks_lock."""
return self.context.shared_state[DONE_TASKS_LOCK]
Expand Down Expand Up @@ -227,6 +234,8 @@ def check_last_tx_status(self) -> bool:
# ref: https://github.com/valory-xyz/open-autonomy/blob/main/packages/valory/skills/transaction_settlement_abci/rounds.py#L432-L434
try:
final_tx_hash = self.synchronized_data.final_tx_hash
# added for healthcheck purposes
self.set_tx(final_tx_hash)
except Exception as e:
self.context.logger.error(e)
return False
Expand Down
4 changes: 2 additions & 2 deletions packages/valory/skills/task_submission_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license: Apache-2.0
aea_version: '>=1.0.0, <2.0.0'
fingerprint:
__init__.py: bafybeiholqak7ltw6bbmn2c5tn3j7xgzkdlfzp3kcskiqsvmxoih6m4muq
behaviours.py: bafybeigsgqxuwoacedl5yz36riqcl2j2bdohhpkq3m2u5c66rbvsan77ee
behaviours.py: bafybeib6gtwgaodrwuam5wpm2sgdo3h2j5czfm37ct6ndupapq6jrw5sli
dialogues.py: bafybeibmac3m5u5h6ucoyjr4dazay72dyga656wvjl6z6saapluvjo54ne
fsm_specification.yaml: bafybeidtmsmpunr3t77pshd3k2s6dd6hlvhze6inu3gj7xyvlg4wi3tnuu
handlers.py: bafybeibe5n7my2vd2wlwo73sbma65epjqc7kxgtittewlylcmvnmoxtxzq
Expand All @@ -32,7 +32,7 @@ protocols:
skills:
- valory/abstract_round_abci:0.1.0:bafybeibiw4oqwqvo4jccwz5fb73iardzychgvcl66tceiildohoju2ikti
- valory/transaction_settlement_abci:0.1.0:bafybeigh2vkt74jrad5gtsczrgqcuhcqe7jkgjy7jdw56yamlzwwnaymjy
- valory/task_execution:0.1.0:bafybeiafc6rgv46qd2hfl5fxqdegjyyb6f2fxhqxe33g6j342lutbb6fsm
- valory/task_execution:0.1.0:bafybeihfbzsgmmcg7l3ws2crjpm2clccha5dakag2f5cnlk5n3bdmdo56u
behaviours:
main:
args: {}
Expand Down

0 comments on commit 09f8e4d

Please sign in to comment.