diff --git a/doc/apdu.md b/doc/apdu.md index 20954b4..9ee325d 100644 --- a/doc/apdu.md +++ b/doc/apdu.md @@ -215,6 +215,33 @@ The input data is the RLP encoded transaction, without v/r/s present, streamed t |r|32| |s|32| +Exemple: +With path `"44'/60'/1'/0/0"` + +CLA: E0 +INS: 04 +P1 : 00 (First transaction block) +P2 : 00 +Lc : ? +Le : + - 04 (number BIP 32 derivations) + - 80 00 00 2c + - 80 00 00 3c + - 00 00 00 00 + - 00 00 00 00 + - RLP chunk + +
+ +CLA: E0 +INS: 04 +P1 : 80 (subsequent transaction block) +P2 : 00 +Lc : ? +Le : + - RLP chunk + +
diff --git a/tests/speculos/.gitignore b/tests/speculos/.gitignore new file mode 100644 index 0000000..94026aa --- /dev/null +++ b/tests/speculos/.gitignore @@ -0,0 +1,25 @@ +__pycache__/ +*.py[cod] +*$py.class + + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# generated by pip +pip-wheel-metadata/ + +# pytest debug logs generated via --debug +pytestdebug.log +.cache +.pytest_cache +.mypy_cache +.coverage +.coverage.* +coverage.xml \ No newline at end of file diff --git a/tests/speculos/README.md b/tests/speculos/README.md new file mode 100644 index 0000000..0a33a8a --- /dev/null +++ b/tests/speculos/README.md @@ -0,0 +1,39 @@ +# Speculos functional tests + +These tests are implemented in Python with the `SpeculosClient` interface which allows easy execution on the [Speculos](https://github.com/LedgerHQ/speculos) emulator. + +## Requirements + +- [python >= 3.8](https://www.python.org/downloads/) +- [pip](https://pip.pypa.io/en/stable/installation/) + +### Dependencies +Python dependencies are listed in [requirements.txt](requirements.txt) + +```shell +python3 -m pip install --extra-index-url https://test.pypi.org/simple/ -r requirements.txt +``` +> The extra index allows to fetch the latest version of Speculos. + +## Usage + +Given the requirements are installed, just do: + +``` +pytest tests/speculos/ +``` + +## Tests by APDU + +you will find the list of apdu [here](../../doc/apdu.md) + +- Get + - GET APP CONFIGURATIOn + - [X] Simple test + - GET ETH PUBLIC ADDRESS + - [X] Test get key of coin (Ether, Dai) + - [ ] Test get key of coin (Ether, Dai) with display + - [ ] Test without chain code + - GET ETH2 PUBLIC KEY + - [ ] Test get key + - [ ] Test get key with display \ No newline at end of file diff --git a/tests/speculos/boilerplate_client/__init__.py b/tests/speculos/boilerplate_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/speculos/boilerplate_client/boilerplate_cmd.py b/tests/speculos/boilerplate_client/boilerplate_cmd.py new file mode 100644 index 0000000..ec5f5d9 --- /dev/null +++ b/tests/speculos/boilerplate_client/boilerplate_cmd.py @@ -0,0 +1,107 @@ +import struct +from typing import Tuple + +from speculos.client import SpeculosClient, ApduException + +from boilerplate_client.boilerplate_cmd_builder import BoilerplateCommandBuilder, InsType +from boilerplate_client.exception import DeviceException +from boilerplate_client.transaction import Transaction + + +class BoilerplateCommand: + def __init__(self, + client: SpeculosClient, + debug: bool = False) -> None: + self.client = client + self.builder = BoilerplateCommandBuilder(debug=debug) + self.debug = debug + + def get_configuration(self) -> Tuple[int, int, int, int]: + try: + response = self.client._apdu_exchange( + self.builder.get_configuration() + ) # type: int, bytes + except ApduException as error: + raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_VERSION) + + # response = MAJOR (1) || MINOR (1) || PATCH (1) + assert len(response) == 4 + + info, major, minor, patch = struct.unpack( + "BBBB", + response + ) # type: int, int, int + + return info, major, minor, patch + + def get_public_key(self, bip32_path: str, display: bool = False) -> Tuple[bytes, bytes, bytes]: + try: + response = self.client._apdu_exchange( + self.builder.get_public_key(bip32_path=bip32_path, + display=display) + ) # type: int, bytes + except ApduException as error: + raise DeviceException(error_code=error.sw, ins=InsType.INS_GET_PUBLIC_KEY) + + # response = pub_key_len (1) || + # pub_key (var) || + # chain_code_len (1) || + # chain_code (var) + offset: int = 0 + + pub_key_len: int = response[offset] + offset += 1 + + uncompressed_addr_len: bytes = response[offset:offset + pub_key_len] + offset += pub_key_len + + eth_addr_len: int = response[offset] + offset += 1 + + eth_addr: bytes = response[offset:offset + eth_addr_len] + offset += eth_addr_len + + chain_code: bytes = response[offset:] + + assert len(response) == 1 + pub_key_len + 1 + eth_addr_len + 32 # 32 -> chain_code_len + + return uncompressed_addr_len, eth_addr, chain_code + + def sign_tx(self, bip32_path: str, transaction: Transaction) -> Tuple[int, bytes]: + sw: int + response: bytes = b"" + + for is_last, chunk in self.builder.sign_tx(bip32_path=bip32_path, transaction=transaction): + if is_last: + with self.client.apdu_exchange_nowait(cla=chunk[0], ins=chunk[1], + p1=chunk[2], p2=chunk[3], + data=chunk[5:]) as exchange: + # Review Transaction + self.client.press_and_release('right') + # Address 1/3, 2/3, 3/3 + self.client.press_and_release('right') + self.client.press_and_release('right') + self.client.press_and_release('right') + # Amount + self.client.press_and_release('right') + # Approve + self.client.press_and_release('both') + response = exchange.receive() + else: + response = self.client._apdu_exchange(chunk) + print(response) + + # response = der_sig_len (1) || + # der_sig (var) || + # v (1) + offset: int = 0 + der_sig_len: int = response[offset] + offset += 1 + der_sig: bytes = response[offset:offset + der_sig_len] + offset += der_sig_len + v: int = response[offset] + offset += 1 + + assert len(response) == 1 + der_sig_len + 1 + + return v, der_sig diff --git a/tests/speculos/boilerplate_client/boilerplate_cmd_builder.py b/tests/speculos/boilerplate_client/boilerplate_cmd_builder.py new file mode 100644 index 0000000..619e09a --- /dev/null +++ b/tests/speculos/boilerplate_client/boilerplate_cmd_builder.py @@ -0,0 +1,187 @@ +import enum +import logging +import struct +from typing import List, Tuple, Union, Iterator, cast + +from boilerplate_client.transaction import Transaction +from boilerplate_client.utils import bip32_path_from_string + +MAX_APDU_LEN: int = 255 + + +def chunkify(data: bytes, chunk_len: int) -> Iterator[Tuple[bool, bytes]]: + size: int = len(data) + + if size <= chunk_len: + yield True, data + return + + chunk: int = size // chunk_len + remaining: int = size % chunk_len + offset: int = 0 + + for i in range(chunk): + yield False, data[offset:offset + chunk_len] + offset += chunk_len + + if remaining: + yield True, data[offset:] + + +class InsType(enum.IntEnum): + INS_GET_PUBLIC_KEY = 0x02 + INS_SIGN_TX = 0x04 + INS_GET_CONFIGURATION = 0x06 + + +class BoilerplateCommandBuilder: + """APDU command builder for the Boilerplate application. + + Parameters + ---------- + debug: bool + Whether you want to see logging or not. + + Attributes + ---------- + debug: bool + Whether you want to see logging or not. + + """ + CLA: int = 0xE0 + + def __init__(self, debug: bool = False): + """Init constructor.""" + self.debug = debug + + def serialize(self, + cla: int, + ins: Union[int, enum.IntEnum], + p1: int = 0, + p2: int = 0, + cdata: bytes = b"") -> bytes: + """Serialize the whole APDU command (header + data). + + Parameters + ---------- + cla : int + Instruction class: CLA (1 byte) + ins : Union[int, IntEnum] + Instruction code: INS (1 byte) + p1 : int + Instruction parameter 1: P1 (1 byte). + p2 : int + Instruction parameter 2: P2 (1 byte). + cdata : bytes + Bytes of command data. + + Returns + ------- + bytes + Bytes of a complete APDU command. + + """ + ins = cast(int, ins.value) if isinstance(ins, enum.IntEnum) else cast(int, ins) + + header: bytes = struct.pack("BBBBB", + cla, + ins, + p1, + p2, + len(cdata)) # add Lc to APDU header + + if self.debug: + logging.info("header: %s", header.hex()) + logging.info("cdata: %s", cdata.hex()) + + return header + cdata + + def get_configuration(self) -> bytes: + """Command builder for GET_CONFIGURATON + + Returns + ------- + bytes + APDU command for GET_CONFIGURATON + + """ + return self.serialize(cla=self.CLA, + ins=InsType.INS_GET_CONFIGURATION, + p1=0x00, + p2=0x00, + cdata=b"") + + def get_public_key(self, bip32_path: str, display: bool = False) -> bytes: + """Command builder for GET_PUBLIC_KEY. + + Parameters + ---------- + bip32_path: str + String representation of BIP32 path. + display : bool + Whether you want to display the address on the device. + + Returns + ------- + bytes + APDU command for GET_PUBLIC_KEY. + + """ + bip32_paths: List[bytes] = bip32_path_from_string(bip32_path) + + cdata: bytes = b"".join([ + len(bip32_paths).to_bytes(1, byteorder="big"), + *bip32_paths + ]) + + return self.serialize(cla=self.CLA, + ins=InsType.INS_GET_PUBLIC_KEY, + p1=0x01 if display else 0x00, + p2=0x01, + cdata=cdata) + + def sign_tx(self, bip32_path: str, transaction: Transaction) -> Iterator[Tuple[bool, bytes]]: + """Command builder for INS_SIGN_TX. + + Parameters + ---------- + bip32_path : str + String representation of BIP32 path. + transaction : Transaction + Representation of the transaction to be signed. + + Yields + ------- + bytes + APDU command chunk for INS_SIGN_TX. + + """ + bip32_paths: List[bytes] = bip32_path_from_string(bip32_path) + + cdata: bytes = b"".join([ + len(bip32_paths).to_bytes(1, byteorder="big"), + *bip32_paths + ]) + + yield False, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_TX, + p1=0x00, + p2=0x00, + cdata=cdata) + + tx: bytes = transaction.serialize() + + for i, (is_last, chunk) in enumerate(chunkify(tx, MAX_APDU_LEN)): + if is_last: + yield True, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_TX, + p1=0x00, + p2=0x00, + cdata=chunk) + return + else: + yield False, self.serialize(cla=self.CLA, + ins=InsType.INS_SIGN_TX, + p1=0x00, + p2=0x00, + cdata=chunk) diff --git a/tests/speculos/boilerplate_client/exception/__init__.py b/tests/speculos/boilerplate_client/exception/__init__.py new file mode 100644 index 0000000..acb2bb8 --- /dev/null +++ b/tests/speculos/boilerplate_client/exception/__init__.py @@ -0,0 +1,35 @@ +from .device_exception import DeviceException +from .errors import (UnknownDeviceError, + DenyError, + WrongP1P2Error, + WrongDataLengthError, + InsNotSupportedError, + ClaNotSupportedError, + WrongResponseLengthError, + DisplayBip32PathFailError, + DisplayAddressFailError, + DisplayAmountFailError, + WrongTxLengthError, + TxParsingFailError, + TxHashFail, + BadStateError, + SignatureFailError) + +__all__ = [ + "DeviceException", + "DenyError", + "UnknownDeviceError", + "WrongP1P2Error", + "WrongDataLengthError", + "InsNotSupportedError", + "ClaNotSupportedError", + "WrongResponseLengthError", + "DisplayBip32PathFailError", + "DisplayAddressFailError", + "DisplayAmountFailError", + "WrongTxLengthError", + "TxParsingFailError", + "TxHashFail", + "BadStateError", + "SignatureFailError" +] diff --git a/tests/speculos/boilerplate_client/exception/device_exception.py b/tests/speculos/boilerplate_client/exception/device_exception.py new file mode 100644 index 0000000..7cd26f2 --- /dev/null +++ b/tests/speculos/boilerplate_client/exception/device_exception.py @@ -0,0 +1,38 @@ +import enum +from typing import Dict, Any, Union + +from .errors import * + + +class DeviceException(Exception): # pylint: disable=too-few-public-methods + exc: Dict[int, Any] = { + 0x6985: DenyError, + 0x6A86: WrongP1P2Error, + 0x6A87: WrongDataLengthError, + 0x6D00: InsNotSupportedError, + 0x6E00: ClaNotSupportedError, + 0xB000: WrongResponseLengthError, + 0xB001: DisplayBip32PathFailError, + 0xB002: DisplayAddressFailError, + 0xB003: DisplayAmountFailError, + 0xB004: WrongTxLengthError, + 0xB005: TxParsingFailError, + 0xB006: TxHashFail, + 0xB007: BadStateError, + 0xB008: SignatureFailError + } + + def __new__(cls, + error_code: int, + ins: Union[int, enum.IntEnum, None] = None, + message: str = "" + ) -> Any: + error_message: str = (f"Error in {ins!r} command" + if ins else "Error in command") + + if error_code in DeviceException.exc: + return DeviceException.exc[error_code](hex(error_code), + error_message, + message) + + return UnknownDeviceError(hex(error_code), error_message, message) diff --git a/tests/speculos/boilerplate_client/exception/errors.py b/tests/speculos/boilerplate_client/exception/errors.py new file mode 100644 index 0000000..a9a853d --- /dev/null +++ b/tests/speculos/boilerplate_client/exception/errors.py @@ -0,0 +1,58 @@ +class UnknownDeviceError(Exception): + pass + + +class DenyError(Exception): + pass + + +class WrongP1P2Error(Exception): + pass + + +class WrongDataLengthError(Exception): + pass + + +class InsNotSupportedError(Exception): + pass + + +class ClaNotSupportedError(Exception): + pass + + +class WrongResponseLengthError(Exception): + pass + + +class DisplayBip32PathFailError(Exception): + pass + + +class DisplayAddressFailError(Exception): + pass + + +class DisplayAmountFailError(Exception): + pass + + +class WrongTxLengthError(Exception): + pass + + +class TxParsingFailError(Exception): + pass + + +class TxHashFail(Exception): + pass + + +class BadStateError(Exception): + pass + + +class SignatureFailError(Exception): + pass diff --git a/tests/speculos/boilerplate_client/transaction.py b/tests/speculos/boilerplate_client/transaction.py new file mode 100644 index 0000000..ce43f47 --- /dev/null +++ b/tests/speculos/boilerplate_client/transaction.py @@ -0,0 +1,47 @@ +from io import BytesIO +from typing import Union + +from boilerplate_client.utils import (read, read_uint, read_varint, + write_varint, UINT64_MAX) + + +class TransactionError(Exception): + pass + + +class Transaction: + def __init__(self, nonce: int, to: Union[str, bytes], value: int, memo: str) -> None: + self.nonce: int = nonce + self.to: bytes = bytes.fromhex(to[2:]) if isinstance(to, str) else to + self.value: int = value + self.memo: bytes = memo.encode("ascii") + + if not (0 <= self.nonce <= UINT64_MAX): + raise TransactionError(f"Bad nonce: '{self.nonce}'!") + + if not (0 <= self.value <= UINT64_MAX): + raise TransactionError(f"Bad value: '{self.value}'!") + + if len(self.to) != 20: + raise TransactionError(f"Bad address: '{self.to}'!") + + def serialize(self) -> bytes: + return b"".join([ + self.nonce.to_bytes(8, byteorder="big"), + self.to, + self.value.to_bytes(8, byteorder="big"), + write_varint(len(self.memo)), + self.memo + ]) + + @classmethod + def from_bytes(cls, hexa: Union[bytes, BytesIO]): + buf: BytesIO = BytesIO(hexa) if isinstance(hexa, bytes) else hexa + + nonce: int = read_uint(buf, 64, byteorder="big") + to: bytes = read(buf, 20) + value: int = read_uint(buf, 64, byteorder="big") + memo_len: int = read_varint(buf) + memo: str = read(buf, memo_len).decode("ascii") + + return cls(nonce=nonce, to=to, value=value, memo=memo) diff --git a/tests/speculos/boilerplate_client/utils.py b/tests/speculos/boilerplate_client/utils.py new file mode 100644 index 0000000..c2fe9d3 --- /dev/null +++ b/tests/speculos/boilerplate_client/utils.py @@ -0,0 +1,75 @@ +from io import BytesIO +from typing import List, Optional, Literal + + +UINT64_MAX: int = 18446744073709551615 +UINT32_MAX: int = 4294967295 +UINT16_MAX: int = 65535 + + +def bip32_path_from_string(path: str) -> List[bytes]: + splitted_path: List[str] = path.split("/") + + if not splitted_path: + raise Exception(f"BIP32 path format error: '{path}'") + + if "m" in splitted_path and splitted_path[0] == "m": + splitted_path = splitted_path[1:] + + return [int(p).to_bytes(4, byteorder="big") if "'" not in p + else (0x80000000 | int(p[:-1])).to_bytes(4, byteorder="big") + for p in splitted_path] + + +def write_varint(n: int) -> bytes: + if n < 0xFC: + return n.to_bytes(1, byteorder="little") + + if n <= UINT16_MAX: + return b"\xFD" + n.to_bytes(2, byteorder="little") + + if n <= UINT32_MAX: + return b"\xFE" + n.to_bytes(4, byteorder="little") + + if n <= UINT64_MAX: + return b"\xFF" + n.to_bytes(8, byteorder="little") + + raise ValueError(f"Can't write to varint: '{n}'!") + + +def read_varint(buf: BytesIO, + prefix: Optional[bytes] = None) -> int: + b: bytes = prefix if prefix else buf.read(1) + + if not b: + raise ValueError(f"Can't read prefix: '{b}'!") + + n: int = {b"\xfd": 2, b"\xfe": 4, b"\xff": 8}.get(b, 1) # default to 1 + + b = buf.read(n) if n > 1 else b + + if len(b) != n: + raise ValueError("Can't read varint!") + + return int.from_bytes(b, byteorder="little") + + +def read(buf: BytesIO, size: int) -> bytes: + b: bytes = buf.read(size) + + if len(b) < size: + raise ValueError(f"Cant read {size} bytes in buffer!") + + return b + + +def read_uint(buf: BytesIO, + bit_len: int, + byteorder: Literal['big', 'little'] = 'little') -> int: + size: int = bit_len // 8 + b: bytes = buf.read(size) + + if len(b) < size: + raise ValueError(f"Can't read u{bit_len} in buffer!") + + return int.from_bytes(b, byteorder) diff --git a/tests/speculos/conftest.py b/tests/speculos/conftest.py new file mode 100644 index 0000000..8c3999e --- /dev/null +++ b/tests/speculos/conftest.py @@ -0,0 +1,27 @@ +from collections import namedtuple +from pathlib import Path + +import pytest + +from speculos.client import SpeculosClient + +from boilerplate_client.boilerplate_cmd import BoilerplateCommand + + +SCRIPT_DIR = Path(__file__).absolute().parent +API_URL = "http://127.0.0.1:5000" + +@pytest.fixture(scope="session") +def client(): + file_path = SCRIPT_DIR.parent.parent / "bin" / "app.elf" + args = ['--model', 'nanos', '--display', 'qt', '--sdk', '2.1'] + with SpeculosClient(app=str(file_path), args=args) as client: + yield client + + +@pytest.fixture(scope="session") +def cmd(client): + yield BoilerplateCommand( + client=client, + debug=True + ) diff --git a/tests/speculos/requirements.txt b/tests/speculos/requirements.txt new file mode 100644 index 0000000..2744c35 --- /dev/null +++ b/tests/speculos/requirements.txt @@ -0,0 +1,5 @@ +speculos +pytest>=6.1.1,<7.0.0 +ledgercomm>=1.1.0,<1.2.0 +ecdsa>=0.16.1,<0.17.0 +pysha3>=1.0.0,<2.0.0 diff --git a/tests/speculos/setup.cfg b/tests/speculos/setup.cfg new file mode 100644 index 0000000..c79fd88 --- /dev/null +++ b/tests/speculos/setup.cfg @@ -0,0 +1,20 @@ +[tool:pytest] +addopts = --strict-markers + +[pylint] +disable = C0114, # missing-module-docstring + C0115, # missing-class-docstring + C0116, # missing-function-docstring + C0103, # invalid-name + R0801, # duplicate-code + R0913 # too-many-arguments +extension-pkg-whitelist=hid + +[pycodestyle] +max-line-length = 90 + +[mypy-hid.*] +ignore_missing_imports = True + +[mypy-pytest.*] +ignore_missing_imports = True diff --git a/tests/speculos/test_configuration_cmd.py b/tests/speculos/test_configuration_cmd.py new file mode 100644 index 0000000..9694737 --- /dev/null +++ b/tests/speculos/test_configuration_cmd.py @@ -0,0 +1,3 @@ + +def test_configuration(cmd): + assert cmd.get_configuration() == (14, 1, 9, 17) \ No newline at end of file diff --git a/tests/speculos/test_pubkey_cmd.py b/tests/speculos/test_pubkey_cmd.py new file mode 100644 index 0000000..aee64cf --- /dev/null +++ b/tests/speculos/test_pubkey_cmd.py @@ -0,0 +1,37 @@ + +from pickle import TRUE +from typing import Tuple + + +def test_get_public_key(cmd): + # ETHER COIN + uncompressed_addr_len, eth_addr, chain_code = cmd.get_public_key( + bip32_path="44'/60'/1'/0/0", + display=False + ) # type: bytes, bytes, bytes + + print("HERE", uncompressed_addr_len) + + assert len(uncompressed_addr_len) == 65 + assert len(eth_addr) == 40 + assert len(chain_code) == 32 + + assert uncompressed_addr_len == b'\x04\xea\x02&\x91\xc7\x87\x00\xd2\xc3\xa0\xc7E\xbe\xa4\xf2\xb8\xe5\xe3\x13\x97j\x10B\xf6\xa1Vc\\\xb2\x05\xda\x1a\xcb\xfe\x04*\nZ\x89eyn6"E\x89\x0eT\xbd-\xbex\xec\x1e\x18df\xf2\xe9\xd0\xf5\xd5\xd8\xdf' + assert eth_addr == b'463e4e114AA57F54f2Fd2C3ec03572C6f75d84C2' + assert chain_code == b'\xaf\x89\xcd)\xea${8I\xec\xc80\xc2\xc8\x94\\e1\xd6P\x87\x07?\x9f\xd09\x00\xa0\xea\xa7\x96\xc8' + + # DAI COIN + uncompressed_addr_len, eth_addr, chain_code = cmd.get_public_key( + bip32_path="44'/700'/1'/0/0", + display=False + ) # type: bytes, bytes, bytes + + print("HERE2", uncompressed_addr_len) + + assert len(uncompressed_addr_len) == 65 + assert len(eth_addr) == 40 + assert len(chain_code) == 32 + + assert uncompressed_addr_len == b'\x04V\x8a\x15\xdc\xed\xc8[\x16\x17\x8d\xaf\xcax\x91v~{\x9c\x06\xba\xaa\xde\xf4\xe7\x9f\x86\x1d~\xed)\xdc\n8\x9c\x84\xf01@E\x13]\xd7~6\x8e\x8e\xabb-\xad\xcdo\xc3Fw\xb7\xc8y\xdbQ/\xc3\xe5\x18' + assert eth_addr == b'Ba9A9aED0a1AbBE1da1155F64e73e57Af7995880' + assert chain_code == b'4\xaa\x95\xf4\x02\x12\x12-T\x155\x86\xed\xc5\x0b\x1d8\x81\xae\xce\xbd\x1a\xbbv\x9a\xc7\xd5\x1a\xd0KT\xe4' diff --git a/tests/speculos/test_version_cmd.py b/tests/speculos/test_version_cmd.py new file mode 100644 index 0000000..1ad0133 --- /dev/null +++ b/tests/speculos/test_version_cmd.py @@ -0,0 +1,10 @@ +#from warnings import catch_warnings +# +#import boilerplate_client +# +# +#def test_version(cmd): +# try: +# cmd.get_version() +# except boilerplate_client.exception.errors.InsNotSupportedError as error: +# assert error.args[0] == '0x6d00' \ No newline at end of file