diff --git a/docs/usage/mini_v.md b/docs/usage/mini_v.md index 72319c95c..939c09efe 100644 --- a/docs/usage/mini_v.md +++ b/docs/usage/mini_v.md @@ -49,6 +49,7 @@ Useful switches: - `f1` or `?`: Show keybinding help - `q` (or `ctrl+q`): Quit the agent +- `i` (or `ctrl+i`): Interrupt the agent immediately (works during long-running commands) - `c`: Switch to `confirm` mode - `y` (or `ctrl+y`): Switch to `yolo` mode - `h` or `LEFT`: Go to previous step of the agent diff --git a/src/minisweagent/agents/interactive_textual.py b/src/minisweagent/agents/interactive_textual.py index 9e8292e36..ff78d8427 100644 --- a/src/minisweagent/agents/interactive_textual.py +++ b/src/minisweagent/agents/interactive_textual.py @@ -43,6 +43,8 @@ def __init__(self, app: "TextualAgent", *args, **kwargs): self.app = app super().__init__(*args, config_class=TextualAgentConfig, **kwargs) self._current_action_from_human = False + self._agent_thread_id = None + self._current_process = None def add_message(self, role: str, content: str, **kwargs): super().add_message(role, content, **kwargs) @@ -57,9 +59,21 @@ def query(self) -> dict: self.add_message("assistant", msg["content"]) return msg self._current_action_from_human = False - return super().query() + try: + return super().query() + except KeyboardInterrupt: + # Handle interrupt during LLM query + interruption_message = self.app.input_container.request_input( + "[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]" + ).strip() + if not interruption_message: + interruption_message = "Temporary interruption caught." + raise NonTerminatingException(f"Interrupted by user: {interruption_message}") def run(self, task: str, **kwargs) -> tuple[str, str]: + # Store the thread ID so we can inject KeyboardInterrupt into it + self._agent_thread_id = threading.get_ident() + try: exit_status, result = super().run(task, **kwargs) except Exception as e: @@ -72,6 +86,19 @@ def run(self, task: str, **kwargs) -> tuple[str, str]: self.app.call_from_thread(self.app.action_quit) return exit_status, result + def step(self) -> dict: + """Override step to handle interrupts like interactive.py""" + try: + return super().step() + except KeyboardInterrupt: + # Same behavior as interactive.py + interruption_message = self.app.input_container.request_input( + "[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]" + ).strip() + if not interruption_message: + interruption_message = "Temporary interruption caught." + raise NonTerminatingException(f"Interrupted by user: {interruption_message}") + def execute_action(self, action: dict) -> dict: if self.config.mode == "human" and not self._current_action_from_human: # threading, grrrrr raise NonTerminatingException("Command not executed because user switched to manual mode.") @@ -83,7 +110,74 @@ def execute_action(self, action: dict) -> dict: result = self.app.input_container.request_input("Press ENTER to confirm or provide rejection reason") if result: # Non-empty string means rejection raise NonTerminatingException(f"Command not executed: {result}") - return super().execute_action(action) + + # Use Popen to allow interrupting long-running commands + import subprocess + import time + + try: + # Start the process + self._current_process = subprocess.Popen( + action["action"], + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + cwd=self.env.config.cwd + if hasattr(self.env, "config") and hasattr(self.env.config, "cwd") and self.env.config.cwd + else None, + env=dict(self.env.config.env) + if hasattr(self.env, "config") and hasattr(self.env.config, "env") + else None, + ) + + # Poll the process with timeout + timeout = ( + self.env.config.timeout if hasattr(self.env, "config") and hasattr(self.env.config, "timeout") else 30 + ) + start_time = time.time() + + while self._current_process.poll() is None: + if time.time() - start_time > timeout: + self._current_process.terminate() + try: + self._current_process.wait(timeout=1) + except subprocess.TimeoutExpired: + self._current_process.kill() + self._current_process.wait() + + output = self._current_process.stdout.read() if self._current_process.stdout else "" + raise subprocess.TimeoutExpired(action["action"], timeout, output=output.encode()) + + time.sleep(0.05) # Poll every 50ms + + # Get output + output = self._current_process.stdout.read() if self._current_process.stdout else "" + returncode = self._current_process.returncode + + result = {"output": output, "returncode": returncode} + self.has_finished(result) + return result + + except KeyboardInterrupt: + # Terminate the process if it's still running + if self._current_process and self._current_process.poll() is None: + self._current_process.terminate() + try: + self._current_process.wait(timeout=1) + except subprocess.TimeoutExpired: + self._current_process.kill() + self._current_process.wait() + + # Handle interrupt during command execution + interruption_message = self.app.input_container.request_input( + "[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]" + ).strip() + if not interruption_message: + interruption_message = "Temporary interruption caught." + raise NonTerminatingException(f"Interrupted by user: {interruption_message}") + finally: + self._current_process = None def has_finished(self, output: dict[str, str]): try: @@ -248,6 +342,7 @@ class TextualAgent(App): Binding("j,down", "scroll_down", "Scroll down", show=False), Binding("k,up", "scroll_up", "Scroll up", show=False), Binding("q,ctrl+q", "quit", "Quit", tooltip="Quit the agent"), + Binding("i,ctrl+i", "interrupt", "INTERRUPT", tooltip="Interrupt the agent and provide feedback"), Binding("y,ctrl+y", "yolo", "YOLO mode", tooltip="Switch to YOLO Mode (LM actions will execute immediately)"), Binding( "c", @@ -277,7 +372,12 @@ def __init__(self, model, env, **kwargs): self._vscroll = VerticalScroll() def run(self, task: str, **kwargs) -> tuple[str, str]: - threading.Thread(target=lambda: self.agent.run(task, **kwargs), daemon=True).start() + def agent_run_wrapper(): + # Store the thread ID so we can inject KeyboardInterrupt into it + self.agent._agent_thread_id = threading.get_ident() + return self.agent.run(task, **kwargs) + + threading.Thread(target=agent_run_wrapper, daemon=True).start() super().run() return self.exit_status, self.result @@ -425,6 +525,30 @@ def action_confirm(self): self.agent.config.mode = "confirm" self.notify("Confirm mode enabled - LM proposes commands and you confirm/reject them") + def action_interrupt(self): + """Interrupt the agent by raising KeyboardInterrupt in its thread.""" + import ctypes + + if self.agent_state == "RUNNING" and self.agent._agent_thread_id is not None: + # First, terminate any running subprocess + if self.agent._current_process and self.agent._current_process.poll() is None: + self.agent._current_process.terminate() + + # Then inject KeyboardInterrupt into the agent thread + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.agent._agent_thread_id), ctypes.py_object(KeyboardInterrupt) + ) + if ret == 0: + self.notify("Failed to interrupt - thread not found") + elif ret > 1: + # If more than one thread affected, undo it + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.agent._agent_thread_id), None) + self.notify("Failed to interrupt - multiple threads affected") + elif self.agent_state == "AWAITING_INPUT": + self.notify("Already awaiting input - use the input field to provide feedback") + else: + self.notify("Cannot interrupt - agent is not running") + def action_next_step(self) -> None: self.i_step += 1 diff --git a/tests/agents/test_interactive_textual.py b/tests/agents/test_interactive_textual.py index 9b2120f2b..0a04d41ea 100644 --- a/tests/agents/test_interactive_textual.py +++ b/tests/agents/test_interactive_textual.py @@ -870,3 +870,135 @@ async def test_system_commands_are_callable(): assert callable(command.callback), ( f"Command '{command.title}' has non-callable callback: {command.callback}" ) + + +@pytest.mark.slow +async def test_interrupt_agent_run(): + """Test interrupting an agent run and providing feedback using KeyboardInterrupt.""" + # Use 3 sleep commands to ensure we have time to interrupt + app = TextualAgent( + model=DeterministicModel( + outputs=[ + "/sleep 0.5", + "/sleep 0.5", + "/sleep 0.5", + "Step 1\n```bash\necho 'step1'\n```", + "Step 2\n```bash\necho 'COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT'\n```", + ] + ), + env=LocalEnvironment(), + mode="yolo", + ) + + async with app.run_test() as pilot: + # Start the agent with the task + threading.Thread(target=lambda: app.agent.run("Interrupt test"), daemon=True).start() + + # Wait for agent to start running and have a thread ID + for _ in range(20): + await pilot.pause(0.05) + if app.agent._agent_thread_id is not None: + break + + # Ensure we have a thread ID + assert app.agent._agent_thread_id is not None + + # Wait a bit to ensure the agent is in the middle of sleeping + await pilot.pause(0.2) + + # The agent might be RUNNING or might have already moved to next state + # Either way, we should be able to interrupt + # Request interrupt using the key binding + # This will inject KeyboardInterrupt into the agent thread + await pilot.press("i") + + # Wait for the interrupt to be processed + # The KeyboardInterrupt should be raised and caught in step() + for _ in range(50): + await pilot.pause(0.1) + if app.agent_state == "AWAITING_INPUT" and "Interrupted" in get_screen_text(app): + break + else: + raise AssertionError("Agent did not show interrupt prompt within 5 seconds") + + # Should show interrupt prompt + assert "Interrupted" in get_screen_text(app) + assert "Type a comment/command" in get_screen_text(app) + + # Provide feedback + await type_text(pilot, "Please use a different approach") + await pilot.press("enter") + await pilot.pause(0.3) + + # Navigate through steps to find the interrupt message + # The interrupt message should be in one of the earlier steps + found_interrupt_message = False + for step in range(app.n_steps): + app.i_step = step + if "Interrupted by user: Please use a different approach" in get_screen_text(app): + found_interrupt_message = True + break + + assert found_interrupt_message, f"Could not find interrupt message in any of {app.n_steps} steps" + assert app.agent_state in ["RUNNING", "STOPPED", "AWAITING_INPUT"] + + +async def test_interrupt_when_not_running(): + """Test that interrupt shows appropriate message when agent is not running.""" + app = TextualAgent( + model=DeterministicModel(outputs=["Test\n```bash\necho 'COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT'\n```"]), + env=LocalEnvironment(), + mode="yolo", + confirm_exit=False, # Disable exit confirmation so agent stops immediately + ) + + async with app.run_test() as pilot: + # Start the agent + threading.Thread(target=lambda: app.agent.run("Test"), daemon=True).start() + + # Wait for agent to finish + for _ in range(50): + await pilot.pause(0.1) + if app.agent_state == "STOPPED": + break + + # Verify agent is stopped + assert app.agent_state == "STOPPED" + + # Try to interrupt when agent is stopped + # The action should just show a notification + await pilot.press("i") + await pilot.pause(0.1) + + # Agent should still be stopped (no interrupt was injected) + assert app.agent_state == "STOPPED" + + +async def test_interrupt_when_awaiting_input(): + """Test that interrupt shows appropriate message when already awaiting input.""" + app = TextualAgent( + model=DeterministicModel(outputs=["Test\n```bash\necho 'test'\n```"]), + env=LocalEnvironment(), + mode="confirm", + ) + + async with app.run_test() as pilot: + # Start the agent + threading.Thread(target=lambda: app.agent.run("Test"), daemon=True).start() + + # Wait for agent to request input + for _ in range(50): + await pilot.pause(0.1) + if app.agent_state == "AWAITING_INPUT": + break + + # Verify agent is awaiting input + assert app.agent_state == "AWAITING_INPUT" + + # Try to interrupt when already awaiting input + await pilot.press("escape") # Unfocus from input + await pilot.press("i") + await pilot.pause(0.1) + + # Agent should still be awaiting input (no interrupt was injected) + assert app.agent_state == "AWAITING_INPUT"