Skip to content

Commit 5cecc3e

Browse files
committed
Consume stdin as nanonext messages
1 parent ff9a812 commit 5cecc3e

File tree

2 files changed

+9
-19
lines changed

2 files changed

+9
-19
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Imports:
2828
ellmer,
2929
jsonlite,
3030
later,
31-
nanonext (>= 1.5.2.9012),
31+
nanonext (>= 1.5.2.9015),
3232
promises,
3333
rlang
3434
Depends: R (>= 4.1.0)

R/server.R

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@ mcp_server <- function() {
77
# TODO: should this actually be a check for being called within Rscript or not?
88
check_not_interactive()
99

10+
the$reader_socket <- nanonext::read_stdin()
1011
the$server_socket <- nanonext::socket("poly")
1112
nanonext::dial(the$server_socket, url = sprintf("%s%d", acquaint_socket, 1L))
1213

13-
# Note that we're using file("stdin") instead of stdin(), which are not the
14-
# same.
15-
the$f <- file("stdin", open = "r")
16-
1714
schedule_handle_message_from_client()
1815
schedule_handle_message_from_host()
1916

@@ -23,29 +20,23 @@ mcp_server <- function() {
2320
}
2421
}
2522

26-
handle_message_from_client <- function(fdstatus) {
27-
buf <- ""
23+
handle_message_from_client <- function(line) {
2824
schedule_handle_message_from_client()
2925
# TODO: Read multiple lines all at once (because the client can send
3026
# multiple requests quickly), and then handle each line separately.
3127
# Otherwise, the message throughput will be bound by the polling rate.
32-
line <- readLines(the$f, n = 1)
33-
# TODO: If stdin is closed, we should exit. Not sure there's a way to detect
34-
# that stdin has been closed without writing C code, though.
3528

3629
if (length(line) == 0) {
3730
return()
3831
}
3932

4033
logcat("FROM CLIENT: ", line)
4134

42-
buf <- paste0(c(buf, line), collapse = "\n")
43-
4435
data <- NULL
4536

4637
tryCatch(
4738
{
48-
data <- jsonlite::parse_json(buf)
39+
data <- jsonlite::parse_json(line)
4940
},
5041
error = function(e) {
5142
# Invalid JSON. Possibly unfinished multi-line JSON message?
@@ -79,7 +70,7 @@ handle_message_from_client <- function(fdstatus) {
7970

8071
cat_json(res)
8172
} else if (data$method == "tools/call") {
82-
result <- forward_request(buf)
73+
result <- forward_request(line)
8374

8475
# } else if (data$method == "prompts/list") {
8576
# } else if (data$method == "resources/list") {
@@ -95,12 +86,11 @@ handle_message_from_client <- function(fdstatus) {
9586
))
9687
}
9788

98-
buf <- ""
9989
}
10090

10191
schedule_handle_message_from_client <- function() {
102-
# Schedule the callback to run when stdin (fd 0) has input.
103-
later::later_fd(handle_message_from_client, readfds = 0L)
92+
r <- nanonext::recv_aio(the$reader_socket, mode = "string")
93+
promises::as.promise(r)$then(handle_message_from_client)
10494
}
10595

10696
handle_message_from_host <- function(data) {
@@ -112,7 +102,7 @@ handle_message_from_host <- function(data) {
112102

113103
logcat("FROM HOST: ", data)
114104

115-
# The response_text is already JSON, so we'll use cat() instead of cat_json()
105+
# The response_text is already JSON, so we don't need to use cat_json()
116106
nanonext::write_stdout(data)
117107
}
118108

@@ -209,7 +199,7 @@ check_not_interactive <- function(call = caller_env()) {
209199

210200
mcp_discover <- function() {
211201
sock <- nanonext::socket("poly")
212-
on.exit(nanonext:::reap(sock))
202+
on.exit(nanonext::reap(sock))
213203
cv <- nanonext::cv()
214204
monitor <- nanonext::monitor(sock, cv)
215205
suppressWarnings(

0 commit comments

Comments
 (0)