Skip to content

Commit e3b084e

Browse files
committed
add functional tests for history_node cleanup
1 parent f9b2f6d commit e3b084e

File tree

1 file changed

+269
-0
lines changed

1 file changed

+269
-0
lines changed

tests/history_node_cleanup_test.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package tests
2+
3+
// This file contains two functional tests to make sure that history_tree and
4+
// history_node rows are cleaned up correctly after a workflow deletion.
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
commandpb "go.temporal.io/api/command/v1"
13+
commonpb "go.temporal.io/api/common/v1"
14+
enumspb "go.temporal.io/api/enums/v1"
15+
"go.temporal.io/api/workflowservice/v1"
16+
"go.temporal.io/server/api/adminservice/v1"
17+
"go.temporal.io/server/chasm"
18+
"go.temporal.io/server/common"
19+
"go.temporal.io/server/common/persistence"
20+
"go.temporal.io/server/common/persistence/versionhistory"
21+
"go.temporal.io/server/common/testing/taskpoller"
22+
"go.temporal.io/server/common/testing/testvars"
23+
"go.temporal.io/server/tests/testcore"
24+
"google.golang.org/protobuf/types/known/durationpb"
25+
)
26+
27+
// TestDeletionOfSingleWorkflow runs a single workflow, force-deletes it via the
28+
// admin API, then asserts that all history_tree and history_node rows are removed.
29+
func TestDeletionOfSingleWorkflow(t *testing.T) {
30+
t.Parallel()
31+
env := testcore.NewEnv(t)
32+
tv := testvars.New(t)
33+
ctx := env.Context()
34+
35+
shardID := common.WorkflowIDToHistoryShard(
36+
env.NamespaceID().String(),
37+
tv.WorkflowID(),
38+
env.GetTestClusterConfig().HistoryConfig.NumHistoryShards,
39+
)
40+
execMgr := env.GetTestCluster().TestBase().ExecutionManager
41+
poller := taskpoller.New(t, env.FrontendClient(), env.Namespace().String())
42+
43+
startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
44+
RequestId: uuid.NewString(),
45+
Namespace: env.Namespace().String(),
46+
WorkflowId: tv.WorkflowID(),
47+
WorkflowType: tv.WorkflowType(),
48+
TaskQueue: tv.TaskQueue(),
49+
})
50+
env.NoError(err)
51+
runID := startResp.RunId
52+
53+
completeWorkflowWithActivities(env, tv, poller)
54+
55+
branchToken := captureCurrentBranchToken(env, ctx, tv.WorkflowID(), runID)
56+
57+
// The admin force-delete and the DeleteHistoryEventTask retention timer both
58+
// reach the same persistence.ExecutionManager.DeleteHistoryBranch call, which
59+
// is the operation that removes history_tree and history_node rows.
60+
_, err = env.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
61+
Namespace: env.Namespace().String(),
62+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runID},
63+
Archetype: chasm.WorkflowArchetype,
64+
})
65+
env.NoError(err)
66+
waitForMutableStateGone(env, ctx, shardID, execMgr, tv.WorkflowID(), runID)
67+
68+
resp, err := execMgr.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
69+
ShardID: shardID,
70+
BranchToken: branchToken,
71+
MinEventID: common.FirstEventID,
72+
MaxEventID: common.EndEventID,
73+
PageSize: 1000,
74+
})
75+
if err == nil {
76+
env.Empty(resp.HistoryEvents, "history_node rows should be gone after deletion")
77+
}
78+
// A NotFound/InvalidArgument error is also acceptable — it means the branch is gone.
79+
}
80+
81+
// TestDeletionOfWorkflowAfterReset runs a workflow, resets it to create a new
82+
// run, force-deletes both runs via the admin API, then asserts that no
83+
// history_node rows remain for either branch.
84+
func TestDeletionOfWorkflowAfterReset(t *testing.T) {
85+
t.Parallel()
86+
env := testcore.NewEnv(t)
87+
tv := testvars.New(t)
88+
ctx := env.Context()
89+
90+
shardID := common.WorkflowIDToHistoryShard(
91+
env.NamespaceID().String(),
92+
tv.WorkflowID(),
93+
env.GetTestClusterConfig().HistoryConfig.NumHistoryShards,
94+
)
95+
execMgr := env.GetTestCluster().TestBase().ExecutionManager
96+
poller := taskpoller.New(t, env.FrontendClient(), env.Namespace().String())
97+
98+
// ── Step 1: start and complete run A ─────────────────────────────────────
99+
startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
100+
RequestId: uuid.NewString(),
101+
Namespace: env.Namespace().String(),
102+
WorkflowId: tv.WorkflowID(),
103+
WorkflowType: tv.WorkflowType(),
104+
TaskQueue: tv.TaskQueue(),
105+
})
106+
env.NoError(err)
107+
runIDA := startResp.RunId
108+
109+
completeWorkflowWithActivities(env, tv, poller)
110+
111+
branchTokenA := captureCurrentBranchToken(env, ctx, tv.WorkflowID(), runIDA)
112+
113+
// Find the first WorkflowTaskCompleted event to use as the reset point.
114+
// B inherits A's opening events and forks from there.
115+
var resetEventID int64
116+
var histPageToken []byte
117+
resetSearch:
118+
for {
119+
histResp, err := env.FrontendClient().GetWorkflowExecutionHistory(ctx, &workflowservice.GetWorkflowExecutionHistoryRequest{
120+
Namespace: env.Namespace().String(),
121+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
122+
NextPageToken: histPageToken,
123+
MaximumPageSize: 100,
124+
})
125+
env.NoError(err)
126+
for _, event := range histResp.GetHistory().GetEvents() {
127+
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
128+
resetEventID = event.EventId
129+
break resetSearch
130+
}
131+
}
132+
histPageToken = histResp.GetNextPageToken()
133+
if len(histPageToken) == 0 {
134+
break
135+
}
136+
}
137+
env.NotZero(resetEventID)
138+
139+
// ── Step 2: reset A → run B ───────────────────────────────────────────────
140+
resetResp, err := env.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
141+
Namespace: env.Namespace().String(),
142+
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
143+
Reason: "test",
144+
RequestId: uuid.NewString(),
145+
WorkflowTaskFinishEventId: resetEventID,
146+
})
147+
env.NoError(err)
148+
runIDB := resetResp.RunId
149+
150+
tvB := tv.WithRunID(runIDB)
151+
completeWorkflowWithActivities(env, tvB, poller)
152+
153+
branchTokenB := captureCurrentBranchToken(env, ctx, tv.WorkflowID(), runIDB)
154+
155+
// ── Step 3: force-delete run A ────────────────────────────────────────────
156+
// Both the admin force-delete and the DeleteHistoryEventTask retention timer
157+
// ultimately call persistence.ExecutionManager.DeleteHistoryBranch, the same
158+
// operation that removes history_tree and history_node rows.
159+
_, err = env.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
160+
Namespace: env.Namespace().String(),
161+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDA},
162+
Archetype: chasm.WorkflowArchetype,
163+
})
164+
env.NoError(err)
165+
waitForMutableStateGone(env, ctx, shardID, execMgr, tv.WorkflowID(), runIDA)
166+
167+
// ── Step 4: force-delete run B ────────────────────────────────────────────
168+
_, err = env.AdminClient().DeleteWorkflowExecution(ctx, &adminservice.DeleteWorkflowExecutionRequest{
169+
Namespace: env.Namespace().String(),
170+
Execution: &commonpb.WorkflowExecution{WorkflowId: tv.WorkflowID(), RunId: runIDB},
171+
Archetype: chasm.WorkflowArchetype,
172+
})
173+
env.NoError(err)
174+
waitForMutableStateGone(env, ctx, shardID, execMgr, tv.WorkflowID(), runIDB)
175+
176+
// ── Assertions ────────────────────────────────────────────────────────────
177+
for _, tc := range []struct {
178+
label string
179+
token []byte
180+
}{
181+
{"run A (original)", branchTokenA},
182+
{"run B (reset)", branchTokenB},
183+
} {
184+
resp, err := execMgr.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
185+
ShardID: shardID,
186+
BranchToken: tc.token,
187+
MinEventID: common.FirstEventID,
188+
MaxEventID: common.EndEventID,
189+
PageSize: 1000,
190+
})
191+
if err == nil {
192+
env.Empty(resp.HistoryEvents,
193+
"history_node rows for %s should be gone after deletion", tc.label)
194+
}
195+
// A NotFound/InvalidArgument error is acceptable — it means the branch is gone.
196+
}
197+
}
198+
199+
// completeWorkflowWithActivities drives a workflow through a single activity then completes it.
200+
func completeWorkflowWithActivities(
201+
env *testcore.TestEnv,
202+
tv *testvars.TestVars,
203+
poller *taskpoller.TaskPoller,
204+
) {
205+
activityScheduled := false
206+
wtHandler := func(_ *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
207+
if !activityScheduled {
208+
activityScheduled = true
209+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
210+
Commands: []*commandpb.Command{{
211+
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
212+
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
213+
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
214+
ActivityId: "act",
215+
ActivityType: tv.ActivityType(),
216+
TaskQueue: tv.TaskQueue(),
217+
ScheduleToCloseTimeout: durationpb.New(30 * time.Second),
218+
StartToCloseTimeout: durationpb.New(10 * time.Second),
219+
},
220+
},
221+
}},
222+
}, nil
223+
}
224+
return &workflowservice.RespondWorkflowTaskCompletedRequest{
225+
Commands: []*commandpb.Command{{
226+
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
227+
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}},
228+
}},
229+
}, nil
230+
}
231+
232+
_, err := poller.PollAndHandleWorkflowTask(tv, wtHandler)
233+
env.NoError(err)
234+
_, err = poller.PollAndHandleActivityTask(tv, taskpoller.CompleteActivityTask(tv))
235+
env.NoError(err)
236+
_, err = poller.PollAndHandleWorkflowTask(tv, wtHandler)
237+
env.NoError(err)
238+
}
239+
240+
// captureCurrentBranchToken extracts the current branch token from a workflow's mutable state.
241+
func captureCurrentBranchToken(env *testcore.TestEnv, ctx context.Context, workflowID, runID string) []byte {
242+
descResp, err := env.AdminClient().DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
243+
Namespace: env.Namespace().String(),
244+
Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID, RunId: runID},
245+
Archetype: chasm.WorkflowArchetype,
246+
})
247+
env.NoError(err)
248+
vh := descResp.GetDatabaseMutableState().GetExecutionInfo().GetVersionHistories()
249+
currentVH, err := versionhistory.GetCurrentVersionHistory(vh)
250+
env.NoError(err)
251+
token := currentVH.GetBranchToken()
252+
env.NotEmpty(token)
253+
return token
254+
}
255+
256+
// waitForMutableStateGone polls until GetWorkflowExecution returns NotFound for the given runID.
257+
func waitForMutableStateGone(env *testcore.TestEnv, ctx context.Context, shardID int32, execMgr persistence.ExecutionManager, workflowID, runID string) {
258+
env.Eventually(func() bool {
259+
_, err := execMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
260+
ShardID: shardID,
261+
NamespaceID: env.NamespaceID().String(),
262+
WorkflowID: workflowID,
263+
RunID: runID,
264+
ArchetypeID: chasm.WorkflowArchetypeID,
265+
})
266+
return common.IsNotFoundError(err)
267+
}, 10*time.Second, 100*time.Millisecond,
268+
"timed out waiting for mutable state of run %s to be deleted", runID)
269+
}

0 commit comments

Comments
 (0)