Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions chat/src/components/chat-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,9 @@ export function ChatProvider({ children }: PropsWithChildren) {
description: message,
});
} finally {
// Remove optimistic draft message if still present (may have been replaced by server response via SSE).
setMessages((prev) => prev.filter((m) => !isDraftMessage(m)));
if (type === "user") {
setMessages((prevMessages) =>
prevMessages.filter((m) => !isDraftMessage(m))
);
setLoading(false);
}
}
Expand Down
35 changes: 35 additions & 0 deletions cmd/attach/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,42 @@ func WriteRawInputOverHTTP(ctx context.Context, url string, msg string) error {
return nil
}

// statusResponse is used to parse the /status endpoint response.
type statusResponse struct {
Status string `json:"status"`
AgentType string `json:"agent_type"`
Backend string `json:"backend"`
}

func checkACPMode(remoteUrl string) error {
resp, err := http.Get(remoteUrl + "/status")
if err != nil {
return xerrors.Errorf("failed to check server status: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return xerrors.Errorf("unexpected %d response from server: %s", resp.StatusCode, resp.Status)
}

var status statusResponse
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return xerrors.Errorf("failed to decode server status: %w", err)
}

if status.Backend == "acp" {
return xerrors.New("attach is not supported in ACP mode. The server is running with --experimental-acp which uses JSON-RPC instead of terminal emulation.")
}

return nil
}

func runAttach(remoteUrl string) error {
// Check if server is running in ACP mode (attach not supported)
if err := checkACPMode(remoteUrl); err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stdin := int(os.Stdin.Fd())
Expand Down
94 changes: 69 additions & 25 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coder/agentapi/lib/httpapi"
"github.com/coder/agentapi/lib/logctx"
"github.com/coder/agentapi/lib/msgfmt"
st "github.com/coder/agentapi/lib/screentracker"
"github.com/coder/agentapi/lib/termexec"
)

Expand Down Expand Up @@ -104,11 +105,33 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
}

printOpenAPI := viper.GetBool(FlagPrintOpenAPI)
experimentalACP := viper.GetBool(FlagExperimentalACP)

if printOpenAPI && experimentalACP {
return xerrors.Errorf("flags --%s and --%s are mutually exclusive", FlagPrintOpenAPI, FlagExperimentalACP)
}

var agentIO st.AgentIO
var transport = "pty"
var process *termexec.Process
var acpResult *httpapi.SetupACPResult

if printOpenAPI {
process = nil
agentIO = nil
} else if experimentalACP {
var err error
acpResult, err = httpapi.SetupACP(ctx, httpapi.SetupACPConfig{
Program: agent,
ProgramArgs: argsToPass[1:],
})
if err != nil {
return xerrors.Errorf("failed to setup ACP: %w", err)
}
acpIO := acpResult.AgentIO
agentIO = acpIO
transport = "acp"
} else {
process, err = httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
proc, err := httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
Program: agent,
ProgramArgs: argsToPass[1:],
TerminalWidth: termWidth,
Expand All @@ -118,11 +141,14 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
if err != nil {
return xerrors.Errorf("failed to setup process: %w", err)
}
process = proc
agentIO = proc
}
port := viper.GetInt(FlagPort)
srv, err := httpapi.NewServer(ctx, httpapi.ServerConfig{
AgentType: agentType,
Process: process,
AgentIO: agentIO,
Transport: transport,
Port: port,
ChatBasePath: viper.GetString(FlagChatBasePath),
AllowedHosts: viper.GetStringSlice(FlagAllowedHosts),
Expand All @@ -138,19 +164,35 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
}
logger.Info("Starting server on port", "port", port)
processExitCh := make(chan error, 1)
go func() {
defer close(processExitCh)
if err := process.Wait(); err != nil {
if errors.Is(err, termexec.ErrNonZeroExitCode) {
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
} else {
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
// Wait for process exit in PTY mode
if process != nil {
go func() {
defer close(processExitCh)
if err := process.Wait(); err != nil {
if errors.Is(err, termexec.ErrNonZeroExitCode) {
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
} else {
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
}
}
}
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
}
// Wait for process exit in ACP mode
if acpResult != nil {
go func() {
defer close(processExitCh)
defer close(acpResult.Done) // Signal cleanup goroutine to exit
if err := acpResult.Wait(); err != nil {
processExitCh <- xerrors.Errorf("ACP process exited: %w", err)
}
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
}
if err := srv.Start(); err != nil && err != context.Canceled && err != http.ErrServerClosed {
return xerrors.Errorf("failed to start server: %w", err)
}
Expand Down Expand Up @@ -180,16 +222,17 @@ type flagSpec struct {
}

const (
FlagType = "type"
FlagPort = "port"
FlagPrintOpenAPI = "print-openapi"
FlagChatBasePath = "chat-base-path"
FlagTermWidth = "term-width"
FlagTermHeight = "term-height"
FlagAllowedHosts = "allowed-hosts"
FlagAllowedOrigins = "allowed-origins"
FlagExit = "exit"
FlagInitialPrompt = "initial-prompt"
FlagType = "type"
FlagPort = "port"
FlagPrintOpenAPI = "print-openapi"
FlagChatBasePath = "chat-base-path"
FlagTermWidth = "term-width"
FlagTermHeight = "term-height"
FlagAllowedHosts = "allowed-hosts"
FlagAllowedOrigins = "allowed-origins"
FlagExit = "exit"
FlagInitialPrompt = "initial-prompt"
FlagExperimentalACP = "experimental-acp"
)

func CreateServerCmd() *cobra.Command {
Expand Down Expand Up @@ -228,6 +271,7 @@ func CreateServerCmd() *cobra.Command {
// localhost:3284 is the default origin when you open the chat interface in your browser. localhost:3000 and 3001 are used during development.
{FlagAllowedOrigins, "o", []string{"http://localhost:3284", "http://localhost:3000", "http://localhost:3001"}, "HTTP allowed origins. Use '*' for all, comma-separated list via flag, space-separated list via AGENTAPI_ALLOWED_ORIGINS env var", "stringSlice"},
{FlagInitialPrompt, "I", "", "Initial prompt for the agent. Recommended only if the agent doesn't support initial prompt in interaction mode. Will be read from stdin if piped (e.g., echo 'prompt' | agentapi server -- my-agent)", "string"},
{FlagExperimentalACP, "", false, "Use experimental ACP transport instead of PTY", "bool"},
}

for _, spec := range flagSpecs {
Expand Down
1 change: 1 addition & 0 deletions lib/httpapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type StatusResponse struct {
Body struct {
Status AgentStatus `json:"status" doc:"Current agent status. 'running' means that the agent is processing a message, 'stable' means that the agent is idle and waiting for input."`
AgentType mf.AgentType `json:"agent_type" doc:"Type of the agent being used by the server."`
Backend string `json:"backend" doc:"Backend transport being used ('acp' or 'pty')."`
}
}

Expand Down
65 changes: 43 additions & 22 deletions lib/httpapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
mf "github.com/coder/agentapi/lib/msgfmt"
st "github.com/coder/agentapi/lib/screentracker"
"github.com/coder/agentapi/lib/termexec"
"github.com/coder/agentapi/x/acpio"
"github.com/coder/quartz"
"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/adapters/humachi"
Expand All @@ -42,12 +43,13 @@ type Server struct {
mu sync.RWMutex
logger *slog.Logger
conversation st.Conversation
agentio *termexec.Process
agentio st.AgentIO
agentType mf.AgentType
emitter *EventEmitter
chatBasePath string
tempDir string
clock quartz.Clock
transport string
}

func (s *Server) NormalizeSchema(schema any) any {
Expand Down Expand Up @@ -98,7 +100,8 @@ const snapshotInterval = 25 * time.Millisecond

type ServerConfig struct {
AgentType mf.AgentType
Process *termexec.Process
AgentIO st.AgentIO
Transport string
Port int
ChatBasePath string
AllowedHosts []string
Expand Down Expand Up @@ -252,18 +255,34 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
}

conversation := st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: config.Process,
Clock: config.Clock,
SnapshotInterval: snapshotInterval,
ScreenStabilityLength: 2 * time.Second,
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
InitialPrompt: initialPrompt,
Logger: logger,
}, emitter)
// Create appropriate conversation based on transport type
var conversation st.Conversation
if config.Transport == "acp" {
// For ACP, cast AgentIO to *acpio.ACPAgentIO
acpIO, ok := config.AgentIO.(*acpio.ACPAgentIO)
if !ok {
return nil, fmt.Errorf("ACP transport requires ACPAgentIO")
}
conversation = acpio.NewACPConversation(ctx, acpIO, logger, initialPrompt, emitter, config.Clock)
} else {
// Default to PTY transport
proc, ok := config.AgentIO.(*termexec.Process)
if !ok && config.AgentIO != nil {
return nil, fmt.Errorf("PTY transport requires termexec.Process")
}
conversation = st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: proc,
Clock: config.Clock,
SnapshotInterval: snapshotInterval,
ScreenStabilityLength: 2 * time.Second,
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
InitialPrompt: initialPrompt,
Logger: logger,
}, emitter)
}

// Create temporary directory for uploads
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
Expand All @@ -278,24 +297,25 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
port: config.Port,
conversation: conversation,
logger: logger,
agentio: config.Process,
agentio: config.AgentIO,
agentType: config.AgentType,
emitter: emitter,
chatBasePath: strings.TrimSuffix(config.ChatBasePath, "/"),
tempDir: tempDir,
clock: config.Clock,
transport: config.Transport,
}

// Register API routes
s.registerRoutes()

// Start the conversation polling loop if we have a process.
// Process is nil only when --print-openapi is used (no agent runs).
// The process is already running at this point - termexec.StartProcess()
// blocks until the PTY is created and the process is active. Agent
// readiness (waiting for the prompt) is handled asynchronously inside
// conversation.Start() via ReadyForInitialPrompt.
if config.Process != nil {
// Start the conversation polling loop if we have an agent IO.
// AgentIO is nil only when --print-openapi is used (no agent runs).
// For PTY transport, the process is already running at this point -
// termexec.StartProcess() blocks until the PTY is created and the process
// is active. Agent readiness (waiting for the prompt) is handled
// asynchronously inside conversation.Start() via ReadyForInitialPrompt.
if config.AgentIO != nil {
s.conversation.Start(ctx)
}

Expand Down Expand Up @@ -417,6 +437,7 @@ func (s *Server) getStatus(ctx context.Context, input *struct{}) (*StatusRespons
resp := &StatusResponse{}
resp.Body.Status = agentStatus
resp.Body.AgentType = s.agentType
resp.Body.Backend = s.transport

return resp, nil
}
Expand Down
Loading