Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync or async exchange calls in ethereum client #555

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 62 additions & 58 deletions client/src/ledger_app_clients/ethereum/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,64 @@ def __init__(self, client: BackendInterface):
self._client = client
self._cmd_builder = CommandBuilder()

def _send(self, payload: bytes):
def _exchange_async(self, payload: bytes):
return self._client.exchange_async_raw(payload)

def _exchange(self, payload: bytes):
return self._client.exchange_raw(payload)

def response(self) -> Optional[RAPDU]:
return self._client.last_async_response

def eip712_send_struct_def_struct_name(self, name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_name(name))
return self._exchange_async(self._cmd_builder.eip712_send_struct_def_struct_name(name))

def eip712_send_struct_def_struct_field(self,
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: list,
key_name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
return self._exchange_async(self._cmd_builder.eip712_send_struct_def_struct_field(
field_type,
type_name,
type_size,
array_levels,
key_name))

def eip712_send_struct_impl_root_struct(self, name: str):
return self._send(self._cmd_builder.eip712_send_struct_impl_root_struct(name))
return self._exchange_async(self._cmd_builder.eip712_send_struct_impl_root_struct(name))

def eip712_send_struct_impl_array(self, size: int):
return self._send(self._cmd_builder.eip712_send_struct_impl_array(size))
return self._exchange_async(self._cmd_builder.eip712_send_struct_impl_array(size))

def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(bytearray(raw_value))
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
return self._exchange_async(chunks[-1])

def eip712_sign_new(self, bip32_path: str):
return self._send(self._cmd_builder.eip712_sign_new(bip32_path))
return self._exchange_async(self._cmd_builder.eip712_sign_new(bip32_path))

def eip712_sign_legacy(self,
bip32_path: str,
domain_hash: bytes,
message_hash: bytes):
return self._send(self._cmd_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash))
return self._exchange_async(self._cmd_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash))

def eip712_filtering_activate(self):
return self._send(self._cmd_builder.eip712_filtering_activate())
return self._exchange_async(self._cmd_builder.eip712_filtering_activate())

def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes):
return self._send(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig))
return self._exchange_async(self._cmd_builder.eip712_filtering_message_info(name,
filters_count,
sig))

def eip712_filtering_show_field(self, name: str, sig: bytes):
return self._send(self._cmd_builder.eip712_filtering_show_field(name, sig))
return self._exchange_async(self._cmd_builder.eip712_filtering_show_field(name, sig))

def sign(self,
bip32_path: str,
Expand All @@ -111,24 +115,23 @@ def sign(self,
tx = prefix + rlp.encode(decoded + suffix)
chunks = self._cmd_builder.sign(bip32_path, tx, suffix)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
return self._exchange_async(chunks[-1])

def get_challenge(self):
return self._send(self._cmd_builder.get_challenge())
return self._exchange(self._cmd_builder.get_challenge())

def get_public_addr(self,
display: bool = True,
chaincode: bool = False,
bip32_path: str = "m/44'/60'/0'/0/0",
chain_id: Optional[int] = None):
return self._send(self._cmd_builder.get_public_addr(display,
chaincode,
bip32_path,
chain_id))
chain_id: Optional[int] = None) -> RAPDU:
return self._exchange_async(self._cmd_builder.get_public_addr(display,
chaincode,
bip32_path,
chain_id))

def provide_domain_name(self, challenge: int, name: str, addr: bytes):
def provide_domain_name(self, challenge: int, name: str, addr: bytes) -> RAPDU:
payload = format_tlv(DomainNameTag.STRUCTURE_TYPE, 3) # TrustedDomainName
payload += format_tlv(DomainNameTag.STRUCTURE_VERSION, 1)
payload += format_tlv(DomainNameTag.SIGNER_KEY_ID, 0) # test key
Expand All @@ -142,9 +145,8 @@ def provide_domain_name(self, challenge: int, name: str, addr: bytes):

chunks = self._cmd_builder.provide_domain_name(payload)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
return self._exchange(chunks[-1])

def set_plugin(self,
plugin_name: str,
Expand All @@ -155,7 +157,7 @@ def set_plugin(self,
version: int = 1,
key_id: int = 2,
algo_id: int = 1,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -170,15 +172,15 @@ def set_plugin(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.SET_PLUGIN, tmp[5:-1])
return self._send(self._cmd_builder.set_plugin(type_,
version,
plugin_name,
contract_addr,
selector,
chain_id,
key_id,
algo_id,
sig))
return self._exchange(self._cmd_builder.set_plugin(type_,
version,
plugin_name,
contract_addr,
selector,
chain_id,
key_id,
algo_id,
sig))

def provide_nft_metadata(self,
collection: str,
Expand All @@ -188,7 +190,7 @@ def provide_nft_metadata(self,
version: int = 1,
key_id: int = 1,
algo_id: int = 1,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -202,42 +204,44 @@ def provide_nft_metadata(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.NFT, tmp[5:-1])
return self._send(self._cmd_builder.provide_nft_information(type_,
version,
collection,
addr,
chain_id,
key_id,
algo_id,
sig))
return self._exchange(self._cmd_builder.provide_nft_information(type_,
version,
collection,
addr,
chain_id,
key_id,
algo_id,
sig))

def set_external_plugin(self,
plugin_name: str,
contract_address: bytes,
method_selelector: bytes,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
tmp = self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, bytes())

# skip APDU header & empty sig
sig = sign_data(Key.CAL, tmp[5:])
return self._send(self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, sig))
return self._exchange(self._cmd_builder.set_external_plugin(plugin_name,
contract_address,
method_selelector,
sig))

def personal_sign(self, path: str, msg: bytes):
chunks = self._cmd_builder.personal_sign(path, msg)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
return self._exchange_async(chunks[-1])

def provide_token_metadata(self,
ticker: str,
addr: bytes,
decimals: int,
chain_id: int,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -248,8 +252,8 @@ def provide_token_metadata(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.CAL, tmp[6:])
return self._send(self._cmd_builder.provide_erc20_token_information(ticker,
addr,
decimals,
chain_id,
sig))
return self._exchange(self._cmd_builder.provide_erc20_token_information(ticker,
addr,
decimals,
chain_id,
sig))
10 changes: 4 additions & 6 deletions tests/ragger/test_blind_sign.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import pytest
from ragger.backend import BackendInterface
from ragger.firmware import Firmware
from ragger.navigator import Navigator, NavInsID
from ragger.error import ExceptionRAPDU
from ledger_app_clients.ethereum.client import EthAppClient
from ledger_app_clients.ethereum.client import EthAppClient, StatusWord
from web3 import Web3
from constants import ROOT_SNAPSHOT_PATH, ABIS_FOLDER

Expand Down Expand Up @@ -35,13 +36,10 @@ def test_blind_sign(firmware: Firmware,
"data": data,
"chainId": 1
}
try:
with pytest.raises(ExceptionRAPDU) as e:
with app_client.sign("m/44'/60'/0'/0/0", tx_params):
pass
except ExceptionRAPDU:
pass
else:
assert False
assert e.value.status == StatusWord.INVALID_DATA

moves = list()
if firmware.device.startswith("nano"):
Expand Down
Loading
Loading