From 54b979186d220a7f9e4e82248ddf32f02670b0b4 Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Fri, 28 Jul 2023 15:41:17 +0200 Subject: [PATCH] [clean] Linter / typing fixes --- .github/workflows/python-client.yml | 4 +- client/CHANGELOG.md | 12 +++ client/README.md | 6 +- client/pyproject.toml | 5 +- .../src/ledger_app_clients/ethereum/client.py | 34 +++++---- .../ethereum/command_builder.py | 18 ++--- .../ethereum/eip712/InputData.py | 73 ++++++++++++------- .../ledger_app_clients/ethereum/keychain.py | 8 +- .../ethereum/response_parser.py | 1 + .../ledger_app_clients/ethereum/settings.py | 19 +++-- client/src/ledger_app_clients/ethereum/tlv.py | 10 +-- 11 files changed, 117 insertions(+), 73 deletions(-) create mode 100644 client/CHANGELOG.md diff --git a/.github/workflows/python-client.yml b/.github/workflows/python-client.yml index 658edb1..ca8f164 100644 --- a/.github/workflows/python-client.yml +++ b/.github/workflows/python-client.yml @@ -17,7 +17,7 @@ jobs: uses: actions/checkout@v3 - run: pip install flake8 - name: Flake8 lint Python code - run: find client/src/ -type f -name '*.py' -exec flake8 --max-line-length=120 '{}' '+' + run: (cd client && find src/ -type f -name '*.py' -exec flake8 --max-line-length=120 '{}' '+') mypy: name: Type checking @@ -27,7 +27,7 @@ jobs: uses: actions/checkout@v3 - run: pip install mypy - name: Mypy type checking - run: mypy client/src + run: (cd client && mypy src/) build: name: Building the package diff --git a/client/CHANGELOG.md b/client/CHANGELOG.md new file mode 100644 index 0000000..9cdc9ea --- /dev/null +++ b/client/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.0.1] - 2023-08-07 + +### Added + +- Initial version diff --git a/client/README.md b/client/README.md index 7d8ca69..37da79a 100644 --- a/client/README.md +++ b/client/README.md @@ -10,12 +10,12 @@ This package is deployed: - on `pypi.org` for the stable version. This version will work with the application available on the `master` branch. ```bash - pip install ledger_app_clients.ethereum` + pip install ledger_app_clients.ethereum ``` -- on `test.pypi.org` for the rolling release. This verison will work with the +- on `test.pypi.org` for the rolling release. This version will work with the application code on the `develop` branch. ```bash - pip install --extra-index-url https://test.pypi.org/simple/ ledger_app_clients.ethereum` + pip install --extra-index-url https://test.pypi.org/simple/ ledger_app_clients.ethereum ``` ### Installation from sources diff --git a/client/pyproject.toml b/client/pyproject.toml index 8993e88..bea18ff 100644 --- a/client/pyproject.toml +++ b/client/pyproject.toml @@ -40,6 +40,5 @@ version = {attr = "ledger_app_clients.ethereum.__version__"} [project.urls] Home = "https://github.com/LedgerHQ/app-ethereum" -# [tool.setuptools_scm] -# write_to = "ledgerwallet/__version__.py" -# local_scheme = "no-local-version" +[tool.mypy] +ignore_missing_imports = true diff --git a/client/src/ledger_app_clients/ethereum/client.py b/client/src/ledger_app_clients/ethereum/client.py index f556d2d..18d774f 100644 --- a/client/src/ledger_app_clients/ethereum/client.py +++ b/client/src/ledger_app_clients/ethereum/client.py @@ -2,6 +2,7 @@ import rlp from enum import IntEnum from ragger.backend import BackendInterface from ragger.utils import RAPDU +from typing import List, Optional, Union from .command_builder import CommandBuilder from .eip712 import EIP712FieldType @@ -13,14 +14,15 @@ WEI_IN_ETH = 1e+18 class StatusWord(IntEnum): - OK = 0x9000 - ERROR_NO_INFO = 0x6a00 - INVALID_DATA = 0x6a80 - INSUFFICIENT_MEMORY = 0x6a84 - INVALID_INS = 0x6d00 - INVALID_P1_P2 = 0x6b00 + OK = 0x9000 + ERROR_NO_INFO = 0x6a00 + INVALID_DATA = 0x6a80 + INSUFFICIENT_MEMORY = 0x6a84 + INVALID_INS = 0x6d00 + INVALID_P1_P2 = 0x6b00 CONDITION_NOT_SATISFIED = 0x6985 - REF_DATA_NOT_FOUND = 0x6a88 + REF_DATA_NOT_FOUND = 0x6a88 + class DOMAIN_NAME_TAG(IntEnum): STRUCTURE_TYPE = 0x01 @@ -39,10 +41,10 @@ class EthAppClient: self._client = client self._cmd_builder = CommandBuilder() - def _send(self, payload: bytearray): + def _send(self, payload: bytes): return self._client.exchange_async_raw(payload) - def response(self) -> RAPDU: + def response(self) -> Optional[RAPDU]: return self._client._last_async_response def eip712_send_struct_def_struct_name(self, name: str): @@ -52,7 +54,7 @@ class EthAppClient: field_type: EIP712FieldType, type_name: str, type_size: int, - array_levels: [], + array_levels: List, key_name: str): return self._send(self._cmd_builder.eip712_send_struct_def_struct_field( field_type, @@ -68,7 +70,7 @@ class EthAppClient: return self._send(self._cmd_builder.eip712_send_struct_impl_array(size)) def eip712_send_struct_impl_struct_field(self, raw_value: bytes): - chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(raw_value) + chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(bytearray(raw_value)) for chunk in chunks[:-1]: with self._send(chunk): pass @@ -102,7 +104,7 @@ class EthAppClient: to: bytes, amount: float, chain_id: int): - data = list() + data: List[Union[int, bytes]] = list() data.append(nonce) data.append(gas_price) data.append(gas_limit) @@ -123,12 +125,12 @@ class EthAppClient: return self._send(self._cmd_builder.get_challenge()) def provide_domain_name(self, challenge: int, name: str, addr: bytes): - payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName + payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName payload += format_tlv(DOMAIN_NAME_TAG.STRUCTURE_VERSION, 1) - payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key - payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1 + payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key + payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1 payload += format_tlv(DOMAIN_NAME_TAG.CHALLENGE, challenge) - payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44 + payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44 payload += format_tlv(DOMAIN_NAME_TAG.DOMAIN_NAME, name) payload += format_tlv(DOMAIN_NAME_TAG.ADDRESS, addr) payload += format_tlv(DOMAIN_NAME_TAG.SIGNATURE, diff --git a/client/src/ledger_app_clients/ethereum/command_builder.py b/client/src/ledger_app_clients/ethereum/command_builder.py index 92d895d..8f2dbfd 100644 --- a/client/src/ledger_app_clients/ethereum/command_builder.py +++ b/client/src/ledger_app_clients/ethereum/command_builder.py @@ -1,7 +1,7 @@ import struct from enum import IntEnum from ragger.bip import pack_derivation_path -from typing import Iterator +from typing import List from .eip712 import EIP712FieldType @@ -41,7 +41,7 @@ class CommandBuilder: ins: InsType, p1: int, p2: int, - cdata: bytearray = bytes()) -> bytes: + cdata: bytes = bytes()) -> bytes: header = bytearray() header.append(self._CLA) @@ -67,24 +67,24 @@ class CommandBuilder: field_type: EIP712FieldType, type_name: str, type_size: int, - array_levels: [], + array_levels: List, key_name: str) -> bytes: data = bytearray() typedesc = 0 typedesc |= (len(array_levels) > 0) << 7 - typedesc |= (type_size != None) << 6 + typedesc |= (type_size is not None) << 6 typedesc |= field_type data.append(typedesc) if field_type == EIP712FieldType.CUSTOM: data.append(len(type_name)) data += self._string_to_bytes(type_name) - if type_size != None: + if type_size is not None: data.append(type_size) if len(array_levels) > 0: data.append(len(array_levels)) for level in array_levels: - data.append(0 if level == None else 1) - if level != None: + data.append(0 if level is None else 1) + if level is not None: data.append(level) data.append(len(key_name)) data += self._string_to_bytes(key_name) @@ -107,7 +107,7 @@ class CommandBuilder: P2Type.ARRAY, data) - def eip712_send_struct_impl_struct_field(self, data: bytearray) -> Iterator[bytes]: + def eip712_send_struct_impl_struct_field(self, data: bytearray) -> List[bytes]: chunks = list() # Add a 16-bit integer with the data's byte length (network byte order) data_w_length = bytearray() @@ -193,7 +193,7 @@ class CommandBuilder: def provide_domain_name(self, tlv_payload: bytes) -> list[bytes]: chunks = list() - payload = struct.pack(">H", len(tlv_payload)) + payload = struct.pack(">H", len(tlv_payload)) payload += tlv_payload p1 = 1 while len(payload) > 0: diff --git a/client/src/ledger_app_clients/ethereum/eip712/InputData.py b/client/src/ledger_app_clients/ethereum/eip712/InputData.py index 68f8556..ac0877c 100644 --- a/client/src/ledger_app_clients/ethereum/eip712/InputData.py +++ b/client/src/ledger_app_clients/ethereum/eip712/InputData.py @@ -3,7 +3,7 @@ import json import re import signal import sys -from typing import Callable +from typing import Any, Callable, Dict, List, Optional from ledger_app_clients.ethereum import keychain from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType @@ -11,11 +11,16 @@ from ledger_app_clients.ethereum.client import EthAppClient, EIP712FieldType # global variables app_client: EthAppClient = None -filtering_paths = None -current_path = list() -sig_ctx = {} +filtering_paths: Dict = {} +current_path: List[str] = list() +sig_ctx: Dict[str, Any] = {} -autonext_handler: Callable = None + +def default_handler(): + raise RuntimeError("Uninitialized handler") + + +autonext_handler: Callable = default_handler # From a string typename, extract the type and all the array depth @@ -55,29 +60,34 @@ def get_typesize(typename): return (typename, typesize) - def parse_int(typesize): return (EIP712FieldType.INT, int(typesize / 8)) + def parse_uint(typesize): return (EIP712FieldType.UINT, int(typesize / 8)) + def parse_address(typesize): return (EIP712FieldType.ADDRESS, None) + def parse_bool(typesize): return (EIP712FieldType.BOOL, None) + def parse_string(typesize): return (EIP712FieldType.STRING, None) + def parse_bytes(typesize): - if typesize != None: + if typesize is not None: return (EIP712FieldType.FIX_BYTES, typesize) return (EIP712FieldType.DYN_BYTES, None) + # set functions for each type -parsing_type_functions = {}; +parsing_type_functions = {} parsing_type_functions["int"] = parse_int parsing_type_functions["uint"] = parse_uint parsing_type_functions["address"] = parse_address @@ -86,7 +96,6 @@ parsing_type_functions["string"] = parse_string parsing_type_functions["bytes"] = parse_bytes - def send_struct_def_field(typename, keyname): type_enum = None @@ -108,7 +117,6 @@ def send_struct_def_field(typename, keyname): return (typename, type_enum, typesize, array_lvls) - def encode_integer(value, typesize): data = bytearray() @@ -122,9 +130,9 @@ def encode_integer(value, typesize): if value == 0: data.append(0) else: - if value < 0: # negative number, send it as unsigned + if value < 0: # negative number, send it as unsigned mask = 0 - for i in range(typesize): # make a mask as big as the typesize + for i in range(typesize): # make a mask as big as the typesize mask = (mask << 8) | 0xff value &= mask while value > 0: @@ -133,42 +141,51 @@ def encode_integer(value, typesize): data.reverse() return data + def encode_int(value, typesize): return encode_integer(value, typesize) + def encode_uint(value, typesize): return encode_integer(value, typesize) + def encode_hex_string(value, size): data = bytearray() - value = value[2:] # skip 0x + value = value[2:] # skip 0x byte_idx = 0 while byte_idx < size: data.append(int(value[(byte_idx * 2):(byte_idx * 2 + 2)], 16)) byte_idx += 1 return data + def encode_address(value, typesize): return encode_hex_string(value, 20) + def encode_bool(value, typesize): return encode_integer(value, typesize) + def encode_string(value, typesize): data = bytearray() for char in value: data.append(ord(char)) return data + def encode_bytes_fix(value, typesize): return encode_hex_string(value, typesize) + def encode_bytes_dyn(value, typesize): # length of the value string # - the length of 0x (2) # / by the length of one byte in a hex string (2) return encode_hex_string(value, int((len(value) - 2) / 2)) + # set functions for each type encoding_functions = {} encoding_functions[EIP712FieldType.INT] = encode_int @@ -180,7 +197,6 @@ encoding_functions[EIP712FieldType.FIX_BYTES] = encode_bytes_fix encoding_functions[EIP712FieldType.DYN_BYTES] = encode_bytes_dyn - def send_struct_impl_field(value, field): # Something wrong happened if this triggers if isinstance(value, list) or (field["enum"] == EIP712FieldType.CUSTOM): @@ -188,7 +204,6 @@ def send_struct_impl_field(value, field): data = encoding_functions[field["enum"]](value, field["typesize"]) - if filtering_paths: path = ".".join(current_path) if path in filtering_paths.keys(): @@ -199,8 +214,7 @@ def send_struct_impl_field(value, field): disable_autonext() - -def evaluate_field(structs, data, field, lvls_left, new_level = True): +def evaluate_field(structs, data, field, lvls_left, new_level=True): array_lvls = field["array_lvls"] if new_level: @@ -215,7 +229,7 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True): return False current_path.pop() idx += 1 - if array_lvls[lvls_left - 1] != None: + if array_lvls[lvls_left - 1] is not None: if array_lvls[lvls_left - 1] != idx: print("Mismatch in array size! Got %d, expected %d\n" % (idx, array_lvls[lvls_left - 1]), @@ -232,7 +246,6 @@ def evaluate_field(structs, data, field, lvls_left, new_level = True): return True - def send_struct_impl(structs, data, structname): # Check if it is a struct we don't known if structname not in structs.keys(): @@ -244,6 +257,7 @@ def send_struct_impl(structs, data, structname): return False return True + # ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures def send_filtering_message_info(display_name: str, filters_count: int): global sig_ctx @@ -262,6 +276,7 @@ def send_filtering_message_info(display_name: str, filters_count: int): enable_autonext() disable_autonext() + # ledgerjs doesn't actually sign anything, and instead uses already pre-computed signatures def send_filtering_show_field(display_name): global sig_ctx @@ -281,12 +296,14 @@ def send_filtering_show_field(display_name): with app_client.eip712_filtering_show_field(display_name, sig): pass -def read_filtering_file(domain, message, filtering_file_path): + +def read_filtering_file(filtering_file_path: str): data_json = None with open(filtering_file_path) as data: data_json = json.load(data) return data_json + def prepare_filtering(filtr_data, message): global filtering_paths @@ -295,12 +312,14 @@ def prepare_filtering(filtr_data, message): else: filtering_paths = {} + def handle_optional_domain_values(domain): if "chainId" not in domain.keys(): domain["chainId"] = 0 if "verifyingContract" not in domain.keys(): domain["verifyingContract"] = "0x0000000000000000000000000000000000000000" + def init_signature_context(types, domain): global sig_ctx @@ -314,7 +333,7 @@ def init_signature_context(types, domain): for i in range(8): sig_ctx["chainid"].append(chainid & (0xff << (i * 8))) sig_ctx["chainid"].reverse() - schema_str = json.dumps(types).replace(" ","") + schema_str = json.dumps(types).replace(" ", "") schema_hash = hashlib.sha224(schema_str.encode()) sig_ctx["schema_hash"] = bytearray.fromhex(schema_hash.hexdigest()) @@ -322,22 +341,24 @@ def init_signature_context(types, domain): def next_timeout(_signum: int, _frame): autonext_handler() + def enable_autonext(): seconds = 1/4 - if app_client._client.firmware.device == 'stax': # Stax Speculos is slow + if app_client._client.firmware.device == 'stax': # Stax Speculos is slow interval = seconds * 3 else: interval = seconds signal.setitimer(signal.ITIMER_REAL, seconds, interval) + def disable_autonext(): signal.setitimer(signal.ITIMER_REAL, 0, 0) def process_file(aclient: EthAppClient, input_file_path: str, - filtering_file_path = None, - autonext: Callable = None) -> bool: + filtering_file_path: Optional[str] = None, + autonext: Optional[Callable] = None) -> bool: global sig_ctx global app_client global autonext_handler @@ -357,7 +378,7 @@ def process_file(aclient: EthAppClient, if filtering_file_path: init_signature_context(types, domain) - filtr = read_filtering_file(domain, message, filtering_file_path) + filtr = read_filtering_file(filtering_file_path) # send types definition for key in types.keys(): @@ -365,7 +386,7 @@ def process_file(aclient: EthAppClient, pass for f in types[key]: (f["type"], f["enum"], f["typesize"], f["array_lvls"]) = \ - send_struct_def_field(f["type"], f["name"]) + send_struct_def_field(f["type"], f["name"]) if filtering_file_path: with app_client.eip712_filtering_activate(): diff --git a/client/src/ledger_app_clients/ethereum/keychain.py b/client/src/ledger_app_clients/ethereum/keychain.py index 523d1d1..4e66b6a 100644 --- a/client/src/ledger_app_clients/ethereum/keychain.py +++ b/client/src/ledger_app_clients/ethereum/keychain.py @@ -3,6 +3,7 @@ import hashlib from ecdsa import SigningKey from ecdsa.util import sigencode_der from enum import Enum, auto +from typing import Dict # Private key PEM files have to be named the same (lowercase) as their corresponding enum entries @@ -11,14 +12,17 @@ class Key(Enum): CAL = auto() DOMAIN_NAME = auto() -_keys: dict[Key, SigningKey] = dict() + +_keys: Dict[Key, SigningKey] = dict() + # Open the corresponding PEM file and load its key in the global dict def _init_key(key: Key): global _keys with open("%s/keychain/%s.pem" % (os.path.dirname(__file__), key.name.lower())) as pem_file: _keys[key] = SigningKey.from_pem(pem_file.read(), hashlib.sha256) - assert (key in _keys) and (_keys[key] != None) + assert (key in _keys) and (_keys[key] is not None) + # Generate a SECP256K1 signature of the given data with the given key def sign_data(key: Key, data: bytes) -> bytes: diff --git a/client/src/ledger_app_clients/ethereum/response_parser.py b/client/src/ledger_app_clients/ethereum/response_parser.py index 5e73df4..26a2638 100644 --- a/client/src/ledger_app_clients/ethereum/response_parser.py +++ b/client/src/ledger_app_clients/ethereum/response_parser.py @@ -9,6 +9,7 @@ def signature(data: bytes) -> tuple[bytes, bytes, bytes]: return v, r, s + def challenge(data: bytes) -> int: assert len(data) == 4 return int.from_bytes(data, "big") diff --git a/client/src/ledger_app_clients/ethereum/settings.py b/client/src/ledger_app_clients/ethereum/settings.py index 2b44d45..d9d3ed5 100644 --- a/client/src/ledger_app_clients/ethereum/settings.py +++ b/client/src/ledger_app_clients/ethereum/settings.py @@ -1,6 +1,7 @@ from enum import Enum, auto from ragger.firmware import Firmware from ragger.navigator import Navigator, NavInsID, NavIns +from typing import List, Union class SettingID(Enum): @@ -10,6 +11,7 @@ class SettingID(Enum): VERBOSE_EIP712 = auto() VERBOSE_ENS = auto() + def get_device_settings(device: str) -> list[SettingID]: if device == "nanos": return [ @@ -27,19 +29,22 @@ def get_device_settings(device: str) -> list[SettingID]: ] return [] + settings_per_page = 3 -def get_setting_position(device: str, setting: NavInsID) -> tuple[int, int]: - screen_height = 672 # px - header_height = 85 # px - footer_height = 124 # px + +def get_setting_position(device: str, setting: Union[NavInsID, SettingID]) -> tuple[int, int]: + screen_height = 672 # px + header_height = 85 # px + footer_height = 124 # px usable_height = screen_height - (header_height + footer_height) setting_height = usable_height // settings_per_page - index_in_page = get_device_settings(device).index(setting) % settings_per_page + index_in_page = get_device_settings(device).index(SettingID(setting)) % settings_per_page return 350, header_height + (setting_height * index_in_page) + (setting_height // 2) + def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]): - moves = list() + moves: List[Union[NavIns, NavInsID]] = list() settings = get_device_settings(fw.device) # Assume the app is on the home page if fw.device.startswith("nano"): @@ -49,7 +54,7 @@ def settings_toggle(fw: Firmware, nav: Navigator, to_toggle: list[SettingID]): if setting in to_toggle: moves += [NavInsID.BOTH_CLICK] moves += [NavInsID.RIGHT_CLICK] - moves += [NavInsID.BOTH_CLICK] # Back + moves += [NavInsID.BOTH_CLICK] # Back else: moves += [NavInsID.USE_CASE_HOME_SETTINGS] moves += [NavInsID.USE_CASE_SETTINGS_NEXT] diff --git a/client/src/ledger_app_clients/ethereum/tlv.py b/client/src/ledger_app_clients/ethereum/tlv.py index 2ff4cef..fd5dc10 100644 --- a/client/src/ledger_app_clients/ethereum/tlv.py +++ b/client/src/ledger_app_clients/ethereum/tlv.py @@ -1,4 +1,5 @@ -from typing import Any +from typing import Union + def der_encode(value: int) -> bytes: # max() to have minimum length of 1 @@ -7,16 +8,15 @@ def der_encode(value: int) -> bytes: value_bytes = (0x80 | len(value_bytes)).to_bytes(1, 'big') + value_bytes return value_bytes -def format_tlv(tag: int, value: Any) -> bytes: + +def format_tlv(tag: int, value: Union[int, str, bytes]) -> bytes: if isinstance(value, int): # max() to have minimum length of 1 value = value.to_bytes(max(1, (value.bit_length() + 7) // 8), 'big') elif isinstance(value, str): value = value.encode() - if not isinstance(value, bytes): - print("Unhandled TLV formatting for type : %s" % (type(value))) - return None + assert isinstance(value, bytes), f"Unhandled TLV formatting for type : {type(value)}" tlv = bytearray() tlv += der_encode(tag)