diff --git a/scripts/exporters/wallets.py b/scripts/exporters/wallets.py index fd681e015..47ed16322 100644 --- a/scripts/exporters/wallets.py +++ b/scripts/exporters/wallets.py @@ -8,12 +8,11 @@ from y import Network from y.time import closest_block_after_timestamp -from yearn import constants +from yearn import constants, utils from yearn.entities import UserTx from yearn.helpers.exporter import Exporter from yearn.outputs.postgres.utils import last_recorded_block from yearn.outputs.victoria.victoria import _post -from yearn.utils import run_in_thread from yearn.yearn import Yearn sentry_sdk.set_tag('script','wallet_exporter') @@ -44,7 +43,7 @@ def postgres_ready(snapshot: datetime) -> bool: class WalletExporter(Exporter): async def export_historical_snapshot(self, snapshot: datetime) -> None: """ Override a method on Exporter so we can add an additional check. """ - if await run_in_thread(postgres_ready, snapshot): + if await utils.threads.run(postgres_ready, snapshot): return await super().export_historical_snapshot(snapshot) exporter = WalletExporter( diff --git a/yearn/utils.py b/yearn/utils.py index 8e6412cca..d524afde5 100644 --- a/yearn/utils.py +++ b/yearn/utils.py @@ -2,27 +2,24 @@ import json import logging import threading -from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from functools import lru_cache from typing import AsyncGenerator, List +import a_sync +import dank_mids import eth_retry import pandas as pd from brownie import Contract, chain, interface, web3 from brownie.convert.datatypes import HexString from brownie.network.contract import _fetch_from_explorer, _resolve_address -from dank_mids.brownie_patch import patch_contract -from typing_extensions import ParamSpec from y.networks import Network -from y.utils.dank_mids import dank_w3 from yearn.typing import AddressOrContract logger = logging.getLogger(__name__) -threads = ThreadPoolExecutor(8) -run_in_thread = lambda fn, *args: asyncio.get_event_loop().run_in_executor(threads, fn, *args) +threads = a_sync.PruningThreadPoolExecutor(8) BINARY_SEARCH_BARRIER = { Network.Mainnet: 0, @@ -78,7 +75,7 @@ def contract(address: AddressOrContract) -> Contract: if address in PREFER_INTERFACE[chain.id]: _interface = PREFER_INTERFACE[chain.id][address] i = _interface(address) - return _squeeze(patch_contract(i, dank_w3)) + return _squeeze(dank_mids.patch_contract(i)) # autofetch-sources: false # Try to fetch the contract from the local sqlite db. @@ -89,7 +86,7 @@ def contract(address: AddressOrContract) -> Contract: c = _resolve_proxy(address) # Lastly, get rid of unnecessary memory-hog properties - return _squeeze(patch_contract(c, dank_w3)) + return _squeeze(dank_mids.patch_contract(c)) # These tokens have trouble when resolving the implementation via the chain. diff --git a/yearn/v1/registry.py b/yearn/v1/registry.py index 0a752eb28..01c8caac1 100644 --- a/yearn/v1/registry.py +++ b/yearn/v1/registry.py @@ -4,11 +4,10 @@ from typing import Dict, List, Optional from brownie import chain, interface -from dank_mids.brownie_patch import patch_contract +import dank_mids from y.contracts import contract_creation_block_async from y.decorators import stuck_coro_debugger from y.networks import Network -from y.utils.dank_mids import dank_w3 from yearn.exceptions import UnsupportedNetwork from yearn.multicall2 import fetch_multicall_async @@ -25,7 +24,7 @@ def __init__(self) -> None: raise UnsupportedNetwork("Vaults V1 registry is only available on Mainnet.") # TODO Fix ENS resolution for registry.ychad.eth - self.registry = patch_contract(interface.YRegistry("0x3eE41C098f9666ed2eA246f4D2558010e59d63A0"), dank_w3) + self.registry = dank_mids.patch_contract(interface.YRegistry("0x3eE41C098f9666ed2eA246f4D2558010e59d63A0")) @cached_property def vaults(self) -> List[VaultV1]: diff --git a/yearn/v1/vaults.py b/yearn/v1/vaults.py index 200eb8a86..baa46e681 100644 --- a/yearn/v1/vaults.py +++ b/yearn/v1/vaults.py @@ -3,14 +3,13 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Optional +import dank_mids from async_property import async_cached_property from brownie import ZERO_ADDRESS, interface from brownie.network.contract import InterfaceContainer -from dank_mids.brownie_patch import patch_contract from y import Contract, magic from y.decorators import stuck_coro_debugger from y.exceptions import PriceError, yPriceMagicError -from y.utils.dank_mids import dank_w3 from yearn import constants from yearn.common import Tvl @@ -107,7 +106,7 @@ async def describe(self, block=None): # guard historical queries where there are no vote_proxy and gauge # for block <= 10635293 (2020-08-11) if vote_proxy and gauge: - vote_proxy = patch_contract(interface.CurveYCRVVoter(vote_proxy), dank_w3) + vote_proxy = dank_mids.patch_contract(interface.CurveYCRVVoter(vote_proxy)) gauge = Contract(gauge) boost, _apy = await asyncio.gather( curve.calculate_boost(gauge, vote_proxy, block=block), @@ -121,7 +120,7 @@ async def describe(self, block=None): attrs["lifetime earned"] = [strategy, "earned"] # /scale if strategy._name == "StrategyYFIGovernance": - ygov = patch_contract(interface.YearnGovernance(await strategy.gov.coroutine()), dank_w3) + ygov = dank_mids.patch_contract(interface.YearnGovernance(await strategy.gov.coroutine())) attrs["earned"] = [ygov, "earned", strategy] attrs["reward rate"] = [ygov, "rewardRate"] attrs["ygov balance"] = [ygov, "balanceOf", strategy] diff --git a/yearn/v2/registry.py b/yearn/v2/registry.py index 679297823..2f3eb3941 100644 --- a/yearn/v2/registry.py +++ b/yearn/v2/registry.py @@ -4,23 +4,18 @@ import time from collections import OrderedDict from functools import cached_property -from logging import getLogger from typing import AsyncIterator, Awaitable, Dict, List, NoReturn, overload import a_sync +import dank_mids import inflection from async_property import async_cached_property, async_property from brownie import chain, web3 from brownie.network.event import _EventItem -from dank_mids.brownie_patch import patch_contract from web3._utils.abi import filter_by_name from web3._utils.events import construct_event_topic_set -from y import Contract +from y import Contract, Network, magic from y.decorators import stuck_coro_debugger -from y.exceptions import NodeNotSynced -from y.networks import Network -from y.prices import magic -from y.utils.dank_mids import dank_w3 from y.utils.events import Events, ProcessedEvents from yearn.decorators import set_exc, wait_or_exit_before @@ -87,7 +82,7 @@ async def registries(self) -> List[Contract]: events = Events(addresses=r, topics=[r.topics['ReleaseRegistryUpdated']]) for rr in set(await asyncio.gather(*[ asyncio.create_task(Contract.coroutine(list(event.values())[0])) - async for event in events.events(to_block=await dank_w3.eth.block_number) + async for event in events.events(to_block=await dank_mids.eth.block_number) ])): registries.append(rr) logger.debug("release registry %s found for registry %s", rr, r) @@ -110,7 +105,7 @@ async def load_from_ens(self): coro=Contract.coroutine(event['newAddress'].hex()), name=f"load registry {event['newAddress']}", ) - async for event in events.events(to_block = await dank_w3.eth.block_number) + async for event in events.events(to_block = await dank_mids.eth.block_number) ] if registries: registries = await asyncio.gather(*registries) @@ -146,7 +141,7 @@ async def watch_events(self) -> NoReturn: def done_callback(task: asyncio.Task) -> None: logger.info("loaded v2 registry in %.3fs", time.time() - start) self._done.set() - done_task = asyncio.create_task(events._lock.wait_for(await dank_w3.eth.block_number)) + done_task = asyncio.create_task(events._lock.wait_for(await dank_mids.eth.block_number)) done_task.add_done_callback(done_callback) async for _ in events: self._filter_vaults() @@ -205,14 +200,14 @@ def process_events(self, events): def vault_from_event(self, event): return Vault( - vault=patch_contract(Contract.from_abi("Vault", event["vault"], self.releases[event["api_version"]].abi), dank_w3), + vault=dank_mids.patch_contract(Contract.from_abi("Vault", event["vault"], self.releases[event["api_version"]].abi)), token=event["token"], api_version=event["api_version"], registry=self, ) @stuck_coro_debugger - async def describe(self, block=None) -> [VaultName, Dict]: + async def describe(self, block=None) -> Dict[VaultName, Dict]: return await a_sync.gather({ vault.name: asyncio.create_task(vault.describe(block=block)) async for vault in self.active_vaults_at(block, iter=True) @@ -289,6 +284,7 @@ def _remove_vault(self, address): class RegistryEvents(ProcessedEvents[_EventItem]): __slots__ = "_init_block", "_registry" def __init__(self, registry: Registry, registries: List[Contract]): + assert registries, registries self._init_block = chain.height self._registry = registry super().__init__(addresses=registries) diff --git a/yearn/v2/vaults.py b/yearn/v2/vaults.py index 584c51478..c0cf714a9 100644 --- a/yearn/v2/vaults.py +++ b/yearn/v2/vaults.py @@ -15,13 +15,10 @@ from eth_utils import encode_hex, event_abi_to_log_topic from multicall.utils import run_in_subprocess from semantic_version.base import Version -from y import ERC20, Contract, Network, magic +from y import ERC20, Contract, Network, magic, dank_w3 from y.contracts import contract_creation_block_async from y.decorators import stuck_coro_debugger from y.exceptions import PriceError, yPriceMagicError -from y.networks import Network -from y.prices import magic -from y.utils.dank_mids import dank_w3 from y.utils.events import ProcessedEvents from yearn.common import Tvl