Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type (
HealthCheck *HealthCheckConfig `yaml:"healthCheck"`
NamespaceNameTranslation NamespaceNameTranslationConfig `yaml:"namespaceNameTranslation"`
Metrics *MetricsConfig `yaml:"metrics"`
FilterNsData bool `yaml:"filterNsData"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EnableUnsafeNsData -- My thought here is to flip the boolean around so that the default (if not configured) is that we do the filtering.

}

NamespaceNameTranslationConfig struct {
Expand Down
104 changes: 98 additions & 6 deletions proxy/adminservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,34 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"

"github.com/gogo/status"
"github.com/temporalio/s2s-proxy/client"
adminclient "github.com/temporalio/s2s-proxy/client/admin"
"github.com/temporalio/s2s-proxy/common"
"github.com/temporalio/s2s-proxy/config"
"github.com/temporalio/s2s-proxy/transport"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/adminservice/v1"
repication "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client/history"
"go.temporal.io/server/common/channel"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)

type (
adminServiceProxyServer struct {
adminservice.UnimplementedAdminServiceServer
adminClient adminservice.AdminServiceClient
logger log.Logger
adminClient adminservice.AdminServiceClient
remoteAdminClient adminservice.AdminServiceClient
logger log.Logger
proxyOptions
}
)
Expand All @@ -39,10 +45,27 @@ func NewAdminServiceProxyServer(
) adminservice.AdminServiceServer {
logger = log.With(logger, common.ServiceTag(serviceName))
clientProvider := client.NewClientProvider(clientConfig, clientFactory, logger)

var remoteClientProvider client.ClientProvider
Copy link
Collaborator Author

@pglass pglass May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually a "local" client provider (not remote)

if opts.IsInbound {
// not used
remoteClientProvider = client.NewClientProvider(opts.Config.Outbound.Client, clientFactory, logger)
} else {
// HACK: This doesn't work for mux transport.
mgr := &transport.TransportManager{}
tp, err := mgr.OpenClient(opts.Config.Inbound.Client)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an outbound server, we need the client for the inbound direction. Need to pass the transport manager in here (or maybe pass in just the client itself).

if err != nil {
panic(err)
}
remoteClientFactory := client.NewClientFactory(tp, logger)
remoteClientProvider = client.NewClientProvider(opts.Config.Inbound.Client, remoteClientFactory, logger)
}

return &adminServiceProxyServer{
adminClient: adminclient.NewLazyClient(clientProvider),
logger: logger,
proxyOptions: opts,
adminClient: adminclient.NewLazyClient(clientProvider),
remoteAdminClient: adminclient.NewLazyClient(remoteClientProvider),
logger: logger,
proxyOptions: opts,
}
}

Expand Down Expand Up @@ -134,7 +157,76 @@ func (s *adminServiceProxyServer) GetNamespace(ctx context.Context, in0 *adminse
}

func (s *adminServiceProxyServer) GetNamespaceReplicationMessages(ctx context.Context, in0 *adminservice.GetNamespaceReplicationMessagesRequest) (*adminservice.GetNamespaceReplicationMessagesResponse, error) {
return s.adminClient.GetNamespaceReplicationMessages(ctx, in0)
resp, err := s.adminClient.GetNamespaceReplicationMessages(ctx, in0)
if err != nil {
return resp, err
}

if !s.Config.FilterNsData {
return resp, err
}

for _, r := range resp.Messages.ReplicationTasks {
if r == nil {
continue
}

nsTask, ok := r.Attributes.(*repication.ReplicationTask_NamespaceTaskAttributes)
if !ok || nsTask == nil {
continue
}

attrs := nsTask.NamespaceTaskAttributes
if attrs == nil || attrs.Info == nil {
continue
}

data := attrs.Info.Data
if s.IsInbound {
// Inbound request means Outbound response. Drop the __temporal fields.
for k := range data {
if strings.HasPrefix(k, "__temporal") {
delete(data, k)
}
}
} else {
// Outbound request means Inbound response. Add the __temporal back.
// First, find them on the target cluster.
nsResp, err := s.remoteAdminClient.GetNamespace(ctx, &adminservice.GetNamespaceRequest{
Attributes: &adminservice.GetNamespaceRequest_Id{
Id: attrs.Id,
},
})
if err != nil {
s.logger.Error("GetNamespace from local cluster failed",
tag.NewStringTag("namespace-id", attrs.Id),
tag.NewErrorTag("error", err),
)
// hopefully, this is retried.
return nil, status.Errorf(codes.Unavailable, "s2s-proxy error")
}

// TODO: Set ConfigVersion? So that the server ignores this task if the ns
// changes before this replication task arrives.
//
// Example:
// nsResp - Version = 1
// something ns update - Version = 2
// attrs - Version = 3
//
// In this case, we copy fields outdated fields from nsResp (Version 1) to the nsTask (Version 3).
// We could protect against this by setting attrs.ConfigVersion = attrs.ConfigVersion + 1.
//
// Copy the __temporal fields from the target cluster.
s.logger.Info("Adding __temporal to namespace replication task")
for k, v := range nsResp.Info.Data {
if strings.HasPrefix(k, "__temporal") {
data[k] = v
}
}
}
}
return resp, err
}

func (s *adminServiceProxyServer) GetReplicationMessages(ctx context.Context, in0 *adminservice.GetReplicationMessagesRequest) (*adminservice.GetReplicationMessagesResponse, error) {
Expand Down
Loading