Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ RoxygenNote: 7.3.2
Imports:
btw (>= 0.0.1.9000),
cli,
curl,
ellmer,
httpuv,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The httpuv dependency can also be dropped.

jsonlite,
later,
nanonext,
promises,
rlang
Depends: R (>= 4.1.0)
URL: https://github.com/simonpcouch/acquaint, https://simonpcouch.github.io/acquaint/
Expand Down
191 changes: 103 additions & 88 deletions R/proxy.R
Original file line number Diff line number Diff line change
@@ -1,119 +1,134 @@
# This R script is a proxy. It takes input on stdin, and when the input forms
# valid JSON, it will send the JSON to the server. Then, when it receives the
# response, it will print the response to stdout.
#' @rdname mcp
#' @export
mcp_proxy <- function() {
# TODO: should this actually be a check for being called within Rscript or not?
check_not_interactive()

mcp_proxy_impl()
the$proxy_socket <- nanonext::socket("pair", dial = acquaint_socket)

# Note that we're using file("stdin") instead of stdin(), which are not the
# same.
the$f <- file("stdin")
open(the$f, blocking = FALSE)

schedule_handle_message_from_client()
schedule_handle_message_from_server()

# Pump the event loop
while (TRUE) {
later::run_now(Inf)
}
}

# This R script is a proxy. It takes input on stdin, and when the input forms
# valid JSON, it will POST the JSON to the server, then when it receives the
# response, it will print the response to stdout.
mcp_proxy_impl <- function() {
url <- paste0("http://localhost:", acquaint_port(), collapse = "")
logcat("START", append = FALSE)
handle_message_from_client <- function(fdstatus) {
buf <- ""
schedule_handle_message_from_client()
# TODO: Read multiple lines all at once (because the server can send
# multiple requests quickly), and then handle each line separately.
# Otherwise, the message throughput will be bound by the polling rate.
line <- readLines(the$f, n = 1)
# TODO: If stdin is closed, we should exit. Not sure there's a way to detect
# that stdin has been closed without writing C code, though.

if (length(line) == 0) {
return()
}

# Note that we're using file("stdin") instead of stdin() because the former
# blocks but the latter does not when used with readLines(). If it doesn't block,
# the loop will poll continuously and use 100% CPU.
f <- file("stdin")
open(f, blocking = TRUE)
logcat("FROM CLIENT: ", line)

while (TRUE) {
line <- readLines(f, n = 1)
buf <- paste0(c(buf, line), collapse = "\n")

if (length(line) == 0) {
next
data <- NULL

tryCatch(
{
data <- jsonlite::fromJSON(buf)
},
error = function(e) {
# Invalid JSON. Possibly unfinished multi-line JSON message?
}
)

logcat(line)
if (is.null(data)) {
# Can get here if there's an empty line
return()
}

buf <- paste0(c(buf, line), collapse = "\n")
if (!is.list(data) || is.null(data$method)) {
cat_json(jsonrpc_response(
data$id,
error = list(code = -32600, message = "Invalid Request")
))
}

data <- NULL
# If we made it here, it's valid JSON

tryCatch(
{
data <- jsonlite::fromJSON(buf)
},
error = function(e) {
# Invalid JSON
}
)
if (is.null(data)) {
next
}
# If we made it here, it's valid JSON

if (identical(data$method, "initialize")) {
res <- jsonrpc_response_proxy(data$id, capabilities())
cat_json(res)

} else if (identical(data$method, "notifications/initialized")) {
# This is confirmation from the client; do nothing

} else if (identical(data$method, "prompts/list")) {
# No prompts yet
cat_json(jsonrpc_response_proxy(data$id, list(prompts = list())))

} else if (identical(data$method, "resources/list")) {
# No resources yet
cat_json(jsonrpc_response_proxy(data$id, list(resources = list())))

} else if (identical(data$method, "tools/list")) {
res <- jsonrpc_response_proxy(
data$id,
list(
tools = get_all_btw_tools()
)
if (data$method == "initialize") {
res <- jsonrpc_response(data$id, capabilities())
cat_json(res)
} else if (data$method == "tools/list") {
res <- jsonrpc_response(
data$id,
list(
tools = get_all_btw_tools()
)
# cat(to_json(res), "\n", sep = "", file = stderr())
cat_json(res)
} else {
# For all other messages, forward them to the server
result <- post_request(buf, url)

response_text <- rawToChar(result$content)
logcat(response_text)

# The response_text is alredy JSON, so we'll use cat() instead of cat_json()
cat(response_text, "\n", sep = "")
# cat("Response status:", result$status_code, "\n", file = stderr())
# cat("Response body:", response_text, "\n", file = stderr())
}
)

buf <- ""
cat_json(res)
} else if (data$method == "tools/call") {
result <- forward_request(buf)

# } else if (data$method == "prompts/list") {
# } else if (data$method == "resources/list") {
} else if (is.null(data$id)) {
# If there is no `id` in the request, then this is a notification and the
# client does not expect a response.
if (data$method == "notifications/initialized") {
}
} else {
cat_json(jsonrpc_response(
data$id,
error = list(code = -32601, message = "Method not found")
))
}

buf <- ""
}

# This process will be launched by the MCP client, so stdout/stderr aren't
# visible. This function will log output to the `logfile` so that you can view
# it.
logcat <- function(x, ..., append = TRUE) {
log_file <- acquaint_log_file()
cat(x, "\n", sep = "", append = append, file = log_file)
schedule_handle_message_from_client <- function() {
# Schedule the callback to run when stdin (fd 0) has input.
later::later_fd(handle_message_from_client, readfds = 0L)
}

post_request <- function(json_data, url) {
h <- curl::new_handle()
h <- curl::handle_setheaders(h, "Content-Type" = "application/json")
h <- curl::handle_setopt(h, customrequest = "POST")
h <- curl::handle_setopt(h, postfields = json_data)
handle_message_from_server <- function(data) {
schedule_handle_message_from_server()

result <- curl::curl_fetch_memory(url, h)
logcat("FROM SERVER: ", data)

return(result)
# The response_text is alredy JSON, so we'll use cat() instead of cat_json()
cat(data, "\n", sep = "")
}

# Wrap `x` in a jsonrpc-formatted object. This also includes the id.
jsonrpc_response_proxy <- function(id, x) {
list(
jsonrpc = "2.0",
id = id,
result = x
)
schedule_handle_message_from_server <- function() {
r <- nanonext::recv_aio(the$proxy_socket)
promises::as.promise(r)$then(handle_message_from_server)
}

forward_request <- function(data) {
logcat("TO SERVER: ", data)

nanonext::send_aio(the$proxy_socket, data)
}

# This process will be launched by the MCP client, so stdout/stderr aren't
# visible. This function will log output to the `logfile` so that you can view
# it.
logcat <- function(x, ..., append = TRUE) {
log_file <- acquaint_log_file()
cat(x, "\n", sep = "", append = append, file = log_file)
}

cat_json <- function(x) {
Expand Down
62 changes: 16 additions & 46 deletions R/server.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,15 @@ mcp_serve <- function() {
return(invisible())
}

# TODO: This only works with one active R session. If there's some other R
# session running with a server, `startServer` will error out. Maybe this
# should be based on an envvar?
if (env_has(acquaint_env, "active_server")) {
httpuv::stopServer(env_get(acquaint_env, "active_server"))
}

s <- httpuv::startServer(
host = "127.0.0.1",
port = acquaint_port(),
app = list(
call = function(req) {
mcp_serve_impl(req)
}
)
)

env_bind(acquaint_env, active_server = s)

s
the$server_socket <- nanonext::socket("pair", listen = acquaint_socket)
schedule_handle_message_from_proxy()
}

mcp_serve_impl <- function(req) {
req_body <- rawToChar(req$rook.input$read())
handle_message_from_proxy <- function(msg) {
schedule_handle_message_from_proxy()

# In RStudio, content logged to stderr will show in red.
# cat(req_body, file=stderr())
data <- jsonlite::fromJSON(req_body)
# cat(paste(capture.output(str(data)), collapse="\n"), file=stderr())
# cat("RECV :", msg, "\n", sep = "", file = stderr())
data <- jsonlite::fromJSON(msg)

if (data$method == "tools/call") {
name <- data$params$name
Expand All @@ -99,7 +79,7 @@ mcp_serve_impl <- function(req) {
tool_call_result <- do.call(fn, args)
# cat(paste(capture.output(str(body)), collapse="\n"), file=stderr())

body <- jsonrpc_response_server(
body <- jsonrpc_response(
data$id,
list(
content = list(
Expand All @@ -112,35 +92,25 @@ mcp_serve_impl <- function(req) {
)
)
} else {
body <- jsonrpc_response_server(
body <- jsonrpc_response(
data$id,
error = list(code = -32601, message = "Method not found")
)
}
# cat(to_json(body), file = stderr())
# cat("SEND:", to_json(body), "\n", sep = "", file = stderr())

# cat("Request received at ", format(Sys.time(), "%H:%M:%S.%OS3\n"), file=stderr())
nanonext::send_aio(the$server_socket, to_json(body))
}

list(
status = 200L,
headers = list('Content-Type' = 'application/json'),
body = to_json(body)
)
schedule_handle_message_from_proxy <- function() {
r <- nanonext::recv_aio(the$server_socket)
promises::as.promise(r)$then(handle_message_from_proxy)$catch(function(e) {
print(e)
})
}

# Create a jsonrpc-structured response object.
jsonrpc_response_server <- function(id, result = NULL, error = NULL) {
if (!xor(is.null(result), is.null(error))) {
warning("Either `result` or `error` must be provided, but not both.")
}

drop_nulls(list(
jsonrpc = "2.0",
id = id,
result = result,
error = error
))
}

# Given a vector or list, drop all the NULL items in it
drop_nulls <- function(x) {
Expand Down
17 changes: 16 additions & 1 deletion R/utils.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
acquaint_env <- new_environment()
the <- new_environment()

acquaint_socket <- "ipc:///tmp/acquaint-socket"

jsonrpc_response <- function(id, result = NULL, error = NULL) {
if (!xor(is.null(result), is.null(error))) {
warning("Either `result` or `error` must be provided, but not both.")
}

drop_nulls(list(
jsonrpc = "2.0",
id = id,
result = result,
error = error
))
}

# Create a named list, ensuring that it's a named list, even if empty.
named_list <- function(...) {
Expand Down