diff --git a/client/pyproject.toml b/client/pyproject.toml index 7b1e8f645..92295741b 100644 --- a/client/pyproject.toml +++ b/client/pyproject.toml @@ -28,8 +28,7 @@ dynamic = [ "version" ] requires-python = ">=3.7" dependencies = [ "ragger[speculos]", - "simple-rlp", - "pysha3", + "web3~=6.0", ] [tools.setuptools] diff --git a/client/src/ledger_app_clients/ethereum/__init__.py b/client/src/ledger_app_clients/ethereum/__init__.py index 3dc1f76bc..d3ec452c3 100644 --- a/client/src/ledger_app_clients/ethereum/__init__.py +++ b/client/src/ledger_app_clients/ethereum/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/client/src/ledger_app_clients/ethereum/client.py b/client/src/ledger_app_clients/ethereum/client.py index bd7544788..0833486f6 100644 --- a/client/src/ledger_app_clients/ethereum/client.py +++ b/client/src/ledger_app_clients/ethereum/client.py @@ -2,25 +2,14 @@ from enum import IntEnum from ragger.backend import BackendInterface from ragger.utils import RAPDU -from typing import List, Optional, Union +from typing import Optional from .command_builder import CommandBuilder from .eip712 import EIP712FieldType from .keychain import sign_data, Key from .tlv import format_tlv - -WEI_IN_ETH = 1e+18 -GWEI_IN_ETH = 1e+9 - - -class TxData: - selector: bytes - parameters: list[bytes] - - def __init__(self, selector: bytes, params: list[bytes]): - self.selector = selector - self.parameters = params +from web3 import Web3 class StatusWord(IntEnum): @@ -64,7 +53,7 @@ def eip712_send_struct_def_struct_field(self, field_type: EIP712FieldType, type_name: str, type_size: int, - array_levels: List, + array_levels: list, key_name: str): return self._send(self._cmd_builder.eip712_send_struct_def_struct_field( field_type, @@ -86,7 +75,7 @@ def eip712_send_struct_impl_struct_field(self, raw_value: bytes): pass return self._send(chunks[-1]) - def eip712_sign_new(self, bip32_path: str, verbose: bool): + def eip712_sign_new(self, bip32_path: str): return self._send(self._cmd_builder.eip712_sign_new(bip32_path)) def eip712_sign_legacy(self, @@ -106,79 +95,26 @@ def eip712_filtering_message_info(self, name: str, filters_count: int, sig: byte def eip712_filtering_show_field(self, name: str, sig: bytes): return self._send(self._cmd_builder.eip712_filtering_show_field(name, sig)) - def _sign(self, bip32_path: str, raw_tx: bytes): - chunks = self._cmd_builder.sign(bip32_path, raw_tx) + def sign(self, + bip32_path: str, + tx_params: dict): + tx = Web3().eth.account.create().sign_transaction(tx_params).rawTransaction + prefix = bytes() + suffix = [] + if tx[0] in [0x01, 0x02]: + prefix = tx[:1] + tx = tx[len(prefix):] + else: # legacy + if "chainId" in tx_params: + suffix = [int(tx_params["chainId"]), bytes(), bytes()] + decoded = rlp.decode(tx)[:-3] # remove already computed signature + tx = prefix + rlp.encode(decoded + suffix) + chunks = self._cmd_builder.sign(bip32_path, tx, suffix) for chunk in chunks[:-1]: with self._send(chunk): pass return self._send(chunks[-1]) - def _data_to_payload(self, data: TxData) -> bytes: - payload = bytearray(data.selector) - for param in data.parameters: - payload += param.rjust(32, b'\x00') - return payload - - def _sign_common(self, - tx: list, - gas_price: float, - gas_limit: int, - destination: bytes, - amount: float, - data: Optional[TxData]): - tx.append(int(gas_price * GWEI_IN_ETH)) - tx.append(gas_limit) - tx.append(destination) - if amount > 0: - tx.append(int(amount * WEI_IN_ETH)) - else: - tx.append(bytes()) - if data is not None: - tx.append(self._data_to_payload(data)) - else: - tx.append(bytes()) - return tx - - def sign_legacy(self, - bip32_path: str, - nonce: int, - gas_price: float, - gas_limit: int, - destination: bytes, - amount: float, - chain_id: int, - data: Optional[TxData] = None): - tx: List[Union[int, bytes]] = list() - tx.append(nonce) - tx = self._sign_common(tx, gas_price, gas_limit, destination, amount, data) - tx.append(chain_id) - tx.append(bytes()) - tx.append(bytes()) - return self._sign(bip32_path, rlp.encode(tx)) - - def sign_1559(self, - bip32_path: str, - chain_id: int, - nonce: int, - max_prio_gas_price: float, - max_gas_price: float, - gas_limit: int, - destination: bytes, - amount: float, - data: Optional[TxData] = None, - access_list=list()): - tx: List[Union[int, bytes]] = list() - tx.append(chain_id) - tx.append(nonce) - tx.append(int(max_prio_gas_price * GWEI_IN_ETH)) - tx = self._sign_common(tx, max_gas_price, gas_limit, destination, amount, data) - tx.append(access_list) - tx.append(False) - tx.append(bytes()) - tx.append(bytes()) - # prefix with transaction type - return self._sign(bip32_path, b'\x02' + rlp.encode(tx)) - def get_challenge(self): return self._send(self._cmd_builder.get_challenge()) @@ -286,5 +222,12 @@ def set_external_plugin(self, tmp = self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, bytes()) # skip APDU header & empty sig - sig = sign_data(Key.SET_PLUGIN, tmp[5:-1]) + sig = sign_data(Key.CAL, tmp[5:]) return self._send(self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, sig)) + + def personal_sign(self, path: str, msg: bytes): + chunks = self._cmd_builder.personal_sign(path, msg) + for chunk in chunks[:-1]: + with self._send(chunk): + pass + return self._send(chunks[-1]) diff --git a/client/src/ledger_app_clients/ethereum/command_builder.py b/client/src/ledger_app_clients/ethereum/command_builder.py index 63ab0ae7b..cba06a5c9 100644 --- a/client/src/ledger_app_clients/ethereum/command_builder.py +++ b/client/src/ledger_app_clients/ethereum/command_builder.py @@ -5,7 +5,6 @@ from enum import IntEnum from typing import Optional from ragger.bip import pack_derivation_path -from typing import List from .eip712 import EIP712FieldType @@ -13,6 +12,7 @@ class InsType(IntEnum): GET_PUBLIC_ADDR = 0x02 SIGN = 0x04 + PERSONAL_SIGN = 0x08 PROVIDE_NFT_INFORMATION = 0x14 SET_PLUGIN = 0x16 EIP712_SEND_STRUCT_DEF = 0x1a @@ -75,7 +75,7 @@ def eip712_send_struct_def_struct_field(self, field_type: EIP712FieldType, type_name: str, type_size: int, - array_levels: List, + array_levels: list, key_name: str) -> bytes: data = bytearray() typedesc = 0 @@ -115,7 +115,7 @@ def eip712_send_struct_impl_array(self, size: int) -> bytes: P2Type.ARRAY, data) - def eip712_send_struct_impl_struct_field(self, data: bytearray) -> List[bytes]: + def eip712_send_struct_impl_struct_field(self, data: bytearray) -> list[bytes]: chunks = list() # Add a 16-bit integer with the data's byte length (network byte order) data_w_length = bytearray() @@ -195,17 +195,27 @@ def set_external_plugin(self, plugin_name: str, contract_address: bytes, selecto 0x00, data) - def sign(self, bip32_path: str, rlp_data: bytes) -> list[bytes]: + def sign(self, bip32_path: str, rlp_data: bytes, vrs: list) -> list[bytes]: apdus = list() payload = pack_derivation_path(bip32_path) payload += rlp_data p1 = P1Type.SIGN_FIRST_CHUNK while len(payload) > 0: + chunk_size = 0xff + + # TODO: Fix the app & remove this, issue #409 + if len(vrs) == 3: + if len(payload) > chunk_size: + import rlp + diff = len(rlp.encode(vrs)) - (len(payload) - chunk_size) + if diff > 0: + chunk_size -= diff + apdus.append(self._serialize(InsType.SIGN, p1, 0x00, - payload[:0xff])) - payload = payload[0xff:] + payload[:chunk_size])) + payload = payload[chunk_size:] p1 = P1Type.SIGN_SUBSQT_CHUNK return apdus @@ -284,3 +294,19 @@ def provide_nft_information(self, payload.append(len(sig)) payload += sig return self._serialize(InsType.PROVIDE_NFT_INFORMATION, 0x00, 0x00, payload) + + def personal_sign(self, path: str, msg: bytes): + payload = pack_derivation_path(path) + payload += struct.pack(">I", len(msg)) + payload += msg + chunks = list() + p1 = P1Type.SIGN_FIRST_CHUNK + while len(payload) > 0: + chunk_size = 0xff + chunks.append(self._serialize(InsType.PERSONAL_SIGN, + p1, + 0x00, + payload[:chunk_size])) + payload = payload[chunk_size:] + p1 = P1Type.SIGN_SUBSQT_CHUNK + return chunks diff --git a/client/src/ledger_app_clients/ethereum/eip712/InputData.py b/client/src/ledger_app_clients/ethereum/eip712/InputData.py index ac0877cf6..1d0263af3 100644 --- a/client/src/ledger_app_clients/ethereum/eip712/InputData.py +++ b/client/src/ledger_app_clients/ethereum/eip712/InputData.py @@ -3,7 +3,8 @@ import re import signal import sys -from typing import Any, Callable, Dict, List, Optional +import copy +from typing import Any, Callable, Optional from ledger_app_clients.ethereum import keychain from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType @@ -11,9 +12,9 @@ # global variables app_client: EthAppClient = None -filtering_paths: Dict = {} -current_path: List[str] = list() -sig_ctx: Dict[str, Any] = {} +filtering_paths: dict = {} +current_path: list[str] = list() +sig_ctx: dict[str, Any] = {} def default_handler(): @@ -297,13 +298,6 @@ def send_filtering_show_field(display_name): pass -def read_filtering_file(filtering_file_path: str): - data_json = None - with open(filtering_file_path) as data: - data_json = json.load(data) - return data_json - - def prepare_filtering(filtr_data, message): global filtering_paths @@ -355,62 +349,61 @@ def disable_autonext(): signal.setitimer(signal.ITIMER_REAL, 0, 0) -def process_file(aclient: EthAppClient, - input_file_path: str, - filtering_file_path: Optional[str] = None, +def process_data(aclient: EthAppClient, + data_json: dict, + filters: Optional[dict] = None, autonext: Optional[Callable] = None) -> bool: global sig_ctx global app_client global autonext_handler + # deepcopy because this function modifies the dict + data_json = copy.deepcopy(data_json) app_client = aclient - with open(input_file_path, "r") as data: - data_json = json.load(data) - domain_typename = "EIP712Domain" - message_typename = data_json["primaryType"] - types = data_json["types"] - domain = data_json["domain"] - message = data_json["message"] - - if autonext: - autonext_handler = autonext - signal.signal(signal.SIGALRM, next_timeout) - - if filtering_file_path: - init_signature_context(types, domain) - filtr = read_filtering_file(filtering_file_path) - - # send types definition - for key in types.keys(): - with app_client.eip712_send_struct_def_struct_name(key): - pass - for f in types[key]: - (f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \ - send_struct_def_field(f["type"], f["name"]) - - if filtering_file_path: - with app_client.eip712_filtering_activate(): - pass - prepare_filtering(filtr, message) - - # send domain implementation - with app_client.eip712_send_struct_impl_root_struct(domain_typename): - enable_autonext() - disable_autonext() - if not send_struct_impl(types, domain, domain_typename): - return False + domain_typename = "EIP712Domain" + message_typename = data_json["primaryType"] + types = data_json["types"] + domain = data_json["domain"] + message = data_json["message"] + + if autonext: + autonext_handler = autonext + signal.signal(signal.SIGALRM, next_timeout) + + if filters: + init_signature_context(types, domain) + + # send types definition + for key in types.keys(): + with app_client.eip712_send_struct_def_struct_name(key): + pass + for f in types[key]: + (f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \ + send_struct_def_field(f["type"], f["name"]) - if filtering_file_path: - if filtr and "name" in filtr: - send_filtering_message_info(filtr["name"], len(filtering_paths)) - else: - send_filtering_message_info(domain["name"], len(filtering_paths)) - - # send message implementation - with app_client.eip712_send_struct_impl_root_struct(message_typename): - enable_autonext() - disable_autonext() - if not send_struct_impl(types, message, message_typename): - return False + if filters: + with app_client.eip712_filtering_activate(): + pass + prepare_filtering(filters, message) + + # send domain implementation + with app_client.eip712_send_struct_impl_root_struct(domain_typename): + enable_autonext() + disable_autonext() + if not send_struct_impl(types, domain, domain_typename): + return False + + if filters: + if filters and "name" in filters: + send_filtering_message_info(filters["name"], len(filtering_paths)) + else: + send_filtering_message_info(domain["name"], len(filtering_paths)) + + # send message implementation + with app_client.eip712_send_struct_impl_root_struct(message_typename): + enable_autonext() + disable_autonext() + if not send_struct_impl(types, message, message_typename): + return False return True diff --git a/client/src/ledger_app_clients/ethereum/keychain.py b/client/src/ledger_app_clients/ethereum/keychain.py index 433d65234..0fc6712b1 100644 --- a/client/src/ledger_app_clients/ethereum/keychain.py +++ b/client/src/ledger_app_clients/ethereum/keychain.py @@ -3,7 +3,6 @@ from ecdsa import SigningKey from ecdsa.util import sigencode_der from enum import Enum, auto -from typing import Dict # Private key PEM files have to be named the same (lowercase) as their corresponding enum entries @@ -15,7 +14,7 @@ class Key(Enum): NFT = auto() -_keys: Dict[Key, SigningKey] = dict() +_keys: dict[Key, SigningKey] = dict() # Open the corresponding PEM file and load its key in the global dict diff --git a/client/src/ledger_app_clients/ethereum/response_parser.py b/client/src/ledger_app_clients/ethereum/response_parser.py index 641e1bbc9..a50ae0184 100644 --- a/client/src/ledger_app_clients/ethereum/response_parser.py +++ b/client/src/ledger_app_clients/ethereum/response_parser.py @@ -49,4 +49,4 @@ def pk_addr(data: bytes, has_chaincode: bool = False): if idx != len(data): return None - return pk, addr.decode(), chaincode + return pk, bytes.fromhex(addr.decode()), chaincode diff --git a/client/src/ledger_app_clients/ethereum/settings.py b/client/src/ledger_app_clients/ethereum/settings.py index d9d3ed597..6bd73e91e 100644 --- a/client/src/ledger_app_clients/ethereum/settings.py +++ b/client/src/ledger_app_clients/ethereum/settings.py @@ -1,7 +1,7 @@ from enum import Enum, auto from ragger.firmware import Firmware from ragger.navigator import Navigator, NavInsID, NavIns -from typing import List, Union +from typing import Union class SettingID(Enum): @@ -44,7 +44,7 @@ def get_setting_position(device: str, setting: Union[NavInsID, SettingID]) -> tu def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]): - moves: List[Union[NavIns, NavInsID]] = list() + moves: list[Union[NavIns, NavInsID]] = list() settings = get_device_settings(fw.device) # Assume the app is on the home page if fw.device.startswith("nano"): diff --git a/client/src/ledger_app_clients/ethereum/utils.py b/client/src/ledger_app_clients/ethereum/utils.py index e6177455e..f538403c7 100644 --- a/client/src/ledger_app_clients/ethereum/utils.py +++ b/client/src/ledger_app_clients/ethereum/utils.py @@ -1,5 +1,41 @@ -import sha3 +from eth_account import Account +from eth_account.messages import encode_defunct, encode_typed_data +import rlp -def get_selector_from_function(fn: str) -> bytes: - return sha3.keccak_256(fn.encode()).digest()[0:4] +def get_selector_from_data(data: str) -> bytes: + raw_data = bytes.fromhex(data[2:]) + return raw_data[:4] + + +def recover_message(msg, vrs: tuple) -> bytes: + if isinstance(msg, dict): # EIP-712 + smsg = encode_typed_data(full_message=msg) + else: # EIP-191 + smsg = encode_defunct(primitive=msg) + addr = Account.recover_message(smsg, vrs) + return bytes.fromhex(addr[2:]) + + +# TODO: Figure out why it doesn't work for non-legacy transactions +def recover_transaction(tx_params, vrs: tuple) -> bytes: + raw_tx = Account.create().sign_transaction(tx_params).rawTransaction + prefix = bytes() + if raw_tx[0] in [0x01, 0x02]: + prefix = raw_tx[:1] + raw_tx = raw_tx[len(prefix):] + if prefix == bytes(): + # v is returned on one byte only so it might have overflowed + # in that case, we will reconstruct it to its full value + if "chainId" in tx_params: + trunc_chain_id = tx_params["chainId"] + while trunc_chain_id.bit_length() > 32: + trunc_chain_id >>= 8 + target = tx_params["chainId"] * 2 + 35 + trunc_target = trunc_chain_id * 2 + 35 + diff = vrs[0][0] - (trunc_target & 0xff) + vrs = (target + diff, vrs[1], vrs[2]) + decoded = rlp.decode(raw_tx) + reencoded = rlp.encode(decoded[:-3] + list(vrs)) + addr = Account.recover_transaction(prefix + reencoded) + return bytes.fromhex(addr[2:]) diff --git a/src_features/setExternalPlugin/cmd_setExternalPlugin.c b/src_features/setExternalPlugin/cmd_setExternalPlugin.c index 4a9435600..49e827a87 100644 --- a/src_features/setExternalPlugin/cmd_setExternalPlugin.c +++ b/src_features/setExternalPlugin/cmd_setExternalPlugin.c @@ -48,7 +48,9 @@ void handleSetExternalPlugin(uint8_t p1, workBuffer + payload_size, dataLength - payload_size)) { #ifndef HAVE_BYPASS_SIGNATURES - PRINTF("Invalid plugin signature %.*H\n", payload_size, workBuffer); + PRINTF("Invalid plugin signature %.*H\n", + dataLength - payload_size, + workBuffer + payload_size); THROW(0x6A80); #endif } diff --git a/tests/ragger/abis/erc1155.json b/tests/ragger/abis/erc1155.json new file mode 100644 index 000000000..3c53ad8c2 --- /dev/null +++ b/tests/ragger/abis/erc1155.json @@ -0,0 +1,276 @@ +[ + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "indexed" : false, + "internalType" : "bool", + "name" : "_approved", + "type" : "bool" + } + ], + "name" : "ApprovalForAll", + "type" : "event" + }, + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "indexed" : false, + "internalType" : "uint256[]", + "name" : "_ids", + "type" : "uint256[]" + }, + { + "indexed" : false, + "internalType" : "uint256[]", + "name" : "_values", + "type" : "uint256[]" + } + ], + "name" : "TransferBatch", + "type" : "event" + }, + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "indexed" : false, + "internalType" : "uint256", + "name" : "_id", + "type" : "uint256" + }, + { + "indexed" : false, + "internalType" : "uint256", + "name" : "_value", + "type" : "uint256" + } + ], + "name" : "TransferSingle", + "type" : "event" + }, + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : false, + "internalType" : "string", + "name" : "_value", + "type" : "string" + }, + { + "indexed" : true, + "internalType" : "uint256", + "name" : "_id", + "type" : "uint256" + } + ], + "name" : "URI", + "type" : "event" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_id", + "type" : "uint256" + } + ], + "name" : "balanceOf", + "outputs" : [ + { + "internalType" : "uint256", + "name" : "", + "type" : "uint256" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address[]", + "name" : "_owners", + "type" : "address[]" + }, + { + "internalType" : "uint256[]", + "name" : "_ids", + "type" : "uint256[]" + } + ], + "name" : "balanceOfBatch", + "outputs" : [ + { + "internalType" : "uint256[]", + "name" : "", + "type" : "uint256[]" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_operator", + "type" : "address" + } + ], + "name" : "isApprovedForAll", + "outputs" : [ + { + "internalType" : "bool", + "name" : "", + "type" : "bool" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "internalType" : "uint256[]", + "name" : "_ids", + "type" : "uint256[]" + }, + { + "internalType" : "uint256[]", + "name" : "_values", + "type" : "uint256[]" + }, + { + "internalType" : "bytes", + "name" : "_data", + "type" : "bytes" + } + ], + "name" : "safeBatchTransferFrom", + "outputs" : [], + "stateMutability" : "nonpayable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_id", + "type" : "uint256" + }, + { + "internalType" : "uint256", + "name" : "_value", + "type" : "uint256" + }, + { + "internalType" : "bytes", + "name" : "_data", + "type" : "bytes" + } + ], + "name" : "safeTransferFrom", + "outputs" : [], + "stateMutability" : "nonpayable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "internalType" : "bool", + "name" : "_approved", + "type" : "bool" + } + ], + "name" : "setApprovalForAll", + "outputs" : [], + "stateMutability" : "nonpayable", + "type" : "function" + } +] diff --git a/tests/ragger/abis/erc721.json b/tests/ragger/abis/erc721.json new file mode 100644 index 000000000..e00d5ca3f --- /dev/null +++ b/tests/ragger/abis/erc721.json @@ -0,0 +1,268 @@ +[ + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_approved", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "Approval", + "type" : "event" + }, + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "indexed" : false, + "internalType" : "bool", + "name" : "_approved", + "type" : "bool" + } + ], + "name" : "ApprovalForAll", + "type" : "event" + }, + { + "anonymous" : false, + "inputs" : [ + { + "indexed" : true, + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "indexed" : true, + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "Transfer", + "type" : "event" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_approved", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "approve", + "outputs" : [], + "stateMutability" : "payable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_owner", + "type" : "address" + } + ], + "name" : "balanceOf", + "outputs" : [ + { + "internalType" : "uint256", + "name" : "", + "type" : "uint256" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "getApproved", + "outputs" : [ + { + "internalType" : "address", + "name" : "", + "type" : "address" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_owner", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_operator", + "type" : "address" + } + ], + "name" : "isApprovedForAll", + "outputs" : [ + { + "internalType" : "bool", + "name" : "", + "type" : "bool" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "ownerOf", + "outputs" : [ + { + "internalType" : "address", + "name" : "", + "type" : "address" + } + ], + "stateMutability" : "view", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "safeTransferFrom", + "outputs" : [], + "stateMutability" : "payable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + }, + { + "internalType" : "bytes", + "name" : "data", + "type" : "bytes" + } + ], + "name" : "safeTransferFrom", + "outputs" : [], + "stateMutability" : "payable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_operator", + "type" : "address" + }, + { + "internalType" : "bool", + "name" : "_approved", + "type" : "bool" + } + ], + "name" : "setApprovalForAll", + "outputs" : [], + "stateMutability" : "nonpayable", + "type" : "function" + }, + { + "inputs" : [ + { + "internalType" : "address", + "name" : "_from", + "type" : "address" + }, + { + "internalType" : "address", + "name" : "_to", + "type" : "address" + }, + { + "internalType" : "uint256", + "name" : "_tokenId", + "type" : "uint256" + } + ], + "name" : "transferFrom", + "outputs" : [], + "stateMutability" : "payable", + "type" : "function" + } +] diff --git a/tests/ragger/requirements.txt b/tests/ragger/requirements.txt index b493e4864..1149a7528 100644 --- a/tests/ragger/requirements.txt +++ b/tests/ragger/requirements.txt @@ -1,4 +1,3 @@ -ragger[speculos] pytest ecdsa ./client/ diff --git a/tests/ragger/test_domain_name.py b/tests/ragger/test_domain_name.py index 074051481..36637a368 100644 --- a/tests/ragger/test_domain_name.py +++ b/tests/ragger/test_domain_name.py @@ -9,6 +9,8 @@ from ledger_app_clients.ethereum.client import EthAppClient, StatusWord from ledger_app_clients.ethereum.settings import SettingID, settings_toggle +from web3 import Web3 + ROOT_SCREENSHOT_PATH = Path(__file__).parent @@ -52,24 +54,26 @@ def test_send_fund(firmware: Firmware, with app_client.provide_domain_name(challenge, NAME, ADDR): pass - with app_client.sign_legacy(BIP32_PATH, - NONCE, - GAS_PRICE, - GAS_LIMIT, - ADDR, - AMOUNT, - CHAIN_ID): + with app_client.sign(BIP32_PATH, + { + "nonce": NONCE, + "gasPrice": Web3.to_wei(GAS_PRICE, "gwei"), + "gas": GAS_LIMIT, + "to": ADDR, + "value": Web3.to_wei(AMOUNT, "ether"), + "chainId": CHAIN_ID + }): moves = list() if firmware.device.startswith("nano"): - moves += [ NavInsID.RIGHT_CLICK ] * 4 + moves += [NavInsID.RIGHT_CLICK] * 4 if verbose: - moves += [ NavInsID.RIGHT_CLICK ] - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2 + moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2 if verbose: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH, "domain_name_verbose_" + str(verbose), moves) @@ -87,7 +91,7 @@ def test_send_fund_wrong_challenge(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.INVALID_DATA else: - assert False # An exception should have been raised + assert False # An exception should have been raised def test_send_fund_wrong_addr(firmware: Firmware, @@ -103,20 +107,22 @@ def test_send_fund_wrong_addr(firmware: Firmware, addr = bytearray(ADDR) addr.reverse() - with app_client.sign_legacy(BIP32_PATH, - NONCE, - GAS_PRICE, - GAS_LIMIT, - addr, - AMOUNT, - CHAIN_ID): + with app_client.sign(BIP32_PATH, + { + "nonce": NONCE, + "gasPrice": Web3.to_wei(GAS_PRICE, "gwei"), + "gas": GAS_LIMIT, + "to": bytes(addr), + "value": Web3.to_wei(AMOUNT, "ether"), + "chainId": CHAIN_ID + }): moves = list() if firmware.device.startswith("nano"): - moves += [ NavInsID.RIGHT_CLICK ] * 4 - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] * 4 + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2 - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2 + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH, "domain_name_wrong_addr", moves) @@ -132,20 +138,22 @@ def test_send_fund_non_mainnet(firmware: Firmware, with app_client.provide_domain_name(challenge, NAME, ADDR): pass - with app_client.sign_legacy(BIP32_PATH, - NONCE, - GAS_PRICE, - GAS_LIMIT, - ADDR, - AMOUNT, - 5): + with app_client.sign(BIP32_PATH, + { + "nonce": NONCE, + "gasPrice": Web3.to_wei(GAS_PRICE, "gwei"), + "gas": GAS_LIMIT, + "to": ADDR, + "value": Web3.to_wei(AMOUNT, "ether"), + "chainId": 5 + }): moves = list() if firmware.device.startswith("nano"): - moves += [ NavInsID.RIGHT_CLICK ] * 5 - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] * 5 + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2 - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2 + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH, "domain_name_non_mainnet", moves) @@ -161,20 +169,22 @@ def test_send_fund_unknown_chain(firmware: Firmware, with app_client.provide_domain_name(challenge, NAME, ADDR): pass - with app_client.sign_legacy(BIP32_PATH, - NONCE, - GAS_PRICE, - GAS_LIMIT, - ADDR, - AMOUNT, - 9): + with app_client.sign(BIP32_PATH, + { + "nonce": NONCE, + "gasPrice": Web3.to_wei(GAS_PRICE, "gwei"), + "gas": GAS_LIMIT, + "to": ADDR, + "value": Web3.to_wei(AMOUNT, "ether"), + "chainId": 9 + }): moves = list() if firmware.device.startswith("nano"): - moves += [ NavInsID.RIGHT_CLICK ] * 5 - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] * 5 + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 3 - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] * 3 + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH, "domain_name_unknown_chain", moves) @@ -192,7 +202,7 @@ def test_send_fund_domain_too_long(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.INVALID_DATA else: - assert False # An exception should have been raised + assert False # An exception should have been raised def test_send_fund_domain_invalid_character(firmware: Firmware, @@ -207,7 +217,7 @@ def test_send_fund_domain_invalid_character(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.INVALID_DATA else: - assert False # An exception should have been raised + assert False # An exception should have been raised def test_send_fund_uppercase(firmware: Firmware, @@ -222,7 +232,7 @@ def test_send_fund_uppercase(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.INVALID_DATA else: - assert False # An exception should have been raised + assert False # An exception should have been raised def test_send_fund_domain_non_ens(firmware: Firmware, @@ -237,4 +247,4 @@ def test_send_fund_domain_non_ens(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.INVALID_DATA else: - assert False # An exception should have been raised + assert False # An exception should have been raised diff --git a/tests/ragger/test_eip712.py b/tests/ragger/test_eip712.py index 081695890..ec4a22af1 100644 --- a/tests/ragger/test_eip712.py +++ b/tests/ragger/test_eip712.py @@ -8,7 +8,7 @@ from ragger.backend import BackendInterface from ragger.firmware import Firmware from ragger.navigator import Navigator, NavInsID -from typing import List +import json import ledger_app_clients.ethereum.response_parser as ResponseParser from ledger_app_clients.ethereum.client import EthAppClient @@ -19,7 +19,7 @@ BIP32_PATH = "m/44'/60'/0'/0/0" -def input_files() -> List[str]: +def input_files() -> list[str]: files = [] for file in os.scandir("%s/eip712_input_files" % (os.path.dirname(__file__))): if fnmatch.fnmatch(file, "*-data.json"): @@ -52,16 +52,16 @@ def test_eip712_legacy(firmware: Firmware, bytes.fromhex('eb4221181ff3f1a83ea7313993ca9218496e424604ba9492bb4052c03d5c3df8')): moves = list() if firmware.device.startswith("nano"): - moves += [ NavInsID.RIGHT_CLICK ] + moves += [NavInsID.RIGHT_CLICK] if firmware.device == "nanos": screens_per_hash = 4 else: screens_per_hash = 2 - moves += [ NavInsID.RIGHT_CLICK ] * screens_per_hash * 2 - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] * screens_per_hash * 2 + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2 - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2 + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate(moves) v, r, s = ResponseParser.signature(app_client.response().data) @@ -74,9 +74,9 @@ def test_eip712_legacy(firmware: Firmware, def autonext(fw: Firmware, nav: Navigator): moves = list() if fw.device.startswith("nano"): - moves = [ NavInsID.RIGHT_CLICK ] + moves = [NavInsID.RIGHT_CLICK] else: - moves = [ NavInsID.USE_CASE_REVIEW_TAP ] + moves = [NavInsID.USE_CASE_REVIEW_TAP] nav.navigate(moves, screen_change_before_first_instruction=False, screen_change_after_last_instruction=False) @@ -92,10 +92,14 @@ def test_eip712_new(firmware: Firmware, else: test_path = "%s/%s" % (input_file.parent, "-".join(input_file.stem.split("-")[:-1])) conf_file = "%s.ini" % (test_path) - filter_file = None + filters = None if filtering: - filter_file = "%s-filter.json" % (test_path) + try: + with open("%s-filter.json" % (test_path)) as f: + filters = json.load(f) + except (IOError, json.decoder.JSONDecodeError) as e: + pytest.skip("Filter file error: %s" % (e.strerror)) config = ConfigParser() config.read(conf_file) @@ -106,34 +110,30 @@ def test_eip712_new(firmware: Firmware, assert "r" in config["signature"] assert "s" in config["signature"] - if not filtering or Path(filter_file).is_file(): - if verbose: - settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712]) + if verbose: + settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712]) + + with open(input_file) as file: + assert InputData.process_data(app_client, + json.load(file), + filters, + partial(autonext, firmware, navigator)) + with app_client.eip712_sign_new(BIP32_PATH): + # tight on timing, needed by the CI otherwise might fail sometimes + time.sleep(0.5) - assert InputData.process_file(app_client, - input_file, - filter_file, - partial(autonext, firmware, navigator)) == True - with app_client.eip712_sign_new(BIP32_PATH, verbose): - time.sleep(0.5) # tight on timing, needed by the CI otherwise might fail sometimes moves = list() if firmware.device.startswith("nano"): - if not verbose and not filtering: # need to skip the message hash - moves = [ NavInsID.RIGHT_CLICK ] * 2 - moves += [ NavInsID.BOTH_CLICK ] + if not verbose and not filtering: # need to skip the message hash + moves = [NavInsID.RIGHT_CLICK] * 2 + moves += [NavInsID.BOTH_CLICK] else: - if not verbose and not filtering: # need to skip the message hash - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + if not verbose and not filtering: # need to skip the message hash + moves += [NavInsID.USE_CASE_REVIEW_TAP] + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] navigator.navigate(moves) v, r, s = ResponseParser.signature(app_client.response().data) - #print("[signature]") - #print("v = %s" % (v.hex())) - #print("r = %s" % (r.hex())) - #print("s = %s" % (s.hex())) - - assert v == bytes.fromhex(config["signature"]["v"]) - assert r == bytes.fromhex(config["signature"]["r"]) - assert s == bytes.fromhex(config["signature"]["s"]) - else: - pytest.skip("No filter file found") + + assert v == bytes.fromhex(config["signature"]["v"]) + assert r == bytes.fromhex(config["signature"]["r"]) + assert s == bytes.fromhex(config["signature"]["s"]) diff --git a/tests/ragger/test_get_address.py b/tests/ragger/test_get_address.py index 33ab5f8df..46c290a1b 100644 --- a/tests/ragger/test_get_address.py +++ b/tests/ragger/test_get_address.py @@ -6,20 +6,22 @@ from ragger.backend import BackendInterface from ragger.navigator import Navigator, NavInsID from ledger_app_clients.ethereum.client import EthAppClient, StatusWord -from ledger_app_clients.ethereum.settings import SettingID, settings_toggle import ledger_app_clients.ethereum.response_parser as ResponseParser from ragger.bip import calculate_public_key_and_chaincode, CurveChoice ROOT_SCREENSHOT_PATH = Path(__file__).parent + @pytest.fixture(params=[True, False]) def with_chaincode(request) -> bool: return request.param + @pytest.fixture(params=[None, 1, 2, 5, 137]) def chain(request) -> Optional[int]: return request.param + def get_moves(firmware: Firmware, navigator: BackendInterface, chain: Optional[int] = None, @@ -27,25 +29,26 @@ def get_moves(firmware: Firmware, moves = list() if firmware.is_nano: - moves += [ NavInsID.RIGHT_CLICK ] + moves += [NavInsID.RIGHT_CLICK] if firmware.device == "nanos": - moves += [ NavInsID.RIGHT_CLICK ] * 3 + moves += [NavInsID.RIGHT_CLICK] * 3 else: - moves += [ NavInsID.RIGHT_CLICK ] + moves += [NavInsID.RIGHT_CLICK] if reject: - moves += [ NavInsID.RIGHT_CLICK ] - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] + moves += [NavInsID.USE_CASE_REVIEW_TAP] if chain is not None and chain > 1: - moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_TAP ] + moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_TAP] if reject: - moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CANCEL ] + moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CANCEL] else: - moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CONFIRM ] + moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CONFIRM] return moves + def test_get_pk_rejected(firmware: Firmware, backend: BackendInterface, navigator: Navigator): @@ -59,7 +62,8 @@ def test_get_pk_rejected(firmware: Firmware, except ExceptionRAPDU as e: assert e.status == StatusWord.CONDITION_NOT_SATISFIED else: - assert False # An exception should have been raised + assert False # An exception should have been raised + def test_get_pk(firmware: Firmware, backend: BackendInterface, diff --git a/tests/ragger/test_nft.py b/tests/ragger/test_nft.py index bba13cf3a..17430c1fc 100644 --- a/tests/ragger/test_nft.py +++ b/tests/ragger/test_nft.py @@ -1,17 +1,21 @@ import pytest +from typing import Optional, Any from pathlib import Path from typing import Callable from ragger.error import ExceptionRAPDU from ragger.firmware import Firmware from ragger.backend import BackendInterface from ragger.navigator import Navigator, NavInsID -from ledger_app_clients.ethereum.client import EthAppClient, TxData, StatusWord -from ledger_app_clients.ethereum.settings import SettingID, settings_toggle -from ledger_app_clients.ethereum.utils import get_selector_from_function -import struct +from ledger_app_clients.ethereum.client import EthAppClient, StatusWord +import ledger_app_clients.ethereum.response_parser as ResponseParser +from ledger_app_clients.ethereum.utils import get_selector_from_data, recover_transaction +from web3 import Web3 +import json +import os ROOT_SCREENSHOT_PATH = Path(__file__).parent +ABIS_FOLDER = "%s/abis" % (os.path.dirname(__file__)) BIP32_PATH = "m/44'/60'/0'/0/0" NONCE = 21 @@ -19,51 +23,63 @@ GAS_LIMIT = 21000 FROM = bytes.fromhex("1122334455667788990011223344556677889900") TO = bytes.fromhex("0099887766554433221100998877665544332211") -NFTS = [ (1, 3), (5, 2), (7, 4) ] # tuples of (token_id, amount) +NFTS = [(1, 3), (5, 2), (7, 4)] # tuples of (token_id, amount) DATA = "Some data".encode() +DEVICE_ADDR: Optional[bytes] = None -class NFTCollection: + +class NFTCollection: addr: bytes name: str chain_id: int - def __init__(self, addr: bytes, name: str, chain_id: int): + + def __init__(self, addr: bytes, name: str, chain_id: int, contract): self.addr = addr self.name = name self.chain_id = chain_id + self.contract = contract + -class Action: - fn: str - data_fn: Callable +class Action: + fn_name: str + fn_args: list[Any] nav_fn: Callable - def __init__(self, fn: str, data_fn: Callable, nav_fn: Callable): - self.fn = fn - self.data_fn = data_fn + + def __init__(self, fn_name: str, fn_args: list[Any], nav_fn: Callable): + self.fn_name = fn_name + self.fn_args = fn_args self.nav_fn = nav_fn -def common_nav_nft(is_nano: bool, nano_steps: int, stax_steps: int, reject: bool) -> list[NavInsID]: + +def common_nav_nft(is_nano: bool, + nano_steps: int, + stax_steps: int, + reject: bool) -> list[NavInsID]: moves = list() if is_nano: - moves += [ NavInsID.RIGHT_CLICK ] * nano_steps + moves += [NavInsID.RIGHT_CLICK] * nano_steps if reject: - moves += [ NavInsID.RIGHT_CLICK ] - moves += [ NavInsID.BOTH_CLICK ] + moves += [NavInsID.RIGHT_CLICK] + moves += [NavInsID.BOTH_CLICK] else: - moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * stax_steps + moves += [NavInsID.USE_CASE_REVIEW_TAP] * stax_steps if reject: moves += [ NavInsID.USE_CASE_REVIEW_REJECT, NavInsID.USE_CASE_CHOICE_CONFIRM ] else: - moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ] + moves += [NavInsID.USE_CASE_REVIEW_CONFIRM] return moves + def snapshot_test_name(nft_type: str, fn: str, chain_id: int, reject: bool) -> str: - name = "%s_%s_%s" % (nft_type, fn.split("(")[0], str(chain_id)) + name = "%s_%s_%s" % (nft_type, fn, str(chain_id)) if reject: name += "-rejected" return name + def common_test_nft(fw: Firmware, back: BackendInterface, nav: Navigator, @@ -71,34 +87,48 @@ def common_test_nft(fw: Firmware, action: Action, reject: bool, plugin_name: str): + global DEVICE_ADDR app_client = EthAppClient(back) - selector = get_selector_from_function(action.fn) if app_client._client.firmware.name == "nanos": pytest.skip("Not supported on LNS") + + if DEVICE_ADDR is None: # to only have to request it once + with app_client.get_public_addr(display=False): + pass + _, DEVICE_ADDR, _ = ResponseParser.pk_addr(app_client.response().data) + + data = collec.contract.encodeABI(action.fn_name, action.fn_args) with app_client.set_plugin(plugin_name, collec.addr, - selector, - 1): + get_selector_from_data(data), + collec.chain_id): pass with app_client.provide_nft_metadata(collec.name, collec.addr, collec.chain_id): pass - with app_client.sign_legacy(BIP32_PATH, - NONCE, - GAS_PRICE, - GAS_LIMIT, - collec.addr, - 0, - collec.chain_id, - action.data_fn(action)): + tx_params = { + "nonce": NONCE, + "gasPrice": Web3.to_wei(GAS_PRICE, "gwei"), + "gas": GAS_LIMIT, + "to": collec.addr, + "value": 0, + "chainId": collec.chain_id, + "data": data, + } + with app_client.sign(BIP32_PATH, tx_params): nav.navigate_and_compare(ROOT_SCREENSHOT_PATH, snapshot_test_name(plugin_name.lower(), - action.fn, + action.fn_name, collec.chain_id, reject), action.nav_fn(fw.is_nano, collec.chain_id, reject)) + # verify signature + vrs = ResponseParser.signature(app_client.response().data) + addr = recover_transaction(tx_params, vrs) + assert addr == DEVICE_ADDR + def common_test_nft_reject(test_fn: Callable, fw: Firmware, @@ -111,53 +141,20 @@ def common_test_nft_reject(test_fn: Callable, except ExceptionRAPDU as e: assert e.status == StatusWord.CONDITION_NOT_SATISFIED else: - assert False # An exception should have been raised + assert False # An exception should have been raised # ERC-721 -ERC721_PLUGIN = "ERC721" -ERC721_SAFE_TRANSFER_FROM_DATA = "safeTransferFrom(address,address,uint256,bytes)" -ERC721_SAFE_TRANSFER_FROM = "safeTransferFrom(address,address,uint256)" -ERC721_TRANSFER_FROM = "transferFrom(address,address,uint256)" -ERC721_APPROVE = "approve(address,uint256)" -ERC721_SET_APPROVAL_FOR_ALL = "setApprovalForAll(address,bool)" - -## data formatting functions - -def data_erc721_transfer_from(action: Action) -> TxData: - return TxData( - get_selector_from_function(action.fn), - [ - FROM, - TO, - struct.pack(">H", NFTS[0][0]) - ] - ) -def data_erc721_safe_transfer_from_data(action: Action) -> TxData: - txd = data_erc721_transfer_from(action) - txd.parameters += [ DATA ] - return txd - -def data_erc721_approve(action: Action) -> TxData: - return TxData( - get_selector_from_function(action.fn), - [ - TO, - struct.pack(">H", NFTS[0][0]) - ] - ) +ERC721_PLUGIN = "ERC721" -def data_erc721_set_approval_for_all(action: Action) -> TxData: - return TxData( - get_selector_from_function(action.fn), - [ - TO, - struct.pack("b", False) - ] +with open("%s/erc721.json" % (ABIS_FOLDER)) as file: + contract_erc721 = Web3().eth.contract( + abi=json.load(file), + address=bytes(20) ) -## ui nav functions +# ui nav functions def nav_erc721_transfer_from(is_nano: bool, chain_id: int, @@ -169,6 +166,7 @@ def nav_erc721_transfer_from(is_nano: bool, stax_steps += 1 return common_nav_nft(is_nano, nano_steps, stax_steps, reject) + def nav_erc721_approve(is_nano: bool, chain_id: int, reject: bool) -> list[NavInsID]: @@ -179,6 +177,7 @@ def nav_erc721_approve(is_nano: bool, stax_steps += 1 return common_nav_nft(is_nano, nano_steps, stax_steps, reject) + def nav_erc721_set_approval_for_all(is_nano: bool, chain_id: int, reject: bool) -> list[NavInsID]: @@ -187,49 +186,56 @@ def nav_erc721_set_approval_for_all(is_nano: bool, nano_steps += 1 return common_nav_nft(is_nano, nano_steps, 3, reject) + collecs_721 = [ NFTCollection(bytes.fromhex("bc4ca0eda7647a8ab7c2061c2e118a18a936f13d"), - "Bored Ape Yacht Club", - 1), + "Bored Ape Yacht Club", + 1, + contract_erc721), NFTCollection(bytes.fromhex("670fd103b1a08628e9557cd66b87ded841115190"), - "y00ts", - 137), + "y00ts", + 137, + contract_erc721), NFTCollection(bytes.fromhex("2909cf13e458a576cdd9aab6bd6617051a92dacf"), - "goerlirocks", - 5) + "goerlirocks", + 5, + contract_erc721), ] actions_721 = [ - Action(ERC721_SAFE_TRANSFER_FROM_DATA, - data_erc721_safe_transfer_from_data, + Action("safeTransferFrom", + [FROM, TO, NFTS[0][0], DATA], nav_erc721_transfer_from), - Action(ERC721_SAFE_TRANSFER_FROM, - data_erc721_transfer_from, + Action("safeTransferFrom", + [FROM, TO, NFTS[0][0]], nav_erc721_transfer_from), - Action(ERC721_TRANSFER_FROM, - data_erc721_transfer_from, + Action("transferFrom", + [FROM, TO, NFTS[0][0]], nav_erc721_transfer_from), - Action(ERC721_APPROVE, - data_erc721_approve, + Action("approve", + [TO, NFTS[0][0]], nav_erc721_approve), - Action(ERC721_SET_APPROVAL_FOR_ALL, - data_erc721_set_approval_for_all, - nav_erc721_set_approval_for_all) + Action("setApprovalForAll", + [TO, False], + nav_erc721_set_approval_for_all), ] @pytest.fixture(params=collecs_721) def collec_721(request) -> NFTCollection: return request.param + + @pytest.fixture(params=actions_721) def action_721(request) -> Action: return request.param + def test_erc721(firmware: Firmware, - backend: BackendInterface, - navigator: Navigator, - collec_721: NFTCollection, - action_721: Action, - reject: bool = False): + backend: BackendInterface, + navigator: Navigator, + collec_721: NFTCollection, + action_721: Action, + reject: bool = False): common_test_nft(firmware, backend, navigator, @@ -238,9 +244,10 @@ def test_erc721(firmware: Firmware, reject, ERC721_PLUGIN) + def test_erc721_reject(firmware: Firmware, - backend: BackendInterface, - navigator: Navigator): + backend: BackendInterface, + navigator: Navigator): common_test_nft_reject(test_erc721, firmware, backend, @@ -248,54 +255,19 @@ def test_erc721_reject(firmware: Firmware, collecs_721[0], actions_721[0]) + # ERC-1155 ERC1155_PLUGIN = "ERC1155" -ERC1155_SAFE_TRANSFER_FROM = "safeTransferFrom(address,address,uint256,uint256,bytes)" -ERC1155_SAFE_BATCH_TRANSFER_FROM = "safeBatchTransferFrom(address,address,uint256[],uint256[],bytes)" -ERC1155_SET_APPROVAL_FOR_ALL = "setApprovalForAll(address,bool)" - -## data formatting functions - -def data_erc1155_safe_transfer_from(action: Action) -> TxData: - return TxData( - get_selector_from_function(action.fn), - [ - FROM, - TO, - struct.pack(">H", NFTS[0][0]), - struct.pack(">H", NFTS[0][1]), - DATA - ] - ) -def data_erc1155_safe_batch_transfer_from(action: Action) -> TxData: - data = TxData( - get_selector_from_function(action.fn), - [ - FROM, - TO - ]) - data.parameters += [ int(32 * 4).to_bytes(8, "big") ] # token_ids offset - data.parameters += [int(32 * (4 + len(NFTS) + 1)).to_bytes(8, "big") ] # amounts offset - data.parameters += [ int(len(NFTS)).to_bytes(8, "big") ] # token_ids length - for nft in NFTS: - data.parameters += [ struct.pack(">H", nft[0]) ] # token_id - data.parameters += [ int(len(NFTS)).to_bytes(8, "big") ] # amounts length - for nft in NFTS: - data.parameters += [ struct.pack(">H", nft[1]) ] # amount - return data - -def data_erc1155_set_approval_for_all(action: Action) -> TxData: - return TxData( - get_selector_from_function(action.fn), - [ - TO, - struct.pack("b", False) - ] +with open("%s/erc1155.json" % (ABIS_FOLDER)) as file: + contract_erc1155 = Web3().eth.contract( + abi=json.load(file), + address=bytes(20) ) -## ui nav functions + +# ui nav functions def nav_erc1155_safe_transfer_from(is_nano: bool, chain_id: int, @@ -305,6 +277,7 @@ def nav_erc1155_safe_transfer_from(is_nano: bool, nano_steps += 1 return common_nav_nft(is_nano, nano_steps, 4, reject) + def nav_erc1155_safe_batch_transfer_from(is_nano: bool, chain_id: int, reject: bool) -> list: @@ -315,6 +288,7 @@ def nav_erc1155_safe_batch_transfer_from(is_nano: bool, stax_steps += 1 return common_nav_nft(is_nano, nano_steps, stax_steps, reject) + def nav_erc1155_set_approval_for_all(is_nano: bool, chain_id: int, reject: bool) -> list: @@ -323,35 +297,50 @@ def nav_erc1155_set_approval_for_all(is_nano: bool, nano_steps += 1 return common_nav_nft(is_nano, nano_steps, 3, reject) + collecs_1155 = [ NFTCollection(bytes.fromhex("495f947276749ce646f68ac8c248420045cb7b5e"), - "OpenSea Shared Storefront", - 1), + "OpenSea Shared Storefront", + 1, + contract_erc1155), NFTCollection(bytes.fromhex("2953399124f0cbb46d2cbacd8a89cf0599974963"), - "OpenSea Collections", - 137), + "OpenSea Collections", + 137, + contract_erc1155), NFTCollection(bytes.fromhex("f4910c763ed4e47a585e2d34baa9a4b611ae448c"), - "OpenSea Collections", - 5) + "OpenSea Collections", + 5, + contract_erc1155), ] actions_1155 = [ - Action(ERC1155_SAFE_TRANSFER_FROM, - data_erc1155_safe_transfer_from, + Action("safeTransferFrom", + [FROM, TO, NFTS[0][0], NFTS[0][1], DATA], nav_erc1155_safe_transfer_from), - Action(ERC1155_SAFE_BATCH_TRANSFER_FROM, - data_erc1155_safe_batch_transfer_from, + Action("safeBatchTransferFrom", + [ + FROM, + TO, + list(map(lambda nft: nft[0], NFTS)), + list(map(lambda nft: nft[1], NFTS)), + DATA + ], nav_erc1155_safe_batch_transfer_from), - Action(ERC1155_SET_APPROVAL_FOR_ALL, - data_erc1155_set_approval_for_all, - nav_erc1155_set_approval_for_all) + Action("setApprovalForAll", + [TO, False], + nav_erc1155_set_approval_for_all), ] + + @pytest.fixture(params=collecs_1155) def collec_1155(request) -> bool: return request.param + + @pytest.fixture(params=actions_1155) def action_1155(request) -> Action: return request.param + def test_erc1155(firmware: Firmware, backend: BackendInterface, navigator: Navigator, @@ -366,6 +355,7 @@ def test_erc1155(firmware: Firmware, reject, ERC1155_PLUGIN) + def test_erc1155_reject(firmware: Firmware, backend: BackendInterface, navigator: Navigator):