From 194f4d5c18cad6c51175bc9692aba5ad83dba096 Mon Sep 17 00:00:00 2001 From: Coline Date: Mon, 11 Jul 2022 13:44:26 +0200 Subject: [PATCH] fix: apdu iterator for eth personal sign --- .../speculos/ethereum_client/ethereum_cmd.py | 27 +++++++----- .../ethereum_client/ethereum_cmd_builder.py | 41 +++++++++++++++---- tests/speculos/test_eip191.py | 28 ++++--------- 3 files changed, 57 insertions(+), 39 deletions(-) diff --git a/tests/speculos/ethereum_client/ethereum_cmd.py b/tests/speculos/ethereum_client/ethereum_cmd.py index 0d05a3d..aa51f79 100644 --- a/tests/speculos/ethereum_client/ethereum_cmd.py +++ b/tests/speculos/ethereum_client/ethereum_cmd.py @@ -1,6 +1,7 @@ from ast import List from contextlib import contextmanager import struct +from time import sleep from typing import Tuple from speculos.client import SpeculosClient, ApduException @@ -152,6 +153,8 @@ class EthereumCommand: except ApduException as error: raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX) + + @contextmanager def simple_sign_tx(self, bip32_path: str, transaction: Transaction, result: List = list()) -> None: try: @@ -196,24 +199,28 @@ class EthereumCommand: result.append(v) result.append(r) result.append(s) - + @contextmanager - def simple_personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction, result: List = list()) -> None: + def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction, result: List = list()) -> None: + """ + Does the same thing as simple_personal_sign but allows to create a batch of cmd apdu + """ try: - chunk: bytes = self.builder.simple_personal_sign_tx(bip32_path=bip32_path, transaction=transaction) - - with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1], - p1=chunk[2], p2=chunk[3], - data=chunk[5:]) as exchange: - yield exchange - response: bytes = exchange.receive() + for islast_apdu, apdu in self.builder.personal_sign_tx(bip32_path=bip32_path, transaction=transaction): + if islast_apdu: + with self.client.apdu_exchange_nowait(cla=apdu[0], ins=apdu[1], + p1=apdu[2], p2=apdu[3], + data=apdu[5:]) as exchange: + yield exchange + response: bytes = exchange.receive() + else: + self.send_apdu(apdu) except ApduException as error: raise DeviceException(error_code=error.sw, ins=InsType.INS_SIGN_TX) # response = V (1) || R (32) || S (32) - assert len(response) == 65 v, r, s = parse_sign_response(response) result.append(v) diff --git a/tests/speculos/ethereum_client/ethereum_cmd_builder.py b/tests/speculos/ethereum_client/ethereum_cmd_builder.py index e6c0be3..d1e9434 100644 --- a/tests/speculos/ethereum_client/ethereum_cmd_builder.py +++ b/tests/speculos/ethereum_client/ethereum_cmd_builder.py @@ -1,5 +1,6 @@ import enum import logging +import re import struct from typing import List, Tuple, Union, Iterator, cast @@ -9,6 +10,9 @@ from ethereum_client.utils import bip32_path_from_string MAX_APDU_LEN: int = 255 +def chunked(size, source): + for i in range(0, len(source), size): + yield source[i:i+size] def chunkify(data: bytes, chunk_len: int) -> Iterator[Tuple[bool, bytes]]: size: int = len(data) @@ -355,8 +359,7 @@ class EthereumCommandBuilder: p2=0x00, cdata=cdata) - - def simple_personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction) -> bytes: + def personal_sign_tx(self, bip32_path: str, transaction: PersonalTransaction) -> Tuple[bool,bytes]: """Command builder for INS_SIGN_PERSONAL_TX. Parameters @@ -372,6 +375,7 @@ class EthereumCommandBuilder: APDU command chunk for INS_SIGN_PERSONAL_TX. """ + bip32_paths: List[bytes] = bip32_path_from_string(bip32_path) cdata: bytes = b"".join([ @@ -383,9 +387,30 @@ class EthereumCommandBuilder: tx: bytes = transaction.serialize() cdata = cdata + tx - - return self.serialize(cla=self.CLA, - ins=InsType.INS_SIGN_PERSONAL_TX, - p1=0x00, - p2=0x00, - cdata=cdata) + last_chunk = len(cdata) // MAX_APDU_LEN + + for i, (chunk) in enumerate(chunked(MAX_APDU_LEN, cdata)): + if i == 0 and i == last_chunk: + yield True, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_PERSONAL_TX, + p1=0x00, + p2=0x00, + cdata=chunk) + elif i == 0: + yield False, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_PERSONAL_TX, + p1=0x00, + p2=0x00, + cdata=chunk) + elif i == last_chunk: + yield True, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_PERSONAL_TX, + p1=0x80, + p2=0x00, + cdata=chunk) + else: + yield False, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_PERSONAL_TX, + p1=0x80, + p2=0x00, + cdata=chunk) \ No newline at end of file diff --git a/tests/speculos/test_eip191.py b/tests/speculos/test_eip191.py index 822d14d..45e0f2b 100644 --- a/tests/speculos/test_eip191.py +++ b/tests/speculos/test_eip191.py @@ -12,7 +12,7 @@ def test_personal_sign_metamask(cmd): msg="Example `personal_sign` message" ) - with cmd.simple_personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: + with cmd.personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: sleep(0.5) if cmd.model == "nanos": @@ -47,8 +47,7 @@ def test_personal_sign_metamask(cmd): assert v == 0x1c # 28 assert r.hex() == "916099cf0d9c21911c85f0770a47a9696a8189e78c259cf099749748c507baae" assert s.hex() == "0d72234bc0ac2e94c5f7a5f4f9cd8610a52be4ea55515a85b9703f1bb158415c" - - + def test_personal_sign_reject(cmd): result: list = [] @@ -58,7 +57,7 @@ def test_personal_sign_reject(cmd): ) try: - with cmd.simple_personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: + with cmd.personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: sleep(0.5) if cmd.model == "nanos": @@ -108,7 +107,7 @@ def test_personal_sign_non_ascii(cmd): msg="0x9c22ff5f21f0b81b113e63f7db6da94fedef11b2119b4088b89664fb9a3cb658" ) - with cmd.simple_personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: + with cmd.personal_sign_tx(bip32_path=bip32_path, transaction=transaction, result=result) as ex: sleep(0.5) if cmd.model == "nanos": @@ -150,27 +149,15 @@ def test_personal_sign_non_ascii(cmd): assert r.hex() == "64bdbdb6959425445d00ff2536a7018d2dce904e1f7475938fe4221c3c72500c" assert s.hex() == "7c9208e99b6b9266a73aae17b73472d06499746edec34fd47a9dab42f06f2e42" -# ============================ -# The encoded message is greater than the maximum length of an apdu, that's why we cut it into 3 apdu -# ============================ - -FIRST_PART = apdu_as_string("e008000096058000002c8000003c8000000000000000000000000000015357656c636f6d6520746f204f70656e536561210a0a436c69636b20746f207369676e20696e20616e642061636365707420746865204f70656e536561205465726d73206f6620536572766963653a2068747470733a2f2f6f70656e7365612e696f2f746f730a0a5468697320726571756573742077696c6c206e6f7420") -SECOND_PART = apdu_as_string("e00880009674726967676572206120626c6f636b636861696e207472616e73616374696f6e206f7220636f737420616e792067617320666565732e0a0a596f75722061757468656e7469636174696f6e207374617475732077696c6c20726573657420616674657220323420686f7572732e0a0a57616c6c657420616464726573733a0a3078393835386566666432333262343033336534376439") -THIRD_PART = apdu_as_string("e008800040303030336434316563333465636165646139340a0a4e6f6e63653a0a32623032633861302d663734662d343535342d393832312d613238303534646339313231") - def test_personal_sign_opensea(cmd): result: list = [] - # useless but allows to see which info are in the apdu bip32_path="44'/60'/0'/0/0" transaction = PersonalTransaction( msg="Welcome to OpenSea!\n\nClick to sign in and accept the OpenSea Terms of Service: https://opensea.io/tos\n\nThis request will not trigger a blockchain transaction or cost any gas fees.\n\nYour authentication status will reset after 24 hours.\n\nWallet address:\n0x9858effd232b4033e47d90003d41ec34ecaeda94\n\nNonce:\n2b02c8a0-f74f-4554-9821-a28054dc9121" ) - cmd.send_apdu(FIRST_PART) - cmd.send_apdu(SECOND_PART) - - with cmd.send_apdu_context(THIRD_PART, result=result) as ex: + with cmd.personal_sign_tx(bip32_path, transaction, result) as ex: sleep(0.5) if cmd.model == "nanos": @@ -215,9 +202,8 @@ def test_personal_sign_opensea(cmd): compare_screenshot(cmd, f"screenshots/eip191/{PATH_IMG[cmd.model]}/personal_sign_opensea/00006.png") cmd.client.press_and_release('both') - response: bytes = result[0] - v, r, s = parse_sign_response(response) + v, r, s = result assert v == 0x1c # 28 assert r.hex() == "61a68c986f087730d2f6ecf89d6d1e48ab963ac461102bb02664bc05c3db75bb" - assert s.hex() == "5714729ef441e097673a7b29a681e97f6963d875eeed2081f26b0b6686cd2bd2" \ No newline at end of file + assert s.hex() == "5714729ef441e097673a7b29a681e97f6963d875eeed2081f26b0b6686cd2bd2"