From e8f58bdce76dc2f3aefb6849abaf903e324bac02 Mon Sep 17 00:00:00 2001 From: Alexandre Paillier Date: Thu, 4 Aug 2022 18:25:02 +0200 Subject: [PATCH] Ragger tests now have EIP721 filtering --- tests/ragger/eip712/InputData.py | 45 ++++++----- .../11-complex_structs-filter.json | 9 +++ tests/ragger/ethereum_client.py | 76 +++++++++++++++---- tests/ragger/test_eip712.py | 31 +++++--- 4 files changed, 116 insertions(+), 45 deletions(-) create mode 100644 tests/ragger/eip712/input_files/11-complex_structs-filter.json diff --git a/tests/ragger/eip712/InputData.py b/tests/ragger/eip712/InputData.py index 7b66801..8f63188 100644 --- a/tests/ragger/eip712/InputData.py +++ b/tests/ragger/eip712/InputData.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import os import json import sys import re @@ -9,6 +10,7 @@ from ecdsa import SigningKey from ecdsa.util import sigencode_der import pdb from ethereum_client import EthereumClient, EIP712FieldType +import base64 # global variables app_client: EthereumClient = None @@ -198,7 +200,7 @@ def send_struct_impl_field(value, field): data = encoding_functions[field["enum"]](value, field["typesize"]) - if False:#args.filtering: + if filtering_paths: path = ".".join(current_path) if path in filtering_paths.keys(): send_filtering_field_name(filtering_paths[path]) @@ -257,10 +259,10 @@ def send_sign(): #send_apdu(INS_SIGN, 0x00, P2_VERS_NEW, path_len + bip32path) print("send_apdu(INS_SIGN, 0x00, P2_VERS_NEW, path_len + bip32path)") -def send_filtering_activate(): - #send_apdu(INS_FILTERING, P1_ACTIVATE, 0x00, bytearray()) - print("send_apdu(INS_FILTERING, P1_ACTIVATE, 0x00, bytearray())") - +#def send_filtering_activate(): +# #send_apdu(INS_FILTERING, P1_ACTIVATE, 0x00, bytearray()) +# print("send_apdu(INS_FILTERING, P1_ACTIVATE, 0x00, bytearray())") +# def send_filtering_info(p1, display_name, sig): payload = bytearray() payload.append(len(display_name)) @@ -272,7 +274,7 @@ def send_filtering_info(p1, display_name, sig): print("send_apdu(INS_FILTERING, p1, 0x00, payload)") # ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures -def send_filtering_contract_name(display_name): +def send_filtering_contract_name(display_name: str): global sig_ctx msg = bytearray() @@ -283,7 +285,7 @@ def send_filtering_contract_name(display_name): msg.append(ord(char)) sig = sig_ctx["key"].sign_deterministic(msg, sigencode=sigencode_der) - send_filtering_info(P1_CONTRACT_NAME, display_name, sig) + app_client.eip712_filtering_send_contract_name(display_name, sig) # ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures def send_filtering_field_name(display_name): @@ -300,11 +302,11 @@ def send_filtering_field_name(display_name): for char in display_name: msg.append(ord(char)) sig = sig_ctx["key"].sign_deterministic(msg, sigencode=sigencode_der) - send_filtering_info(P1_FIELD_NAME, display_name, sig) + app_client.eip712_filtering_send_field_name(display_name, sig) -def read_filtering_file(domain, message): +def read_filtering_file(domain, message, filtering_file_path): data_json = None - with open("%s-filter.json" % (args.JSON_FILE)) as data: + with open(filtering_file_path) as data: data_json = json.load(data) return data_json @@ -319,8 +321,11 @@ def prepare_filtering(filtr_data, message): def init_signature_context(types, domain): global sig_ctx - with open(args.keypath, "r") as priv_file: - sig_ctx["key"] = SigningKey.from_pem(priv_file.read(), hashlib.sha256) + env_key = os.getenv("CAL_SIGNATURE_TEST_KEY") + if env_key: + key = base64.b64decode(env_key).decode() # base 64 string -> decode bytes -> string + print(key) + sig_ctx["key"] = SigningKey.from_pem(key, hashlib.sha256) caddr = domain["verifyingContract"] if caddr.startswith("0x"): caddr = caddr[2:] @@ -337,7 +342,7 @@ def init_signature_context(types, domain): return True return False -def process_file(aclient: EthereumClient, input_file_path: str, filtering = False) -> bool: +def process_file(aclient: EthereumClient, input_file_path: str, filtering_file_path = None) -> bool: global sig_ctx global app_client @@ -350,10 +355,10 @@ def process_file(aclient: EthereumClient, input_file_path: str, filtering = Fals domain = data_json["domain"] message = data_json["message"] - if filtering: + if filtering_file_path: if not init_signature_context(types, domain): return False - filtr = read_filtering_file(domain, message) + filtr = read_filtering_file(domain, message, filtering_file_path) # send types definition for key in types.keys(): @@ -362,8 +367,8 @@ def process_file(aclient: EthereumClient, input_file_path: str, filtering = Fals (f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \ send_struct_def_field(f["type"], f["name"]) - if filtering: - send_filtering_activate() + if filtering_file_path: + app_client.eip712_filtering_activate() prepare_filtering(filtr, message) # send domain implementation @@ -371,17 +376,15 @@ def process_file(aclient: EthereumClient, input_file_path: str, filtering = Fals if not send_struct_impl(types, domain, domain_typename): return False - if filtering: + if filtering_file_path: if filtr and "name" in filtr: send_filtering_contract_name(filtr["name"]) else: - send_filtering_contract_name(sig_ctx["domain"]["name"]) + send_filtering_contract_name(domain["name"]) # send message implementation app_client.eip712_send_struct_impl_root_struct(message_typename) if not send_struct_impl(types, message, message_typename): return False - # sign - #send_sign() return True diff --git a/tests/ragger/eip712/input_files/11-complex_structs-filter.json b/tests/ragger/eip712/input_files/11-complex_structs-filter.json new file mode 100644 index 0000000..a38240d --- /dev/null +++ b/tests/ragger/eip712/input_files/11-complex_structs-filter.json @@ -0,0 +1,9 @@ +{ + "name": "Depthy Test", + "fields": { + "contents": "Message", + "from.name": "Sender", + "to.members.[].name": "Recipient", + "attach.list.[].name": "Attachment" + } +} diff --git a/tests/ragger/ethereum_client.py b/tests/ragger/ethereum_client.py index 81a1af6..68ce086 100644 --- a/tests/ragger/ethereum_client.py +++ b/tests/ragger/ethereum_client.py @@ -4,6 +4,7 @@ from typing import Iterator, Dict, List from ragger.backend import BackendInterface from ragger.utils import RAPDU import signal +import pdb class InsType(IntEnum): EIP712_SEND_STRUCT_DEF = 0x1a, @@ -13,14 +14,17 @@ class InsType(IntEnum): class P1Type(IntEnum): COMPLETE_SEND = 0x00, - PARTIAL_SEND = 0x01 + PARTIAL_SEND = 0x01, + FILTERING_ACTIVATE = 0x00, + FILTERING_CONTRACT_NAME = 0x0f, + FILTERING_FIELD_NAME = 0xff class P2Type(IntEnum): STRUCT_NAME = 0x00, STRUCT_FIELD = 0xff, ARRAY = 0x0f, LEGACY_IMPLEM = 0x00 - NEW_IMPLEM = 0x01, + NEW_IMPLEM = 0x01 class EIP712FieldType(IntEnum): CUSTOM = 0, @@ -63,14 +67,17 @@ class EthereumClientCmdBuilder: header.append(len(cdata)) return header + cdata - def eip712_send_struct_def_struct_name(self, name: str) -> bytes: + def _string_to_bytes(self, string: str) -> bytes: data = bytearray() - for char in name: + for char in string: data.append(ord(char)) + return data + + def eip712_send_struct_def_struct_name(self, name: str) -> bytes: return self._serialize(InsType.EIP712_SEND_STRUCT_DEF, P1Type.COMPLETE_SEND, P2Type.STRUCT_NAME, - data) + self._string_to_bytes(name)) def eip712_send_struct_def_struct_field(self, field_type: EIP712FieldType, @@ -86,8 +93,7 @@ class EthereumClientCmdBuilder: data.append(typedesc) if field_type == EIP712FieldType.CUSTOM: data.append(len(type_name)) - for char in type_name: - data.append(ord(char)) + data += self._string_to_bytes(type_name) if type_size != None: data.append(type_size) if len(array_levels) > 0: @@ -97,21 +103,17 @@ class EthereumClientCmdBuilder: if level != None: data.append(level) data.append(len(key_name)) - for char in key_name: - data.append(ord(char)) + data += self._string_to_bytes(key_name) return self._serialize(InsType.EIP712_SEND_STRUCT_DEF, P1Type.COMPLETE_SEND, P2Type.STRUCT_FIELD, data) def eip712_send_struct_impl_root_struct(self, name: str) -> bytes: - data = bytearray() - for char in name: - data.append(ord(char)) return self._serialize(InsType.EIP712_SEND_STRUCT_IMPL, P1Type.COMPLETE_SEND, P2Type.STRUCT_NAME, - data) + self._string_to_bytes(name)) def eip712_send_struct_impl_array(self, size: int) -> bytes: data = bytearray() @@ -163,6 +165,32 @@ class EthereumClientCmdBuilder: P2Type.LEGACY_IMPLEM, data) + def eip712_filtering_activate(self): + return self._serialize(InsType.EIP712_SEND_FILTERING, + P1Type.FILTERING_ACTIVATE, + 0x00, + bytearray()) + + def _eip712_filtering_send_name(self, name: str, sig: bytes) -> bytes: + data = bytearray() + data.append(len(name)) + data += self._string_to_bytes(name) + data.append(len(sig)) + data += sig + return data + + def eip712_filtering_send_contract_name(self, name: str, sig: bytes) -> bytes: + return self._serialize(InsType.EIP712_SEND_FILTERING, + P1Type.FILTERING_CONTRACT_NAME, + 0x00, + self._eip712_filtering_send_name(name, sig)) + + def eip712_filtering_send_field_name(self, name: str, sig: bytes) -> bytes: + return self._serialize(InsType.EIP712_SEND_FILTERING, + P1Type.FILTERING_FIELD_NAME, + 0x00, + self._eip712_filtering_send_name(name, sig)) + class EthereumResponseParser: def sign(self, data: bytes): @@ -195,6 +223,7 @@ class EthereumClient: ) } _click_delay = 1/4 + _eip712_filtering = False def __init__(self, client: BackendInterface, debug: bool = False): self._client = client @@ -262,7 +291,8 @@ class EthereumClient: def eip712_sign_new(self, bip32): with self._send(self._cmd_builder.eip712_sign_new(bip32)): - if not self._settings[SettingType.VERBOSE_EIP712].value: # need to skip the message hash + if not self._settings[SettingType.VERBOSE_EIP712].value and \ + not self._eip712_filtering: # need to skip the message hash self._client.right_click() self._client.right_click() self._client.both_click() # approve signature @@ -306,3 +336,21 @@ class EthereumClient: self._settings[enum].value = new_values[enum] self._client.right_click() self._client.both_click() + + def eip712_filtering_activate(self): + with self._send(self._cmd_builder.eip712_filtering_activate()): + pass + self._eip712_filtering = True + assert self._recv().status == 0x9000 + + def eip712_filtering_send_contract_name(self, name: str, sig: bytes): + #pdb.set_trace() + with self._send(self._cmd_builder.eip712_filtering_send_contract_name(name, sig)): + self._enable_click_until_response() + self._disable_click_until_response() + assert self._recv().status == 0x9000 + + def eip712_filtering_send_field_name(self, name: str, sig: bytes): + with self._send(self._cmd_builder.eip712_filtering_send_field_name(name, sig)): + pass + assert self._recv().status == 0x9000 diff --git a/tests/ragger/test_eip712.py b/tests/ragger/test_eip712.py index 1645013..f9b4a16 100644 --- a/tests/ragger/test_eip712.py +++ b/tests/ragger/test_eip712.py @@ -31,6 +31,10 @@ def input_file(request) -> str: def verbose(request) -> bool: return request.param +@pytest.fixture(params=[False, True]) +def filtering(request) -> bool: + return request.param + def test_eip712_legacy(app_client: EthereumClient): v, r, s = app_client.eip712_sign_legacy( @@ -49,6 +53,10 @@ def test_eip712_new(app_client: EthereumClient, input_file: Path, verbose: bool, if app_client._client.firmware.device != "nanos": test_path = "%s/%s" % (input_file.parent, "-".join(input_file.stem.split("-")[:-1])) conf_file = "%s.ini" % (test_path) + filter_file = None + + if filtering: + filter_file = "%s-filter.json" % (test_path) config = ConfigParser() config.read(conf_file) @@ -59,16 +67,19 @@ def test_eip712_new(app_client: EthereumClient, input_file: Path, verbose: bool, assert "r" in config["signature"] assert "s" in config["signature"] - if verbose: - app_client.settings_set({ - SettingType.VERBOSE_EIP712: True - }) + if not filtering or Path(filter_file).is_file(): + if verbose: + app_client.settings_set({ + SettingType.VERBOSE_EIP712: True + }) - InputData.process_file(app_client, input_file, False) - v, r, s = app_client.eip712_sign_new(bip32) + assert InputData.process_file(app_client, input_file, filter_file) == True + v, r, s = app_client.eip712_sign_new(bip32) - assert v == bytes.fromhex(config["signature"]["v"]) - assert r == bytes.fromhex(config["signature"]["r"]) - assert s == bytes.fromhex(config["signature"]["s"]) + assert v == bytes.fromhex(config["signature"]["v"]) + assert r == bytes.fromhex(config["signature"]["r"]) + assert s == bytes.fromhex(config["signature"]["s"]) + else: + print("No filter file found, skipping...") else: - print("Not supported by LNS") + print("Not supported by LNS, skipping...")