diff --git a/engine/cld/changeset/common.go b/engine/cld/changeset/common.go index 0a336b34..befe3144 100644 --- a/engine/cld/changeset/common.go +++ b/engine/cld/changeset/common.go @@ -32,6 +32,7 @@ type Configurations struct { type internalChangeSet interface { noop() // unexported function to prevent arbitrary structs from implementing ChangeSet. Apply(env fdeployment.Environment) (fdeployment.ChangesetOutput, error) + applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) Configurations() (Configurations, error) } @@ -77,6 +78,35 @@ type TypedJSON struct { ChainOverrides []uint64 `json:"chainOverrides"` // Optional field for chain overrides } +func parseTypedInput(inputStr string) (TypedJSON, error) { + if inputStr == "" { + return TypedJSON{}, errors.New("input is empty") + } + + var inputObject TypedJSON + if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil { + return TypedJSON{}, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err) + } + if len(inputObject.Payload) == 0 { + return TypedJSON{}, errors.New("'payload' field is required") + } + + return inputObject, nil +} + +func decodePayload[C any](payload json.RawMessage) (C, error) { + var config C + + payloadDecoder := json.NewDecoder(strings.NewReader(string(payload))) + payloadDecoder.UseNumber() + payloadDecoder.DisallowUnknownFields() + if err := payloadDecoder.Decode(&config); err != nil { + return config, fmt.Errorf("failed to unmarshal payload: %w", err) + } + + return config, nil +} + // WithJSON returns a fully configured changeset, which pairs a [fdeployment.ChangeSet] with its configuration based // a JSON input. It also allows extensions, such as a PostProcessing function. // InputStr must be a JSON object with a "payload" field that contains the actual input data for a Durable Pipeline. @@ -92,31 +122,13 @@ type TypedJSON struct { // Note: Prefer WithEnvInput for durable_pipelines.go func (f WrappedChangeSet[C]) WithJSON(_ C, inputStr string) ConfiguredChangeSet { return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) { - var config C - - if inputStr == "" { - return config, errors.New("input is empty") - } - - var inputObject TypedJSON - if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil { - return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err) - } - - // If payload is null, decode it as null (which will give zero value) - // If payload is missing, return an error - if len(inputObject.Payload) == 0 { - return config, errors.New("'payload' field is required") - } - - payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload))) - payloadDecoder.UseNumber() - payloadDecoder.DisallowUnknownFields() - if err := payloadDecoder.Decode(&config); err != nil { - return config, fmt.Errorf("failed to unmarshal payload: %w", err) + inputObject, err := parseTypedInput(inputStr) + if err != nil { + var zero C + return zero, err } - return config, nil + return decodePayload[C](inputObject.Payload) }, inputChainOverrides: func() ([]uint64, error) { return loadInputChainOverrides(inputStr) @@ -152,42 +164,35 @@ func (f WrappedChangeSet[C]) WithEnvInput(opts ...EnvInputOption[C]) ConfiguredC inputStr := os.Getenv("DURABLE_PIPELINE_INPUT") - return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) { - var config C - - if inputStr == "" { - return config, errors.New("input is empty") - } - - var inputObject TypedJSON - if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil { - return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err) - } + providerFromInput := func(rawInput string) (C, error) { + var zero C - // If payload is null, decode it as null (which will give zero value) - // If payload is missing, return an error - if len(inputObject.Payload) == 0 { - return config, errors.New("'payload' field is required") + inputObject, err := parseTypedInput(rawInput) + if err != nil { + return zero, err } - - payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload))) - payloadDecoder.UseNumber() - payloadDecoder.DisallowUnknownFields() - if err := payloadDecoder.Decode(&config); err != nil { - return config, fmt.Errorf("failed to unmarshal payload: %w", err) + config, err := decodePayload[C](inputObject.Payload) + if err != nil { + return zero, err } if options.inputModifier != nil { - conf, err := options.inputModifier(config) - if err != nil { - return conf, fmt.Errorf("failed to apply input modifier: %w", err) + conf, modifierErr := options.inputModifier(config) + if modifierErr != nil { + return conf, fmt.Errorf("failed to apply input modifier: %w", modifierErr) } return conf, nil } return config, nil - }, + } + + return ChangeSetImpl[C]{changeset: f, + configProvider: func() (C, error) { + return providerFromInput(inputStr) + }, + configProviderWithInput: providerFromInput, inputChainOverrides: func() ([]uint64, error) { return loadInputChainOverrides(inputStr) }, @@ -223,25 +228,14 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv // Read input from environment variable inputStr := os.Getenv("DURABLE_PIPELINE_INPUT") - configProvider := func() (C, error) { + configProviderFromInput := func(rawInput string) (C, error) { var zero C - if inputStr == "" { - return zero, errors.New("input is empty") - } - - // Parse JSON input - var inputObject TypedJSON - if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil { + inputObject, err := parseTypedInput(rawInput) + if err != nil { return zero, fmt.Errorf("failed to parse resolver input as JSON: %w", err) } - // If payload is null, pass it to the resolver (which will receive null) - // If payload field is missing, return an error - if len(inputObject.Payload) == 0 { - return zero, errors.New("'payload' field is required") - } - // Call resolver – automatically unmarshal into its expected input type. typedConfig, err := fresolvers.CallResolver[C](resolver, inputObject.Payload) if err != nil { @@ -251,8 +245,12 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv return typedConfig, nil } - return ChangeSetImpl[C]{changeset: f, configProvider: configProvider, - ConfigResolver: resolver, + return ChangeSetImpl[C]{changeset: f, + configProvider: func() (C, error) { + return configProviderFromInput(inputStr) + }, + configProviderWithInput: configProviderFromInput, + ConfigResolver: resolver, inputChainOverrides: func() ([]uint64, error) { return loadInputChainOverrides(inputStr) }, @@ -262,9 +260,10 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv var _ ConfiguredChangeSet = ChangeSetImpl[any]{} type ChangeSetImpl[C any] struct { - changeset WrappedChangeSet[C] - configProvider func() (C, error) - inputChainOverrides func() ([]uint64, error) + changeset WrappedChangeSet[C] + configProvider func() (C, error) + configProviderWithInput func(inputStr string) (C, error) + inputChainOverrides func() ([]uint64, error) // Present only when the changeset was wired with // Configure(...).WithConfigResolver(...) @@ -289,6 +288,25 @@ func (ccs ChangeSetImpl[C]) Apply(env fdeployment.Environment) (fdeployment.Chan return ccs.changeset.operation.Apply(env, c) } +func (ccs ChangeSetImpl[C]) applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) { + if inputStr == "" { + return ccs.Apply(env) + } + if ccs.configProviderWithInput == nil { + return ccs.Apply(env) + } + + c, err := ccs.configProviderWithInput(inputStr) + if err != nil { + return fdeployment.ChangesetOutput{}, err + } + if err := ccs.changeset.operation.VerifyPreconditions(env, c); err != nil { + return fdeployment.ChangesetOutput{}, err + } + + return ccs.changeset.operation.Apply(env, c) +} + func (ccs ChangeSetImpl[C]) Configurations() (Configurations, error) { var chainOverrides []uint64 var err error diff --git a/engine/cld/changeset/postprocess.go b/engine/cld/changeset/postprocess.go index 5119ea4d..08a21440 100644 --- a/engine/cld/changeset/postprocess.go +++ b/engine/cld/changeset/postprocess.go @@ -35,6 +35,18 @@ func (ccs PostProcessingChangeSetImpl[C]) Apply(env fdeployment.Environment) (fd return ccs.postProcessor(env, output) } +func (ccs PostProcessingChangeSetImpl[C]) applyWithInput( + env fdeployment.Environment, inputStr string, +) (fdeployment.ChangesetOutput, error) { + env.Logger.Debugf("Post-processing ChangesetOutput from %T", ccs.changeset.changeset.operation) + output, err := ccs.changeset.applyWithInput(env, inputStr) + if err != nil { + return output, err + } + + return ccs.postProcessor(env, output) +} + func (ccs PostProcessingChangeSetImpl[C]) Configurations() (Configurations, error) { return ccs.changeset.Configurations() } diff --git a/engine/cld/changeset/registry.go b/engine/cld/changeset/registry.go index 62ffd131..d387cd6c 100644 --- a/engine/cld/changeset/registry.go +++ b/engine/cld/changeset/registry.go @@ -172,6 +172,19 @@ func (r *ChangesetsRegistry) AddGlobalPostHooks(hooks ...PostHook) { // a failed Apply are logged but never mask the Apply error. func (r *ChangesetsRegistry) Apply( key string, e fdeployment.Environment, +) (fdeployment.ChangesetOutput, error) { + return r.applyWithInput(key, e, "") +} + +// ApplyWithInput applies a changeset with explicit input string for this apply invocation. +func (r *ChangesetsRegistry) ApplyWithInput( + key string, e fdeployment.Environment, inputStr string, +) (fdeployment.ChangesetOutput, error) { + return r.applyWithInput(key, e, inputStr) +} + +func (r *ChangesetsRegistry) applyWithInput( + key string, e fdeployment.Environment, inputStr string, ) (fdeployment.ChangesetOutput, error) { entry, globalPre, globalPost, err := r.getApplySnapshot(key) if err != nil { @@ -204,7 +217,9 @@ func (r *ChangesetsRegistry) Apply( } } - output, applyErr := entry.changeset.Apply(e) + var output fdeployment.ChangesetOutput + var applyErr error + output, applyErr = entry.changeset.applyWithInput(e, inputStr) postParams := PostHookParams{ Env: hookEnv, diff --git a/engine/cld/changeset/registry_test.go b/engine/cld/changeset/registry_test.go index 95c7e871..d3279db9 100644 --- a/engine/cld/changeset/registry_test.go +++ b/engine/cld/changeset/registry_test.go @@ -25,6 +25,10 @@ func (noopChangeset) Apply(e fdeployment.Environment) (fdeployment.ChangesetOutp return fdeployment.ChangesetOutput{}, nil } +func (n noopChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) { + return n.Apply(e) +} + func (n noopChangeset) Configurations() (Configurations, error) { return Configurations{ InputChainOverrides: n.chainOverrides, @@ -45,6 +49,10 @@ func (r *recordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.Chang return r.output, r.err } +func (r *recordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) { + return r.Apply(e) +} + func (*recordingChangeset) Configurations() (Configurations, error) { return Configurations{}, nil } @@ -61,6 +69,10 @@ func (o *orderRecordingChangeset) Apply(_ fdeployment.Environment) (fdeployment. return fdeployment.ChangesetOutput{}, nil } +func (o *orderRecordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) { + return o.Apply(e) +} + func (*orderRecordingChangeset) Configurations() (Configurations, error) { return Configurations{}, nil } @@ -148,6 +160,65 @@ func Test_Changesets_Apply(t *testing.T) { } } +//nolint:paralleltest +func Test_Changesets_ApplyWithInput_WithEnvConfiguredChangeset(t *testing.T) { + type inputConfig struct { + Value int `json:"value"` + } + + t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"value":999}}`) + + var received int + cs := fdeployment.CreateChangeSet( + func(_ fdeployment.Environment, cfg inputConfig) (fdeployment.ChangesetOutput, error) { + received = cfg.Value + return fdeployment.ChangesetOutput{}, nil + }, + func(_ fdeployment.Environment, _ inputConfig) error { return nil }, + ) + + r := NewChangesetsRegistry() + r.Add("0001_test", Configure(cs).WithEnvInput()) + + // overrides the input set by the env var + _, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"value":1}}`) + require.NoError(t, err) + require.Equal(t, 1, received) +} + +//nolint:paralleltest // Uses process environment for fallback behavior assertions. +func Test_Changesets_ApplyWithInput_WithResolverConfiguredChangeset(t *testing.T) { + type resolverInput struct { + Base int `json:"base"` + } + type resolverOutput struct { + Value int `json:"value"` + } + + t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"base":100}}`) + + resolver := func(input resolverInput) (resolverOutput, error) { + return resolverOutput{Value: input.Base + 10}, nil + } + + var received int + cs := fdeployment.CreateChangeSet( + func(_ fdeployment.Environment, cfg resolverOutput) (fdeployment.ChangesetOutput, error) { + received = cfg.Value + return fdeployment.ChangesetOutput{}, nil + }, + func(_ fdeployment.Environment, _ resolverOutput) error { return nil }, + ) + + r := NewChangesetsRegistry() + r.Add("0001_test", Configure(cs).WithConfigResolver(resolver)) + + // overrides the input set by the env var + _, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"base":7}}`) + require.NoError(t, err) + require.Equal(t, 17, received) +} + func Test_Changesets_Add(t *testing.T) { t.Parallel()