-
Notifications
You must be signed in to change notification settings - Fork 5
POC: Filter namespace data #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,28 +4,34 @@ import ( | |
"context" | ||
"fmt" | ||
"io" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/gogo/status" | ||
"github.com/temporalio/s2s-proxy/client" | ||
adminclient "github.com/temporalio/s2s-proxy/client/admin" | ||
"github.com/temporalio/s2s-proxy/common" | ||
"github.com/temporalio/s2s-proxy/config" | ||
"github.com/temporalio/s2s-proxy/transport" | ||
|
||
"go.temporal.io/api/serviceerror" | ||
"go.temporal.io/server/api/adminservice/v1" | ||
repication "go.temporal.io/server/api/replication/v1" | ||
"go.temporal.io/server/client/history" | ||
"go.temporal.io/server/common/channel" | ||
"go.temporal.io/server/common/headers" | ||
"go.temporal.io/server/common/log" | ||
"go.temporal.io/server/common/log/tag" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/metadata" | ||
) | ||
|
||
type ( | ||
adminServiceProxyServer struct { | ||
adminservice.UnimplementedAdminServiceServer | ||
adminClient adminservice.AdminServiceClient | ||
logger log.Logger | ||
adminClient adminservice.AdminServiceClient | ||
remoteAdminClient adminservice.AdminServiceClient | ||
logger log.Logger | ||
proxyOptions | ||
} | ||
) | ||
|
@@ -39,10 +45,27 @@ func NewAdminServiceProxyServer( | |
) adminservice.AdminServiceServer { | ||
logger = log.With(logger, common.ServiceTag(serviceName)) | ||
clientProvider := client.NewClientProvider(clientConfig, clientFactory, logger) | ||
|
||
var remoteClientProvider client.ClientProvider | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is actually a "local" client provider (not remote) |
||
if opts.IsInbound { | ||
// not used | ||
remoteClientProvider = client.NewClientProvider(opts.Config.Outbound.Client, clientFactory, logger) | ||
} else { | ||
// HACK: This doesn't work for mux transport. | ||
mgr := &transport.TransportManager{} | ||
tp, err := mgr.OpenClient(opts.Config.Inbound.Client) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For an outbound server, we need the client for the inbound direction. Need to pass the transport manager in here (or maybe pass in just the client itself). |
||
if err != nil { | ||
panic(err) | ||
} | ||
remoteClientFactory := client.NewClientFactory(tp, logger) | ||
remoteClientProvider = client.NewClientProvider(opts.Config.Inbound.Client, remoteClientFactory, logger) | ||
} | ||
|
||
return &adminServiceProxyServer{ | ||
adminClient: adminclient.NewLazyClient(clientProvider), | ||
logger: logger, | ||
proxyOptions: opts, | ||
adminClient: adminclient.NewLazyClient(clientProvider), | ||
remoteAdminClient: adminclient.NewLazyClient(remoteClientProvider), | ||
logger: logger, | ||
proxyOptions: opts, | ||
} | ||
} | ||
|
||
|
@@ -134,7 +157,76 @@ func (s *adminServiceProxyServer) GetNamespace(ctx context.Context, in0 *adminse | |
} | ||
|
||
func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Context, in0 *adminservice.GetNamespaceReplicationMessagesRequest) (*adminservice.GetNamespaceReplicationMessagesResponse, error) { | ||
return s.adminClient.GetNamespaceReplicationMessages(ctx, in0) | ||
resp, err := s.adminClient.GetNamespaceReplicationMessages(ctx, in0) | ||
if err != nil { | ||
return resp, err | ||
} | ||
|
||
if !s.Config.FilterNsData { | ||
return resp, err | ||
} | ||
|
||
for _, r := range resp.Messages.ReplicationTasks { | ||
if r == nil { | ||
continue | ||
} | ||
|
||
nsTask, ok := r.Attributes.(*repication.ReplicationTask_NamespaceTaskAttributes) | ||
if !ok || nsTask == nil { | ||
continue | ||
} | ||
|
||
attrs := nsTask.NamespaceTaskAttributes | ||
if attrs == nil || attrs.Info == nil { | ||
continue | ||
} | ||
|
||
data := attrs.Info.Data | ||
if s.IsInbound { | ||
// Inbound request means Outbound response. Drop the __temporal fields. | ||
for k := range data { | ||
if strings.HasPrefix(k, "__temporal") { | ||
delete(data, k) | ||
} | ||
} | ||
} else { | ||
// Outbound request means Inbound response. Add the __temporal back. | ||
// First, find them on the target cluster. | ||
nsResp, err := s.remoteAdminClient.GetNamespace(ctx, &adminservice.GetNamespaceRequest{ | ||
Attributes: &adminservice.GetNamespaceRequest_Id{ | ||
Id: attrs.Id, | ||
}, | ||
}) | ||
if err != nil { | ||
s.logger.Error("GetNamespace from local cluster failed", | ||
tag.NewStringTag("namespace-id", attrs.Id), | ||
tag.NewErrorTag("error", err), | ||
) | ||
// hopefully, this is retried. | ||
return nil, status.Errorf(codes.Unavailable, "s2s-proxy error") | ||
} | ||
|
||
// TODO: Set ConfigVersion? So that the server ignores this task if the ns | ||
// changes before this replication task arrives. | ||
// | ||
// Example: | ||
// nsResp - Version = 1 | ||
// something ns update - Version = 2 | ||
// attrs - Version = 3 | ||
// | ||
// In this case, we copy fields outdated fields from nsResp (Version 1) to the nsTask (Version 3). | ||
// We could protect against this by setting attrs.ConfigVersion = attrs.ConfigVersion + 1. | ||
// | ||
// Copy the __temporal fields from the target cluster. | ||
s.logger.Info("Adding __temporal to namespace replication task") | ||
for k, v := range nsResp.Info.Data { | ||
if strings.HasPrefix(k, "__temporal") { | ||
data[k] = v | ||
} | ||
} | ||
} | ||
} | ||
return resp, err | ||
} | ||
|
||
func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (*adminservice.GetReplicationMessagesResponse, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EnableUnsafeNsData
-- My thought here is to flip the boolean around so that the default (if not configured) is that we do the filtering.