Skip to content

Commit b3e1889

Browse files
craig[bot]mw5harulajmani
committed
155192: backfill: split merge memory accounting mutex acquisitions r=mw5h a=mw5h Previously, constructMergeBatch() would get the mutex on memory accounting once per batch to minimize the number of atomics needed to perform memory accounting. With the addition of vector indexing, we now need to perform KV operations in order to construct batches, which can lead to deadlocks between KV and the memory accounting mutex. This patch splits the mutex acquisition into two acquisitions, releasing the mutex in between. Memory used is accumulated as merged entries are built and then accounting is notified for the entire batch. This has the effect of making the accounting lag behind allocation a bit, but that seems preferable than paying the cost of a mutex acquisition for every row merged. Fixes: #155190 Release note (bug fix): A potential deadlock during vector index creation has been corrected. 155284: backfill: add a separate control for number of vectors merged per batch r=mw5h a=mw5h Add the 'bulkio.index_backfill.vector_merge_batch_size', which controls the number of rows to merge into vector indexes per transaction while the index is being created. This is analogous to the 'bulkio.index_backfill.merge_batch_size' setting, but it only applies when the target index is a vector index. By default, 3 vectors will be merged per transaction to reduce contention with fixup tasks. Fixes: #155283 Release note (sql change): Added the bulkio.index_backfill.vector_merge_batch_size cluster setting to control how many vectors to merge into a vector index per transaction during create operations. By default, this defaults to 3. 155412: kvserver: introduce test harness for replica lifecycle events r=pav-kv a=arulajmani First two commits from #155408. This patch introduces the scaffolding for a harness to test various replica lifecycle events in a data-driven manner. The harness works from th context of a single node/store -- n1,s1. The intention is to print all engine interactions of the replica on this store on various replica lifecycle events. To kick things off, we add the ability to create a replica -- both initialized and uninitialized. Epic: none Release note: None Co-authored-by: Matt White <[email protected]> Co-authored-by: Arul Ajmani <[email protected]>
4 parents f625fe5 + 606b4b8 + 61efdca + 99f96bc commit b3e1889

File tree

4 files changed

+377
-21
lines changed

4 files changed

+377
-21
lines changed

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ go_test(
351351
"replica_init_test.go",
352352
"replica_learner_test.go",
353353
"replica_lease_renewal_test.go",
354+
"replica_lifecycle_datadriven_test.go",
354355
"replica_metrics_test.go",
355356
"replica_probe_test.go",
356357
"replica_proposal_bench_test.go",
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
// TODO(arul): As this test suite evolves, see if it can be moved into the
7+
// kvstorage package instead.
8+
9+
package kvserver
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"strings"
15+
"testing"
16+
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
18+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/print"
20+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
21+
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
22+
"github.com/cockroachdb/cockroach/pkg/roachpb"
23+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
24+
"github.com/cockroachdb/cockroach/pkg/storage"
25+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
26+
"github.com/cockroachdb/cockroach/pkg/util/log"
27+
"github.com/cockroachdb/datadriven"
28+
"github.com/cockroachdb/errors"
29+
"github.com/stretchr/testify/require"
30+
"golang.org/x/exp/maps"
31+
"golang.org/x/exp/slices"
32+
)
33+
34+
// TestReplicaLifecycleDataDriven is intended to test the behaviour of various
35+
// replica lifecycle events, such as splits, merges, replica destruction, etc.
36+
// The test has a single storage engine that corresponds to n1/s1, and all batch
37+
// operations to storage are printed out. It uses the following format:
38+
//
39+
// create-descriptor start=<key> end=<key> replicas=[<int>,<int>,...]
40+
// ----
41+
//
42+
// Creates a range descriptor with the specified start and end keys and
43+
// optional replica list. The range ID is auto-assigned. If provided,
44+
// replicas specify NodeIDs for replicas of the range. Note that ReplicaIDs
45+
// are assigned incrementally starting from 1.
46+
//
47+
// create-replica range-id=<int> [initialized]
48+
// ----
49+
//
50+
// Creates a replica on n1/s1 for the specified range ID. The created replica
51+
// may be initialized or uninitialized.
52+
//
53+
// print-range-state
54+
// ----
55+
//
56+
// Prints the current range state in the test context.
57+
func TestReplicaLifecycleDataDriven(t *testing.T) {
58+
defer leaktest.AfterTest(t)()
59+
defer log.Scope(t).Close(t)
60+
61+
datadriven.Walk(t, "testdata/replica_lifecycle", func(t *testing.T, path string) {
62+
tc := newTestCtx()
63+
defer tc.close()
64+
ctx := context.Background()
65+
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
66+
switch d.Cmd {
67+
case "create-descriptor":
68+
var startKey, endKey string
69+
d.ScanArgs(t, "start", &startKey)
70+
d.ScanArgs(t, "end", &endKey)
71+
var replicasStr string
72+
d.ScanArgs(t, "replicas", &replicasStr)
73+
replicaNodeIDs := parseReplicas(t, replicasStr)
74+
75+
rangeID := tc.nextRangeID
76+
tc.nextRangeID++
77+
var internalReplicas []roachpb.ReplicaDescriptor
78+
for i, id := range replicaNodeIDs {
79+
internalReplicas = append(internalReplicas, roachpb.ReplicaDescriptor{
80+
ReplicaID: roachpb.ReplicaID(i + 1),
81+
NodeID: id,
82+
StoreID: roachpb.StoreID(id),
83+
Type: roachpb.VOTER_FULL,
84+
})
85+
}
86+
desc := roachpb.RangeDescriptor{
87+
RangeID: rangeID,
88+
StartKey: roachpb.RKey(startKey),
89+
EndKey: roachpb.RKey(endKey),
90+
InternalReplicas: internalReplicas,
91+
NextReplicaID: roachpb.ReplicaID(len(internalReplicas) + 1),
92+
}
93+
require.True(t, desc.StartKey.Compare(desc.EndKey) < 0)
94+
95+
// Ranges are expected to be non-overlapping. Before creating a
96+
// new one, sanity check that we're not violating this property
97+
// in the test context.
98+
for existingRangeID, existingRS := range tc.ranges {
99+
existingDesc := existingRS.desc
100+
require.False(t, desc.StartKey.Compare(existingDesc.EndKey) < 0 &&
101+
existingDesc.StartKey.Compare(desc.EndKey) < 0,
102+
"descriptor overlaps with existing range %d [%s,%s)",
103+
existingRangeID, existingDesc.StartKey, existingDesc.EndKey)
104+
}
105+
106+
rs := newRangeState(desc)
107+
tc.ranges[rangeID] = rs
108+
return fmt.Sprintf("created descriptor: %v", desc)
109+
110+
case "create-replica":
111+
var rangeID int
112+
d.ScanArgs(t, "range-id", &rangeID)
113+
rs := tc.mustGetRangeState(t, roachpb.RangeID(rangeID))
114+
if rs.replica != nil {
115+
return errors.New("initialized replica already exists on n1/s1").Error()
116+
}
117+
repl := rs.getReplicaDescriptor(t)
118+
119+
initialized := d.HasArg("initialized")
120+
121+
batch := tc.storage.NewBatch()
122+
defer batch.Close()
123+
124+
if initialized {
125+
err := stateloader.WriteInitialRangeState(ctx, batch, rs.desc, repl.ReplicaID, rs.version)
126+
require.NoError(t, err)
127+
} else {
128+
err := kvstorage.CreateUninitializedReplica(
129+
ctx, batch, batch, 1, /* StoreID */
130+
roachpb.FullReplicaID{RangeID: rs.desc.RangeID, ReplicaID: repl.ReplicaID},
131+
)
132+
require.NoError(t, err)
133+
}
134+
tc.updatePostReplicaCreateState(t, ctx, rs, batch)
135+
136+
// Print the descriptor and batch output.
137+
var sb strings.Builder
138+
output, err := print.DecodeWriteBatch(batch.Repr())
139+
require.NoError(t, err, "error decoding batch")
140+
sb.WriteString(fmt.Sprintf("created replica: %v", repl))
141+
if output != "" {
142+
sb.WriteString("\n")
143+
sb.WriteString(output)
144+
}
145+
// Commit the batch.
146+
err = batch.Commit(true)
147+
require.NoError(t, err, "error committing batch")
148+
return sb.String()
149+
150+
case "print-range-state":
151+
var sb strings.Builder
152+
if len(tc.ranges) == 0 {
153+
return "no ranges in test context"
154+
}
155+
// Sort by range IDs for consistent output.
156+
rangeIDs := maps.Keys(tc.ranges)
157+
slices.Sort(rangeIDs)
158+
159+
for _, rangeID := range rangeIDs {
160+
rs := tc.ranges[rangeID]
161+
sb.WriteString(fmt.Sprintf("%s\n", rs))
162+
}
163+
return sb.String()
164+
165+
default:
166+
return fmt.Sprintf("unknown command: %s", d.Cmd)
167+
}
168+
})
169+
})
170+
}
171+
172+
// rangeState represents the state of a single range in the test context.
173+
type rangeState struct {
174+
desc roachpb.RangeDescriptor
175+
version roachpb.Version
176+
replica *replicaInfo // replica on n1/s1.
177+
}
178+
179+
// replicaInfo contains the basic info about a replica, used for managing its
180+
// engine (both raft log and state machine) state.
181+
type replicaInfo struct {
182+
roachpb.FullReplicaID
183+
hs raftpb.HardState
184+
ts kvserverpb.RaftTruncatedState
185+
}
186+
187+
// testCtx is a single test's context. It tracks the state of all ranges and any
188+
// intermediate steps when performing replica lifecycle events.
189+
type testCtx struct {
190+
ranges map[roachpb.RangeID]*rangeState
191+
nextRangeID roachpb.RangeID // monotonically-increasing rangeID
192+
st *cluster.Settings
193+
// The storage engine corresponds to a single store, (n1, s1).
194+
storage storage.Engine
195+
}
196+
197+
// newTestCtx constructs and returns a new testCtx.
198+
func newTestCtx() *testCtx {
199+
st := cluster.MakeTestingClusterSettings()
200+
return &testCtx{
201+
ranges: make(map[roachpb.RangeID]*rangeState),
202+
nextRangeID: 1,
203+
st: st,
204+
storage: storage.NewDefaultInMemForTesting(),
205+
}
206+
}
207+
208+
// close closes the test context's storage engine.
209+
func (tc *testCtx) close() {
210+
tc.storage.Close()
211+
}
212+
213+
// newRangeState constructs a new rangeState for the supplied descriptor.
214+
func newRangeState(desc roachpb.RangeDescriptor) *rangeState {
215+
return &rangeState{
216+
desc: desc,
217+
version: roachpb.Version{Major: 10, Minor: 8, Internal: 7}, // dummy version to avoid churn
218+
}
219+
}
220+
221+
// mustGetRangeState returns the range state for the given range ID.
222+
func (tc *testCtx) mustGetRangeState(t *testing.T, rangeID roachpb.RangeID) *rangeState {
223+
rs, ok := tc.ranges[rangeID]
224+
require.True(t, ok, "range-id %d not found", rangeID)
225+
return rs
226+
}
227+
228+
func (tc *testCtx) updatePostReplicaCreateState(
229+
t *testing.T, ctx context.Context, rs *rangeState, batch storage.Batch,
230+
) {
231+
// Sanity check that we're not overwriting an existing replica.
232+
require.Nil(t, rs.replica)
233+
sl := stateloader.Make(rs.desc.RangeID)
234+
hs, err := sl.LoadHardState(ctx, batch)
235+
require.NoError(t, err)
236+
ts, err := sl.LoadRaftTruncatedState(ctx, batch)
237+
require.NoError(t, err)
238+
replID, err := sl.LoadRaftReplicaID(ctx, batch)
239+
require.NoError(t, err)
240+
rs.replica = &replicaInfo{
241+
FullReplicaID: roachpb.FullReplicaID{
242+
RangeID: rs.desc.RangeID,
243+
ReplicaID: replID.ReplicaID,
244+
},
245+
hs: hs,
246+
ts: ts,
247+
}
248+
}
249+
250+
func (rs *rangeState) getReplicaDescriptor(t *testing.T) *roachpb.ReplicaDescriptor {
251+
for i, repl := range rs.desc.InternalReplicas {
252+
if repl.NodeID == roachpb.NodeID(1) {
253+
return &rs.desc.InternalReplicas[i]
254+
}
255+
}
256+
t.Fatal("replica not found")
257+
return nil // unreachable
258+
}
259+
260+
func (rs *rangeState) String() string {
261+
var sb strings.Builder
262+
sb.WriteString(fmt.Sprintf("range desc: %s", rs.desc))
263+
if rs.replica != nil {
264+
sb.WriteString(fmt.Sprintf("\n replica (n1/s1): %s", rs.replica))
265+
}
266+
return sb.String()
267+
}
268+
269+
func (r *replicaInfo) String() string {
270+
var sb strings.Builder
271+
sb.WriteString(fmt.Sprintf("id=%s ", r.FullReplicaID.ReplicaID))
272+
if r.hs == (raftpb.HardState{}) {
273+
sb.WriteString("uninitialized")
274+
} else {
275+
sb.WriteString(fmt.Sprintf("HardState={Term:%d,Vote:%d,Commit:%d} ", r.hs.Term, r.hs.Vote, r.hs.Commit))
276+
sb.WriteString(fmt.Sprintf("TruncatedState={Index:%d,Term:%d}", r.ts.Index, r.ts.Term))
277+
}
278+
return sb.String()
279+
}
280+
281+
func parseReplicas(t *testing.T, val string) []roachpb.NodeID {
282+
var replicaNodeIDs []roachpb.NodeID
283+
require.True(t, len(val) >= 2 && val[0] == '[' && val[len(val)-1] == ']', "incorrect format")
284+
val = val[1 : len(val)-1]
285+
for _, s := range strings.Split(val, ",") {
286+
var id int
287+
_, err := fmt.Sscanf(strings.TrimSpace(s), "%d", &id)
288+
require.NoError(t, err)
289+
replicaNodeIDs = append(replicaNodeIDs, roachpb.NodeID(id))
290+
}
291+
// The test is written from the perspective of n1/s1, so not having n1 in
292+
// this list should return an error.
293+
require.True(t, slices.Contains(replicaNodeIDs, 1), "replica list must contain n1")
294+
return replicaNodeIDs
295+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
create-descriptor start=a end=d replicas=[1,2,3]
2+
----
3+
created descriptor: r1:{a-d} [(n1,s1):1, (n2,s2):2, (n3,s3):3, next=4, gen=0]
4+
5+
create-replica range-id=1
6+
----
7+
created replica: (n1,s1):1
8+
Put: 0,0 /Local/RangeID/1/u/RaftReplicaID (0x016989757266747200): replica_id:1
9+
10+
# Create an initialized replica this time around.
11+
create-descriptor start=d end=k replicas=[1,2,3]
12+
----
13+
created descriptor: r2:{d-k} [(n1,s1):1, (n2,s2):2, (n3,s3):3, next=4, gen=0]
14+
15+
create-replica range-id=2 initialized
16+
----
17+
created replica: (n1,s1):1
18+
Put: 0,0 /Local/RangeID/2/r/RangeLease (0x01698a72726c6c2d00): <empty>
19+
Put: 0,0 /Local/RangeID/2/r/RangeGCThreshold (0x01698a726c67632d00): 0,0
20+
Put: 0,0 /Local/RangeID/2/r/RangeGCHint (0x01698a727267636800): latest_range_delete_timestamp:<> gc_timestamp:<> gc_timestamp_next:<>
21+
Put: 0,0 /Local/RangeID/2/r/RangeVersion (0x01698a727276657200): 10.8-upgrading-step-007
22+
Put: 0,0 /Local/RangeID/2/r/RangeAppliedState (0x01698a727261736b00): raft_applied_index:10 lease_applied_index:10 range_stats:<sys_bytes:142 sys_count:4 > raft_closed_timestamp:<> raft_applied_index_term:5
23+
Put: 0,0 /Local/RangeID/2/u/RaftReplicaID (0x01698a757266747200): replica_id:1
24+
Put: 0,0 /Local/RangeID/2/u/RaftHardState (0x01698a757266746800): term:5 vote:0 commit:10 lead:0 lead_epoch:0
25+
Put: 0,0 /Local/RangeID/2/u/RaftTruncatedState (0x01698a757266747400): index:10 term:5
26+
27+
print-range-state
28+
----
29+
range desc: r1:{a-d} [(n1,s1):1, (n2,s2):2, (n3,s3):3, next=4, gen=0]
30+
replica (n1/s1): id=1 uninitialized
31+
range desc: r2:{d-k} [(n1,s1):1, (n2,s2):2, (n3,s3):3, next=4, gen=0]
32+
replica (n1/s1): id=1 HardState={Term:5,Vote:0,Commit:10} TruncatedState={Index:10,Term:5}

0 commit comments

Comments
 (0)