diff --git a/examples/user_onboarding__create_and_login.py b/examples/user_onboarding__create_and_login.py index 9b3802038..c1e9c4fe7 100644 --- a/examples/user_onboarding__create_and_login.py +++ b/examples/user_onboarding__create_and_login.py @@ -23,8 +23,10 @@ - For production, leverage the get_user_vault function to create and log into vaults, along with any KeeperCommander method to add content. ''' -USER_A = 'infra@disposable-domain.work.gd' -USER_B = 'devops@disposable-domain.work.gd' +USER_A = 'infra@email.com' +USER_B = 'devops@email.com' +CSV_FILE = 'csv_file.csv' +JSON_FILE = 'json_file.json' from keepercommander.params import KeeperParams from keepercommander import api @@ -162,10 +164,10 @@ def get_user_vault(admin_params, user, folder=None, password_length=20, replace_ from keepercommander.importer.imp_exp import _import as run_import # Run CSV import for User A -run_import(user_a_params, 'csv', 'csv_file.csv') +run_import(user_a_params, 'csv', CSV_FILE) # Run JSON import for User B -run_import(user_b_params, 'json', 'json_file.json') +run_import(user_b_params, 'json', JSON_FILE) # Re-expire Master Passwords eu.execute(admin_params, email=[USER_A,USER_B], expire=True, force=True) diff --git a/keepercommander/auth/console_ui.py b/keepercommander/auth/console_ui.py index e930cc829..0145d10b1 100644 --- a/keepercommander/auth/console_ui.py +++ b/keepercommander/auth/console_ui.py @@ -23,38 +23,38 @@ def __init__(self): def on_device_approval(self, step): if self._show_device_approval_help: - print(f"\n{Fore.YELLOW}Device Approval Required{Fore.RESET}\n") - print(f"{Fore.CYAN}Select an approval method:{Fore.RESET}") - print(f" {Fore.GREEN}1{Fore.RESET}. Email - Send approval link to your email") - print(f" {Fore.GREEN}2{Fore.RESET}. Keeper Push - Send notification to an approved device") - print(f" {Fore.GREEN}3{Fore.RESET}. 2FA Push - Send code via your 2FA method") - print() - print(f" {Fore.GREEN}c{Fore.RESET}. Enter code - Enter a verification code") - print(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") - print() + logging.info(f"\n{Fore.YELLOW}Device Approval Required{Fore.RESET}\n") + logging.info(f"{Fore.CYAN}Select an approval method:{Fore.RESET}") + logging.info(f" {Fore.GREEN}1{Fore.RESET}. Email - Send approval link to your email") + logging.info(f" {Fore.GREEN}2{Fore.RESET}. Keeper Push - Send notification to an approved device") + logging.info(f" {Fore.GREEN}3{Fore.RESET}. 2FA Push - Send code via your 2FA method") + logging.info("") + logging.info(f" {Fore.GREEN}c{Fore.RESET}. Enter code - Enter a verification code") + logging.info(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") + logging.info("") self._show_device_approval_help = False else: - print(f"\n{Fore.YELLOW}Waiting for device approval.{Fore.RESET}") - print(f"{Fore.CYAN}Check email, SMS, or push notification on the approved device.{Fore.RESET}") - print(f"Enter {Fore.GREEN}c {Fore.RESET} to submit a verification code.\n") + logging.info(f"\n{Fore.YELLOW}Waiting for device approval.{Fore.RESET}") + logging.info(f"{Fore.CYAN}Check email, SMS, or push notification on the approved device.{Fore.RESET}") + logging.info(f"Enter {Fore.GREEN}c {Fore.RESET} to submit a verification code.\n") try: selection = input(f'{Fore.GREEN}Selection{Fore.RESET} (or Enter to check status): ').strip().lower() if selection == '1' or selection == 'email_send' or selection == 'es': step.send_push(login_steps.DeviceApprovalChannel.Email) - print(f"\n{Fore.GREEN}Email sent to {step.username}{Fore.RESET}") - print("Click the approval link in the email, then press Enter.\n") + logging.info(f"\n{Fore.GREEN}Email sent to {step.username}{Fore.RESET}") + logging.info("Click the approval link in the email, then press Enter.\n") elif selection == '2' or selection == 'keeper_push' or selection == 'kp': step.send_push(login_steps.DeviceApprovalChannel.KeeperPush) - print(f"\n{Fore.GREEN}Push notification sent.{Fore.RESET}") - print("Approve on your device, then press Enter.\n") + logging.info(f"\n{Fore.GREEN}Push notification sent.{Fore.RESET}") + logging.info("Approve on your device, then press Enter.\n") elif selection == '3' or selection == '2fa_send' or selection == '2fs': step.send_push(login_steps.DeviceApprovalChannel.TwoFactor) - print(f"\n{Fore.GREEN}2FA code sent.{Fore.RESET}") - print("Enter the code using option 'c'.\n") + logging.info(f"\n{Fore.GREEN}2FA code sent.{Fore.RESET}") + logging.info("Enter the code using option 'c'.\n") elif selection == 'c' or selection.startswith('c '): # Support both "c" (prompts for code) and "c " (code inline) @@ -67,23 +67,23 @@ def on_device_approval(self, step): # Try email code first, then 2FA try: step.send_code(login_steps.DeviceApprovalChannel.Email, code_input) - print(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") except KeeperApiError: try: step.send_code(login_steps.DeviceApprovalChannel.TwoFactor, code_input) - print(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") except KeeperApiError as e: - print(f"{Fore.YELLOW}Invalid code. Please try again.{Fore.RESET}") + logging.warning(f"{Fore.YELLOW}Invalid code. Please try again.{Fore.RESET}") elif selection.startswith("email_code="): code = selection.replace("email_code=", "") step.send_code(login_steps.DeviceApprovalChannel.Email, code) - print(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified email code.{Fore.RESET}") elif selection.startswith("2fa_code="): code = selection.replace("2fa_code=", "") step.send_code(login_steps.DeviceApprovalChannel.TwoFactor, code) - print(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") + logging.info(f"{Fore.GREEN}Successfully verified 2FA code.{Fore.RESET}") elif selection == 'q': step.cancel() @@ -92,14 +92,12 @@ def on_device_approval(self, step): step.resume() else: - print(f"{Fore.YELLOW}Invalid selection. Enter 1, 2, 3, c, q, or press Enter.{Fore.RESET}") + logging.warning(f"{Fore.YELLOW}Invalid selection. Enter 1, 2, 3, c, q, or press Enter.{Fore.RESET}") except KeyboardInterrupt: step.cancel() except KeeperApiError as kae: - print() - print(f'{Fore.YELLOW}{kae.message}{Fore.RESET}') - pass + logging.warning(f'{Fore.YELLOW}{kae.message}{Fore.RESET}') @staticmethod def two_factor_channel_to_desc(channel): # type: (login_steps.TwoFactorChannel) -> str @@ -122,13 +120,13 @@ def on_two_factor(self, step): channels = step.get_channels() if self._show_two_factor_help: - print(f"\n{Fore.YELLOW}Two-Factor Authentication Required{Fore.RESET}\n") - print(f"{Fore.CYAN}Select your 2FA method:{Fore.RESET}") + logging.info(f"\n{Fore.YELLOW}Two-Factor Authentication Required{Fore.RESET}\n") + logging.info(f"{Fore.CYAN}Select your 2FA method:{Fore.RESET}") for i in range(len(channels)): channel = channels[i] - print(f" {Fore.GREEN}{i+1}{Fore.RESET}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)} {channel.channel_name} {channel.phone}") - print(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") - print() + logging.info(f" {Fore.GREEN}{i+1}{Fore.RESET}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)} {channel.channel_name} {channel.phone}") + logging.info(f" {Fore.GREEN}q{Fore.RESET}. Cancel login") + logging.info("") self._show_device_approval_help = False channel = None # type: Optional[login_steps.TwoFactorChannelInfo] @@ -143,9 +141,9 @@ def on_two_factor(self, step): channel = channels[idx-1] logging.debug(f"Selected {idx}. {ConsoleLoginUi.two_factor_channel_to_desc(channel.channel_type)}") else: - print("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") + logging.warning("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") else: - print("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") + logging.warning("Invalid entry, additional factors of authentication shown may be configured if not currently enabled.") mfa_prompt = False @@ -155,9 +153,9 @@ def on_two_factor(self, step): mfa_prompt = True try: step.send_push(channel.channel_uid, login_steps.TwoFactorPushAction.TextMessage) - print(f'\n{Fore.GREEN}SMS sent successfully.{Fore.RESET}\n') + logging.info(f'\n{Fore.GREEN}SMS sent successfully.{Fore.RESET}\n') except KeeperApiError: - print("Was unable to send SMS.") + logging.warning("Was unable to send SMS.") elif channel.channel_type == login_steps.TwoFactorChannel.SecurityKey: try: from ..yubikey.yubikey import yubikey_authenticate @@ -178,14 +176,14 @@ def on_two_factor(self, step): } step.duration = login_steps.TwoFactorDuration.EveryLogin step.send_code(channel.channel_uid, json.dumps(signature)) - print(f'{Fore.GREEN}Security key verified.{Fore.RESET}') + logging.info(f'{Fore.GREEN}Security key verified.{Fore.RESET}') except ImportError as e: from ..yubikey import display_fido2_warning display_fido2_warning() logging.warning(e) except KeeperApiError: - print(f'{Fore.RED}Unable to verify security key.{Fore.RESET}') + logging.error(f'{Fore.RED}Unable to verify security key.{Fore.RESET}') except Exception as e: logging.error(e) @@ -228,7 +226,7 @@ def on_two_factor(self, step): 'Ask Every 24 hours' if mfa_expiration == login_steps.TwoFactorDuration.Every24Hours else 'Ask Every 30 days', "|".join(allowed_expirations)) - print(prompt_exp) + logging.info(prompt_exp) try: answer = input(f'\n{Fore.GREEN}Enter 2FA Code: {Fore.RESET}') @@ -240,7 +238,7 @@ def on_two_factor(self, step): if m_duration: answer = m_duration.group(1).strip().lower() if answer not in allowed_expirations: - print(f'Invalid 2FA Duration: {answer}') + logging.warning(f'Invalid 2FA Duration: {answer}') answer = '' if answer == 'login': @@ -264,16 +262,16 @@ def on_two_factor(self, step): step.duration = mfa_expiration try: step.send_code(channel.channel_uid, otp_code) - print(f'{Fore.GREEN}2FA code verified.{Fore.RESET}') + logging.info(f'{Fore.GREEN}2FA code verified.{Fore.RESET}') except KeeperApiError: - print(f'{Fore.YELLOW}Invalid 2FA code. Please try again.{Fore.RESET}') + logging.warning(f'{Fore.YELLOW}Invalid 2FA code. Please try again.{Fore.RESET}') def on_password(self, step): if self._show_password_help: logging.info(f'{Fore.CYAN}Enter master password for {Fore.WHITE}{step.username}{Fore.RESET}') if self._failed_password_attempt > 0: - print(f'{Fore.YELLOW}Forgot password? Type "recover"{Fore.RESET}') + logging.info(f'{Fore.YELLOW}Forgot password? Type "recover"{Fore.RESET}') password = getpass.getpass(prompt=f'{Fore.GREEN}Password: {Fore.RESET}', stream=None) if not password: @@ -284,7 +282,7 @@ def on_password(self, step): try: step.verify_password(password) except KeeperApiError as kae: - print(kae.message) + logging.warning(kae.message) except KeyboardInterrupt: step.cancel() @@ -300,19 +298,19 @@ def on_sso_redirect(self, step): wb = None sp_url = step.sso_login_url - print(f'\n{Fore.CYAN}SSO Login URL:{Fore.RESET}\n{sp_url}\n') + logging.info(f'\n{Fore.CYAN}SSO Login URL:{Fore.RESET}\n{sp_url}\n') if self._show_sso_redirect_help: - print(f'{Fore.CYAN}Navigate to SSO Login URL with your browser and complete login.{Fore.RESET}') - print(f'{Fore.CYAN}Copy the returned SSO Token and paste it here.{Fore.RESET}') - print(f'{Fore.YELLOW}TIP: Click "Copy login token" button on the SSO Connect page.{Fore.RESET}') - print('') - print(f' {Fore.GREEN}a{Fore.RESET}. SSO User with a Master Password') - print(f' {Fore.GREEN}c{Fore.RESET}. Copy SSO Login URL to clipboard') + logging.info(f'{Fore.CYAN}Navigate to SSO Login URL with your browser and complete login.{Fore.RESET}') + logging.info(f'{Fore.CYAN}Copy the returned SSO Token and paste it here.{Fore.RESET}') + logging.info(f'{Fore.YELLOW}TIP: Click "Copy login token" button on the SSO Connect page.{Fore.RESET}') + logging.info('') + logging.info(f' {Fore.GREEN}a{Fore.RESET}. SSO User with a Master Password') + logging.info(f' {Fore.GREEN}c{Fore.RESET}. Copy SSO Login URL to clipboard') if wb: - print(f' {Fore.GREEN}o{Fore.RESET}. Open SSO Login URL in web browser') - print(f' {Fore.GREEN}p{Fore.RESET}. Paste SSO Token from clipboard') - print(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') - print() + logging.info(f' {Fore.GREEN}o{Fore.RESET}. Open SSO Login URL in web browser') + logging.info(f' {Fore.GREEN}p{Fore.RESET}. Paste SSO Token from clipboard') + logging.info(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') + logging.info('') self._show_sso_redirect_help = False while True: @@ -331,25 +329,25 @@ def on_sso_redirect(self, step): token = None try: pyperclip.copy(sp_url) - print('SSO Login URL is copied to clipboard.') + logging.info('SSO Login URL is copied to clipboard.') except: - print('Failed to copy SSO Login URL to clipboard.') + logging.warning('Failed to copy SSO Login URL to clipboard.') elif token == 'o': token = None if wb: try: wb.open_new_tab(sp_url) except: - print('Failed to open web browser.') + logging.warning('Failed to open web browser.') elif token == 'p': try: token = pyperclip.paste() except: token = '' - logging.info('Failed to paste from clipboard') + logging.warning('Failed to paste from clipboard') else: if len(token) < 10: - print(f'Unsupported menu option: {token}') + logging.warning(f'Unsupported menu option: {token}') token = None if token: step.set_sso_token(token) @@ -357,14 +355,14 @@ def on_sso_redirect(self, step): def on_sso_data_key(self, step): if self._show_sso_data_key_help: - print(f'\n{Fore.YELLOW}Device Approval Required for SSO{Fore.RESET}\n') - print(f'{Fore.CYAN}Select an approval method:{Fore.RESET}') - print(f' {Fore.GREEN}1{Fore.RESET}. Keeper Push - Send a push notification to your device') - print(f' {Fore.GREEN}2{Fore.RESET}. Admin Approval - Request your admin to approve this device') - print('') - print(f' {Fore.GREEN}r{Fore.RESET}. Resume SSO login after device is approved') - print(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') - print() + logging.info(f'\n{Fore.YELLOW}Device Approval Required for SSO{Fore.RESET}\n') + logging.info(f'{Fore.CYAN}Select an approval method:{Fore.RESET}') + logging.info(f' {Fore.GREEN}1{Fore.RESET}. Keeper Push - Send a push notification to your device') + logging.info(f' {Fore.GREEN}2{Fore.RESET}. Admin Approval - Request your admin to approve this device') + logging.info('') + logging.info(f' {Fore.GREEN}r{Fore.RESET}. Resume SSO login after device is approved') + logging.info(f' {Fore.GREEN}q{Fore.RESET}. Cancel SSO login') + logging.info('') self._show_sso_data_key_help = False while True: @@ -384,4 +382,4 @@ def on_sso_data_key(self, step): elif answer == '2': step.request_data_key(login_steps.DataKeyShareChannel.AdminApproval) else: - print(f'Action \"{answer}\" is not supported.') + logging.warning(f'Action \"{answer}\" is not supported.') diff --git a/keepercommander/commands/_supershell_impl.py b/keepercommander/commands/_supershell_impl.py index 8c0fc8e92..53057e2e0 100644 --- a/keepercommander/commands/_supershell_impl.py +++ b/keepercommander/commands/_supershell_impl.py @@ -17,8 +17,25 @@ from pathlib import Path from typing import Optional, List, Dict, Any import pyperclip +from pyperclip import PyperclipException from rich.markup import escape as rich_escape + +def safe_copy_to_clipboard(text: str) -> tuple[bool, str]: + """Safely copy text to clipboard, handling missing clipboard on remote/headless systems. + + Returns: + (True, "") on success + (False, error_message) on failure + """ + try: + pyperclip.copy(text) + return True, "" + except PyperclipException: + return False, "Clipboard not available (no X11/Wayland)" + except Exception as e: + return False, str(e) + # Import from refactored modules from .supershell.themes import COLOR_THEMES from .supershell.utils import load_preferences, save_preferences, strip_ansi_codes @@ -39,7 +56,7 @@ from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical, VerticalScroll, Center, Middle -from textual.widgets import Tree, DataTable, Footer, Header, Static, Input, Label, Button +from textual.widgets import Tree, DataTable, Footer, Header, Static, Input, Label, Button, TextArea from textual.binding import Binding from textual.screen import Screen, ModalScreen from textual.reactive import reactive @@ -47,7 +64,329 @@ from textual.message import Message from textual.timer import Timer from rich.text import Text -from textual.events import Click, MouseDown, Paste +from textual.events import Click, MouseDown, MouseUp, MouseMove, Paste + +# === DEBUG EVENT LOGGING === +# Set to True to log all mouse/keyboard events to /tmp/supershell_debug.log +# tail -f /tmp/supershell_debug.log to watch events in real-time +DEBUG_EVENTS = False +_debug_log_file = None + +def _debug_log(msg: str): + """Log debug message to /tmp/supershell_debug.log if DEBUG_EVENTS is True.""" + if not DEBUG_EVENTS: + return + global _debug_log_file + try: + if _debug_log_file is None: + _debug_log_file = open('/tmp/supershell_debug.log', 'a') + import datetime + timestamp = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] + _debug_log_file.write(f"[{timestamp}] {msg}\n") + _debug_log_file.flush() + except Exception as e: + pass # Silently fail if logging fails +# === END DEBUG EVENT LOGGING === + + +class AutoCopyTextArea(TextArea): + """TextArea that auto-copies selected text to clipboard on mouse release. + + Behavior matches standard Linux terminal: + - Click and drag to select text + - Double-click to select word, drag to extend from word boundaries + - On mouse up, automatically copy selection to clipboard + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + import time + self._last_click_time = 0.0 + self._last_click_pos = (0, 0) + self._word_select_mode = False + self._word_anchor_start = None # (row, col) + self._word_anchor_end = None # (row, col) + + async def _on_mouse_down(self, event: MouseDown) -> None: + """Handle mouse down - detect double-click for word selection.""" + import time + current_time = time.time() + current_pos = (event.x, event.y) + + # Check for double-click (within 500ms and reasonably close position) + time_ok = (current_time - self._last_click_time) < 0.5 + pos_ok = (abs(current_pos[0] - self._last_click_pos[0]) <= 10 and + abs(current_pos[1] - self._last_click_pos[1]) <= 5) + is_double_click = time_ok and pos_ok + + # Update click tracking + self._last_click_time = current_time + self._last_click_pos = current_pos + + if is_double_click: + # Double-click: select word and prepare for drag + self._select_word_at_position(event) + else: + # Single click: reset word mode and do normal selection + self._word_select_mode = False + self._word_anchor_start = None + self._word_anchor_end = None + await super()._on_mouse_down(event) + + def _select_word_at_position(self, event: MouseDown) -> None: + """Select the word at the mouse position.""" + try: + location = self.get_target_document_location(event) + row, col = location + + lines = self.text.split('\n') + if row >= len(lines): + return + line = lines[row] + if col > len(line): + col = len(line) + + # Find word boundaries (whitespace-delimited) + start = col + while start > 0 and not line[start - 1].isspace(): + start -= 1 + + end = col + while end < len(line) and not line[end].isspace(): + end += 1 + + if start == end: + # No word at this position + return + + # Store anchors for potential drag extension + self._word_anchor_start = (row, start) + self._word_anchor_end = (row, end) + self._word_select_mode = True + + # Select the word + from textual.widgets.text_area import Selection + self.selection = Selection((row, start), (row, end)) + + # Set up for potential drag (like parent's _on_mouse_down) + self._selecting = True + self.capture_mouse() + self._pause_blink(visible=False) + self.history.checkpoint() + + except Exception as e: + _debug_log(f"AutoCopyTextArea._select_word_at_position error: {e}") + # On error, fall back to normal behavior + self._word_select_mode = False + + async def _on_mouse_move(self, event: MouseMove) -> None: + """Handle mouse move - extend selection if dragging.""" + if not self._selecting: + return + + try: + target = self.get_target_document_location(event) + from textual.widgets.text_area import Selection + + if self._word_select_mode and self._word_anchor_start: + # Word-select mode: anchor to original word boundaries + anchor_start = self._word_anchor_start + anchor_end = self._word_anchor_end + + if target < anchor_start: + self.selection = Selection(target, anchor_end) + elif target > anchor_end: + self.selection = Selection(anchor_start, target) + else: + self.selection = Selection(anchor_start, anchor_end) + else: + # Normal drag: extend from original click position + selection_start, _ = self.selection + self.selection = Selection(selection_start, target) + except Exception: + pass + + async def _on_mouse_up(self, event: MouseUp) -> None: + """Handle mouse up - finalize selection and copy.""" + # Clean up word select state + self._word_select_mode = False + + # Let parent finalize selection mode + self._end_mouse_selection() + + # Always try to copy - _auto_copy_if_selected checks if there's actual selection + self._auto_copy_if_selected() + + def _on_click(self, event: Click) -> None: + """Handle click events - double-click selects and copies word.""" + # Double-click: select word and copy (backup for mouse_down detection) + if event.chain >= 2: + try: + location = self.get_target_document_location(event) + row, col = location + + lines = self.text.split('\n') + if row < len(lines): + line = lines[row] + if col > len(line): + col = len(line) + + # Find word boundaries + start = col + while start > 0 and not line[start - 1].isspace(): + start -= 1 + end = col + while end < len(line) and not line[end].isspace(): + end += 1 + + if start < end: + word = line[start:end] + # Select and copy the word + from textual.widgets.text_area import Selection + self.selection = Selection((row, start), (row, end)) + # Copy immediately + success, err = safe_copy_to_clipboard(word) + if success: + preview = word[:40] + ('...' if len(word) > 40 else '') + self.app.notify(f"Copied: {preview}", severity="information") + else: + self.app.notify(f"⚠️ {err}", severity="warning") + except Exception: + pass + event.stop() + return + # Let parent handle single clicks + super()._on_click(event) + + def _auto_copy_if_selected(self) -> None: + """Copy selected text to clipboard if any.""" + try: + selected = self.selected_text + _debug_log(f"AutoCopyTextArea: selected_text={selected!r}") + if selected and selected.strip(): + success, err = safe_copy_to_clipboard(selected) + if success: + preview = selected[:40] + ('...' if len(selected) > 40 else '') + preview = preview.replace('\n', ' ') + # Use app.notify() instead of widget's notify() + self.app.notify(f"Copied: {preview}", severity="information") + _debug_log(f"AutoCopyTextArea: Copied to clipboard") + else: + self.app.notify(f"⚠️ {err}", severity="warning") + except Exception as e: + _debug_log(f"AutoCopyTextArea: Error: {e}") + + +class ShellInputTextArea(TextArea): + """TextArea for shell command input with Enter-to-execute behavior. + + Features: + - Enter executes command instead of inserting newline + - Soft wrapping for long commands + - Multi-line display + - Integrates with shell history navigation + """ + + def __init__(self, app_ref: 'SuperShellApp', *args, **kwargs): + # Set defaults for shell input behavior + kwargs.setdefault('soft_wrap', True) + kwargs.setdefault('show_line_numbers', False) + kwargs.setdefault('tab_behavior', 'focus') # Tab cycles focus, not inserts tab + super().__init__(*args, **kwargs) + self._app_ref = app_ref + + async def _on_key(self, event) -> None: + """Intercept keys for shell-specific behavior.""" + # Enter executes command instead of inserting newline + if event.key == "enter": + command = self.text.strip() + self.clear() # Clear immediately for responsiveness + if command: + # Execute asynchronously with loading indicator + self._app_ref._execute_shell_command_async(command) + event.prevent_default() + event.stop() + return + + # Up arrow navigates history + if event.key == "up": + if self._app_ref.shell_command_history: + if self._app_ref.shell_history_index < len(self._app_ref.shell_command_history) - 1: + self._app_ref.shell_history_index += 1 + history_cmd = self._app_ref.shell_command_history[-(self._app_ref.shell_history_index + 1)] + self.clear() + self.insert(history_cmd) + event.prevent_default() + event.stop() + return + + # Down arrow navigates history + if event.key == "down": + if self._app_ref.shell_history_index > 0: + self._app_ref.shell_history_index -= 1 + history_cmd = self._app_ref.shell_command_history[-(self._app_ref.shell_history_index + 1)] + self.clear() + self.insert(history_cmd) + elif self._app_ref.shell_history_index == 0: + self._app_ref.shell_history_index = -1 + self.clear() + event.prevent_default() + event.stop() + return + + # Ctrl+U clears the input (bash-like) + if event.key == "ctrl+u": + self.clear() + self._app_ref.shell_history_index = -1 + event.prevent_default() + event.stop() + return + + # Ctrl+D closes shell pane + if event.key == "ctrl+d": + self._app_ref._close_shell_pane() + event.prevent_default() + event.stop() + return + + # Escape unfocuses the input + if event.key == "escape": + self._app_ref.shell_input_active = False + tree = self._app_ref.query_one("#folder_tree") + tree.focus() + self._app_ref._update_status("Shell open | Tab to cycle | press Enter in shell to run commands") + event.prevent_default() + event.stop() + return + + # Tab cycles to search mode + if event.key == "tab": + from textual.widgets import Tree + self._app_ref.shell_input_active = False + self._app_ref.search_input_active = True + tree = self._app_ref.query_one("#folder_tree", Tree) + tree.add_class("search-input-active") + tree.focus() # Search mode works with tree focused + self._app_ref._update_search_display(perform_search=False) + self._app_ref._update_status("Type to search | Tab to tree | Ctrl+U to clear") + event.prevent_default() + event.stop() + return + + # Shift+Tab cycles to detail pane + if event.key == "shift+tab": + from textual.containers import VerticalScroll + self._app_ref.shell_input_active = False + detail_scroll = self._app_ref.query_one("#record_detail", VerticalScroll) + detail_scroll.focus() + self._app_ref._update_status("Detail pane | Tab to shell | Shift+Tab to tree") + event.prevent_default() + event.stop() + return + + # Let parent TextArea handle all other keys (typing, backspace, cursor movement, etc.) + await super()._on_key(event) + from ..commands.base import Command @@ -309,27 +648,73 @@ class SuperShellApp(App): border-bottom: solid #333333; } - #shell_output { + #shell_output_content { height: 1fr; - overflow-y: auto; + background: #000000; + color: #ffffff; + border: none; padding: 0 1; + } + + /* Theme-specific selection colors for shell output */ + #shell_output_content.theme-green .text-area--selection { + background: #004400; + } + #shell_output_content.theme-blue .text-area--selection { + background: #002244; + } + #shell_output_content.theme-magenta .text-area--selection { + background: #330033; + } + #shell_output_content.theme-yellow .text-area--selection { + background: #333300; + } + #shell_output_content.theme-white .text-area--selection { + background: #444444; + } + /* Default fallback */ + #shell_output_content .text-area--selection { + background: #004400; + } + + /* Shell input container with prompt and TextArea */ + #shell_input_container { + height: auto; + min-height: 3; + max-height: 6; background: #000000; + border-top: solid #333333; + border-bottom: solid #333333; + padding: 0 1; } - #shell_output_content { + #shell_prompt { + width: 2; + height: 100%; background: #000000; - color: #ffffff; + color: #00ff00; + padding: 0; } - #shell_input_line { - height: 1; - background: #111111; + /* Shell input area - multi-line TextArea for command entry */ + #shell_input_area { + width: 1fr; + height: auto; + min-height: 1; + max-height: 5; + background: #000000; color: #00ff00; - padding: 0 1; + border: none; + padding: 0; + } + + #shell_input_area:focus { + background: #000000; } - #shell_pane:focus-within #shell_input_line { - background: #1a1a2e; + #shell_input_area .text-area--cursor { + color: #00ff00; + background: #00ff00; } """ @@ -400,6 +785,7 @@ def __init__(self, params): self.shell_input_active = False self.shell_command_history = [] # For up/down arrow navigation self.shell_history_index = -1 + # Shell output uses TextArea widget with built-in selection support # Load color theme from preferences prefs = load_preferences() self.color_theme = prefs.get('color_theme', 'green') @@ -459,10 +845,20 @@ def restore_expanded(node): # Update header info (email/username colors) self._update_header_info_display() - + # Update CSS dynamically for tree selection/hover self._apply_theme_css() + # Update shell output selection color theme + try: + shell_output = self.query_one("#shell_output_content") + # Remove old theme classes and add new one + for old_theme in COLOR_THEMES.keys(): + shell_output.remove_class(f"theme-{old_theme}") + shell_output.add_class(f"theme-{theme_name}") + except Exception: + pass # Shell pane might not exist yet + def notify(self, message, *, title="", severity="information", timeout=1.5): """Override notify to use faster timeout (default 1.5s instead of 5s)""" super().notify(message, title=title, severity=severity, timeout=timeout) @@ -547,9 +943,12 @@ def compose(self) -> ComposeResult: # Shell pane - hidden by default, shown when :command or Ctrl+\ pressed with Vertical(id="shell_pane"): yield Static("", id="shell_header") - with VerticalScroll(id="shell_output"): - yield Static("", id="shell_output_content") - yield Static("", id="shell_input_line") + # AutoCopyTextArea auto-copies selected text on mouse release + yield AutoCopyTextArea("", id="shell_output_content", read_only=True, classes=f"theme-{self.color_theme}") + # Shell input line with prompt and TextArea + with Horizontal(id="shell_input_container"): + yield Static("❯ ", id="shell_prompt") + yield ShellInputTextArea(self, "", id="shell_input_area") # Status bar at very bottom yield Static("", id="status_bar") @@ -3100,42 +3499,63 @@ def _update_shortcuts_bar(self, record_selected: bool = False, folder_selected: @on(Click, "#search_bar, #search_display") def on_search_bar_click(self, event: Click) -> None: """Activate search mode when search bar is clicked""" + _debug_log(f"CLICK: search_bar x={event.x} y={event.y} button={event.button} " + f"shift={event.shift} ctrl={event.ctrl} meta={event.meta}") tree = self.query_one("#folder_tree", Tree) + + # Deactivate shell input if it was active + if self.shell_input_active: + self.shell_input_active = False + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.blur() # Remove focus from shell input + except Exception: + pass + self.search_input_active = True tree.add_class("search-input-active") + tree.focus() # Focus tree so keyboard events go to search handler self._update_search_display(perform_search=False) # Don't change tree when entering search self._update_status("Type to search | Tab to navigate | Ctrl+U to clear") event.stop() + _debug_log(f"CLICK: search_bar -> stopped") @on(Click, "#user_info") def on_user_info_click(self, event: Click) -> None: """Show whoami info when user info is clicked""" + _debug_log(f"CLICK: user_info x={event.x} y={event.y}") self._display_whoami_info() event.stop() + _debug_log(f"CLICK: user_info -> stopped") @on(Click, "#device_status_info") def on_device_status_click(self, event: Click) -> None: """Show device info when Stay Logged In / Logout section is clicked""" + _debug_log(f"CLICK: device_status_info x={event.x} y={event.y}") self._display_device_info() event.stop() + _debug_log(f"CLICK: device_status_info -> stopped") - @on(Click, "#shell_pane, #shell_output, #shell_output_content, #shell_input_line, #shell_header") + @on(Click, "#shell_pane, #shell_input_area, #shell_header") def on_shell_pane_click(self, event: Click) -> None: - """Handle clicks in shell pane - activate shell input and stop propagation. + """Handle clicks in shell pane (not output area) - activate shell input.""" + _debug_log(f"CLICK: shell_pane x={event.x} y={event.y} button={event.button}") - This prevents clicks in the shell pane from bubbling up and triggering - expensive operations in other parts of the UI. - Note: Native terminal text selection requires Shift+click (terminal-dependent). - """ + # Normal click - activate and focus shell input if self.shell_pane_visible: - # Activate shell input when clicking anywhere in shell pane - if not self.shell_input_active: - self.shell_input_active = True - self._update_shell_input_display() + self.shell_input_active = True + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.focus() + except Exception: + pass event.stop() + _debug_log(f"CLICK: shell_pane -> stopped") def on_paste(self, event: Paste) -> None: """Handle paste events (Cmd+V on Mac, Ctrl+V on Windows/Linux)""" + _debug_log(f"PASTE: text={event.text!r} shell_input_active={self.shell_input_active}") + # Shell input TextArea handles its own paste - don't interfere if self.search_input_active and event.text: # Append pasted text to search input (strip newlines) pasted_text = event.text.replace('\n', ' ').replace('\r', '') @@ -3247,7 +3667,11 @@ def _update_search_display(self, perform_search=True): # Update display with blinking cursor at end if self.search_input_text: # Show text with blinking cursor (escape special chars for Rich markup) - display_text = f"> {rich_escape(self.search_input_text)}[blink]▎[/blink]" + escaped_text = rich_escape(self.search_input_text) + # Double trailing backslash so it doesn't escape the [blink] tag + if escaped_text.endswith('\\'): + escaped_text += '\\' + display_text = f"> {escaped_text}[blink]▎[/blink]" else: # Show prompt with blinking cursor (ready to type) display_text = "> [blink]▎[/blink]" @@ -3286,8 +3710,14 @@ def on_key(self, event): Keyboard handling is delegated to specialized handlers in supershell/handlers/keyboard.py for better organization and testing. """ + _debug_log(f"KEY: key={event.key!r} char={event.character!r} " + f"shell_visible={self.shell_pane_visible} shell_input_active={self.shell_input_active} " + f"search_active={self.search_input_active}") + # Dispatch to the keyboard handler chain - if keyboard_dispatcher.dispatch(event, self): + handled = keyboard_dispatcher.dispatch(event, self) + _debug_log(f"KEY: handled={handled}") + if handled: return # Event was not handled by any handler - let it propagate @@ -3379,19 +3809,32 @@ def _open_shell_pane(self, command: str = None): self.shell_pane_visible = True self.shell_input_active = True - self.shell_input_text = "" + self.shell_input_text = "" # Keep for compatibility + self._shell_executing = False # Track if command is executing - # Update shell header with theme colors + # Update shell header with theme colors and prompt self._update_shell_header() - # Initialize the input display - self._update_shell_input_display() + # Initialize the prompt with green ❯ + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update("[green]❯[/green] ") + except Exception: + pass + + # Focus the input TextArea + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.focus() + shell_input.clear() + except Exception: + pass # If a command was provided, execute it immediately if command: - self._execute_shell_command(command) + self._execute_shell_command_async(command) - self._update_status("Shell open | Enter to run | quit/q/Ctrl+D to close") + self._update_status("Shell open | Enter to run | Up/Down for history | Ctrl+D to close") def _close_shell_pane(self): """Close the shell pane and return to normal view""" @@ -3403,12 +3846,176 @@ def _close_shell_pane(self): self.shell_input_text = "" self.shell_history_index = -1 + # Clear the input TextArea + try: + shell_input = self.query_one("#shell_input_area", ShellInputTextArea) + shell_input.clear() + except Exception: + pass + # Focus tree tree = self.query_one("#folder_tree", Tree) tree.focus() self._update_status("Navigate with j/k | / to search | ? for help") + def _execute_shell_command_async(self, command: str): + """Execute a command asynchronously with loading indicator.""" + # Show spinner in prompt + self._start_shell_spinner() + + # Run the command in a worker thread + self.run_worker( + lambda: self._execute_shell_command_worker(command), + name="shell_command", + exclusive=True, + thread=True + ) + + def _start_shell_spinner(self): + """Start the spinner animation in the shell prompt.""" + self._shell_spinner_frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] + self._shell_spinner_index = 0 + self._shell_executing = True + + # Update prompt to show first spinner frame + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update(f"[yellow]{self._shell_spinner_frames[0]}[/yellow] ") + except Exception: + pass + + # Start timer for animation + self._shell_spinner_timer = self.set_interval(0.1, self._animate_shell_spinner) + + def _animate_shell_spinner(self): + """Animate the shell spinner.""" + if not self._shell_executing: + return + + self._shell_spinner_index = (self._shell_spinner_index + 1) % len(self._shell_spinner_frames) + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update(f"[yellow]{self._shell_spinner_frames[self._shell_spinner_index]}[/yellow] ") + except Exception: + pass + + def _stop_shell_spinner(self): + """Stop the spinner and restore the prompt.""" + self._shell_executing = False + if hasattr(self, '_shell_spinner_timer') and self._shell_spinner_timer: + self._shell_spinner_timer.stop() + self._shell_spinner_timer = None + + # Restore normal prompt + try: + prompt = self.query_one("#shell_prompt", Static) + prompt.update("[green]❯[/green] ") + except Exception: + pass + + def _execute_shell_command_worker(self, command: str): + """Worker function that executes the command and returns the result.""" + command = command.strip() + + # Handle quit commands + if command.lower() in ('quit', 'q', 'exit'): + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._close_shell_pane) + return + + # Block supershell inside supershell + cmd_name = command.split()[0].lower() if command.split() else '' + if cmd_name in ('supershell', 'ss'): + self.shell_history.append((command, "[yellow]Cannot run supershell inside supershell[/yellow]")) + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + self.call_from_thread(self._scroll_shell_to_bottom) + return + + # Handle clear command + if command.lower() == 'clear': + self.shell_history = [] + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + return + + # Add to command history for up/down navigation + if command and (not self.shell_command_history or + self.shell_command_history[-1] != command): + self.shell_command_history.append(command) + self.shell_history_index = -1 + + # Capture stdout/stderr for command execution + stdout_buffer = io.StringIO() + stderr_buffer = io.StringIO() + log_buffer = io.StringIO() + old_stdout = sys.stdout + old_stderr = sys.stderr + + # Create a temporary logging handler to capture log output + log_handler = logging.StreamHandler(log_buffer) + log_handler.setLevel(logging.INFO) + log_handler.setFormatter(logging.Formatter('%(message)s')) + root_logger = logging.getLogger() + root_logger.addHandler(log_handler) + + try: + sys.stdout = stdout_buffer + sys.stderr = stderr_buffer + + # Execute via cli.do_command + from ..cli import do_command + result = do_command(self.params, command) + # Some commands return output (e.g., JSON format) instead of printing + if result is not None: + print(result) + + except Exception as e: + stderr_buffer.write(f"Error: {str(e)}\n") + finally: + sys.stdout = old_stdout + sys.stderr = old_stderr + root_logger.removeHandler(log_handler) + + # Get output + output = stdout_buffer.getvalue() + errors = stderr_buffer.getvalue() + log_output = log_buffer.getvalue() + + # Strip ANSI codes + output = strip_ansi_codes(output) + errors = strip_ansi_codes(errors) + log_output = strip_ansi_codes(log_output) + + # Combine output (stdout first, then log output, then errors) + full_output = output.rstrip() + if log_output.strip(): + if full_output: + full_output += "\n" + full_output += log_output.rstrip() + if errors: + if full_output: + full_output += "\n" + full_output += f"[red]{rich_escape(errors.rstrip())}[/red]" + + # Add to history + self.shell_history.append((command, full_output)) + + # Stop spinner and update display on the main thread + self.call_from_thread(self._stop_shell_spinner) + self.call_from_thread(self._update_shell_output_display) + self.call_from_thread(self._scroll_shell_to_bottom) + + def _scroll_shell_to_bottom(self): + """Scroll shell output to bottom.""" + try: + shell_output = self.query_one("#shell_output_content", TextArea) + shell_output.action_cursor_line_end() + shell_output.scroll_end(animate=False) + except Exception: + pass + def _execute_shell_command(self, command: str): """Execute a Keeper command in the shell pane and display output""" command = command.strip() @@ -3436,35 +4043,55 @@ def _execute_shell_command(self, command: str): # Capture stdout/stderr for command execution stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() + log_buffer = io.StringIO() old_stdout = sys.stdout old_stderr = sys.stderr + # Create a temporary logging handler to capture log output + log_handler = logging.StreamHandler(log_buffer) + log_handler.setLevel(logging.INFO) + log_handler.setFormatter(logging.Formatter('%(message)s')) + root_logger = logging.getLogger() + root_logger.addHandler(log_handler) + try: sys.stdout = stdout_buffer sys.stderr = stderr_buffer # Execute via cli.do_command from ..cli import do_command - do_command(self.params, command) + result = do_command(self.params, command) + # Some commands return output (e.g., JSON format) instead of printing + if result is not None: + print(result) except Exception as e: stderr_buffer.write(f"Error: {str(e)}\n") finally: sys.stdout = old_stdout sys.stderr = old_stderr + root_logger.removeHandler(log_handler) # Get output output = stdout_buffer.getvalue() errors = stderr_buffer.getvalue() + log_output = log_buffer.getvalue() # Strip ANSI codes output = strip_ansi_codes(output) errors = strip_ansi_codes(errors) + log_output = strip_ansi_codes(log_output) - # Combine output + # Combine output (stdout first, then log output, then errors) full_output = output.rstrip() + if log_output.strip(): + if full_output: + full_output += "\n" + full_output += log_output.rstrip() if errors: - full_output += f"\n[red]{rich_escape(errors.rstrip())}[/red]" + if full_output: + full_output += "\n" + full_output += f"[red]{rich_escape(errors.rstrip())}[/red]" # Add to history self.shell_history.append((command, full_output)) @@ -3475,52 +4102,47 @@ def _execute_shell_command(self, command: str): # Scroll to bottom (defer to ensure content is rendered) def scroll_to_bottom(): try: - shell_output = self.query_one("#shell_output", VerticalScroll) + shell_output = self.query_one("#shell_output_content", TextArea) + # Move cursor to end to scroll to bottom + shell_output.action_cursor_line_end() shell_output.scroll_end(animate=False) except Exception: pass self.call_after_refresh(scroll_to_bottom) def _update_shell_output_display(self): - """Update the shell output area with command history""" + """Update the shell output area with command history (using TextArea for selection support)""" try: - shell_output_content = self.query_one("#shell_output_content", Static) + shell_output_content = self.query_one("#shell_output_content", TextArea) except Exception: return - t = self.theme_colors lines = [] for cmd, output in self.shell_history: - # Show prompt and command + # Show prompt and command (plain text - TextArea doesn't support Rich markup) prompt = self._get_shell_prompt() - lines.append(f"[{t['primary']}]{prompt}[/{t['primary']}]{rich_escape(cmd)}") + lines.append(f"{prompt}{cmd}") # Show output (with blank line separator only if there's output) if output.strip(): - lines.append(output) + # Strip Rich markup tags from output, but preserve JSON arrays + # Rich tags look like [red], [/bold], [#ffffff], [bold red] - start with letter, /, or # + # JSON arrays contain quotes, braces, colons, etc. + plain_output = re.sub(r'\[/?[a-zA-Z#][a-zA-Z0-9_ #]*\]', '', output) + lines.append(plain_output) lines.append("") # Blank line after output - shell_output_content.update("\n".join(lines)) + # TextArea uses .text property, not .update() + shell_output_content.text = "\n".join(lines) + _debug_log(f"_update_shell_output_display: updated TextArea with {len(lines)} lines") def _update_shell_input_display(self): - """Update the shell input line with prompt and current input text""" - try: - shell_input = self.query_one("#shell_input_line", Static) - except Exception: - return - - t = self.theme_colors - prompt = self._get_shell_prompt() - - if self.shell_input_active: - cursor = "[reverse] [/reverse]" - else: - cursor = "" + """Legacy method - now a no-op since ShellInputTextArea handles its own display. - shell_input.update( - f"[{t['primary']}]{prompt}[/{t['primary']}]" - f"{rich_escape(self.shell_input_text)}{cursor}" - ) + Kept for compatibility with any code that might still call it. + The prompt is now shown in the shell header via _update_shell_header(). + """ + pass def _get_shell_prompt(self) -> str: """Get the shell prompt based on current folder context""" @@ -3537,16 +4159,17 @@ def _get_shell_prompt(self) -> str: return "My Vault> " def _update_shell_header(self): - """Update shell header bar with theme colors""" + """Update shell header bar with theme colors and current prompt context""" try: shell_header = self.query_one("#shell_header", Static) except Exception: return t = self.theme_colors + prompt = self._get_shell_prompt() shell_header.update( - f"[bold {t['primary']}]Keeper Shell[/bold {t['primary']}] " - f"[{t['text_dim']}](quit/q/Ctrl+D to close)[/{t['text_dim']}]" + f"[bold {t['primary']}]{prompt}[/bold {t['primary']}]" + f"[{t['text_dim']}] (Enter to run | Up/Down for history | Ctrl+D to close)[/{t['text_dim']}]" ) def check_action(self, action: str, parameters: tuple) -> bool | None: @@ -3646,8 +4269,11 @@ def action_copy_username(self): if self.selected_record and self.selected_record in self.records: record = self.records[self.selected_record] if 'login' in record: - pyperclip.copy(record['login']) - self.notify("👤 Username copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(record['login']) + if success: + self.notify("👤 Username copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No username found for this record", severity="warning") else: @@ -3658,8 +4284,11 @@ def action_copy_url(self): if self.selected_record and self.selected_record in self.records: record = self.records[self.selected_record] if 'login_url' in record: - pyperclip.copy(record['login_url']) - self.notify("🔗 URL copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(record['login_url']) + if success: + self.notify("🔗 URL copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No URL found for this record", severity="warning") else: @@ -3668,11 +4297,17 @@ def action_copy_url(self): def action_copy_uid(self): """Copy UID of selected record or folder to clipboard""" if self.selected_record: - pyperclip.copy(self.selected_record) - self.notify("📋 Record UID copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(self.selected_record) + if success: + self.notify("📋 Record UID copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") elif self.selected_folder: - pyperclip.copy(self.selected_folder) - self.notify("📋 Folder UID copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(self.selected_folder) + if success: + self.notify("📋 Folder UID copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: self.notify("⚠️ No record or folder selected", severity="warning") @@ -3747,8 +4382,11 @@ def action_copy_record(self): if self.view_mode == 'json': # Copy as JSON formatted = json.dumps(app_data, indent=2) - pyperclip.copy(formatted) - self.notify("📋 Secrets Manager app JSON copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + self.notify("📋 Secrets Manager app JSON copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Copy as formatted text (detail view) lines = [] @@ -3783,8 +4421,11 @@ def action_copy_record(self): lines.append("") formatted = "\n".join(lines) - pyperclip.copy(formatted) - self.notify("📋 Secrets Manager app details copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + self.notify("📋 Secrets Manager app details copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Regular record handling record_data = self.records.get(self.selected_record, {}) @@ -3796,22 +4437,28 @@ def action_copy_record(self): output = strip_ansi_codes(output) json_obj = json.loads(output) formatted = json.dumps(json_obj, indent=2) - pyperclip.copy(formatted) - # Generate audit event since JSON contains the password - if has_password: - self.params.queue_audit_event('copy_password', record_uid=self.selected_record) - self.notify("📋 JSON copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(formatted) + if success: + # Generate audit event since JSON contains the password + if has_password: + self.params.queue_audit_event('copy_password', record_uid=self.selected_record) + self.notify("📋 JSON copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") else: # Copy formatted text (without Rich markup) content = self._format_record_for_tui(self.selected_record) # Strip Rich markup for plain text clipboard import re plain = re.sub(r'\[/?[^\]]+\]', '', content) - pyperclip.copy(plain) - # Generate audit event if record has password (detail view includes password) - if has_password: - self.params.queue_audit_event('copy_password', record_uid=self.selected_record) - self.notify("📋 Record contents copied to clipboard!", severity="information") + success, err = safe_copy_to_clipboard(plain) + if success: + # Generate audit event if record has password (detail view includes password) + if has_password: + self.params.queue_audit_event('copy_password', record_uid=self.selected_record) + self.notify("📋 Record contents copied to clipboard!", severity="information") + else: + self.notify(f"⚠️ {err}", severity="warning") except Exception as e: logging.error(f"Error copying record: {e}", exc_info=True) self.notify("⚠️ Failed to copy record contents", severity="error") diff --git a/keepercommander/commands/discover/__init__.py b/keepercommander/commands/discover/__init__.py index 247a2e07d..24d163549 100644 --- a/keepercommander/commands/discover/__init__.py +++ b/keepercommander/commands/discover/__init__.py @@ -1,5 +1,7 @@ from __future__ import annotations import logging +import os + from ..base import Command from ..pam.config_facades import PamConfigurationRecordFacade from ..pam.router_helper import get_response_payload @@ -11,8 +13,11 @@ from ...crypto import encrypt_aes_v2, decrypt_aes_v2 from ...display import bcolors from ...discovery_common.constants import PAM_USER, PAM_MACHINE, PAM_DATABASE, PAM_DIRECTORY +from ...utils import value_to_boolean import json import base64 +import re + from typing import List, Optional, Union, Callable, Tuple, Any, Dict, TYPE_CHECKING if TYPE_CHECKING: @@ -64,7 +69,27 @@ def all_gateways(params: KeeperParams): return get_all_gateways(params) @staticmethod - def find_gateway(params: KeeperParams, find_func: Callable, gateways: Optional[List] = None) \ + def get_configuration_records(params) -> List[KeeperRecord]: + + """ + Get PAM configuration records. + + The default it to find all the record version 6 records. + If the environment variable `PAM_RECORD_TYPE_MATCH` is set to a true value, the search will use both record + versions 3 and 6, and then check the record type. + """ + + configuration_list = [] + if value_to_boolean(os.environ.get("PAM_RECORD_TYPE_MATCH")): + for record in list(vault_extensions.find_records(params, record_version=iter([3, 6]))): + if re.search(r"pam.+Configuration", record.record_type): + configuration_list.append(record) + else: + configuration_list = list(vault_extensions.find_records(params, record_version=6)) + return configuration_list + + @classmethod + def find_gateway(cls, params: KeeperParams, find_func: Callable, gateways: Optional[List] = None) \ -> Tuple[Optional[GatewayContext], Any]: """ @@ -76,8 +101,9 @@ def find_gateway(params: KeeperParams, find_func: Callable, gateways: Optional[L if gateways is None: gateways = GatewayContext.all_gateways(params) - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = cls.get_configuration_records(params) for configuration_record in configuration_records: + payload = find_func( configuration_record=configuration_record ) @@ -142,8 +168,8 @@ def from_gateway(params: KeeperParams, gateway: str, configuration_uid: Optional If there is only one gateway, then that gateway is used. """ - # Get all the PAM configuration records in the Vault; not Application - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + # Get all the PAM configuration records in the Vault; configurations are version 6 + configuration_records = GatewayContext.get_configuration_records(params=params) if configuration_uid: logging.debug(f"find the gateway with configuration record {configuration_uid}") diff --git a/keepercommander/commands/discover/job_status.py b/keepercommander/commands/discover/job_status.py index f9e3fd0f3..8ad2bddd3 100644 --- a/keepercommander/commands/discover/job_status.py +++ b/keepercommander/commands/discover/job_status.py @@ -306,7 +306,7 @@ def execute(self, params, **kwargs): # For each configuration/ gateway, we are going to get all jobs. # We are going to query the gateway for any updated status. - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = GatewayContext.get_configuration_records(params=params) for configuration_record in configuration_records: gateway_context = GatewayContext.from_configuration_uid( diff --git a/keepercommander/commands/discover/result_process.py b/keepercommander/commands/discover/result_process.py index c9ae11118..0ba7eb57a 100644 --- a/keepercommander/commands/discover/result_process.py +++ b/keepercommander/commands/discover/result_process.py @@ -4,7 +4,7 @@ import json import sys import os.path - +import re from keeper_secrets_manager_core.utils import url_safe_str_to_bytes from . import PAMGatewayActionDiscoverCommandBase, GatewayContext from ..pam.router_helper import (router_get_connected_gateways, router_set_record_rotation_information, @@ -23,7 +23,6 @@ from ...discovery_common.constants import VERTICES_SORT_MAP from pydantic import BaseModel from typing import Optional, List, Any, Tuple, Dict, TYPE_CHECKING - from ...api import get_records_add_request if TYPE_CHECKING: @@ -1471,7 +1470,7 @@ def execute(self, params: KeeperParams, **kwargs): all_gateways = GatewayContext.all_gateways(params) - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + configuration_records = GatewayContext.get_configuration_records(params=params) for configuration_record in configuration_records: gateway_context = GatewayContext.from_configuration_uid(params=params, diff --git a/keepercommander/commands/pam_debug/info.py b/keepercommander/commands/pam_debug/info.py index 49be65f8d..9f3208ddd 100644 --- a/keepercommander/commands/pam_debug/info.py +++ b/keepercommander/commands/pam_debug/info.py @@ -44,7 +44,7 @@ def execute(self, params: KeeperParams, **kwargs): return if record.record_type not in ["pamUser", "pamMachine", "pamDatabase", "pamDirectory"]: - if re.search(r'^pam.*Configuration$', record.record_type) is None: + if re.search(r'^pam.+Configuration$', record.record_type) is None: print(f"{bcolors.FAIL}The record is a {record.record_type}. This is not a PAM record.{bcolors.ENDC}") return @@ -58,11 +58,13 @@ def execute(self, params: KeeperParams, **kwargs): print(f"{bcolors.WARNING}PAM record does not have protobuf rotation settings, " f"checking all configurations.{bcolors.ENDC}") - configuration_records = list(vault_extensions.find_records(params, "pam.*Configuration")) + # Get all the PAM configuration records in the Vault; configurations are version 6 + configuration_records = GatewayContext.get_configuration_records(params=params) if len(configuration_records) == 0: print(f"{bcolors.FAIL}Cannot find any PAM configuration records in the Vault{bcolors.ENDC}") for configuration_record in configuration_records: + record_link = RecordLink(record=configuration_record, params=params) record_vertex = record_link.dag.get_vertex(record.record_uid) if record_vertex is not None and record_vertex.active is True: diff --git a/keepercommander/commands/pam_debug/rotation_setting.py b/keepercommander/commands/pam_debug/rotation_setting.py index 41e4a034f..8e475b487 100644 --- a/keepercommander/commands/pam_debug/rotation_setting.py +++ b/keepercommander/commands/pam_debug/rotation_setting.py @@ -74,7 +74,7 @@ def execute(self, params: KeeperParams, **kwargs): print(f"{bcolors.FAIL}Configuration record does not exists.{bcolors.ENDC}") return - if re.search(r'^pam.*Configuration$', configuration_record.record_type) is None: + if re.search(r'^pam.+Configuration$', configuration_record.record_type) is None: print( f"{bcolors.FAIL}The configuration record is not a configuration record. " f"It's {configuration_record.record_type} record.{bcolors.ENDC}") diff --git a/keepercommander/commands/supershell/handlers/keyboard.py b/keepercommander/commands/supershell/handlers/keyboard.py index 6ef3d03e1..8008bf59f 100644 --- a/keepercommander/commands/supershell/handlers/keyboard.py +++ b/keepercommander/commands/supershell/handlers/keyboard.py @@ -10,6 +10,25 @@ from rich.markup import escape as rich_escape +# Debug logging - writes to /tmp/supershell_debug.log when enabled +DEBUG_EVENTS = False +_debug_log_file = None + +def _debug_log(msg: str): + """Log debug message to /tmp/supershell_debug.log if DEBUG_EVENTS is True.""" + if not DEBUG_EVENTS: + return + global _debug_log_file + try: + if _debug_log_file is None: + _debug_log_file = open('/tmp/supershell_debug.log', 'a') + import datetime + timestamp = datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3] + _debug_log_file.write(f"[{timestamp}] {msg}\n") + _debug_log_file.flush() + except Exception: + pass + if TYPE_CHECKING: from textual.events import Key from textual.widgets import Tree @@ -141,119 +160,107 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: class ShellInputHandler(KeyHandler): - """Handles input when shell pane is visible and active.""" + """Placeholder for shell input key handling. + + Note: All key handling is now done by ShellInputTextArea itself, + including Tab/Shift+Tab focus cycling. This handler is kept for + potential future use but currently does not handle any keys. + """ def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - # Handle Enter key whenever shell pane is visible (even if not actively focused on input) - if app.shell_pane_visible and event.key == "enter": - return True - return app.shell_pane_visible and app.shell_input_active + # ShellInputTextArea handles all keys directly + return False def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - tree = app.query_one("#folder_tree") + return False - if event.key == "ctrl+d": - app._close_shell_pane() - self._stop_event(event) - return True - if event.key == "enter": - # Ensure shell input is active when Enter is pressed - if not app.shell_input_active: - app.shell_input_active = True - app._update_shell_input_display() - # Execute command (even if empty, to show new prompt) - app._execute_shell_command(app.shell_input_text) - app.shell_input_text = "" - app._update_shell_input_display() - self._stop_event(event) - return True +class ShellPaneCloseHandler(KeyHandler): + """Handles Ctrl+D to close shell even when not focused on input.""" - if event.key == "backspace": - if app.shell_input_text: - app.shell_input_text = app.shell_input_text[:-1] - app._update_shell_input_display() - self._stop_event(event) - return True + def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + return app.shell_pane_visible and event.key == "ctrl+d" - if event.key == "ctrl+u": - # Ctrl+U clears the input line (like bash) - app.shell_input_text = "" - app._update_shell_input_display() - self._stop_event(event) - return True + def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + app._close_shell_pane() + self._stop_event(event) + return True - if event.key == "escape": - app.shell_input_active = False - tree.focus() - app._update_shell_input_display() - app._update_status("Shell open | Tab to cycle | press Enter in shell to run commands") - self._stop_event(event) - return True - if event.key == "tab": - # Shell → Search input - app.shell_input_active = False - app._update_shell_input_display() - app.search_input_active = True - tree.add_class("search-input-active") - app._update_search_display(perform_search=False) - app._update_status("Type to search | Tab to tree | Ctrl+U to clear") - self._stop_event(event) - return True +class ShellCopyHandler(KeyHandler): + """Handles Ctrl+C/Cmd+C to copy selected text, Ctrl+Shift+C/Cmd+Shift+C to copy all shell output.""" - if event.key == "shift+tab": - # Shell → Detail pane - from textual.containers import VerticalScroll - app.shell_input_active = False - app._update_shell_input_display() - detail_scroll = app.query_one("#record_detail", VerticalScroll) - detail_scroll.focus() - app._update_status("Detail pane | Tab to shell | Shift+Tab to tree") - self._stop_event(event) - return True + # Keys that trigger copy selected text + COPY_KEYS = ("ctrl+c", "cmd+c") + # Keys that trigger copy all output + COPY_ALL_KEYS = ("ctrl+shift+c", "cmd+shift+c") - if event.key == "up": - if app.shell_command_history: - if app.shell_history_index < len(app.shell_command_history) - 1: - app.shell_history_index += 1 - app.shell_input_text = app.shell_command_history[-(app.shell_history_index + 1)] - app._update_shell_input_display() - self._stop_event(event) - return True + def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + all_copy_keys = self.COPY_KEYS + self.COPY_ALL_KEYS + result = app.shell_pane_visible and event.key in all_copy_keys + _debug_log(f"ShellCopyHandler.can_handle: shell_visible={app.shell_pane_visible} " + f"key={event.key!r} result={result}") + return result - if event.key == "down": - if app.shell_history_index > 0: - app.shell_history_index -= 1 - app.shell_input_text = app.shell_command_history[-(app.shell_history_index + 1)] - elif app.shell_history_index == 0: - app.shell_history_index = -1 - app.shell_input_text = "" - app._update_shell_input_display() - self._stop_event(event) - return True + def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: + _debug_log(f"ShellCopyHandler.handle: key={event.key!r}") + import pyperclip + + # Ctrl+C or Cmd+C: Copy selected text from TextArea + if event.key in self.COPY_KEYS: + try: + from textual.widgets import TextArea + shell_output = app.query_one("#shell_output_content", TextArea) + selected = shell_output.selected_text + _debug_log(f"ShellCopyHandler.handle: selected_text={selected!r}") + + if selected and selected.strip(): + pyperclip.copy(selected) + preview = selected[:40] + ('...' if len(selected) > 40 else '') + preview = preview.replace('\n', ' ') + app.notify(f"Copied: {preview}", severity="information") + _debug_log(f"ShellCopyHandler.handle: Copied selected text") + self._stop_event(event) + return True + else: + _debug_log(f"ShellCopyHandler.handle: No text selected, not handling") + return False # Let event propagate if nothing selected + except Exception as e: + _debug_log(f"ShellCopyHandler.handle: Error getting selection: {e}") + return False + + # Ctrl+Shift+C or Cmd+Shift+C: Copy all shell output + if event.key in self.COPY_ALL_KEYS: + import re + lines = [] + _debug_log(f"ShellCopyHandler.handle: shell_history has {len(app.shell_history)} entries") + for cmd, output in app.shell_history: + lines.append(f"> {cmd}") + if output.strip(): + lines.append(output) + lines.append("") + + raw_text = '\n'.join(lines) + clean_text = re.sub(r'\x1b\[[0-9;]*m', '', raw_text) + clean_text = re.sub(r'\[[^\]]*\]', '', clean_text) + + if clean_text.strip(): + try: + pyperclip.copy(clean_text.strip()) + app.notify("All shell output copied", severity="information") + _debug_log(f"ShellCopyHandler.handle: Copied all output") + except Exception as e: + _debug_log(f"ShellCopyHandler.handle: Copy failed: {e}") + app.notify("Copy failed", severity="warning") + else: + app.notify("No output to copy", severity="information") - if event.character and event.character.isprintable(): - app.shell_input_text += event.character - app._update_shell_input_display() self._stop_event(event) return True return False -class ShellPaneCloseHandler(KeyHandler): - """Handles Ctrl+D to close shell even when not focused on input.""" - - def can_handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - return app.shell_pane_visible and event.key == "ctrl+d" - - def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: - app._close_shell_pane() - self._stop_event(event) - return True - - class SearchInputTabHandler(KeyHandler): """Handles Tab/Shift+Tab when in search input mode.""" @@ -284,7 +291,11 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: # Shift+Tab: Search input → Shell (if visible) or Detail pane if app.shell_pane_visible: app.shell_input_active = True - app._update_shell_input_display() + try: + shell_input = app.query_one("#shell_input_area") + shell_input.focus() + except Exception: + pass app._update_status("Shell | Shift+Tab to detail | Tab to search") else: detail_scroll.focus() @@ -313,7 +324,11 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: # Detail pane → Shell (if visible) or Search input if app.shell_pane_visible: app.shell_input_active = True - app._update_shell_input_display() + try: + shell_input = app.query_one("#shell_input_area") + shell_input.focus() + except Exception: + pass app._update_status("Shell | Tab to search | Shift+Tab to detail") else: app.search_input_active = True @@ -410,7 +425,10 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: # Tab switches to detail pane if event.key == "tab": detail_scroll.focus() - app._update_status("Detail pane | Tab to search | Shift+Tab to tree") + if app.shell_pane_visible: + app._update_status("Detail pane | Tab to shell | Shift+Tab to tree") + else: + app._update_status("Detail pane | Tab to search | Shift+Tab to tree") self._stop_event(event) return True @@ -503,7 +521,7 @@ def handle(self, event: 'Key', app: 'SuperShellApp') -> bool: self._stop_event(event) return True - if event.key == "backspace": + if event.key == "backspace" and app.search_input_active: if app.search_input_text: app.search_input_text = app.search_input_text[:-1] app._update_search_display() @@ -585,6 +603,7 @@ def __init__(self): CommandModeHandler(), ShellInputHandler(), ShellPaneCloseHandler(), + ShellCopyHandler(), # Tab cycling handlers SearchInputTabHandler(), @@ -610,9 +629,14 @@ def dispatch(self, event: 'Key', app: 'SuperShellApp') -> bool: True if the event was handled """ for handler in self.handlers: - if handler.can_handle(event, app): - if handler.handle(event, app): + can_handle = handler.can_handle(event, app) + if can_handle: + _debug_log(f"DISPATCH: {handler.__class__.__name__} can_handle=True for key={event.key!r}") + result = handler.handle(event, app) + _debug_log(f"DISPATCH: {handler.__class__.__name__} handle returned {result}") + if result: return True + _debug_log(f"DISPATCH: No handler for key={event.key!r}") return False diff --git a/keepercommander/commands/supershell/screens/help.py b/keepercommander/commands/supershell/screens/help.py index 0e13b95b0..2261c3a62 100644 --- a/keepercommander/commands/supershell/screens/help.py +++ b/keepercommander/commands/supershell/screens/help.py @@ -17,13 +17,26 @@ class HelpScreen(ModalScreen): DEFAULT_CSS = """ HelpScreen { align: center middle; + background: rgba(0, 0, 0, 0.8); + } + + HelpScreen > Vertical { + background: #000000; + } + + HelpScreen > Vertical > Horizontal { + background: #000000; + } + + HelpScreen Static { + background: #000000; } #help_container { width: 90; height: auto; max-height: 90%; - background: #111111; + background: #000000; border: solid #444444; padding: 1 2; } @@ -81,7 +94,7 @@ def compose(self) -> ComposeResult: Up/Down Command history quit/q Close shell pane Ctrl+D Close shell pane - Shift+drag Select text (native) + Select text Auto-copies to clipboard [green]General:[/green] ? Help diff --git a/keepercommander/service/util/tunneling.py b/keepercommander/service/util/tunneling.py index 5c650de9f..afc8c95d7 100644 --- a/keepercommander/service/util/tunneling.py +++ b/keepercommander/service/util/tunneling.py @@ -185,6 +185,13 @@ def generate_ngrok_url(port, auth_token, ngrok_custom_domain, run_mode): if not port or not auth_token: raise ValueError("Both 'port' and 'ngrok_auth_token' must be provided.") + # Strip .ngrok.io suffix if user provided full domain (e.g., "mycompany.ngrok.io" -> "mycompany") + if ngrok_custom_domain: + for suffix in ['.ngrok.io', '.ngrok.app', '.ngrok-free.app']: + if ngrok_custom_domain.lower().endswith(suffix): + ngrok_custom_domain = ngrok_custom_domain[:-len(suffix)] + break + logging.getLogger("ngrok").setLevel(logging.CRITICAL) logging.getLogger("pyngrok").setLevel(logging.CRITICAL)