Merge pull request #498 from LedgerHQ/feat/apa/python_client_improvements

Python client & tests improvements
This commit is contained in:
apaillier-ledger
2023-12-01 16:12:08 +01:00
committed by GitHub
17 changed files with 950 additions and 405 deletions

View File

@@ -28,8 +28,7 @@ dynamic = [ "version" ]
requires-python = ">=3.7"
dependencies = [
"ragger[speculos]",
"simple-rlp",
"pysha3",
"web3~=6.0",
]
[tools.setuptools]

View File

@@ -1 +1 @@
__version__ = "0.1.0"
__version__ = "0.2.0"

View File

@@ -2,25 +2,14 @@ import rlp
from enum import IntEnum
from ragger.backend import BackendInterface
from ragger.utils import RAPDU
from typing import List, Optional, Union
from typing import Optional
from .command_builder import CommandBuilder
from .eip712 import EIP712FieldType
from .keychain import sign_data, Key
from .tlv import format_tlv
WEI_IN_ETH = 1e+18
GWEI_IN_ETH = 1e+9
class TxData:
selector: bytes
parameters: list[bytes]
def __init__(self, selector: bytes, params: list[bytes]):
self.selector = selector
self.parameters = params
from web3 import Web3
class StatusWord(IntEnum):
@@ -64,7 +53,7 @@ class EthAppClient:
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: List,
array_levels: list,
key_name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
field_type,
@@ -86,7 +75,7 @@ class EthAppClient:
pass
return self._send(chunks[-1])
def eip712_sign_new(self, bip32_path: str, verbose: bool):
def eip712_sign_new(self, bip32_path: str):
return self._send(self._cmd_builder.eip712_sign_new(bip32_path))
def eip712_sign_legacy(self,
@@ -106,79 +95,26 @@ class EthAppClient:
def eip712_filtering_show_field(self, name: str, sig: bytes):
return self._send(self._cmd_builder.eip712_filtering_show_field(name, sig))
def _sign(self, bip32_path: str, raw_tx: bytes):
chunks = self._cmd_builder.sign(bip32_path, raw_tx)
def sign(self,
bip32_path: str,
tx_params: dict):
tx = Web3().eth.account.create().sign_transaction(tx_params).rawTransaction
prefix = bytes()
suffix = []
if tx[0] in [0x01, 0x02]:
prefix = tx[:1]
tx = tx[len(prefix):]
else: # legacy
if "chainId" in tx_params:
suffix = [int(tx_params["chainId"]), bytes(), bytes()]
decoded = rlp.decode(tx)[:-3] # remove already computed signature
tx = prefix + rlp.encode(decoded + suffix)
chunks = self._cmd_builder.sign(bip32_path, tx, suffix)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
def _data_to_payload(self, data: TxData) -> bytes:
payload = bytearray(data.selector)
for param in data.parameters:
payload += param.rjust(32, b'\x00')
return payload
def _sign_common(self,
tx: list,
gas_price: float,
gas_limit: int,
destination: bytes,
amount: float,
data: Optional[TxData]):
tx.append(int(gas_price * GWEI_IN_ETH))
tx.append(gas_limit)
tx.append(destination)
if amount > 0:
tx.append(int(amount * WEI_IN_ETH))
else:
tx.append(bytes())
if data is not None:
tx.append(self._data_to_payload(data))
else:
tx.append(bytes())
return tx
def sign_legacy(self,
bip32_path: str,
nonce: int,
gas_price: float,
gas_limit: int,
destination: bytes,
amount: float,
chain_id: int,
data: Optional[TxData] = None):
tx: List[Union[int, bytes]] = list()
tx.append(nonce)
tx = self._sign_common(tx, gas_price, gas_limit, destination, amount, data)
tx.append(chain_id)
tx.append(bytes())
tx.append(bytes())
return self._sign(bip32_path, rlp.encode(tx))
def sign_1559(self,
bip32_path: str,
chain_id: int,
nonce: int,
max_prio_gas_price: float,
max_gas_price: float,
gas_limit: int,
destination: bytes,
amount: float,
data: Optional[TxData] = None,
access_list=list()):
tx: List[Union[int, bytes]] = list()
tx.append(chain_id)
tx.append(nonce)
tx.append(int(max_prio_gas_price * GWEI_IN_ETH))
tx = self._sign_common(tx, max_gas_price, gas_limit, destination, amount, data)
tx.append(access_list)
tx.append(False)
tx.append(bytes())
tx.append(bytes())
# prefix with transaction type
return self._sign(bip32_path, b'\x02' + rlp.encode(tx))
def get_challenge(self):
return self._send(self._cmd_builder.get_challenge())
@@ -286,5 +222,12 @@ class EthAppClient:
tmp = self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, bytes())
# skip APDU header & empty sig
sig = sign_data(Key.SET_PLUGIN, tmp[5:-1])
sig = sign_data(Key.CAL, tmp[5:])
return self._send(self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, sig))
def personal_sign(self, path: str, msg: bytes):
chunks = self._cmd_builder.personal_sign(path, msg)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])

View File

@@ -5,7 +5,6 @@ import struct
from enum import IntEnum
from typing import Optional
from ragger.bip import pack_derivation_path
from typing import List
from .eip712 import EIP712FieldType
@@ -13,6 +12,7 @@ from .eip712 import EIP712FieldType
class InsType(IntEnum):
GET_PUBLIC_ADDR = 0x02
SIGN = 0x04
PERSONAL_SIGN = 0x08
PROVIDE_NFT_INFORMATION = 0x14
SET_PLUGIN = 0x16
EIP712_SEND_STRUCT_DEF = 0x1a
@@ -75,7 +75,7 @@ class CommandBuilder:
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: List,
array_levels: list,
key_name: str) -> bytes:
data = bytearray()
typedesc = 0
@@ -115,7 +115,7 @@ class CommandBuilder:
P2Type.ARRAY,
data)
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> List[bytes]:
def eip712_send_struct_impl_struct_field(self, data: bytearray) -> list[bytes]:
chunks = list()
# Add a 16-bit integer with the data's byte length (network byte order)
data_w_length = bytearray()
@@ -195,17 +195,27 @@ class CommandBuilder:
0x00,
data)
def sign(self, bip32_path: str, rlp_data: bytes) -> list[bytes]:
def sign(self, bip32_path: str, rlp_data: bytes, vrs: list) -> list[bytes]:
apdus = list()
payload = pack_derivation_path(bip32_path)
payload += rlp_data
p1 = P1Type.SIGN_FIRST_CHUNK
while len(payload) > 0:
chunk_size = 0xff
# TODO: Fix the app & remove this, issue #409
if len(vrs) == 3:
if len(payload) > chunk_size:
import rlp
diff = len(rlp.encode(vrs)) - (len(payload) - chunk_size)
if diff > 0:
chunk_size -= diff
apdus.append(self._serialize(InsType.SIGN,
p1,
0x00,
payload[:0xff]))
payload = payload[0xff:]
payload[:chunk_size]))
payload = payload[chunk_size:]
p1 = P1Type.SIGN_SUBSQT_CHUNK
return apdus
@@ -284,3 +294,19 @@ class CommandBuilder:
payload.append(len(sig))
payload += sig
return self._serialize(InsType.PROVIDE_NFT_INFORMATION, 0x00, 0x00, payload)
def personal_sign(self, path: str, msg: bytes):
payload = pack_derivation_path(path)
payload += struct.pack(">I", len(msg))
payload += msg
chunks = list()
p1 = P1Type.SIGN_FIRST_CHUNK
while len(payload) > 0:
chunk_size = 0xff
chunks.append(self._serialize(InsType.PERSONAL_SIGN,
p1,
0x00,
payload[:chunk_size]))
payload = payload[chunk_size:]
p1 = P1Type.SIGN_SUBSQT_CHUNK
return chunks

View File

@@ -3,7 +3,8 @@ import json
import re
import signal
import sys
from typing import Any, Callable, Dict, List, Optional
import copy
from typing import Any, Callable, Optional
from ledger_app_clients.ethereum import keychain
from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType
@@ -11,9 +12,9 @@ from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType
# global variables
app_client: EthAppClient = None
filtering_paths: Dict = {}
current_path: List[str] = list()
sig_ctx: Dict[str, Any] = {}
filtering_paths: dict = {}
current_path: list[str] = list()
sig_ctx: dict[str, Any] = {}
def default_handler():
@@ -297,13 +298,6 @@ def send_filtering_show_field(display_name):
pass
def read_filtering_file(filtering_file_path: str):
data_json = None
with open(filtering_file_path) as data:
data_json = json.load(data)
return data_json
def prepare_filtering(filtr_data, message):
global filtering_paths
@@ -355,62 +349,61 @@ def disable_autonext():
signal.setitimer(signal.ITIMER_REAL, 0, 0)
def process_file(aclient: EthAppClient,
input_file_path: str,
filtering_file_path: Optional[str] = None,
def process_data(aclient: EthAppClient,
data_json: dict,
filters: Optional[dict] = None,
autonext: Optional[Callable] = None) -> bool:
global sig_ctx
global app_client
global autonext_handler
# deepcopy because this function modifies the dict
data_json = copy.deepcopy(data_json)
app_client = aclient
with open(input_file_path, "r") as data:
data_json = json.load(data)
domain_typename = "EIP712Domain"
message_typename = data_json["primaryType"]
types = data_json["types"]
domain = data_json["domain"]
message = data_json["message"]
domain_typename = "EIP712Domain"
message_typename = data_json["primaryType"]
types = data_json["types"]
domain = data_json["domain"]
message = data_json["message"]
if autonext:
autonext_handler = autonext
signal.signal(signal.SIGALRM, next_timeout)
if autonext:
autonext_handler = autonext
signal.signal(signal.SIGALRM, next_timeout)
if filtering_file_path:
init_signature_context(types, domain)
filtr = read_filtering_file(filtering_file_path)
if filters:
init_signature_context(types, domain)
# send types definition
for key in types.keys():
with app_client.eip712_send_struct_def_struct_name(key):
pass
for f in types[key]:
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
send_struct_def_field(f["type"], f["name"])
# send types definition
for key in types.keys():
with app_client.eip712_send_struct_def_struct_name(key):
pass
for f in types[key]:
(f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \
send_struct_def_field(f["type"], f["name"])
if filtering_file_path:
with app_client.eip712_filtering_activate():
pass
prepare_filtering(filtr, message)
if filters:
with app_client.eip712_filtering_activate():
pass
prepare_filtering(filters, message)
# send domain implementation
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, domain, domain_typename):
return False
# send domain implementation
with app_client.eip712_send_struct_impl_root_struct(domain_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, domain, domain_typename):
return False
if filtering_file_path:
if filtr and "name" in filtr:
send_filtering_message_info(filtr["name"], len(filtering_paths))
else:
send_filtering_message_info(domain["name"], len(filtering_paths))
if filters:
if filters and "name" in filters:
send_filtering_message_info(filters["name"], len(filtering_paths))
else:
send_filtering_message_info(domain["name"], len(filtering_paths))
# send message implementation
with app_client.eip712_send_struct_impl_root_struct(message_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, message, message_typename):
return False
# send message implementation
with app_client.eip712_send_struct_impl_root_struct(message_typename):
enable_autonext()
disable_autonext()
if not send_struct_impl(types, message, message_typename):
return False
return True

View File

@@ -3,7 +3,6 @@ import hashlib
from ecdsa import SigningKey
from ecdsa.util import sigencode_der
from enum import Enum, auto
from typing import Dict
# Private key PEM files have to be named the same (lowercase) as their corresponding enum entries
@@ -15,7 +14,7 @@ class Key(Enum):
NFT = auto()
_keys: Dict[Key, SigningKey] = dict()
_keys: dict[Key, SigningKey] = dict()
# Open the corresponding PEM file and load its key in the global dict

View File

@@ -49,4 +49,4 @@ def pk_addr(data: bytes, has_chaincode: bool = False):
if idx != len(data):
return None
return pk, addr.decode(), chaincode
return pk, bytes.fromhex(addr.decode()), chaincode

View File

@@ -1,7 +1,7 @@
from enum import Enum, auto
from ragger.firmware import Firmware
from ragger.navigator import Navigator, NavInsID, NavIns
from typing import List, Union
from typing import Union
class SettingID(Enum):
@@ -44,7 +44,7 @@ def get_setting_position(device: str, setting: Union[NavInsID, SettingID]) -> tu
def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]):
moves: List[Union[NavIns, NavInsID]] = list()
moves: list[Union[NavIns, NavInsID]] = list()
settings = get_device_settings(fw.device)
# Assume the app is on the home page
if fw.device.startswith("nano"):

View File

@@ -1,5 +1,41 @@
import sha3
from eth_account import Account
from eth_account.messages import encode_defunct, encode_typed_data
import rlp
def get_selector_from_function(fn: str) -> bytes:
return sha3.keccak_256(fn.encode()).digest()[0:4]
def get_selector_from_data(data: str) -> bytes:
raw_data = bytes.fromhex(data[2:])
return raw_data[:4]
def recover_message(msg, vrs: tuple) -> bytes:
if isinstance(msg, dict): # EIP-712
smsg = encode_typed_data(full_message=msg)
else: # EIP-191
smsg = encode_defunct(primitive=msg)
addr = Account.recover_message(smsg, vrs)
return bytes.fromhex(addr[2:])
# TODO: Figure out why it doesn't work for non-legacy transactions
def recover_transaction(tx_params, vrs: tuple) -> bytes:
raw_tx = Account.create().sign_transaction(tx_params).rawTransaction
prefix = bytes()
if raw_tx[0] in [0x01, 0x02]:
prefix = raw_tx[:1]
raw_tx = raw_tx[len(prefix):]
if prefix == bytes():
# v is returned on one byte only so it might have overflowed
# in that case, we will reconstruct it to its full value
if "chainId" in tx_params:
trunc_chain_id = tx_params["chainId"]
while trunc_chain_id.bit_length() > 32:
trunc_chain_id >>= 8
target = tx_params["chainId"] * 2 + 35
trunc_target = trunc_chain_id * 2 + 35
diff = vrs[0][0] - (trunc_target & 0xff)
vrs = (target + diff, vrs[1], vrs[2])
decoded = rlp.decode(raw_tx)
reencoded = rlp.encode(decoded[:-3] + list(vrs))
addr = Account.recover_transaction(prefix + reencoded)
return bytes.fromhex(addr[2:])

View File

@@ -48,7 +48,9 @@ void handleSetExternalPlugin(uint8_t p1,
workBuffer + payload_size,
dataLength - payload_size)) {
#ifndef HAVE_BYPASS_SIGNATURES
PRINTF("Invalid plugin signature %.*H\n", payload_size, workBuffer);
PRINTF("Invalid plugin signature %.*H\n",
dataLength - payload_size,
workBuffer + payload_size);
THROW(0x6A80);
#endif
}

View File

@@ -0,0 +1,276 @@
[
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"indexed" : false,
"internalType" : "bool",
"name" : "_approved",
"type" : "bool"
}
],
"name" : "ApprovalForAll",
"type" : "event"
},
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"indexed" : false,
"internalType" : "uint256[]",
"name" : "_ids",
"type" : "uint256[]"
},
{
"indexed" : false,
"internalType" : "uint256[]",
"name" : "_values",
"type" : "uint256[]"
}
],
"name" : "TransferBatch",
"type" : "event"
},
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"indexed" : false,
"internalType" : "uint256",
"name" : "_id",
"type" : "uint256"
},
{
"indexed" : false,
"internalType" : "uint256",
"name" : "_value",
"type" : "uint256"
}
],
"name" : "TransferSingle",
"type" : "event"
},
{
"anonymous" : false,
"inputs" : [
{
"indexed" : false,
"internalType" : "string",
"name" : "_value",
"type" : "string"
},
{
"indexed" : true,
"internalType" : "uint256",
"name" : "_id",
"type" : "uint256"
}
],
"name" : "URI",
"type" : "event"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_id",
"type" : "uint256"
}
],
"name" : "balanceOf",
"outputs" : [
{
"internalType" : "uint256",
"name" : "",
"type" : "uint256"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address[]",
"name" : "_owners",
"type" : "address[]"
},
{
"internalType" : "uint256[]",
"name" : "_ids",
"type" : "uint256[]"
}
],
"name" : "balanceOfBatch",
"outputs" : [
{
"internalType" : "uint256[]",
"name" : "",
"type" : "uint256[]"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_operator",
"type" : "address"
}
],
"name" : "isApprovedForAll",
"outputs" : [
{
"internalType" : "bool",
"name" : "",
"type" : "bool"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"internalType" : "uint256[]",
"name" : "_ids",
"type" : "uint256[]"
},
{
"internalType" : "uint256[]",
"name" : "_values",
"type" : "uint256[]"
},
{
"internalType" : "bytes",
"name" : "_data",
"type" : "bytes"
}
],
"name" : "safeBatchTransferFrom",
"outputs" : [],
"stateMutability" : "nonpayable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_id",
"type" : "uint256"
},
{
"internalType" : "uint256",
"name" : "_value",
"type" : "uint256"
},
{
"internalType" : "bytes",
"name" : "_data",
"type" : "bytes"
}
],
"name" : "safeTransferFrom",
"outputs" : [],
"stateMutability" : "nonpayable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"internalType" : "bool",
"name" : "_approved",
"type" : "bool"
}
],
"name" : "setApprovalForAll",
"outputs" : [],
"stateMutability" : "nonpayable",
"type" : "function"
}
]

View File

@@ -0,0 +1,268 @@
[
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_approved",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "Approval",
"type" : "event"
},
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"indexed" : false,
"internalType" : "bool",
"name" : "_approved",
"type" : "bool"
}
],
"name" : "ApprovalForAll",
"type" : "event"
},
{
"anonymous" : false,
"inputs" : [
{
"indexed" : true,
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"indexed" : true,
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "Transfer",
"type" : "event"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_approved",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "approve",
"outputs" : [],
"stateMutability" : "payable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_owner",
"type" : "address"
}
],
"name" : "balanceOf",
"outputs" : [
{
"internalType" : "uint256",
"name" : "",
"type" : "uint256"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "getApproved",
"outputs" : [
{
"internalType" : "address",
"name" : "",
"type" : "address"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_owner",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_operator",
"type" : "address"
}
],
"name" : "isApprovedForAll",
"outputs" : [
{
"internalType" : "bool",
"name" : "",
"type" : "bool"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "ownerOf",
"outputs" : [
{
"internalType" : "address",
"name" : "",
"type" : "address"
}
],
"stateMutability" : "view",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "safeTransferFrom",
"outputs" : [],
"stateMutability" : "payable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
},
{
"internalType" : "bytes",
"name" : "data",
"type" : "bytes"
}
],
"name" : "safeTransferFrom",
"outputs" : [],
"stateMutability" : "payable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_operator",
"type" : "address"
},
{
"internalType" : "bool",
"name" : "_approved",
"type" : "bool"
}
],
"name" : "setApprovalForAll",
"outputs" : [],
"stateMutability" : "nonpayable",
"type" : "function"
},
{
"inputs" : [
{
"internalType" : "address",
"name" : "_from",
"type" : "address"
},
{
"internalType" : "address",
"name" : "_to",
"type" : "address"
},
{
"internalType" : "uint256",
"name" : "_tokenId",
"type" : "uint256"
}
],
"name" : "transferFrom",
"outputs" : [],
"stateMutability" : "payable",
"type" : "function"
}
]

View File

@@ -1,4 +1,3 @@
ragger[speculos]
pytest
ecdsa
./client/

View File

@@ -9,6 +9,8 @@ import ledger_app_clients.ethereum.response_parser as ResponseParser
from ledger_app_clients.ethereum.client import EthAppClient, StatusWord
from ledger_app_clients.ethereum.settings import SettingID, settings_toggle
from web3 import Web3
ROOT_SCREENSHOT_PATH = Path(__file__).parent
@@ -52,24 +54,26 @@ def test_send_fund(firmware: Firmware,
with app_client.provide_domain_name(challenge, NAME, ADDR):
pass
with app_client.sign_legacy(BIP32_PATH,
NONCE,
GAS_PRICE,
GAS_LIMIT,
ADDR,
AMOUNT,
CHAIN_ID):
with app_client.sign(BIP32_PATH,
{
"nonce": NONCE,
"gasPrice": Web3.to_wei(GAS_PRICE, "gwei"),
"gas": GAS_LIMIT,
"to": ADDR,
"value": Web3.to_wei(AMOUNT, "ether"),
"chainId": CHAIN_ID
}):
moves = list()
if firmware.device.startswith("nano"):
moves += [ NavInsID.RIGHT_CLICK ] * 4
moves += [NavInsID.RIGHT_CLICK] * 4
if verbose:
moves += [ NavInsID.RIGHT_CLICK ]
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2
if verbose:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ]
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_TAP]
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
"domain_name_verbose_" + str(verbose),
moves)
@@ -87,7 +91,7 @@ def test_send_fund_wrong_challenge(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.INVALID_DATA
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
def test_send_fund_wrong_addr(firmware: Firmware,
@@ -103,20 +107,22 @@ def test_send_fund_wrong_addr(firmware: Firmware,
addr = bytearray(ADDR)
addr.reverse()
with app_client.sign_legacy(BIP32_PATH,
NONCE,
GAS_PRICE,
GAS_LIMIT,
addr,
AMOUNT,
CHAIN_ID):
with app_client.sign(BIP32_PATH,
{
"nonce": NONCE,
"gasPrice": Web3.to_wei(GAS_PRICE, "gwei"),
"gas": GAS_LIMIT,
"to": bytes(addr),
"value": Web3.to_wei(AMOUNT, "ether"),
"chainId": CHAIN_ID
}):
moves = list()
if firmware.device.startswith("nano"):
moves += [ NavInsID.RIGHT_CLICK ] * 4
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK] * 4
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
"domain_name_wrong_addr",
moves)
@@ -132,20 +138,22 @@ def test_send_fund_non_mainnet(firmware: Firmware,
with app_client.provide_domain_name(challenge, NAME, ADDR):
pass
with app_client.sign_legacy(BIP32_PATH,
NONCE,
GAS_PRICE,
GAS_LIMIT,
ADDR,
AMOUNT,
5):
with app_client.sign(BIP32_PATH,
{
"nonce": NONCE,
"gasPrice": Web3.to_wei(GAS_PRICE, "gwei"),
"gas": GAS_LIMIT,
"to": ADDR,
"value": Web3.to_wei(AMOUNT, "ether"),
"chainId": 5
}):
moves = list()
if firmware.device.startswith("nano"):
moves += [ NavInsID.RIGHT_CLICK ] * 5
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK] * 5
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
"domain_name_non_mainnet",
moves)
@@ -161,20 +169,22 @@ def test_send_fund_unknown_chain(firmware: Firmware,
with app_client.provide_domain_name(challenge, NAME, ADDR):
pass
with app_client.sign_legacy(BIP32_PATH,
NONCE,
GAS_PRICE,
GAS_LIMIT,
ADDR,
AMOUNT,
9):
with app_client.sign(BIP32_PATH,
{
"nonce": NONCE,
"gasPrice": Web3.to_wei(GAS_PRICE, "gwei"),
"gas": GAS_LIMIT,
"to": ADDR,
"value": Web3.to_wei(AMOUNT, "ether"),
"chainId": 9
}):
moves = list()
if firmware.device.startswith("nano"):
moves += [ NavInsID.RIGHT_CLICK ] * 5
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK] * 5
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 3
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_TAP] * 3
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate_and_compare(ROOT_SCREENSHOT_PATH,
"domain_name_unknown_chain",
moves)
@@ -192,7 +202,7 @@ def test_send_fund_domain_too_long(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.INVALID_DATA
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
def test_send_fund_domain_invalid_character(firmware: Firmware,
@@ -207,7 +217,7 @@ def test_send_fund_domain_invalid_character(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.INVALID_DATA
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
def test_send_fund_uppercase(firmware: Firmware,
@@ -222,7 +232,7 @@ def test_send_fund_uppercase(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.INVALID_DATA
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
def test_send_fund_domain_non_ens(firmware: Firmware,
@@ -237,4 +247,4 @@ def test_send_fund_domain_non_ens(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.INVALID_DATA
else:
assert False # An exception should have been raised
assert False # An exception should have been raised

View File

@@ -8,7 +8,7 @@ from pathlib import Path
from ragger.backend import BackendInterface
from ragger.firmware import Firmware
from ragger.navigator import Navigator, NavInsID
from typing import List
import json
import ledger_app_clients.ethereum.response_parser as ResponseParser
from ledger_app_clients.ethereum.client import EthAppClient
@@ -19,7 +19,7 @@ from ledger_app_clients.ethereum.settings import SettingID, settings_toggle
BIP32_PATH = "m/44'/60'/0'/0/0"
def input_files() -> List[str]:
def input_files() -> list[str]:
files = []
for file in os.scandir("%s/eip712_input_files" % (os.path.dirname(__file__))):
if fnmatch.fnmatch(file, "*-data.json"):
@@ -52,16 +52,16 @@ def test_eip712_legacy(firmware: Firmware,
bytes.fromhex('eb4221181ff3f1a83ea7313993ca9218496e424604ba9492bb4052c03d5c3df8')):
moves = list()
if firmware.device.startswith("nano"):
moves += [ NavInsID.RIGHT_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
if firmware.device == "nanos":
screens_per_hash = 4
else:
screens_per_hash = 2
moves += [ NavInsID.RIGHT_CLICK ] * screens_per_hash * 2
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK] * screens_per_hash * 2
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * 2
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_TAP] * 2
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate(moves)
v, r, s = ResponseParser.signature(app_client.response().data)
@@ -74,9 +74,9 @@ def test_eip712_legacy(firmware: Firmware,
def autonext(fw: Firmware, nav: Navigator):
moves = list()
if fw.device.startswith("nano"):
moves = [ NavInsID.RIGHT_CLICK ]
moves = [NavInsID.RIGHT_CLICK]
else:
moves = [ NavInsID.USE_CASE_REVIEW_TAP ]
moves = [NavInsID.USE_CASE_REVIEW_TAP]
nav.navigate(moves, screen_change_before_first_instruction=False, screen_change_after_last_instruction=False)
@@ -92,10 +92,14 @@ def test_eip712_new(firmware: Firmware,
else:
test_path = "%s/%s" % (input_file.parent, "-".join(input_file.stem.split("-")[:-1]))
conf_file = "%s.ini" % (test_path)
filter_file = None
filters = None
if filtering:
filter_file = "%s-filter.json" % (test_path)
try:
with open("%s-filter.json" % (test_path)) as f:
filters = json.load(f)
except (IOError, json.decoder.JSONDecodeError) as e:
pytest.skip("Filter file error: %s" % (e.strerror))
config = ConfigParser()
config.read(conf_file)
@@ -106,34 +110,30 @@ def test_eip712_new(firmware: Firmware,
assert "r" in config["signature"]
assert "s" in config["signature"]
if not filtering or Path(filter_file).is_file():
if verbose:
settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712])
if verbose:
settings_toggle(firmware, navigator, [SettingID.VERBOSE_EIP712])
with open(input_file) as file:
assert InputData.process_data(app_client,
json.load(file),
filters,
partial(autonext, firmware, navigator))
with app_client.eip712_sign_new(BIP32_PATH):
# tight on timing, needed by the CI otherwise might fail sometimes
time.sleep(0.5)
assert InputData.process_file(app_client,
input_file,
filter_file,
partial(autonext, firmware, navigator)) == True
with app_client.eip712_sign_new(BIP32_PATH, verbose):
time.sleep(0.5) # tight on timing, needed by the CI otherwise might fail sometimes
moves = list()
if firmware.device.startswith("nano"):
if not verbose and not filtering: # need to skip the message hash
moves = [ NavInsID.RIGHT_CLICK ] * 2
moves += [ NavInsID.BOTH_CLICK ]
if not verbose and not filtering: # need to skip the message hash
moves = [NavInsID.RIGHT_CLICK] * 2
moves += [NavInsID.BOTH_CLICK]
else:
if not verbose and not filtering: # need to skip the message hash
moves += [ NavInsID.USE_CASE_REVIEW_TAP ]
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
if not verbose and not filtering: # need to skip the message hash
moves += [NavInsID.USE_CASE_REVIEW_TAP]
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
navigator.navigate(moves)
v, r, s = ResponseParser.signature(app_client.response().data)
#print("[signature]")
#print("v = %s" % (v.hex()))
#print("r = %s" % (r.hex()))
#print("s = %s" % (s.hex()))
assert v == bytes.fromhex(config["signature"]["v"])
assert r == bytes.fromhex(config["signature"]["r"])
assert s == bytes.fromhex(config["signature"]["s"])
else:
pytest.skip("No filter file found")
assert v == bytes.fromhex(config["signature"]["v"])
assert r == bytes.fromhex(config["signature"]["r"])
assert s == bytes.fromhex(config["signature"]["s"])

View File

@@ -6,20 +6,22 @@ from ragger.firmware import Firmware
from ragger.backend import BackendInterface
from ragger.navigator import Navigator, NavInsID
from ledger_app_clients.ethereum.client import EthAppClient, StatusWord
from ledger_app_clients.ethereum.settings import SettingID, settings_toggle
import ledger_app_clients.ethereum.response_parser as ResponseParser
from ragger.bip import calculate_public_key_and_chaincode, CurveChoice
ROOT_SCREENSHOT_PATH = Path(__file__).parent
@pytest.fixture(params=[True, False])
def with_chaincode(request) -> bool:
return request.param
@pytest.fixture(params=[None, 1, 2, 5, 137])
def chain(request) -> Optional[int]:
return request.param
def get_moves(firmware: Firmware,
navigator: BackendInterface,
chain: Optional[int] = None,
@@ -27,25 +29,26 @@ def get_moves(firmware: Firmware,
moves = list()
if firmware.is_nano:
moves += [ NavInsID.RIGHT_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
if firmware.device == "nanos":
moves += [ NavInsID.RIGHT_CLICK ] * 3
moves += [NavInsID.RIGHT_CLICK] * 3
else:
moves += [ NavInsID.RIGHT_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
if reject:
moves += [ NavInsID.RIGHT_CLICK ]
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ]
moves += [NavInsID.USE_CASE_REVIEW_TAP]
if chain is not None and chain > 1:
moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_TAP ]
moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_TAP]
if reject:
moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CANCEL ]
moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CANCEL]
else:
moves += [ NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CONFIRM ]
moves += [NavInsID.USE_CASE_ADDRESS_CONFIRMATION_CONFIRM]
return moves
def test_get_pk_rejected(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
@@ -59,7 +62,8 @@ def test_get_pk_rejected(firmware: Firmware,
except ExceptionRAPDU as e:
assert e.status == StatusWord.CONDITION_NOT_SATISFIED
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
def test_get_pk(firmware: Firmware,
backend: BackendInterface,

View File

@@ -1,17 +1,21 @@
import pytest
from typing import Optional, Any
from pathlib import Path
from typing import Callable
from ragger.error import ExceptionRAPDU
from ragger.firmware import Firmware
from ragger.backend import BackendInterface
from ragger.navigator import Navigator, NavInsID
from ledger_app_clients.ethereum.client import EthAppClient, TxData, StatusWord
from ledger_app_clients.ethereum.settings import SettingID, settings_toggle
from ledger_app_clients.ethereum.utils import get_selector_from_function
import struct
from ledger_app_clients.ethereum.client import EthAppClient, StatusWord
import ledger_app_clients.ethereum.response_parser as ResponseParser
from ledger_app_clients.ethereum.utils import get_selector_from_data, recover_transaction
from web3 import Web3
import json
import os
ROOT_SCREENSHOT_PATH = Path(__file__).parent
ABIS_FOLDER = "%s/abis" % (os.path.dirname(__file__))
BIP32_PATH = "m/44'/60'/0'/0/0"
NONCE = 21
@@ -19,51 +23,63 @@ GAS_PRICE = 13
GAS_LIMIT = 21000
FROM = bytes.fromhex("1122334455667788990011223344556677889900")
TO = bytes.fromhex("0099887766554433221100998877665544332211")
NFTS = [ (1, 3), (5, 2), (7, 4) ] # tuples of (token_id, amount)
NFTS = [(1, 3), (5, 2), (7, 4)] # tuples of (token_id, amount)
DATA = "Some data".encode()
DEVICE_ADDR: Optional[bytes] = None
class NFTCollection:
class NFTCollection:
addr: bytes
name: str
chain_id: int
def __init__(self, addr: bytes, name: str, chain_id: int):
def __init__(self, addr: bytes, name: str, chain_id: int, contract):
self.addr = addr
self.name = name
self.chain_id = chain_id
self.contract = contract
class Action:
fn: str
data_fn: Callable
class Action:
fn_name: str
fn_args: list[Any]
nav_fn: Callable
def __init__(self, fn: str, data_fn: Callable, nav_fn: Callable):
self.fn = fn
self.data_fn = data_fn
def __init__(self, fn_name: str, fn_args: list[Any], nav_fn: Callable):
self.fn_name = fn_name
self.fn_args = fn_args
self.nav_fn = nav_fn
def common_nav_nft(is_nano: bool, nano_steps: int, stax_steps: int, reject: bool) -> list[NavInsID]:
def common_nav_nft(is_nano: bool,
nano_steps: int,
stax_steps: int,
reject: bool) -> list[NavInsID]:
moves = list()
if is_nano:
moves += [ NavInsID.RIGHT_CLICK ] * nano_steps
moves += [NavInsID.RIGHT_CLICK] * nano_steps
if reject:
moves += [ NavInsID.RIGHT_CLICK ]
moves += [ NavInsID.BOTH_CLICK ]
moves += [NavInsID.RIGHT_CLICK]
moves += [NavInsID.BOTH_CLICK]
else:
moves += [ NavInsID.USE_CASE_REVIEW_TAP ] * stax_steps
moves += [NavInsID.USE_CASE_REVIEW_TAP] * stax_steps
if reject:
moves += [
NavInsID.USE_CASE_REVIEW_REJECT,
NavInsID.USE_CASE_CHOICE_CONFIRM
]
else:
moves += [ NavInsID.USE_CASE_REVIEW_CONFIRM ]
moves += [NavInsID.USE_CASE_REVIEW_CONFIRM]
return moves
def snapshot_test_name(nft_type: str, fn: str, chain_id: int, reject: bool) -> str:
name = "%s_%s_%s" % (nft_type, fn.split("(")[0], str(chain_id))
name = "%s_%s_%s" % (nft_type, fn, str(chain_id))
if reject:
name += "-rejected"
return name
def common_test_nft(fw: Firmware,
back: BackendInterface,
nav: Navigator,
@@ -71,34 +87,48 @@ def common_test_nft(fw: Firmware,
action: Action,
reject: bool,
plugin_name: str):
global DEVICE_ADDR
app_client = EthAppClient(back)
selector = get_selector_from_function(action.fn)
if app_client._client.firmware.name == "nanos":
pytest.skip("Not supported on LNS")
if DEVICE_ADDR is None: # to only have to request it once
with app_client.get_public_addr(display=False):
pass
_, DEVICE_ADDR, _ = ResponseParser.pk_addr(app_client.response().data)
data = collec.contract.encodeABI(action.fn_name, action.fn_args)
with app_client.set_plugin(plugin_name,
collec.addr,
selector,
1):
get_selector_from_data(data),
collec.chain_id):
pass
with app_client.provide_nft_metadata(collec.name, collec.addr, collec.chain_id):
pass
with app_client.sign_legacy(BIP32_PATH,
NONCE,
GAS_PRICE,
GAS_LIMIT,
collec.addr,
0,
collec.chain_id,
action.data_fn(action)):
tx_params = {
"nonce": NONCE,
"gasPrice": Web3.to_wei(GAS_PRICE, "gwei"),
"gas": GAS_LIMIT,
"to": collec.addr,
"value": 0,
"chainId": collec.chain_id,
"data": data,
}
with app_client.sign(BIP32_PATH, tx_params):
nav.navigate_and_compare(ROOT_SCREENSHOT_PATH,
snapshot_test_name(plugin_name.lower(),
action.fn,
action.fn_name,
collec.chain_id,
reject),
action.nav_fn(fw.is_nano,
collec.chain_id,
reject))
# verify signature
vrs = ResponseParser.signature(app_client.response().data)
addr = recover_transaction(tx_params, vrs)
assert addr == DEVICE_ADDR
def common_test_nft_reject(test_fn: Callable,
fw: Firmware,
@@ -111,53 +141,20 @@ def common_test_nft_reject(test_fn: Callable,
except ExceptionRAPDU as e:
assert e.status == StatusWord.CONDITION_NOT_SATISFIED
else:
assert False # An exception should have been raised
assert False # An exception should have been raised
# ERC-721
ERC721_PLUGIN = "ERC721"
ERC721_SAFE_TRANSFER_FROM_DATA = "safeTransferFrom(address,address,uint256,bytes)"
ERC721_SAFE_TRANSFER_FROM = "safeTransferFrom(address,address,uint256)"
ERC721_TRANSFER_FROM = "transferFrom(address,address,uint256)"
ERC721_APPROVE = "approve(address,uint256)"
ERC721_SET_APPROVAL_FOR_ALL = "setApprovalForAll(address,bool)"
## data formatting functions
def data_erc721_transfer_from(action: Action) -> TxData:
return TxData(
get_selector_from_function(action.fn),
[
FROM,
TO,
struct.pack(">H", NFTS[0][0])
]
with open("%s/erc721.json" % (ABIS_FOLDER)) as file:
contract_erc721 = Web3().eth.contract(
abi=json.load(file),
address=bytes(20)
)
def data_erc721_safe_transfer_from_data(action: Action) -> TxData:
txd = data_erc721_transfer_from(action)
txd.parameters += [ DATA ]
return txd
def data_erc721_approve(action: Action) -> TxData:
return TxData(
get_selector_from_function(action.fn),
[
TO,
struct.pack(">H", NFTS[0][0])
]
)
def data_erc721_set_approval_for_all(action: Action) -> TxData:
return TxData(
get_selector_from_function(action.fn),
[
TO,
struct.pack("b", False)
]
)
## ui nav functions
# ui nav functions
def nav_erc721_transfer_from(is_nano: bool,
chain_id: int,
@@ -169,6 +166,7 @@ def nav_erc721_transfer_from(is_nano: bool,
stax_steps += 1
return common_nav_nft(is_nano, nano_steps, stax_steps, reject)
def nav_erc721_approve(is_nano: bool,
chain_id: int,
reject: bool) -> list[NavInsID]:
@@ -179,6 +177,7 @@ def nav_erc721_approve(is_nano: bool,
stax_steps += 1
return common_nav_nft(is_nano, nano_steps, stax_steps, reject)
def nav_erc721_set_approval_for_all(is_nano: bool,
chain_id: int,
reject: bool) -> list[NavInsID]:
@@ -187,49 +186,56 @@ def nav_erc721_set_approval_for_all(is_nano: bool,
nano_steps += 1
return common_nav_nft(is_nano, nano_steps, 3, reject)
collecs_721 = [
NFTCollection(bytes.fromhex("bc4ca0eda7647a8ab7c2061c2e118a18a936f13d"),
"Bored Ape Yacht Club",
1),
"Bored Ape Yacht Club",
1,
contract_erc721),
NFTCollection(bytes.fromhex("670fd103b1a08628e9557cd66b87ded841115190"),
"y00ts",
137),
"y00ts",
137,
contract_erc721),
NFTCollection(bytes.fromhex("2909cf13e458a576cdd9aab6bd6617051a92dacf"),
"goerlirocks",
5)
"goerlirocks",
5,
contract_erc721),
]
actions_721 = [
Action(ERC721_SAFE_TRANSFER_FROM_DATA,
data_erc721_safe_transfer_from_data,
Action("safeTransferFrom",
[FROM, TO, NFTS[0][0], DATA],
nav_erc721_transfer_from),
Action(ERC721_SAFE_TRANSFER_FROM,
data_erc721_transfer_from,
Action("safeTransferFrom",
[FROM, TO, NFTS[0][0]],
nav_erc721_transfer_from),
Action(ERC721_TRANSFER_FROM,
data_erc721_transfer_from,
Action("transferFrom",
[FROM, TO, NFTS[0][0]],
nav_erc721_transfer_from),
Action(ERC721_APPROVE,
data_erc721_approve,
Action("approve",
[TO, NFTS[0][0]],
nav_erc721_approve),
Action(ERC721_SET_APPROVAL_FOR_ALL,
data_erc721_set_approval_for_all,
nav_erc721_set_approval_for_all)
Action("setApprovalForAll",
[TO, False],
nav_erc721_set_approval_for_all),
]
@pytest.fixture(params=collecs_721)
def collec_721(request) -> NFTCollection:
return request.param
@pytest.fixture(params=actions_721)
def action_721(request) -> Action:
return request.param
def test_erc721(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator,
collec_721: NFTCollection,
action_721: Action,
reject: bool = False):
backend: BackendInterface,
navigator: Navigator,
collec_721: NFTCollection,
action_721: Action,
reject: bool = False):
common_test_nft(firmware,
backend,
navigator,
@@ -238,9 +244,10 @@ def test_erc721(firmware: Firmware,
reject,
ERC721_PLUGIN)
def test_erc721_reject(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):
backend: BackendInterface,
navigator: Navigator):
common_test_nft_reject(test_erc721,
firmware,
backend,
@@ -248,54 +255,19 @@ def test_erc721_reject(firmware: Firmware,
collecs_721[0],
actions_721[0])
# ERC-1155
ERC1155_PLUGIN = "ERC1155"
ERC1155_SAFE_TRANSFER_FROM = "safeTransferFrom(address,address,uint256,uint256,bytes)"
ERC1155_SAFE_BATCH_TRANSFER_FROM = "safeBatchTransferFrom(address,address,uint256[],uint256[],bytes)"
ERC1155_SET_APPROVAL_FOR_ALL = "setApprovalForAll(address,bool)"
## data formatting functions
def data_erc1155_safe_transfer_from(action: Action) -> TxData:
return TxData(
get_selector_from_function(action.fn),
[
FROM,
TO,
struct.pack(">H", NFTS[0][0]),
struct.pack(">H", NFTS[0][1]),
DATA
]
with open("%s/erc1155.json" % (ABIS_FOLDER)) as file:
contract_erc1155 = Web3().eth.contract(
abi=json.load(file),
address=bytes(20)
)
def data_erc1155_safe_batch_transfer_from(action: Action) -> TxData:
data = TxData(
get_selector_from_function(action.fn),
[
FROM,
TO
])
data.parameters += [ int(32 * 4).to_bytes(8, "big") ] # token_ids offset
data.parameters += [int(32 * (4 + len(NFTS) + 1)).to_bytes(8, "big") ] # amounts offset
data.parameters += [ int(len(NFTS)).to_bytes(8, "big") ] # token_ids length
for nft in NFTS:
data.parameters += [ struct.pack(">H", nft[0]) ] # token_id
data.parameters += [ int(len(NFTS)).to_bytes(8, "big") ] # amounts length
for nft in NFTS:
data.parameters += [ struct.pack(">H", nft[1]) ] # amount
return data
def data_erc1155_set_approval_for_all(action: Action) -> TxData:
return TxData(
get_selector_from_function(action.fn),
[
TO,
struct.pack("b", False)
]
)
## ui nav functions
# ui nav functions
def nav_erc1155_safe_transfer_from(is_nano: bool,
chain_id: int,
@@ -305,6 +277,7 @@ def nav_erc1155_safe_transfer_from(is_nano: bool,
nano_steps += 1
return common_nav_nft(is_nano, nano_steps, 4, reject)
def nav_erc1155_safe_batch_transfer_from(is_nano: bool,
chain_id: int,
reject: bool) -> list:
@@ -315,6 +288,7 @@ def nav_erc1155_safe_batch_transfer_from(is_nano: bool,
stax_steps += 1
return common_nav_nft(is_nano, nano_steps, stax_steps, reject)
def nav_erc1155_set_approval_for_all(is_nano: bool,
chain_id: int,
reject: bool) -> list:
@@ -323,35 +297,50 @@ def nav_erc1155_set_approval_for_all(is_nano: bool,
nano_steps += 1
return common_nav_nft(is_nano, nano_steps, 3, reject)
collecs_1155 = [
NFTCollection(bytes.fromhex("495f947276749ce646f68ac8c248420045cb7b5e"),
"OpenSea Shared Storefront",
1),
"OpenSea Shared Storefront",
1,
contract_erc1155),
NFTCollection(bytes.fromhex("2953399124f0cbb46d2cbacd8a89cf0599974963"),
"OpenSea Collections",
137),
"OpenSea Collections",
137,
contract_erc1155),
NFTCollection(bytes.fromhex("f4910c763ed4e47a585e2d34baa9a4b611ae448c"),
"OpenSea Collections",
5)
"OpenSea Collections",
5,
contract_erc1155),
]
actions_1155 = [
Action(ERC1155_SAFE_TRANSFER_FROM,
data_erc1155_safe_transfer_from,
Action("safeTransferFrom",
[FROM, TO, NFTS[0][0], NFTS[0][1], DATA],
nav_erc1155_safe_transfer_from),
Action(ERC1155_SAFE_BATCH_TRANSFER_FROM,
data_erc1155_safe_batch_transfer_from,
Action("safeBatchTransferFrom",
[
FROM,
TO,
list(map(lambda nft: nft[0], NFTS)),
list(map(lambda nft: nft[1], NFTS)),
DATA
],
nav_erc1155_safe_batch_transfer_from),
Action(ERC1155_SET_APPROVAL_FOR_ALL,
data_erc1155_set_approval_for_all,
nav_erc1155_set_approval_for_all)
Action("setApprovalForAll",
[TO, False],
nav_erc1155_set_approval_for_all),
]
@pytest.fixture(params=collecs_1155)
def collec_1155(request) -> bool:
return request.param
@pytest.fixture(params=actions_1155)
def action_1155(request) -> Action:
return request.param
def test_erc1155(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator,
@@ -366,6 +355,7 @@ def test_erc1155(firmware: Firmware,
reject,
ERC1155_PLUGIN)
def test_erc1155_reject(firmware: Firmware,
backend: BackendInterface,
navigator: Navigator):