diff --git a/go.mod b/go.mod index 374def22..e2dbc2ce 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,18 @@ module github.com/dapr/go-sdk -go 1.24.4 +go 1.24.6 require ( + connectrpc.com/connect v1.19.1 github.com/dapr/dapr v1.16.0-rc.3 - github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 - github.com/dapr/kit v0.15.4 + github.com/dapr/durabletask-go v0.10.1 + github.com/dapr/kit v0.16.1 github.com/go-chi/chi/v5 v5.2.2 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.10.0 google.golang.org/grpc v1.73.0 - google.golang.org/protobuf v1.36.6 + google.golang.org/protobuf v1.36.10 gopkg.in/yaml.v3 v3.0.1 ) @@ -21,7 +22,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.36.0 // indirect go.opentelemetry.io/otel/metric v1.36.0 // indirect @@ -30,5 +30,6 @@ require ( golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 // indirect ) + +replace github.com/dapr/dapr => github.com/johansja/dapr v1.16.2-johansja diff --git a/go.sum b/go.sum index 84395fe1..06263259 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,11 @@ +connectrpc.com/connect v1.19.1 h1:R5M57z05+90EfEvCY1b7hBxDVOUl45PrtXtAV2fOC14= +connectrpc.com/connect v1.19.1/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/dapr/dapr v1.16.0-rc.3 h1:D99V20GOhb+bZXH1PngME+wgzIZCcBFOvmaP7DOZxGo= -github.com/dapr/dapr v1.16.0-rc.3/go.mod h1:uyKnxMohSg87LSFzZ/oyuiGSo0+qkzeR0eXncPyIV9c= -github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 h1:l8oBGwcfCwqvSYDZwla0A2fhENmXFc1Wk4lR0VEq+is= -github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= -github.com/dapr/kit v0.15.4 h1:29DezCR22OuZhXX4yPEc+lqcOf/PNaeAuIEx9nGv394= -github.com/dapr/kit v0.15.4/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dapr/durabletask-go v0.10.1 h1:gE88Qh4+/6zKdegHjOAOx+UQaPxmwWKWoIDivee23XY= +github.com/dapr/durabletask-go v0.10.1/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= +github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= @@ -25,19 +23,16 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/johansja/dapr v1.16.2-johansja h1:jKSP3nde2Fl93eZe6eilTkzZd95miez723aZGiaj3ts= +github.com/johansja/dapr v1.16.2-johansja/go.mod h1:m8F+PC2N6/1tUOeKS16BNX4tGDCnTtLZ3uhV4DjByhY= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -55,8 +50,6 @@ go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKr go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 h1:R84qjqJb5nVJMxqWYb3np9L5ZsaDtB+a39EqjV0JSUM= -golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0/go.mod h1:S9Xr4PYopiDyqSyp5NjCrhFrqg6A5zA2E/iPHPhqnS8= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -70,7 +63,6 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -88,13 +80,10 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 h1:jgJW5IePPXLGB8e/1wvd0Ich9QE97RvvF3a8J3fP/Lg= -k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/service/connectrpc/Readme.md b/service/connectrpc/Readme.md new file mode 100644 index 00000000..8f385d19 --- /dev/null +++ b/service/connectrpc/Readme.md @@ -0,0 +1,135 @@ +# Dapr ConnectRPC Service SDK for Go + +Start by importing Dapr Go `service/connectrpc` package: + +```go +daprd "github.com/dapr/go-sdk/service/connectrpc" +``` + +## Creating and Starting Service + +To create a ConnectRPC Dapr service, first, create a Dapr callback instance with a specific address: + +```go +s, err := daprd.NewService(":50001") +if err != nil { + log.Fatalf("failed to start the server: %v", err) +} +``` + +Or with address and an existing `net.Listener` in case you want to combine existing server listener: + +```go +list, err := net.Listen("tcp", "localhost:0") +if err != nil { + log.Fatalf("gRPC listener creation failed: %s", err) +} +s := daprd.NewServiceWithListener(list) +``` + +Dapr gRPC service supports using existed gRPC server with the help of `NewServiceWithGrpcServer`. You can use `RegisterGreeterServer` to add existed gRPC service either: + +```go +lis, err := net.Listen("tcp", port) +if err != nil { + log.Fatalf("failed to listen: %v", err) +} + +grpcServer := grpc.NewServer() + +// register existed service +// pb.RegisterGreeterServer(grpcServer, &existedGrpcServer{}) + +// new dapr grpc service +s := daprd.NewServiceWithGrpcServer(lis, grpcServer) +``` + +Once you create a service instance, you can "attach" to that service any number of event, binding, and service invocation logic handlers as shown below. Onces the logic is defined, you are ready to start the service: + +```go +if err := s.Start(); err != nil { + log.Fatalf("server error: %v", err) +} +``` + + +## Event Handling + +To handle events from specific topic you need to add at least one topic event handler before starting the service: + +```go +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + } +if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} +``` + +The handler method itself can be any method with the expected signature: + +```go +func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) + // do something with the event + return true, nil +} +``` + +## Service Invocation Handler + +To handle service invocations you will need to add at least one service invocation handler before starting the service: + +```go +if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil { + log.Fatalf("error adding invocation handler: %v", err) +} +``` + +The handler method itself can be any method with the expected signature: + +```go +func echoHandler(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + log.Printf("echo - ContentType:%s, Verb:%s, QueryString:%s, %+v", in.ContentType, in.Verb, in.QueryString, string(in.Data)) + // do something with the invocation here + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + return +} +``` + +## Binding Invocation Handler + +To handle binding invocations you will need to add at least one binding invocation handler before starting the service: + +```go +if err := s.AddBindingInvocationHandler("run", runHandler); err != nil { + log.Fatalf("error adding binding handler: %v", err) +} +``` + +The handler method itself can be any method with the expected signature: + +```go +func runHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata) + // do something with the invocation here + return nil, nil +} +``` + +## Templates + +To accelerate your gRPC Dapr app development in Go even further you can use one of the GitHub templates integrating the gRPC Dapr callback package: + +* [Dapr gRPC Service in Go](https://github.com/mchmarny/dapr-grpc-service-template) - Template project to jump start your Dapr event subscriber service with gRPC development +* [Dapr gRPC Event Subscriber in Go](https://github.com/mchmarny/dapr-grpc-event-subscriber-template) - Template project to jump start your Dapr event subscriber service with gRPC development + + +## Contributing to Dapr Go client + +See the [Contribution Guide](../../CONTRIBUTING.md) to get started with building and developing. diff --git a/service/connectrpc/binding.go b/service/connectrpc/binding.go new file mode 100644 index 00000000..6f0c00ff --- /dev/null +++ b/service/connectrpc/binding.go @@ -0,0 +1,73 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "fmt" + + "connectrpc.com/connect" + runtimev1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/dapr/go-sdk/service/common" +) + +// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. +func (s *Server) AddBindingInvocationHandler(name string, fn common.BindingInvocationHandler) error { + if name == "" { + return errors.New("binding name required") + } + if fn == nil { + return errors.New("binding handler required") + } + s.bindingHandlers[name] = fn + return nil +} + +// ListInputBindings is called by Dapr to get the list of bindings the app will get invoked by. In this example, we are telling Dapr +// To invoke our app with a binding named storage. +func (s *Server) ListInputBindings(ctx context.Context, in *connect.Request[emptypb.Empty]) (*connect.Response[runtimev1.ListInputBindingsResponse], error) { + list := make([]string, 0) + for k := range s.bindingHandlers { + list = append(list, k) + } + + return connect.NewResponse(&runtimev1.ListInputBindingsResponse{ + Bindings: list, + }), nil +} + +// OnBindingEvent gets invoked every time a new event is fired from a registered binding. The message carries the binding name, a payload and optional metadata. +func (s *Server) OnBindingEvent(ctx context.Context, in *connect.Request[runtimev1.BindingEventRequest]) (*connect.Response[runtimev1.BindingEventResponse], error) { + if in == nil { + return nil, errors.New("nil binding event request") + } + if fn, ok := s.bindingHandlers[in.Msg.GetName()]; ok { + e := &common.BindingEvent{ + Data: in.Msg.GetData(), + Metadata: in.Msg.GetMetadata(), + } + data, err := fn(ctx, e) + if err != nil { + return nil, fmt.Errorf("error executing %s binding: %w", in.Msg.GetName(), err) + } + return connect.NewResponse(&runtimev1.BindingEventResponse{ + Data: data, + }), nil + } + + return nil, fmt.Errorf("binding not implemented: %s", in.Msg.GetName()) +} diff --git a/service/connectrpc/binding_test.go b/service/connectrpc/binding_test.go new file mode 100644 index 00000000..2afa5169 --- /dev/null +++ b/service/connectrpc/binding_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "testing" + + "connectrpc.com/connect" + runtimev1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/dapr/go-sdk/service/common" +) + +func testBindingHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + if in == nil { + return nil, errors.New("nil event") + } + return in.Data, nil +} + +func TestListInputBindings(t *testing.T) { + server := newService("", nil) + err := server.AddBindingInvocationHandler("test1", testBindingHandler) + require.NoError(t, err) + err = server.AddBindingInvocationHandler("test2", testBindingHandler) + require.NoError(t, err) + resp, err := server.ListInputBindings(t.Context(), connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + assert.NotNil(t, resp) + assert.Lenf(t, resp.Msg.GetBindings(), 2, "expected 2 handlers") +} + +func TestBindingForErrors(t *testing.T) { + server := newService("", nil) + err := server.AddBindingInvocationHandler("", nil) + require.Errorf(t, err, "expected error on nil method name") + + err = server.AddBindingInvocationHandler("test", nil) + require.Errorf(t, err, "expected error on nil method handler") +} + +// go test -timeout 30s ./service/grpc -count 1 -run ^TestBinding$ +func TestBinding(t *testing.T) { + ctx := t.Context() + methodName := "test" + + server := newService("", nil) + err := server.AddBindingInvocationHandler(methodName, testBindingHandler) + require.NoError(t, err) + + t.Run("binding without event", func(t *testing.T) { + _, err := server.OnBindingEvent(ctx, nil) + require.Error(t, err) + }) + + t.Run("binding event for wrong method", func(t *testing.T) { + in := connect.NewRequest(&runtimev1.BindingEventRequest{Name: "invalid"}) + _, err := server.OnBindingEvent(ctx, in) + require.Error(t, err) + }) + + t.Run("binding event without data", func(t *testing.T) { + in := connect.NewRequest(&runtimev1.BindingEventRequest{Name: methodName}) + out, err := server.OnBindingEvent(ctx, in) + require.NoError(t, err) + assert.NotNil(t, out) + }) + + t.Run("binding event with data", func(t *testing.T) { + data := "hello there" + in := connect.NewRequest(&runtimev1.BindingEventRequest{ + Name: methodName, + Data: []byte(data), + }) + out, err := server.OnBindingEvent(ctx, in) + require.NoError(t, err) + assert.NotNil(t, out) + assert.Equal(t, data, string(out.Msg.GetData())) + }) + + t.Run("binding event with metadata", func(t *testing.T) { + in := connect.NewRequest(&runtimev1.BindingEventRequest{ + Name: methodName, + Metadata: map[string]string{"k1": "v1", "k2": "v2"}, + }) + out, err := server.OnBindingEvent(ctx, in) + require.NoError(t, err) + assert.NotNil(t, out) + }) +} diff --git a/service/connectrpc/health_check.go b/service/connectrpc/health_check.go new file mode 100644 index 00000000..d559933d --- /dev/null +++ b/service/connectrpc/health_check.go @@ -0,0 +1,49 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + + "connectrpc.com/connect" + runtimev1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/dapr/go-sdk/service/common" +) + +// AddHealthCheckHandler appends provided app health check handler. +func (s *Server) AddHealthCheckHandler(_ string, fn common.HealthCheckHandler) error { + if fn == nil { + return errors.New("health check handler required") + } + + s.healthCheckHandler = fn + + return nil +} + +// HealthCheck check app health status. +func (s *Server) HealthCheck(ctx context.Context, _ *connect.Request[emptypb.Empty]) (*connect.Response[runtimev1.HealthCheckResponse], error) { + if s.healthCheckHandler != nil { + if err := s.healthCheckHandler(ctx); err != nil { + return connect.NewResponse(&runtimev1.HealthCheckResponse{}), err + } + + return connect.NewResponse(&runtimev1.HealthCheckResponse{}), nil + } + + return nil, errors.New("health check handler not implemented") +} diff --git a/service/connectrpc/health_check_test.go b/service/connectrpc/health_check_test.go new file mode 100644 index 00000000..767b9b92 --- /dev/null +++ b/service/connectrpc/health_check_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func testHealthCheckHandler(ctx context.Context) (err error) { + return nil +} + +func testHealthCheckHandlerWithError(ctx context.Context) (err error) { + return errors.New("app is unhealthy") +} + +func TestHealthCheckHandlerForErrors(t *testing.T) { + server := newService("", nil) + err := server.AddHealthCheckHandler("", nil) + require.Errorf(t, err, "expected error on nil health check handler") +} + +// go test -timeout 30s ./service/grpc -count 1 -run ^TestHealthCheck$ +func TestHealthCheck(t *testing.T) { + ctx := t.Context() + + server := newService("", nil) + + t.Run("health check without handler", func(t *testing.T) { + _, err := server.HealthCheck(ctx, nil) + require.Error(t, err) + }) + + err := server.AddHealthCheckHandler("", testHealthCheckHandler) + require.NoError(t, err) + + t.Run("health check with handler", func(t *testing.T) { + _, err = server.HealthCheck(ctx, nil) + require.NoError(t, err) + }) + + err = server.AddHealthCheckHandler("", testHealthCheckHandlerWithError) + require.NoError(t, err) + + t.Run("health check with error handler", func(t *testing.T) { + _, err = server.HealthCheck(ctx, nil) + require.Error(t, err) + }) +} diff --git a/service/connectrpc/invoke.go b/service/connectrpc/invoke.go new file mode 100644 index 00000000..5aa79210 --- /dev/null +++ b/service/connectrpc/invoke.go @@ -0,0 +1,94 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "fmt" + + "connectrpc.com/connect" + commonv1 "github.com/dapr/dapr/pkg/proto/common/v1" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/anypb" + + cc "github.com/dapr/go-sdk/service/common" +) + +// AddServiceInvocationHandler appends provided service invocation handler with its method to the service. +func (s *Server) AddServiceInvocationHandler(method string, fn cc.ServiceInvocationHandler) error { + if method == "" || method == "/" { + return errors.New("service name required") + } + + if method[0] == '/' { + method = method[1:] + } + + if fn == nil { + return errors.New("invocation handler required") + } + s.invokeHandlers[method] = fn + return nil +} + +// OnInvoke gets invoked when a remote service has called the app through Dapr. +func (s *Server) OnInvoke(ctx context.Context, in *connect.Request[commonv1.InvokeRequest]) (*connect.Response[commonv1.InvokeResponse], error) { + if in == nil { + return nil, errors.New("nil invoke request") + } + if s.authToken != "" { + if md, ok := metadata.FromIncomingContext(ctx); !ok { + return nil, errors.New("authentication failed") + } else if vals := md.Get(cc.APITokenKey); len(vals) > 0 { + if vals[0] != s.authToken { + return nil, errors.New("authentication failed: app token mismatch") + } + } else { + return nil, errors.New("authentication failed. app token key not exist") + } + } + if fn, ok := s.invokeHandlers[in.Msg.GetMethod()]; ok { + e := &cc.InvocationEvent{} + e.ContentType = in.Msg.GetContentType() + + if in.Msg.GetData() != nil { + e.Data = in.Msg.GetData().GetValue() + e.DataTypeURL = in.Msg.GetData().GetTypeUrl() + } + + if in.Msg.GetHttpExtension() != nil { + e.Verb = in.Msg.GetHttpExtension().GetVerb().String() + e.QueryString = in.Msg.GetHttpExtension().GetQuerystring() + } + + ct, er := fn(ctx, e) + if er != nil { + return nil, er + } + + if ct == nil { + return connect.NewResponse(&commonv1.InvokeResponse{}), nil + } + + return connect.NewResponse(&commonv1.InvokeResponse{ + ContentType: ct.ContentType, + Data: &anypb.Any{ + Value: ct.Data, + TypeUrl: ct.DataTypeURL, + }, + }), nil + } + return nil, fmt.Errorf("method not implemented: %s", in.Msg.GetMethod()) +} diff --git a/service/connectrpc/invoke_test.go b/service/connectrpc/invoke_test.go new file mode 100644 index 00000000..f8de2bad --- /dev/null +++ b/service/connectrpc/invoke_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "os" + "testing" + + "connectrpc.com/connect" + commonv1 "github.com/dapr/dapr/pkg/proto/common/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/anypb" + + cc "github.com/dapr/go-sdk/service/common" +) + +func testInvokeHandler(ctx context.Context, in *cc.InvocationEvent) (out *cc.Content, err error) { + if in == nil { + return + } + out = &cc.Content{ + ContentType: in.ContentType, + Data: in.Data, + } + return +} + +func testInvokeHandlerWithError(ctx context.Context, in *cc.InvocationEvent) (out *cc.Content, err error) { + return nil, errors.New("test error") +} + +func TestInvokeErrors(t *testing.T) { + server := newService("", nil) + err := server.AddServiceInvocationHandler("", nil) + require.Error(t, err) + + err = server.AddServiceInvocationHandler("/", nil) + require.Error(t, err) + + err = server.AddServiceInvocationHandler("test", nil) + require.Error(t, err) +} + +func TestInvokeWithToken(t *testing.T) { + t.Setenv(cc.AppAPITokenEnvVar, "app-dapr-token") + server := newService("", nil) + methodName := "test" + err := server.AddServiceInvocationHandler(methodName, testInvokeHandler) + require.NoError(t, err) + t.Run("invoke with token, return success", func(t *testing.T) { + grpcMetadata := metadata.New(map[string]string{ + cc.APITokenKey: os.Getenv(cc.AppAPITokenEnvVar), + }) + ctx := metadata.NewIncomingContext(t.Context(), grpcMetadata) + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodName}) + _, err := server.OnInvoke(ctx, in) + require.NoError(t, err) + }) + t.Run("invoke with empty token, return failed", func(t *testing.T) { + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodName}) + _, err := server.OnInvoke(t.Context(), in) + require.Error(t, err) + }) + t.Run("invoke with mismatch token, return failed", func(t *testing.T) { + grpcMetadata := metadata.New(map[string]string{ + cc.APITokenKey: "mismatch-token", + }) + ctx := metadata.NewOutgoingContext(t.Context(), grpcMetadata) + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodName}) + _, err := server.OnInvoke(ctx, in) + require.Error(t, err) + }) + _ = os.Unsetenv(cc.AppAPITokenEnvVar) +} + +// go test -timeout 30s ./service/grpc -count 1 -run ^TestInvoke$ +func TestInvoke(t *testing.T) { + methodName := "test" + methodNameWithError := "error" + ctx := t.Context() + + server := newService("", nil) + err := server.AddServiceInvocationHandler("/"+methodName, testInvokeHandler) + require.NoError(t, err) + + err = server.AddServiceInvocationHandler(methodNameWithError, testInvokeHandlerWithError) + require.NoError(t, err) + + t.Run("invoke without request", func(t *testing.T) { + _, err := server.OnInvoke(ctx, nil) + require.Error(t, err) + }) + + t.Run("invoke request with invalid method name", func(t *testing.T) { + in := connect.NewRequest(&commonv1.InvokeRequest{Method: "invalid"}) + _, err := server.OnInvoke(ctx, in) + require.Error(t, err) + }) + + t.Run("invoke request without data", func(t *testing.T) { + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodName}) + _, err := server.OnInvoke(ctx, in) + require.NoError(t, err) + }) + + t.Run("invoke request with data", func(t *testing.T) { + data := "hello there" + dataContentType := "text/plain" + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodName}) + in.Msg.Data = &anypb.Any{Value: []byte(data)} + in.Msg.ContentType = dataContentType + out, err := server.OnInvoke(ctx, in) + require.NoError(t, err) + assert.NotNil(t, out) + assert.Equal(t, dataContentType, out.Msg.GetContentType()) + assert.Equal(t, data, string(out.Msg.GetData().GetValue())) + }) + + t.Run("invoke request with error", func(t *testing.T) { + data := "hello there" + dataContentType := "text/plain" + in := connect.NewRequest(&commonv1.InvokeRequest{Method: methodNameWithError}) + in.Msg.Data = &anypb.Any{Value: []byte(data)} + in.Msg.ContentType = dataContentType + _, err := server.OnInvoke(ctx, in) + require.Error(t, err) + }) +} diff --git a/service/connectrpc/scheduling.go b/service/connectrpc/scheduling.go new file mode 100644 index 00000000..68f737ae --- /dev/null +++ b/service/connectrpc/scheduling.go @@ -0,0 +1,52 @@ +package connectrpc + +import ( + "context" + "errors" + "fmt" + "strings" + + "connectrpc.com/connect" + runtimev1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + + "github.com/dapr/go-sdk/service/common" +) + +// AddJobEventHandler registers a job handler +func (s *Server) AddJobEventHandler(name string, fn common.JobEventHandler) error { + if name == "" { + return errors.New("job event name cannot be empty") + } + + if fn == nil { + return errors.New("job event handler not supplied") + } + + s.jobEventHandlers[name] = fn + return nil +} + +// OnJobEventAlpha1 is invoked by the sidecar following a scheduled job registered in +// the scheduler +func (s *Server) OnJobEventAlpha1(ctx context.Context, in *connect.Request[runtimev1.JobEventRequest]) (*connect.Response[runtimev1.JobEventResponse], error) { + // parse the job type from the method or name + jobType, found := strings.CutPrefix(in.Msg.GetMethod(), "job/") + if !found { + if in.Msg.GetName() == "" { + return connect.NewResponse(&runtimev1.JobEventResponse{}), errors.New("unsupported invocation") + } + jobType = in.Msg.GetName() + } + + if fn, ok := s.jobEventHandlers[jobType]; ok { + e := &common.JobEvent{ + JobType: jobType, + Data: in.Msg.GetData().GetValue(), + } + if err := fn(ctx, e); err != nil { + return nil, fmt.Errorf("error executing %s binding: %w", in.Msg.GetName(), err) + } + return connect.NewResponse(&runtimev1.JobEventResponse{}), nil + } + return connect.NewResponse(&runtimev1.JobEventResponse{}), errors.New("job event handler not found") +} diff --git a/service/connectrpc/service.go b/service/connectrpc/service.go new file mode 100644 index 00000000..1b2771dd --- /dev/null +++ b/service/connectrpc/service.go @@ -0,0 +1,118 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "net/http" + "os" + "sync/atomic" + "time" + + "github.com/dapr/dapr/pkg/proto/runtime/v1/runtimeconnect" + "github.com/go-chi/chi/v5" + + "github.com/dapr/go-sdk/actor" + "github.com/dapr/go-sdk/actor/config" + "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" +) + +// NewService creates new Service. +func NewService(address string) common.Service { + return newService(address, nil) +} + +// NewServiceWithMux creates new Service with existing http mux. +func NewServiceWithMux(address string, mux *chi.Mux) common.Service { + return newService(address, mux) +} + +func newService(address string, router *chi.Mux) *Server { + if router == nil { + router = chi.NewRouter() + } + + s := &Server{ + invokeHandlers: make(map[string]common.ServiceInvocationHandler), + topicRegistrar: make(internal.TopicRegistrar), + bindingHandlers: make(map[string]common.BindingInvocationHandler), + jobEventHandlers: make(map[string]common.JobEventHandler), + authToken: os.Getenv(common.AppAPITokenEnvVar), + httpServer: &http.Server{ //nolint:gosec + Addr: address, + Handler: router, + }, + mux: router, + } + + path, handler := runtimeconnect.NewAppCallbackHandler(s) + router.Handle(path+"*", handler) + path, handler = runtimeconnect.NewAppCallbackAlphaHandler(s) + router.Handle(path+"*", handler) + path, handler = runtimeconnect.NewAppCallbackHealthCheckHandler(s) + router.Handle(path+"*", handler) + + return s +} + +// Server is the gRPC service implementation for Dapr. +type Server struct { + runtimeconnect.UnimplementedAppCallbackHandler + runtimeconnect.UnimplementedAppCallbackHealthCheckHandler + invokeHandlers map[string]common.ServiceInvocationHandler + topicRegistrar internal.TopicRegistrar + bindingHandlers map[string]common.BindingInvocationHandler + jobEventHandlers map[string]common.JobEventHandler + healthCheckHandler common.HealthCheckHandler + authToken string + started uint32 + httpServer *http.Server + mux *chi.Mux +} + +// Deprecated: Use RegisterActorImplFactoryContext instead. +func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) { + panic("Actor is not supported by gRPC API") +} + +func (s *Server) RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option) { + panic("Actor is not supported by gRPC API") +} + +// Start registers the server and starts it. +func (s *Server) Start() error { + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return errors.New("a gRPC server can only be started once") + } + return s.httpServer.ListenAndServe() +} + +// Stop stops the previously-started service. +func (s *Server) Stop() error { + if atomic.LoadUint32(&s.started) == 0 { + return nil + } + + ctxShutDown, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return s.httpServer.Shutdown(ctxShutDown) +} + +// GracefulStop stops the previously-started service gracefully. +func (s *Server) GracefulStop() error { + return s.Stop() +} diff --git a/service/connectrpc/service_test.go b/service/connectrpc/service_test.go new file mode 100644 index 00000000..f115967e --- /dev/null +++ b/service/connectrpc/service_test.go @@ -0,0 +1,56 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "errors" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStoppingUnstartedService(t *testing.T) { + s := newService("", nil) + assert.NotNil(t, s) + err := s.Stop() + require.NoError(t, err) +} + +func TestStoppingStartedService(t *testing.T) { + s := newService(":3333", nil) + assert.NotNil(t, s) + + go func() { + if err := s.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + }() + // Wait for the server to start + time.Sleep(200 * time.Millisecond) + require.NoError(t, s.Stop()) +} + +func TestStartingStoppedService(t *testing.T) { + s := newService(":3333", nil) + assert.NotNil(t, s) + stopErr := s.Stop() + require.NoError(t, stopErr) + + startErr := s.Start() + require.Error(t, startErr, "expected starting a stopped server to raise an error") + assert.Equal(t, startErr.Error(), http.ErrServerClosed.Error()) +} diff --git a/service/connectrpc/topic.go b/service/connectrpc/topic.go new file mode 100644 index 00000000..664399cd --- /dev/null +++ b/service/connectrpc/topic.go @@ -0,0 +1,186 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "mime" + "strings" + + "connectrpc.com/connect" + runtimev1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" +) + +// AddTopicEventHandler appends provided event handler with topic name to the service. +func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { + if fn == nil { + return errors.New("topic handler required") + } + + return s.AddTopicEventSubscriber(sub, fn) +} + +// AddTopicEventSubscriber appends the provided subscriber to the service. +func (s *Server) AddTopicEventSubscriber(sub *common.Subscription, subscriber common.TopicEventSubscriber) error { + if sub == nil { + return errors.New("subscription required") + } + + return s.topicRegistrar.AddSubscription(sub, subscriber) +} + +// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. +func (s *Server) ListTopicSubscriptions(ctx context.Context, in *connect.Request[emptypb.Empty]) (*connect.Response[runtimev1.ListTopicSubscriptionsResponse], error) { + subs := make([]*runtimev1.TopicSubscription, 0) + for _, v := range s.topicRegistrar { + s := v.Subscription + sub := &runtimev1.TopicSubscription{ + PubsubName: s.PubsubName, + Topic: s.Topic, + Metadata: s.Metadata, + Routes: convertRoutes(s.Routes), + DeadLetterTopic: s.DeadLetterTopic, + } + subs = append(subs, sub) + } + + return connect.NewResponse(&runtimev1.ListTopicSubscriptionsResponse{ + Subscriptions: subs, + }), nil +} + +func convertRoutes(routes *internal.TopicRoutes) *runtimev1.TopicRoutes { + if routes == nil { + return nil + } + rules := make([]*runtimev1.TopicRule, len(routes.Rules)) + for i, rule := range routes.Rules { + rules[i] = &runtimev1.TopicRule{ + Match: rule.Match, + Path: rule.Path, + } + } + return &runtimev1.TopicRoutes{ + Rules: rules, + Default: routes.Default, + } +} + +// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. +// Dapr sends published messages in a CloudEvents v1.0 envelope. +func (s *Server) OnTopicEvent(ctx context.Context, in *connect.Request[runtimev1.TopicEventRequest]) (*connect.Response[runtimev1.TopicEventResponse], error) { + if in == nil || in.Msg.GetTopic() == "" || in.Msg.GetPubsubName() == "" { + // this is really Dapr issue more than the event request format. + // since Dapr will not get updated until long after this event expires, just drop it + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_DROP}), errors.New("pub/sub and topic names required") + } + key := in.Msg.GetPubsubName() + "-" + in.Msg.GetTopic() + noValidationKey := in.Msg.GetPubsubName() + + var sub *internal.TopicRegistration + var ok bool + + sub, ok = s.topicRegistrar[key] + if !ok { + sub, ok = s.topicRegistrar[noValidationKey] + } + + if ok { + data := interface{}(in.Msg.GetData()) + if len(in.Msg.GetData()) > 0 { + mediaType, _, err := mime.ParseMediaType(in.Msg.GetDataContentType()) + if err == nil { + var v interface{} + switch mediaType { + case "application/json": + if err := json.Unmarshal(in.Msg.GetData(), &v); err == nil { + data = v + } + case "text/plain": + // Assume UTF-8 encoded string. + data = string(in.Msg.GetData()) + default: + if strings.HasPrefix(mediaType, "application/") && + strings.HasSuffix(mediaType, "+json") { + if err := json.Unmarshal(in.Msg.GetData(), &v); err == nil { + data = v + } + } + } + } + } + + e := &common.TopicEvent{ + ID: in.Msg.GetId(), + Source: in.Msg.GetSource(), + Type: in.Msg.GetType(), + SpecVersion: in.Msg.GetSpecVersion(), + DataContentType: in.Msg.GetDataContentType(), + Data: data, + RawData: in.Msg.GetData(), + Topic: in.Msg.GetTopic(), + PubsubName: in.Msg.GetPubsubName(), + Metadata: getCustomMetadataFromContext(ctx), + } + h := sub.DefaultHandler + if in.Msg.GetPath() != "" { + if pathHandler, ok := sub.RouteHandlers[in.Msg.GetPath()]; ok { + h = pathHandler + } + } + if h == nil { + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_RETRY}), fmt.Errorf( + "route %s for pub/sub and topic combination not configured: %s/%s", + in.Msg.GetPath(), in.Msg.GetPubsubName(), in.Msg.GetTopic(), + ) + } + retry, err := h.Handle(ctx, e) + if err == nil { + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_SUCCESS}), nil + } + if retry { + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_RETRY}), err + } + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_DROP}), nil + } + return connect.NewResponse(&runtimev1.TopicEventResponse{Status: runtimev1.TopicEventResponse_RETRY}), fmt.Errorf( + "pub/sub and topic combination not configured: %s/%s", + in.Msg.GetPubsubName(), in.Msg.GetTopic(), + ) +} + +func getCustomMetadataFromContext(ctx context.Context) map[string]string { + md := make(map[string]string) + meta, ok := metadata.FromIncomingContext(ctx) + if ok { + for k, v := range meta { + if strings.HasPrefix(strings.ToLower(k), "metadata.") { + md[k[9:]] = v[0] + } + } + } + return md +} + +func (s *Server) OnBulkTopicEventAlpha1(ctx context.Context, in *connect.Request[runtimev1.TopicEventBulkRequest]) (*connect.Response[runtimev1.TopicEventBulkResponse], error) { + panic("This API callback is not supported.") +} diff --git a/service/connectrpc/topic_test.go b/service/connectrpc/topic_test.go new file mode 100644 index 00000000..d71cc7f1 --- /dev/null +++ b/service/connectrpc/topic_test.go @@ -0,0 +1,333 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectrpc + +import ( + "context" + "errors" + "testing" + + "connectrpc.com/connect" + "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/dapr/go-sdk/service/common" +) + +func TestTopicErrors(t *testing.T) { + server := newService("", nil) + err := server.AddTopicEventHandler(nil, nil) + require.Errorf(t, err, "expected error on nil sub") + + sub := &common.Subscription{} + err = server.AddTopicEventHandler(sub, nil) + require.Errorf(t, err, "expected error on invalid sub") + + sub.PubsubName = "messages" + err = server.AddTopicEventHandler(sub, nil) + require.Errorf(t, err, "expected error on sub without topic") + + sub.Topic = "test" + err = server.AddTopicEventHandler(sub, nil) + require.Errorf(t, err, "expected error on sub without handler") +} + +func TestTopicSubscriptionList(t *testing.T) { + server := newService("", nil) + + // Add default route. + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + } + err := server.AddTopicEventHandler(sub1, eventHandler) + require.NoError(t, err) + resp, err := server.ListTopicSubscriptions(t.Context(), connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Lenf(t, resp.Msg.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.Msg.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + assert.Nil(t, sub.GetRoutes()) + } + + // Add routing rule. + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + } + err = server.AddTopicEventHandler(sub2, eventHandler) + require.NoError(t, err) + resp, err = server.ListTopicSubscriptions(t.Context(), connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + assert.NotNil(t, resp) + if assert.Lenf(t, resp.Msg.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.Msg.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + if assert.NotNil(t, sub.GetRoutes()) { + assert.Equal(t, "/test", sub.GetRoutes().GetDefault()) + if assert.Len(t, sub.GetRoutes().GetRules(), 1) { + rule := sub.GetRoutes().GetRules()[0] + assert.Equal(t, "/other", rule.GetPath()) + assert.Equal(t, `event.type == "other"`, rule.GetMatch()) + } + } + } +} + +// go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$ +func TestTopic(t *testing.T) { + ctx := t.Context() + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + } + server := newService("", nil) + + err := server.AddTopicEventHandler(sub, eventHandler) + require.NoError(t, err) + + t.Run("topic event without request", func(t *testing.T) { + _, err := server.OnTopicEvent(ctx, nil) + require.Error(t, err) + }) + + t.Run("topic event for wrong topic", func(t *testing.T) { + in := connect.NewRequest(&runtime.TopicEventRequest{ + Topic: "invalid", + }) + _, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + }) + + t.Run("topic event for valid topic", func(t *testing.T) { + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + }) + _, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + + t.Run("topic event for valid topic with metadata", func(t *testing.T) { + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + assert.Equal(t, "value1", e.Metadata["key1"]) + return false, nil + }) + require.NoError(t, err) + + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + }) + ctx := metadata.NewIncomingContext(t.Context(), metadata.New(map[string]string{"Metadata.key1": "value1"})) + _, err = server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) +} + +func TestTopicWithValidationDisabled(t *testing.T) { + ctx := t.Context() + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "*", + DisableTopicValidation: true, + } + server := newService("", nil) + + err := server.AddTopicEventHandler(sub, eventHandler) + require.NoError(t, err) + + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: "test", + PubsubName: sub.PubsubName, + }) + + _, err = server.OnTopicEvent(ctx, in) + require.NoError(t, err) +} + +func TestTopicWithErrors(t *testing.T) { + ctx := t.Context() + + sub1 := &common.Subscription{ + PubsubName: "messages", + Topic: "test1", + } + + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + server := newService("", nil) + + err := server.AddTopicEventHandler(sub1, eventHandlerWithRetryError) + require.NoError(t, err) + + err = server.AddTopicEventHandler(sub2, eventHandlerWithError) + require.NoError(t, err) + + t.Run("topic event for retry error", func(t *testing.T) { + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub1.Topic, + PubsubName: sub1.PubsubName, + }) + resp, err := server.OnTopicEvent(ctx, in) + require.Error(t, err) + assert.Equal(t, runtime.TopicEventResponse_RETRY, resp.Msg.GetStatus()) + }) + + t.Run("topic event for error", func(t *testing.T) { + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + }) + resp, err := server.OnTopicEvent(ctx, in) + require.NoError(t, err) + assert.Equal(t, runtime.TopicEventResponse_DROP, resp.Msg.GetStatus()) + }) +} + +func eventHandler(ctx context.Context, event *common.TopicEvent) (retry bool, err error) { + if event == nil { + return true, errors.New("nil event") + } + return false, nil +} + +func eventHandlerWithRetryError(ctx context.Context, event *common.TopicEvent) (retry bool, err error) { + return true, errors.New("nil event") +} + +func eventHandlerWithError(ctx context.Context, event *common.TopicEvent) (retry bool, err error) { + return false, errors.New("nil event") +} + +func TestEventDataHandling(t *testing.T) { + ctx := t.Context() + + tests := map[string]struct { + contentType string + data string + value interface{} + }{ + "JSON bytes": { + contentType: "application/json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "JSON entension media type bytes": { + contentType: "application/extension+json", + data: `{"message":"hello"}`, + value: map[string]interface{}{ + "message": "hello", + }, + }, + "Test": { + contentType: "text/plain", + data: `message = hello`, + value: `message = hello`, + }, + "Other": { + contentType: "application/octet-stream", + data: `message = hello`, + value: []byte(`message = hello`), + }, + } + + s := newService("", nil) + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } + + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} + + return false, nil + } + err := s.AddTopicEventHandler(sub, handler) + require.NoErrorf(t, err, "error adding event handler") + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + in := connect.NewRequest(&runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: tt.contentType, + Data: []byte(tt.data), + Topic: sub.Topic, + PubsubName: sub.PubsubName, + }) + + s.OnTopicEvent(ctx, in) + <-recv + assert.Equal(t, tt.value, topicEvent.Data) + }) + } +}