diff --git a/src/solo_gui/device_manager.py b/src/solo_gui/device_manager.py index d0a4a3a..379e3b5 100644 --- a/src/solo_gui/device_manager.py +++ b/src/solo_gui/device_manager.py @@ -76,11 +76,6 @@ def __init__(self): super().__init__() self._device: Optional[SoloDevice] = None self._ctap2: Optional[Ctap2] = None - self._client_pin: Optional[ClientPin] = None - self._pin_token: Optional[bytes] = None - self._pin_protocol = None - self._credman: Optional[CredentialManagement] = None - self._cached_pin: Optional[str] = None self._last_auth_error: Optional[str] = None self._request_queue: Queue = Queue() self._worker_thread: Optional[threading.Thread] = None @@ -103,7 +98,6 @@ def start(self, device: SoloDevice) -> bool: current_path == new_path, ) self._running = False - self._cached_pin = None self._request_queue.put(None) if self._worker_thread: @@ -114,8 +108,6 @@ def start(self, device: SoloDevice) -> bool: self._request_queue = Queue() self._device = device - if not self._open_device(): - return False self._running = True self._request_queue = Queue() @@ -132,7 +124,6 @@ def stop(self): return self._running = False - self._cached_pin = None self._request_queue.put(None) if self._worker_thread: @@ -143,32 +134,29 @@ def stop(self): self._request_queue = Queue() self.device_disconnected.emit() - def _open_device(self) -> bool: - """Open the device connection.""" + def _open_ctap2(self) -> Optional[Ctap2]: + """Open a fresh CTAP2 connection for one high-level operation.""" try: if self._device is None: - return False + return None if hasattr(self._device, "prefers_ccid") and self._device.prefers_ccid(): _log.debug("_open_device: CCID-backed device=%r", getattr(self._device, "path", None)) - self._ctap2 = None - return True + return None _log.debug("_open_device: looking for device=%r", getattr(self._device, "path", None)) - self._ctap2 = self._device.open_ctap2() - if self._ctap2 is not None: - self._reset_hid_channel() + ctap2 = self._device.open_ctap2() + if ctap2 is not None: + self._ctap2 = ctap2 + self._reset_hid_channel(ctap2) _log.debug("_open_device: SUCCESS") - return True + return ctap2 _log.debug("_open_device: FAILED — device.open_ctap2() returned None") - return False + return None except Exception as e: - # CTAP error 0x00 is actually success, device is already open - if "0x00" in str(e) or "SUCCESS" in str(e): - return True _log.debug("_open_device: exception: %s", e) - return False + raise def _close_device(self): - """Close the device connection.""" + """Close any currently active CTAP2 connection and clear operation state.""" try: if self._ctap2 is not None and getattr(self._ctap2, "device", None) is not None: close = getattr(self._ctap2.device, "close", None) @@ -177,15 +165,13 @@ def _close_device(self): except Exception as exc: _log.debug("_close_device: close failed: %s", exc) self._ctap2 = None - self._client_pin = None - self._pin_token = None - self._credman = None - def _reset_hid_channel(self) -> None: + def _reset_hid_channel(self, ctap2: Optional[Ctap2] = None) -> None: """Re-initialize the CTAP HID channel for the current device, if any.""" try: - if self._ctap2: - hid_dev = self._ctap2.device + ctap2 = ctap2 or self._ctap2 + if ctap2: + hid_dev = ctap2.device hid_dev._channel_id = 0xFFFFFFFF # Broadcast channel nonce = os.urandom(8) response = hid_dev.call(CTAPHID.INIT, nonce) @@ -194,13 +180,33 @@ def _reset_hid_channel(self) -> None: except Exception as exc: _log.debug("_reset_hid_channel failed: %s", exc) - def _reopen_device(self) -> bool: - """Reopen device connection (after error).""" - self._close_device() - time.sleep(0.1) # Brief delay to let USB settle - if not self._open_device(): - return False - return True + def _run_with_fresh_ctap2(self, operation: Callable[[Ctap2], Any], *, retry: bool = True) -> Any: + """Run one high-level CTAP2 operation with a short-lived HID handle.""" + if self._device is None: + raise RuntimeError("Device not connected") + + attempts = 2 if retry else 1 + last_error: Optional[BaseException] = None + + for attempt in range(attempts): + self._close_device() + try: + ctap2 = self._open_ctap2() + if ctap2 is None: + raise RuntimeError("CTAP HID connection is not available") + return operation(ctap2) + except Exception as exc: + last_error = exc + if attempt + 1 >= attempts or not self._is_retryable_channel_error(exc): + raise + _log.debug("_run_with_fresh_ctap2: retrying after stale channel: %s", exc) + time.sleep(0.1) + finally: + self._close_device() + + if last_error is not None: + raise last_error + raise RuntimeError("CTAP HID connection is not available") @staticmethod def _is_retryable_channel_error(error: BaseException | str) -> bool: @@ -211,9 +217,12 @@ def _is_retryable_channel_error(error: BaseException | str) -> bool: or "wrong_channel" in error_msg or "invalid channel" in error_msg or "invalid_channel" in error_msg + or "invalid command" in error_msg + or "invalid_command" in error_msg or "invalid_seq" in error_msg or "key_agreement" in error_msg or "result.key_agreement" in error_msg + or "ctap error: 0x01" in error_msg or "ctap error: 0x04" in error_msg or "6f00" in error_msg or "0x6f00" in error_msg @@ -239,20 +248,6 @@ def _is_key_agreement_result_error(error: BaseException | str) -> bool: error_msg = str(error).lower() return "key_agreement" in error_msg or "result.key_agreement" in error_msg - def _wait_for_device(self, timeout: float = 10.0) -> bool: - """Wait for device to reappear after disconnect.""" - deadline = time.monotonic() + timeout - while time.monotonic() < deadline: - try: - if self._device is not None: - self._ctap2 = self._device.open_ctap2() - if self._ctap2 is not None: - return True - except Exception: - pass - time.sleep(0.25) - return False - def _process_loop(self): """Main processing loop - runs in worker thread.""" while self._running: @@ -298,21 +293,16 @@ def _handle_request(self, request: DeviceRequest): request.callback(None, str(e)) self.error_occurred.emit(request.operation_id, str(e)) - def _ensure_device(self) -> bool: - """Ensure device is connected, reopen if needed.""" - if self._device is not None and hasattr(self._device, "prefers_ccid") and self._device.prefers_ccid(): - return True - if self._ctap2 is None: - return self._reopen_device() - return True - - def _ensure_authenticated(self, pin: str) -> bool: - """Ensure we have valid PIN token for CredMan operations.""" + def _prefers_ccid(self) -> bool: + return bool( + self._device is not None + and hasattr(self._device, "prefers_ccid") + and self._device.prefers_ccid() + ) + + def _credential_management_for_pin(self, ctap2: Ctap2, pin: str) -> CredentialManagement: + """Create an authenticated CredentialManagement object for this operation.""" self._last_auth_error = None - if self._ctap2 is None: - if not self._reopen_device(): - self._last_auth_error = "Device not connected" - return False permissions = [ClientPin.PERMISSION.CREDENTIAL_MGMT] persistent_cred_mgmt = getattr( @@ -325,53 +315,34 @@ def _ensure_authenticated(self, pin: str) -> bool: permissions.append(None) last_error: Exception | None = None - for attempt in range(2): - self._client_pin = None - self._pin_token = None - self._credman = None - - for permission in permissions: - try: - self._client_pin = ClientPin(self._ctap2) - self._pin_protocol = self._client_pin.protocol - self._pin_token = self._client_pin.get_pin_token(pin, permission) - self._cached_pin = pin - self._credman = CredentialManagement( - self._ctap2, self._pin_protocol, self._pin_token - ) - self._last_auth_error = None - return True - except Exception as exc: - last_error = exc - _log.debug( - "_ensure_authenticated failed permission=%s err=%s", - permission, - exc, - ) - if self._is_key_agreement_result_error(exc): - break - - if last_error is None or not self._is_retryable_channel_error(last_error): - break - if not self._reopen_device(): - break + for permission in permissions: + try: + client_pin = ClientPin(ctap2) + pin_protocol = client_pin.protocol + pin_token = client_pin.get_pin_token(pin, permission) + self._last_auth_error = None + return CredentialManagement(ctap2, pin_protocol, pin_token) + except Exception as exc: + last_error = exc + _log.debug( + "_credential_management_for_pin failed permission=%s err=%s", + permission, + exc, + ) + if self._is_key_agreement_result_error(exc): + break if last_error is None: self._last_auth_error = "Authentication failed" else: message = str(last_error).strip() or last_error.__class__.__name__ self._last_auth_error = message - return False + raise RuntimeError(self._last_auth_error) def _do_get_info(self, request: DeviceRequest): """Execute GET_INFO request.""" - if not self._ensure_device(): - _log.debug("_do_get_info: _ensure_device() failed") - request.callback(None, "Device not connected") - return - try: - if self._ctap2 is None and self._device is not None and hasattr(self._device, "prefers_ccid") and self._device.prefers_ccid(): + if self._prefers_ccid(): info = self._device.get_info() result = { 'aaguid': None, @@ -382,8 +353,9 @@ def _do_get_info(self, request: DeviceRequest): } request.callback(result, None) return + _log.debug("_do_get_info: calling ctap2.get_info()") - info = self._ctap2.get_info() + info = self._run_with_fresh_ctap2(lambda ctap2: ctap2.get_info()) opts = dict(info.options) if info.options else {} _log.debug("_do_get_info: OK versions=%s options=%s", info.versions, opts) result = { @@ -394,114 +366,52 @@ def _do_get_info(self, request: DeviceRequest): } request.callback(result, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e): - if self._reopen_device(): - try: - info = self._ctap2.get_info() - result = { - 'aaguid': info.aaguid, - 'versions': info.versions, - 'options': dict(info.options) if info.options else {}, - 'ctap2_available': True, - } - request.callback(result, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _do_get_pin_retries(self, request: DeviceRequest): """Execute GET_PIN_RETRIES request.""" - if not self._ensure_device(): - request.callback(None, "Device not connected") - return - try: - if self._client_pin is None: - self._client_pin = ClientPin(self._ctap2) - retries = self._normalize_pin_retries(self._client_pin.get_pin_retries()) + def operation(ctap2: Ctap2) -> Optional[int]: + client_pin = ClientPin(ctap2) + return self._normalize_pin_retries(client_pin.get_pin_retries()) + + retries = self._run_with_fresh_ctap2(operation) request.callback(retries, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e): - if self._reopen_device(): - try: - if self._client_pin is None: - self._client_pin = ClientPin(self._ctap2) - retries = self._normalize_pin_retries(self._client_pin.get_pin_retries()) - request.callback(retries, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _do_wink(self, request: DeviceRequest): """Execute WINK request.""" - if not self._ensure_device(): - request.callback(None, "Device not connected") - return - try: - if self._ctap2 is None and self._device is not None: + if self._prefers_ccid(): self._device.admin().wink() request.callback(True, None) return - self._ctap2.device.wink() + + self._run_with_fresh_ctap2(lambda ctap2: ctap2.device.wink()) request.callback(True, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e): - if self._reopen_device(): - try: - self._ctap2.device.wink() - request.callback(True, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _do_vendor_command(self, request: DeviceRequest): """Execute vendor command.""" - if not self._ensure_device(): - request.callback(None, "Device not connected") - return - try: command = request.args['command'] data = request.args['data'] - if self._ctap2 is None and self._device is not None: + if self._prefers_ccid(): if command == 0x70: response = self._device.secrets().send_apdu(data) else: response = self._device.admin().call(command, data) request.callback(response, None) return - response = self._ctap2.device.call(command, data) + + response = self._run_with_fresh_ctap2( + lambda ctap2: ctap2.device.call(command, data) + ) request.callback(response, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e) and self._reopen_device(): - try: - command = request.args['command'] - data = request.args['data'] - if self._ctap2 is None and self._device is not None: - if command == 0x70: - response = self._device.secrets().send_apdu(data) - else: - response = self._device.admin().call(command, data) - request.callback(response, None) - return - response = self._ctap2.device.call(command, data) - request.callback(response, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _do_reset(self, request: DeviceRequest): """Execute RESET request.""" @@ -510,57 +420,15 @@ def _do_reset(self, request: DeviceRequest): return try: - # Factory reset is especially sensitive to stale HID handles on - # Windows after rapid replug/reboot cycles. Always open a fresh - # CTAP2 connection for the reset attempt instead of reusing an - # existing shared session. - self._close_device() - try: - self._ctap2 = self._device.open_ctap2() - if self._ctap2 is not None: - self._reset_hid_channel() - except Exception as exc: - _log.debug("_do_reset: open_ctap2 failed: %s", exc) - - if self._ctap2 is None: - request.callback( - None, - "Factory reset requires the FIDO2 HID interface, but no CTAP HID connection is available", - ) - return - _log.debug("_do_reset: issuing CTAP authenticatorReset") - self._ctap2.reset() - self._cached_pin = None - self._pin_token = None - self._credman = None + self._run_with_fresh_ctap2(lambda ctap2: ctap2.reset(), retry=False) verification_error = self._verify_factory_reset_effect() - self._close_device() if verification_error: request.callback(None, verification_error) return request.callback(True, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e): - if self._reopen_device() and self._ctap2 is not None: - try: - _log.debug("_do_reset: retrying CTAP authenticatorReset after wrong channel") - self._ctap2.reset() - self._cached_pin = None - self._pin_token = None - self._credman = None - verification_error = self._verify_factory_reset_effect() - self._close_device() - if verification_error: - request.callback(None, verification_error) - return - request.callback(True, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _open_ctap2_for_reset_verification(self, timeout: float = 3.0) -> Optional[Ctap2]: """Open a fresh CTAP2 handle with a short retry window after reset.""" @@ -568,10 +436,11 @@ def _open_ctap2_for_reset_verification(self, timeout: float = 3.0) -> Optional[C while time.monotonic() < deadline: self._close_device() try: - self._ctap2 = self._device.open_ctap2() if self._device is not None else None - if self._ctap2 is not None: - self._reset_hid_channel() - return self._ctap2 + ctap2 = self._device.open_ctap2() if self._device is not None else None + if ctap2 is not None: + self._ctap2 = ctap2 + self._reset_hid_channel(ctap2) + return ctap2 except Exception as exc: _log.debug("_open_ctap2_for_reset_verification failed: %s", exc) time.sleep(0.2) @@ -633,15 +502,15 @@ def _verify_factory_reset_effect(self) -> Optional[str]: return None - def _collect_resident_credentials(self, operation_id: str) -> list[dict]: + def _collect_resident_credentials(self, credman: CredentialManagement, operation_id: str) -> list[dict]: """Read resident credentials through an authenticated CredentialManagement session.""" credentials = [] - metadata = self._credman.get_metadata() + metadata = credman.get_metadata() existing_count = metadata.get(CredentialManagement.RESULT.EXISTING_CRED_COUNT, 0) if existing_count > 0: self.operation_progress.emit(operation_id, 10, f"Found {existing_count} credentials") - rps = self._credman.enumerate_rps() + rps = credman.enumerate_rps() for idx, rp_data in enumerate(rps): rp = rp_data.get(CredentialManagement.RESULT.RP) @@ -650,7 +519,7 @@ def _collect_resident_credentials(self, operation_id: str) -> list[dict]: if not rp: continue - creds = self._credman.enumerate_creds(rp_id_hash) + creds = credman.enumerate_creds(rp_id_hash) for cred_data in creds: cred_id = cred_data.get(CredentialManagement.RESULT.CREDENTIAL_ID) user = cred_data.get(CredentialManagement.RESULT.USER) @@ -672,58 +541,47 @@ def _collect_resident_credentials(self, operation_id: str) -> list[dict]: def _do_get_credentials(self, request: DeviceRequest): """Execute GET_CREDENTIALS request.""" - pin = request.args.get('pin') or self._cached_pin + pin = request.args.get('pin') if not pin: self.pin_required.emit(request.operation_id) request.callback(None, "PIN required") return - - if not self._ensure_authenticated(pin): - request.callback(None, self._last_auth_error or "Authentication failed") - return - + try: - credentials = self._collect_resident_credentials(request.operation_id) + def operation(ctap2: Ctap2) -> list[dict]: + credman = self._credential_management_for_pin(ctap2, pin) + return self._collect_resident_credentials(credman, request.operation_id) + + credentials = self._run_with_fresh_ctap2(operation) self.credentials_loaded.emit(request.operation_id, credentials) request.callback(credentials, None) except Exception as e: - if self._is_retryable_channel_error(e) and self._reopen_device(): - self._pin_token = None - self._credman = None - if self._ensure_authenticated(pin): - try: - credentials = self._collect_resident_credentials(request.operation_id) - self.credentials_loaded.emit(request.operation_id, credentials) - request.callback(credentials, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return request.callback(None, str(e)) def _do_delete_credential(self, request: DeviceRequest): """Execute DELETE_CREDENTIAL request.""" - pin = request.args.get('pin') or self._cached_pin + pin = request.args.get('pin') cred_id = request.args.get('cred_id') if not pin: self.pin_required.emit(request.operation_id) request.callback(None, "PIN required") return - - if not self._ensure_authenticated(pin): - request.callback(None, self._last_auth_error or "Authentication failed") - return - + try: - self._credman.delete_cred(cred_id) + def operation(ctap2: Ctap2) -> bool: + credman = self._credential_management_for_pin(ctap2, pin) + credman.delete_cred(cred_id) + return True + + self._run_with_fresh_ctap2(operation, retry=False) request.callback(True, None) except Exception as e: request.callback(False, str(e)) def _do_rename_credential(self, request: DeviceRequest): """Execute RENAME_CREDENTIAL request.""" - pin = request.args.get('pin') or self._cached_pin + pin = request.args.get('pin') cred_id = request.args.get('cred_id') new_name = request.args.get('new_name') @@ -731,19 +589,20 @@ def _do_rename_credential(self, request: DeviceRequest): self.pin_required.emit(request.operation_id) request.callback(None, "PIN required") return - - if not self._ensure_authenticated(pin): - request.callback(None, self._last_auth_error or "Authentication failed") - return - + try: - cred_id_descriptor = {"id": cred_id, "type": "public-key"} - user_info = { - "id": request.args.get('user_id', b''), - "name": new_name, - "displayName": new_name, - } - self._credman.update_user_info(cred_id_descriptor, user_info) + def operation(ctap2: Ctap2) -> bool: + credman = self._credential_management_for_pin(ctap2, pin) + cred_id_descriptor = {"id": cred_id, "type": "public-key"} + user_info = { + "id": request.args.get('user_id', b''), + "name": new_name, + "displayName": new_name, + } + credman.update_user_info(cred_id_descriptor, user_info) + return True + + self._run_with_fresh_ctap2(operation, retry=False) request.callback(True, None) except Exception as e: request.callback(False, str(e)) @@ -752,145 +611,50 @@ def _do_set_pin(self, request: DeviceRequest): """Execute SET_PIN request.""" new_pin = request.args.get('new_pin') - if not self._ensure_device(): - _log.debug("_do_set_pin: _ensure_device() failed") - request.callback(None, "Device not connected") - return - try: _log.debug("_do_set_pin: calling ClientPin.set_pin()") - if self._client_pin is None: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.set_pin(new_pin) - # Setting a fresh PIN does not unlock credential management. - # Force the next operation to request and establish a new auth token. - self._cached_pin = None - self._client_pin = None - self._pin_token = None - self._credman = None + def operation(ctap2: Ctap2) -> bool: + client_pin = ClientPin(ctap2) + client_pin.set_pin(new_pin) + return True + + self._run_with_fresh_ctap2(operation, retry=False) _log.debug("_do_set_pin: OK") request.callback(True, None) - except OSError as e: - # Stale HID handle (e.g. WinError 1167) — reopen and retry once. - _log.debug("_do_set_pin: OSError, retrying: %s", e) - self._client_pin = None - if self._reopen_device(): - try: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.set_pin(new_pin) - self._cached_pin = None - self._client_pin = None - self._pin_token = None - self._credman = None - request.callback(True, None) - return - except Exception as e2: - _log.debug("_do_set_pin: retry failed: %s", e2) - request.callback(False, str(e2)) - return - request.callback(False, str(e)) except Exception as e: _log.debug("_do_set_pin: exception: %s", e) - if self._is_retryable_channel_error(e): - self._client_pin = None - if self._reopen_device(): - try: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.set_pin(new_pin) - self._cached_pin = None - self._client_pin = None - self._pin_token = None - self._credman = None - request.callback(True, None) - return - except Exception as e2: - _log.debug("_do_set_pin: channel retry failed: %s", e2) - request.callback(False, str(e2)) - return request.callback(False, str(e)) def _do_browser_apdu(self, request: DeviceRequest): """Execute a browser APDU request (CTAPHID vendor command 0x70).""" - if not self._ensure_device(): - request.callback(None, "Device not connected") - return - try: apdu_bytes = request.args['apdu_bytes'] - if self._ctap2 is None and self._device is not None: + if self._prefers_ccid(): response = self._device.secrets().send_apdu(bytes(apdu_bytes)) request.callback(response, None) return - response = self._ctap2.device.call(0x70, bytes(apdu_bytes)) + + response = self._run_with_fresh_ctap2( + lambda ctap2: ctap2.device.call(0x70, bytes(apdu_bytes)) + ) request.callback(response, None) except Exception as e: - error_msg = str(e) - if self._is_retryable_channel_error(e) and self._reopen_device(): - try: - apdu_bytes = request.args['apdu_bytes'] - if self._ctap2 is None and self._device is not None: - response = self._device.secrets().send_apdu(bytes(apdu_bytes)) - request.callback(response, None) - return - response = self._ctap2.device.call(0x70, bytes(apdu_bytes)) - request.callback(response, None) - return - except Exception as e2: - request.callback(None, str(e2)) - return - request.callback(None, error_msg) + request.callback(None, str(e)) def _do_change_pin(self, request: DeviceRequest): """Execute CHANGE_PIN request.""" current_pin = request.args.get('current_pin') new_pin = request.args.get('new_pin') - if not self._ensure_device(): - request.callback(None, "Device not connected") - return - try: - if self._client_pin is None: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.change_pin(current_pin, new_pin) - self._cached_pin = new_pin - self._client_pin = None - self._pin_token = None - self._credman = None + def operation(ctap2: Ctap2) -> bool: + client_pin = ClientPin(ctap2) + client_pin.change_pin(current_pin, new_pin) + return True + + self._run_with_fresh_ctap2(operation, retry=False) request.callback(True, None) - except OSError as e: - _log.debug("_do_change_pin: OSError, retrying: %s", e) - self._client_pin = None - if self._reopen_device(): - try: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.change_pin(current_pin, new_pin) - self._cached_pin = new_pin - self._client_pin = None - self._pin_token = None - self._credman = None - request.callback(True, None) - return - except Exception as e2: - request.callback(False, str(e2)) - return - request.callback(False, str(e)) except Exception as e: - if self._is_retryable_channel_error(e): - self._client_pin = None - if self._reopen_device(): - try: - self._client_pin = ClientPin(self._ctap2) - self._client_pin.change_pin(current_pin, new_pin) - self._cached_pin = new_pin - self._client_pin = None - self._pin_token = None - self._credman = None - request.callback(True, None) - return - except Exception as e2: - request.callback(False, str(e2)) - return request.callback(False, str(e)) # Public API Methods @@ -1013,9 +777,9 @@ def send_browser_apdu(self, apdu_bytes: bytes, )) def set_cached_pin(self, pin: str): - """Cache PIN for the session (memory only, cleared on disconnect).""" - self._cached_pin = pin + """Deprecated compatibility hook; FIDO2 PINs are no longer cached.""" + del pin def clear_cached_pin(self): - """Clear cached PIN.""" - self._cached_pin = None + """Deprecated compatibility hook; FIDO2 PINs are no longer cached.""" + pass diff --git a/src/solo_gui/views/tabs/fido2_tab.py b/src/solo_gui/views/tabs/fido2_tab.py index d931c4e..c6b6a3c 100644 --- a/src/solo_gui/views/tabs/fido2_tab.py +++ b/src/solo_gui/views/tabs/fido2_tab.py @@ -164,6 +164,7 @@ def __init__(self): self._pin_set: bool = False self._refresh_after_pin_status: bool = False self._pending_pin_action: Optional[str] = None + self._suppress_next_pin_status_error: bool = False self._setup_ui() def _setup_ui(self) -> None: @@ -343,6 +344,7 @@ def clear_device(self) -> None: self._status_label.setText("No device connected") self._refresh_after_pin_status = False self._pending_pin_action = None + self._suppress_next_pin_status_error = False self._transport_hint_label.setVisible(False) self._restart_admin_button.setVisible(False) self._set_buttons_enabled(False) @@ -373,7 +375,13 @@ def _setup_worker(self) -> None: self._worker_thread.start() # Request PIN status after a short delay to ensure worker is ready - QTimer.singleShot(100, lambda: self._worker.get_pin_status() if self._worker else None) + QTimer.singleShot(100, lambda: self._request_pin_status(suppress_errors=True)) + + def _request_pin_status(self, *, suppress_errors: bool = False) -> None: + if not self._worker: + return + self._suppress_next_pin_status_error = suppress_errors + self._worker.get_pin_status() def _cleanup_worker(self) -> None: """Cleanup the worker thread.""" @@ -428,7 +436,7 @@ def _refresh_credentials(self) -> None: self._refresh_after_pin_status = True self._set_busy(True, "Checking current device state...") - self._worker.get_pin_status() + self._request_pin_status() def _load_credentials_after_status_refresh(self) -> None: """Load credentials after Refresh has synchronized live device state.""" @@ -479,8 +487,12 @@ def _rename_credential(self) -> None: ) if ok and new_name and self._worker: + pin = self._prompt_for_pin("Enter your device PIN to rename this credential:") + if not pin: + self._status_label.setText("PIN entry cancelled") + return self._set_busy(True, "Renaming credential...") - self._worker.rename_credential(credential, new_name) + self._worker.rename_credential(credential, new_name, pin) def _delete_credential(self) -> None: """Delete the selected credential.""" @@ -501,8 +513,18 @@ def _delete_credential(self) -> None: ) if reply == QMessageBox.Yes and self._worker: + pin = self._prompt_for_pin("Enter your device PIN to delete this credential:") + if not pin: + self._status_label.setText("PIN entry cancelled") + return self._set_busy(True, "Deleting credential...") - self._worker.delete_credential(credential) + self._worker.delete_credential(credential, pin) + + def _prompt_for_pin(self, message: str) -> Optional[str]: + dialog = PinDialog(self, title="PIN Required", message=message) + if dialog.exec() != QDialog.Accepted: + return None + return dialog.get_pin() or None def _change_pin(self) -> None: """Change the FIDO2 PIN.""" @@ -511,7 +533,7 @@ def _change_pin(self) -> None: self._pending_pin_action = "change" self._set_busy(True, "Checking current PIN status...") - self._worker.get_pin_status() + self._request_pin_status() def _show_change_pin_dialog(self) -> None: dialog = ChangePinDialog(self, is_new_pin=False) @@ -529,7 +551,7 @@ def _set_pin(self) -> None: self._pending_pin_action = "set" self._set_busy(True, "Checking current PIN status...") - self._worker.get_pin_status() + self._request_pin_status() def _show_set_pin_dialog(self) -> None: dialog = ChangePinDialog(self, is_new_pin=True) @@ -551,8 +573,7 @@ def _on_pin_required(self) -> None: if dialog.exec() == QDialog.Accepted: pin = dialog.get_pin() if pin and self._worker: - # Pass PIN directly to load_credentials rather than pre-caching it. - # The worker caches it only on successful verification. + # Pass the PIN only for this credential-management operation. self._set_busy(True, "Loading credentials...") self._worker.load_credentials(pin) else: @@ -563,8 +584,7 @@ def _update_pin_status(self) -> None: if not self._worker: return - # Call get_pin_status on worker - self._worker.get_pin_status() + self._request_pin_status(suppress_errors=True) def _on_credential_deleted(self, success: bool, error: str) -> None: """Handle credential deletion result.""" @@ -596,6 +616,7 @@ def _on_pin_changed(self, success: bool, error: str) -> None: def _on_pin_status_updated(self, status: dict) -> None: """Handle PIN status update.""" + self._suppress_next_pin_status_error = False ctap2_available = status.get("ctap2_available", True) pin_set = status.get("pin_set", False) retries = status.get("pin_retries") @@ -703,6 +724,13 @@ def _on_error_occurred(self, error: str) -> None: self._refresh_after_pin_status = False self._pending_pin_action = None self._set_busy(False) + self._change_pin_button.setEnabled(False) + self._set_pin_button.setEnabled(False) + if self._suppress_next_pin_status_error: + self._suppress_next_pin_status_error = False + self._status_label.setText(f"Could not refresh FIDO2 PIN status: {error}") + return + self._suppress_next_pin_status_error = False # PIN not set is informational, not an error if "PIN not set" in error: QMessageBox.information(self, "PIN Required", error) diff --git a/src/solo_gui/workers/fido2_worker.py b/src/solo_gui/workers/fido2_worker.py index ac80727..6c19821 100644 --- a/src/solo_gui/workers/fido2_worker.py +++ b/src/solo_gui/workers/fido2_worker.py @@ -1,7 +1,5 @@ """FIDO2 worker thread for SoloKeys GUI using DeviceManager.""" -from typing import Optional - from PySide6.QtCore import QObject, Signal from solo2.fido2 import Fido2Credential @@ -22,13 +20,11 @@ class Fido2Worker(QObject): def __init__(self): super().__init__() self._device_manager = DeviceManager.get_instance() - self._pin: Optional[str] = None self._current_op_id: str = "" def set_pin(self, pin: str) -> None: - """Set the PIN for authenticated operations.""" - self._pin = pin - self._device_manager.set_cached_pin(pin) + """Deprecated compatibility hook; FIDO2 PINs are not cached.""" + del pin def get_pin_status(self) -> None: """Get current PIN status and retry counters.""" @@ -59,6 +55,9 @@ def on_info(result, error): # Get PIN retries separately if PIN is set if status['pin_set']: def on_retries(retries, err): + # Retry counters are optional. Some platform/firmware + # combinations reject GET_PIN_RETRIES even though + # getInfo correctly reports clientPin=true. if not err: status['pin_retries'] = retries self.pin_status_updated.emit(status) @@ -69,9 +68,8 @@ def on_retries(retries, err): self._device_manager.get_info(on_info, operation_id=self._current_op_id) - def load_credentials(self, pin: Optional[str] = None) -> None: + def load_credentials(self, pin: str | None = None) -> None: """Load all FIDO2 credentials from the device.""" - pin_to_use = pin or self._pin self._current_op_id = "fido2_load_creds" def on_loaded(credentials, error): @@ -99,12 +97,10 @@ def on_loaded(credentials, error): )) self.credentials_loaded.emit(cred_objects) - self._device_manager.get_credentials(pin_to_use, on_loaded, operation_id=self._current_op_id) + self._device_manager.get_credentials(pin, on_loaded, operation_id=self._current_op_id) - def delete_credential(self, credential: Fido2Credential, pin: Optional[str] = None) -> None: + def delete_credential(self, credential: Fido2Credential, pin: str | None = None) -> None: """Delete a FIDO2 credential from the device.""" - pin_to_use = pin or self._pin - def on_deleted(result, error): if error: self.credential_deleted.emit(False, str(error)) @@ -112,14 +108,13 @@ def on_deleted(result, error): self.credential_deleted.emit(True, "") self._device_manager.delete_credential( - credential.cred_id, pin_to_use, on_deleted, operation_id="fido2_delete" + credential.cred_id, pin, on_deleted, operation_id="fido2_delete" ) - def rename_credential(self, credential: Fido2Credential, new_name: str, - pin: Optional[str] = None) -> None: + def rename_credential( + self, credential: Fido2Credential, new_name: str, pin: str | None = None + ) -> None: """Rename a FIDO2 credential.""" - pin_to_use = pin or self._pin - def on_renamed(result, error): if error: self.credential_renamed.emit(False, str(error)) @@ -134,7 +129,7 @@ def on_renamed(result, error): user_id = user_id.encode() self._device_manager.rename_credential( - credential.cred_id, new_name, user_id, pin_to_use, + credential.cred_id, new_name, user_id, pin, on_renamed, operation_id="fido2_rename" ) @@ -151,10 +146,6 @@ def on_set(result, error): else: self.pin_changed.emit(False, f"Failed to set PIN: {error}") else: - # A freshly set PIN should be re-entered on the next privileged - # operation so the GUI establishes a clean auth session. - self._pin = None - self._device_manager.clear_cached_pin() self.pin_changed.emit(True, "") self._device_manager.set_pin(new_pin, on_set, operation_id="fido2_set_pin") @@ -174,8 +165,6 @@ def on_changed(result, error): else: self.pin_changed.emit(False, f"Failed to change PIN: {error}") else: - self._pin = new_pin - self._device_manager.set_cached_pin(new_pin) self.pin_changed.emit(True, "") self._device_manager.change_pin(current_pin, new_pin, on_changed, operation_id="fido2_change_pin")