Skip to content

Commit 52df300

Browse files
feat: custom workers initial support (#1795)
* create a simple thread framework Signed-off-by: Robert Landers <[email protected]> * add tests Signed-off-by: Robert Landers <[email protected]> * fix comment Signed-off-by: Robert Landers <[email protected]> * remove mention of an old function that no longer exists Signed-off-by: Robert Landers <[email protected]> * simplify providing a request Signed-off-by: Robert Landers <[email protected]> * satisfy linter Signed-off-by: Robert Landers <[email protected]> * add error handling and handle shutdowns Signed-off-by: Robert Landers <[email protected]> * add tests Signed-off-by: Robert Landers <[email protected]> * pipes are tied to workers, not threads Signed-off-by: Robert Landers <[email protected]> * fix test Signed-off-by: Robert Landers <[email protected]> * add a way to detect when a request is completed Signed-off-by: Robert Landers <[email protected]> * we never shutdown workers or remove them, so we do not need this Signed-off-by: Robert Landers <[email protected]> * add more comments Signed-off-by: Robert Landers <[email protected]> * Simplify modular threads (#1874) * Simplify * remove unused variable * log thread index * feat: allow passing parameters to the PHP callback and accessing its return value (#1881) * fix formatting Signed-off-by: Robert Landers <[email protected]> * fix test compilation Signed-off-by: Robert Landers <[email protected]> * fix segfaults Signed-off-by: Robert Landers <[email protected]> * Update frankenphp.c Co-authored-by: Kévin Dunglas <[email protected]> --------- Signed-off-by: Robert Landers <[email protected]> Co-authored-by: Kévin Dunglas <[email protected]>
1 parent fe7aa2c commit 52df300

File tree

7 files changed

+307
-16
lines changed

7 files changed

+307
-16
lines changed

context.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ type frankenPHPContext struct {
2828
// Whether the request is already closed by us
2929
isDone bool
3030

31-
responseWriter http.ResponseWriter
31+
responseWriter http.ResponseWriter
32+
handlerParameters any
33+
handlerReturn any
3234

3335
done chan any
3436
startedAt time.Time

frankenphp.c

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,11 @@ PHP_FUNCTION(frankenphp_handle_request) {
432432
zend_unset_timeout();
433433
#endif
434434

435-
bool has_request = go_frankenphp_worker_handle_request_start(thread_index);
435+
struct go_frankenphp_worker_handle_request_start_return result =
436+
go_frankenphp_worker_handle_request_start(thread_index);
436437
if (frankenphp_worker_request_startup() == FAILURE
437438
/* Shutting down */
438-
|| !has_request) {
439+
|| !result.r0) {
439440
RETURN_FALSE;
440441
}
441442

@@ -450,10 +451,15 @@ PHP_FUNCTION(frankenphp_handle_request) {
450451

451452
/* Call the PHP func passed to frankenphp_handle_request() */
452453
zval retval = {0};
454+
zval *callback_ret = NULL;
455+
453456
fci.size = sizeof fci;
454457
fci.retval = &retval;
455-
if (zend_call_function(&fci, &fcc) == SUCCESS) {
456-
zval_ptr_dtor(&retval);
458+
fci.params = result.r1;
459+
fci.param_count = result.r1 == NULL ? 0 : 1;
460+
461+
if (zend_call_function(&fci, &fcc) == SUCCESS && Z_TYPE(retval) != IS_UNDEF) {
462+
callback_ret = &retval;
457463
}
458464

459465
/*
@@ -467,7 +473,13 @@ PHP_FUNCTION(frankenphp_handle_request) {
467473
}
468474

469475
frankenphp_worker_request_shutdown();
470-
go_frankenphp_finish_worker_request(thread_index);
476+
go_frankenphp_finish_worker_request(thread_index, callback_ret);
477+
if (result.r1 != NULL) {
478+
zval_ptr_dtor(result.r1);
479+
}
480+
if (callback_ret != NULL) {
481+
zval_ptr_dtor(&retval);
482+
}
471483

472484
RETURN_TRUE;
473485
}

frankenphp.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ func Init(options ...Option) error {
214214

215215
registerExtensions()
216216

217+
// add registered external workers
218+
for _, ew := range externalWorkers {
219+
options = append(options, WithWorkers(ew.Name(), ew.FileName(), ew.GetMinThreads(), WithWorkerEnv(ew.Env())))
220+
}
221+
217222
opt := &opt{}
218223
for _, o := range options {
219224
if err := o(opt); err != nil {

threadFramework.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package frankenphp
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"net/http"
7+
"sync"
8+
)
9+
10+
// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
11+
// frankenphp_handle_request(), the ProvideRequest method is called. You are responsible for providing a standard
12+
// http.Request that will be conferred to the underlying worker script.
13+
//
14+
// A worker script with the provided Name and FileName will be registered, along with the provided
15+
// configuration. You can also provide any environment variables that you want through Env. GetMinThreads allows you to
16+
// reserve a minimum number of threads from the frankenphp thread pool. This number must be positive.
17+
// These methods are only called once at startup, so register them in an init() function.
18+
//
19+
// When a thread is activated and nearly ready, ThreadActivatedNotification will be called with an opaque threadId;
20+
// this is a time for setting up any per-thread resources. When a thread is about to be returned to the thread pool,
21+
// you will receive a call to ThreadDrainNotification that will inform you of the threadId.
22+
// After the thread is returned to the thread pool, ThreadDeactivatedNotification will be called.
23+
//
24+
// Once you have at least one thread activated, you will receive calls to ProvideRequest where you should respond with
25+
// a request. FrankenPHP will automatically pipe these requests to the worker script and handle the response.
26+
// The piping process is designed to run indefinitely and will be gracefully shut down when FrankenPHP shuts down.
27+
//
28+
// Note: External workers receive the lowest priority when determining thread allocations. If GetMinThreads cannot be
29+
// allocated, then frankenphp will panic and provide this information to the user (who will need to allocate more
30+
// total threads). Don't be greedy.
31+
type WorkerExtension interface {
32+
Name() string
33+
FileName() string
34+
Env() PreparedEnv
35+
GetMinThreads() int
36+
ThreadActivatedNotification(threadId int)
37+
ThreadDrainNotification(threadId int)
38+
ThreadDeactivatedNotification(threadId int)
39+
ProvideRequest() *WorkerRequest[any, any]
40+
}
41+
42+
// EXPERIMENTAL
43+
type WorkerRequest[P any, R any] struct {
44+
// The request for your worker script to handle
45+
Request *http.Request
46+
// Response is a response writer that provides the output of the provided request, it must not be nil to access the request body
47+
Response http.ResponseWriter
48+
// CallbackParameters is an optional field that will be converted in PHP types and passed as parameter to the PHP callback
49+
CallbackParameters P
50+
// AfterFunc is an optional function that will be called after the request is processed with the original value, the return of the PHP callback, converted in Go types, is passed as parameter
51+
AfterFunc func(callbackReturn R)
52+
}
53+
54+
var externalWorkers = make(map[string]WorkerExtension)
55+
var externalWorkerMutex sync.Mutex
56+
57+
// EXPERIMENTAL
58+
func RegisterExternalWorker(worker WorkerExtension) {
59+
externalWorkerMutex.Lock()
60+
defer externalWorkerMutex.Unlock()
61+
62+
externalWorkers[worker.Name()] = worker
63+
}
64+
65+
// startExternalWorkerPipe creates a pipe from an external worker to the main worker.
66+
func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *phpThread) {
67+
for {
68+
rq := externalWorker.ProvideRequest()
69+
70+
if rq == nil || rq.Request == nil {
71+
logger.LogAttrs(context.Background(), slog.LevelWarn, "external worker provided nil request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
72+
continue
73+
}
74+
75+
r := rq.Request
76+
fr, err := NewRequestWithContext(r, WithOriginalRequest(r), WithWorkerName(w.name))
77+
if err != nil {
78+
logger.LogAttrs(context.Background(), slog.LevelError, "error creating request for external worker", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex), slog.Any("error", err))
79+
continue
80+
}
81+
82+
if fc, ok := fromContext(fr.Context()); ok {
83+
fc.responseWriter = rq.Response
84+
fc.handlerParameters = rq.CallbackParameters
85+
86+
// Queue the request and wait for completion if Done channel was provided
87+
logger.LogAttrs(context.Background(), slog.LevelInfo, "queue the external worker request", slog.String("worker", w.name), slog.Int("thread", thread.threadIndex))
88+
89+
w.requestChan <- fc
90+
if rq.AfterFunc != nil {
91+
go func() {
92+
<-fc.done
93+
94+
if rq.AfterFunc != nil {
95+
rq.AfterFunc(fc.handlerReturn)
96+
}
97+
}()
98+
}
99+
}
100+
}
101+
}

threadFramework_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package frankenphp
2+
3+
import (
4+
"io"
5+
"net/http/httptest"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// mockWorkerExtension implements the WorkerExtension interface
15+
type mockWorkerExtension struct {
16+
name string
17+
fileName string
18+
env PreparedEnv
19+
minThreads int
20+
requestChan chan *WorkerRequest[any, any]
21+
activatedCount int
22+
drainCount int
23+
deactivatedCount int
24+
mu sync.Mutex
25+
}
26+
27+
func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
28+
return &mockWorkerExtension{
29+
name: name,
30+
fileName: fileName,
31+
env: make(PreparedEnv),
32+
minThreads: minThreads,
33+
requestChan: make(chan *WorkerRequest[any, any], 10), // Buffer to avoid blocking
34+
}
35+
}
36+
37+
func (m *mockWorkerExtension) Name() string {
38+
return m.name
39+
}
40+
41+
func (m *mockWorkerExtension) FileName() string {
42+
return m.fileName
43+
}
44+
45+
func (m *mockWorkerExtension) Env() PreparedEnv {
46+
return m.env
47+
}
48+
49+
func (m *mockWorkerExtension) GetMinThreads() int {
50+
return m.minThreads
51+
}
52+
53+
func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) {
54+
m.mu.Lock()
55+
defer m.mu.Unlock()
56+
m.activatedCount++
57+
}
58+
59+
func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) {
60+
m.mu.Lock()
61+
defer m.mu.Unlock()
62+
m.drainCount++
63+
}
64+
65+
func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) {
66+
m.mu.Lock()
67+
defer m.mu.Unlock()
68+
m.deactivatedCount++
69+
}
70+
71+
func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest[any, any] {
72+
return <-m.requestChan
73+
}
74+
75+
func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) {
76+
m.requestChan <- r
77+
}
78+
79+
func (m *mockWorkerExtension) GetActivatedCount() int {
80+
m.mu.Lock()
81+
defer m.mu.Unlock()
82+
return m.activatedCount
83+
}
84+
85+
func TestWorkerExtension(t *testing.T) {
86+
// Create a mock extension
87+
mockExt := newMockWorkerExtension("mockWorker", "testdata/worker.php", 1)
88+
89+
// Register the mock extension
90+
RegisterExternalWorker(mockExt)
91+
92+
// Clean up external workers after test to avoid interfering with other tests
93+
defer func() {
94+
delete(externalWorkers, mockExt.Name())
95+
}()
96+
97+
// Initialize FrankenPHP with a worker that has a different name than our extension
98+
err := Init()
99+
require.NoError(t, err)
100+
defer Shutdown()
101+
102+
// Wait a bit for the worker to be ready
103+
time.Sleep(100 * time.Millisecond)
104+
105+
// Verify that the extension's thread was activated
106+
assert.GreaterOrEqual(t, mockExt.GetActivatedCount(), 1, "Thread should have been activated")
107+
108+
// Create a test request
109+
req := httptest.NewRequest("GET", "http://example.com/test/?foo=bar", nil)
110+
req.Header.Set("X-Test-Header", "test-value")
111+
112+
w := httptest.NewRecorder()
113+
114+
// Create a channel to signal when the request is done
115+
done := make(chan struct{})
116+
117+
// Inject the request into the worker through the extension
118+
mockExt.InjectRequest(&WorkerRequest[any, any]{
119+
Request: req,
120+
Response: w,
121+
AfterFunc: func(callbackReturn any) {
122+
close(done)
123+
},
124+
})
125+
126+
// Wait for the request to be fully processed
127+
<-done
128+
129+
// Check the response - now safe from race conditions
130+
resp := w.Result()
131+
body, _ := io.ReadAll(resp.Body)
132+
133+
// The worker.php script should output information about the request
134+
// We're just checking that we got a response, not the specific content
135+
assert.NotEmpty(t, body, "Response body should not be empty")
136+
}

0 commit comments

Comments
 (0)