Skip to content

Commit

Permalink
(feat) Added again support for chain stream V2
Browse files Browse the repository at this point in the history
  • Loading branch information
aarmoa committed Nov 19, 2024
1 parent 0e6a4ca commit 52c798f
Show file tree
Hide file tree
Showing 16 changed files with 1,045 additions and 129 deletions.
2 changes: 1 addition & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ inputs:
# tag: v1.13.0
# subdir: proto
- git_repo: https://github.com/InjectiveLabs/injective-core
branch: feat/update_chain_stream_for_exchange_v2
branch: feat/add_exchange_v1_compatibility_to_chain_stream
subdir: proto
- directory: proto
22 changes: 11 additions & 11 deletions examples/chain_client/7_ChainStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,29 @@ async def main() -> None:
inj_usdt_market = "0x0611780ba69656949525013d947713300f56c37b6175e02f26bffa495c3208fe"
inj_usdt_perp_market = "0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"

bank_balances_filter = composer.chain_stream_bank_balances_filter(
bank_balances_filter = composer.chain_stream_bank_balances_v2_filter(
accounts=["inj1hkhdaj2a2clmq5jq6mspsggqs32vynpk228q3r"]
)
subaccount_deposits_filter = composer.chain_stream_subaccount_deposits_filter(subaccount_ids=[subaccount_id])
spot_trades_filter = composer.chain_stream_trades_filter(subaccount_ids=["*"], market_ids=[inj_usdt_market])
derivative_trades_filter = composer.chain_stream_trades_filter(
subaccount_deposits_filter = composer.chain_stream_subaccount_deposits_v2_filter(subaccount_ids=[subaccount_id])
spot_trades_filter = composer.chain_stream_trades_v2_filter(subaccount_ids=["*"], market_ids=[inj_usdt_market])
derivative_trades_filter = composer.chain_stream_trades_v2_filter(
subaccount_ids=["*"], market_ids=[inj_usdt_perp_market]
)
spot_orders_filter = composer.chain_stream_orders_filter(
spot_orders_filter = composer.chain_stream_orders_v2_filter(
subaccount_ids=[subaccount_id], market_ids=[inj_usdt_market]
)
derivative_orders_filter = composer.chain_stream_orders_filter(
derivative_orders_filter = composer.chain_stream_orders_v2_filter(
subaccount_ids=[subaccount_id], market_ids=[inj_usdt_perp_market]
)
spot_orderbooks_filter = composer.chain_stream_orderbooks_filter(market_ids=[inj_usdt_market])
derivative_orderbooks_filter = composer.chain_stream_orderbooks_filter(market_ids=[inj_usdt_perp_market])
positions_filter = composer.chain_stream_positions_filter(
spot_orderbooks_filter = composer.chain_stream_orderbooks_v2_filter(market_ids=[inj_usdt_market])
derivative_orderbooks_filter = composer.chain_stream_orderbooks_v2_filter(market_ids=[inj_usdt_perp_market])
positions_filter = composer.chain_stream_positions_v2_filter(
subaccount_ids=[subaccount_id], market_ids=[inj_usdt_perp_market]
)
oracle_price_filter = composer.chain_stream_oracle_price_filter(symbols=["INJ", "USDT"])
oracle_price_filter = composer.chain_stream_oracle_price_v2_filter(symbols=["INJ", "USDT"])

task = asyncio.get_event_loop().create_task(
client.listen_chain_stream_updates(
client.listen_chain_stream_v2_updates(
callback=chain_stream_event_processor,
on_end_callback=stream_closed_processor,
on_status_callback=stream_error_processor,
Expand Down
38 changes: 38 additions & 0 deletions pyinjective/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
tendermint_pb2 as ibc_tendermint,
)
from pyinjective.proto.injective.stream.v1beta1 import query_pb2 as chain_stream_query
from pyinjective.proto.injective.stream.v2 import query_pb2 as chain_stream_v2_query
from pyinjective.proto.injective.types.v1beta1 import account_pb2
from pyinjective.utils.logger import LoggerProvider

Expand Down Expand Up @@ -2481,6 +2482,11 @@ async def listen_chain_stream_updates(
positions_filter: Optional[chain_stream_query.PositionsFilter] = None,
oracle_price_filter: Optional[chain_stream_query.OraclePriceFilter] = None,
):
"""
This method is deprecated and will be removed soon. Please use `listen_chain_stream_v2_updates` instead
"""
warn("This method is deprecated. Use listen_chain_stream_v2_updates instead", DeprecationWarning, stacklevel=2)

return await self.chain_stream_api.stream(
callback=callback,
on_end_callback=on_end_callback,
Expand All @@ -2497,6 +2503,38 @@ async def listen_chain_stream_updates(
oracle_price_filter=oracle_price_filter,
)

async def listen_chain_stream_v2_updates(
self,
callback: Callable,
on_end_callback: Optional[Callable] = None,
on_status_callback: Optional[Callable] = None,
bank_balances_filter: Optional[chain_stream_v2_query.BankBalancesFilter] = None,
subaccount_deposits_filter: Optional[chain_stream_v2_query.SubaccountDepositsFilter] = None,
spot_trades_filter: Optional[chain_stream_v2_query.TradesFilter] = None,
derivative_trades_filter: Optional[chain_stream_v2_query.TradesFilter] = None,
spot_orders_filter: Optional[chain_stream_v2_query.OrdersFilter] = None,
derivative_orders_filter: Optional[chain_stream_v2_query.OrdersFilter] = None,
spot_orderbooks_filter: Optional[chain_stream_v2_query.OrderbookFilter] = None,
derivative_orderbooks_filter: Optional[chain_stream_v2_query.OrderbookFilter] = None,
positions_filter: Optional[chain_stream_v2_query.PositionsFilter] = None,
oracle_price_filter: Optional[chain_stream_v2_query.OraclePriceFilter] = None,
):
return await self.chain_stream_api.stream_v2(
callback=callback,
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
bank_balances_filter=bank_balances_filter,
subaccount_deposits_filter=subaccount_deposits_filter,
spot_trades_filter=spot_trades_filter,
derivative_trades_filter=derivative_trades_filter,
spot_orders_filter=spot_orders_filter,
derivative_orders_filter=derivative_orders_filter,
spot_orderbooks_filter=spot_orderbooks_filter,
derivative_orderbooks_filter=derivative_orderbooks_filter,
positions_filter=positions_filter,
oracle_price_filter=oracle_price_filter,
)

# region IBC Transfer module
async def fetch_denom_trace(self, hash: str) -> Dict[str, Any]:
return await self.ibc_transfer_api.fetch_denom_trace(hash=hash)
Expand Down
42 changes: 42 additions & 0 deletions pyinjective/client/chain/grpc_stream/chain_grpc_chain_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

from pyinjective.core.network import CookieAssistant
from pyinjective.proto.injective.stream.v1beta1 import query_pb2 as chain_stream_pb, query_pb2_grpc as chain_stream_grpc
from pyinjective.proto.injective.stream.v2 import (
query_pb2 as chain_stream_v2_pb,
query_pb2_grpc as chain_stream_v2_grpc,
)
from pyinjective.utils.grpc_api_stream_assistant import GrpcApiStreamAssistant


class ChainGrpcChainStream:
def __init__(self, channel: Channel, cookie_assistant: CookieAssistant):
self._stub = chain_stream_grpc.StreamStub(channel)
self._stub_v2 = chain_stream_v2_grpc.StreamStub(channel)
self._assistant = GrpcApiStreamAssistant(cookie_assistant=cookie_assistant)

async def stream(
Expand Down Expand Up @@ -48,3 +53,40 @@ async def stream(
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
)

async def stream_v2(
self,
callback: Callable,
on_end_callback: Optional[Callable] = None,
on_status_callback: Optional[Callable] = None,
bank_balances_filter: Optional[chain_stream_v2_pb.BankBalancesFilter] = None,
subaccount_deposits_filter: Optional[chain_stream_v2_pb.SubaccountDepositsFilter] = None,
spot_trades_filter: Optional[chain_stream_v2_pb.TradesFilter] = None,
derivative_trades_filter: Optional[chain_stream_v2_pb.TradesFilter] = None,
spot_orders_filter: Optional[chain_stream_v2_pb.OrdersFilter] = None,
derivative_orders_filter: Optional[chain_stream_v2_pb.OrdersFilter] = None,
spot_orderbooks_filter: Optional[chain_stream_v2_pb.OrderbookFilter] = None,
derivative_orderbooks_filter: Optional[chain_stream_v2_pb.OrderbookFilter] = None,
positions_filter: Optional[chain_stream_v2_pb.PositionsFilter] = None,
oracle_price_filter: Optional[chain_stream_v2_pb.OraclePriceFilter] = None,
):
request = chain_stream_v2_pb.StreamRequest(
bank_balances_filter=bank_balances_filter,
subaccount_deposits_filter=subaccount_deposits_filter,
spot_trades_filter=spot_trades_filter,
derivative_trades_filter=derivative_trades_filter,
spot_orders_filter=spot_orders_filter,
derivative_orders_filter=derivative_orders_filter,
spot_orderbooks_filter=spot_orderbooks_filter,
derivative_orderbooks_filter=derivative_orderbooks_filter,
positions_filter=positions_filter,
oracle_price_filter=oracle_price_filter,
)

await self._assistant.listen_stream(
call=self._stub_v2.StreamV2,
request=request,
callback=callback,
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
)
105 changes: 105 additions & 0 deletions pyinjective/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
tx_pb2 as injective_permissions_tx_pb,
)
from pyinjective.proto.injective.stream.v1beta1 import query_pb2 as chain_stream_query
from pyinjective.proto.injective.stream.v2 import query_pb2 as chain_stream_v2_query
from pyinjective.proto.injective.tokenfactory.v1beta1 import tx_pb2 as token_factory_tx_pb
from pyinjective.proto.injective.wasmx.v1 import tx_pb2 as wasmx_tx_pb
from pyinjective.utils.denom import Denom
Expand Down Expand Up @@ -2703,13 +2704,30 @@ def MsgVote(
def chain_stream_bank_balances_filter(
self, accounts: Optional[List[str]] = None
) -> chain_stream_query.BankBalancesFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_bank_balances_v2_filter` instead
"""
warn(
"This method is deprecated. Use chain_stream_bank_balances_v2_filter instead",
DeprecationWarning,
stacklevel=2,
)
accounts = accounts or ["*"]
return chain_stream_query.BankBalancesFilter(accounts=accounts)

def chain_stream_subaccount_deposits_filter(
self,
subaccount_ids: Optional[List[str]] = None,
) -> chain_stream_query.SubaccountDepositsFilter:
"""
This method is deprecated and will be removed soon.
Please use `chain_stream_subaccount_deposits_v2_filter` instead
"""
warn(
"This method is deprecated. Use chain_stream_subaccount_deposits_v2_filter instead",
DeprecationWarning,
stacklevel=2,
)
subaccount_ids = subaccount_ids or ["*"]
return chain_stream_query.SubaccountDepositsFilter(subaccount_ids=subaccount_ids)

Expand All @@ -2718,6 +2736,11 @@ def chain_stream_trades_filter(
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_query.TradesFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_trades_v2_filter` instead
"""
warn("This method is deprecated. Use chain_stream_trades_v2_filter instead", DeprecationWarning, stacklevel=2)

subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_query.TradesFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
Expand All @@ -2727,6 +2750,11 @@ def chain_stream_orders_filter(
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_query.OrdersFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_orders_v2_filter` instead
"""
warn("This method is deprecated. Use chain_stream_orders_v2_filter instead", DeprecationWarning, stacklevel=2)

subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_query.OrdersFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
Expand All @@ -2735,6 +2763,13 @@ def chain_stream_orderbooks_filter(
self,
market_ids: Optional[List[str]] = None,
) -> chain_stream_query.OrderbookFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_orderbooks_v2_filter` instead
"""
warn(
"This method is deprecated. Use chain_stream_orderbooks_v2_filter instead", DeprecationWarning, stacklevel=2
)

market_ids = market_ids or ["*"]
return chain_stream_query.OrderbookFilter(market_ids=market_ids)

Expand All @@ -2743,6 +2778,13 @@ def chain_stream_positions_filter(
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_query.PositionsFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_positions_v2_filter` instead
"""
warn(
"This method is deprecated. Use chain_stream_positions_v2_filter instead", DeprecationWarning, stacklevel=2
)

subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_query.PositionsFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)
Expand All @@ -2751,9 +2793,72 @@ def chain_stream_oracle_price_filter(
self,
symbols: Optional[List[str]] = None,
) -> chain_stream_query.PositionsFilter:
"""
This method is deprecated and will be removed soon. Please use `chain_stream_oracle_price_v2_filter` instead
"""
warn(
"This method is deprecated. Use chain_stream_oracle_price_v2_filter instead",
DeprecationWarning,
stacklevel=2,
)

symbols = symbols or ["*"]
return chain_stream_query.OraclePriceFilter(symbol=symbols)

def chain_stream_bank_balances_v2_filter(
self, accounts: Optional[List[str]] = None
) -> chain_stream_v2_query.BankBalancesFilter:
accounts = accounts or ["*"]
return chain_stream_v2_query.BankBalancesFilter(accounts=accounts)

def chain_stream_subaccount_deposits_v2_filter(
self,
subaccount_ids: Optional[List[str]] = None,
) -> chain_stream_v2_query.SubaccountDepositsFilter:
subaccount_ids = subaccount_ids or ["*"]
return chain_stream_v2_query.SubaccountDepositsFilter(subaccount_ids=subaccount_ids)

def chain_stream_trades_v2_filter(
self,
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_v2_query.TradesFilter:
subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_v2_query.TradesFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)

def chain_stream_orders_v2_filter(
self,
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_v2_query.OrdersFilter:
subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_v2_query.OrdersFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)

def chain_stream_orderbooks_v2_filter(
self,
market_ids: Optional[List[str]] = None,
) -> chain_stream_v2_query.OrderbookFilter:
market_ids = market_ids or ["*"]
return chain_stream_v2_query.OrderbookFilter(market_ids=market_ids)

def chain_stream_positions_v2_filter(
self,
subaccount_ids: Optional[List[str]] = None,
market_ids: Optional[List[str]] = None,
) -> chain_stream_v2_query.PositionsFilter:
subaccount_ids = subaccount_ids or ["*"]
market_ids = market_ids or ["*"]
return chain_stream_v2_query.PositionsFilter(subaccount_ids=subaccount_ids, market_ids=market_ids)

def chain_stream_oracle_price_v2_filter(
self,
symbols: Optional[List[str]] = None,
) -> chain_stream_v2_query.PositionsFilter:
symbols = symbols or ["*"]
return chain_stream_v2_query.OraclePriceFilter(symbol=symbols)

# endregion

# ------------------------------------------------
Expand Down
Loading

0 comments on commit 52c798f

Please sign in to comment.