Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions integration/test_info_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@
from valkeytestframework.conftest import resource_port_tracker
from valkeytestframework.util import waiters
from ft_info_parser import FTInfoParser
from test_fanout_base import MAX_RETRIES
from typing import Any, Union
from valkey.exceptions import ResponseError, ConnectionError
import pytest

class TestFTInfoCluster(ValkeySearchClusterTestCaseDebugMode):

def execute_primaries(self, command: Union[str, list[str]]) -> list[Any]:
return [
self.client_for_primary(i).execute_command(*command)
for i in range(len(self.replication_groups))
]

def is_backfill_complete(self, node, index_name):
raw = node.execute_command("FT.INFO", index_name, "CLUSTER")
parser = FTInfoParser([])
Expand Down Expand Up @@ -87,3 +97,159 @@ def test_ft_info_cluster_retry(self):
assert float(info["backfill_complete_percent_max"]) == 1.000000
assert float(info["backfill_complete_percent_min"]) == 1.000000
assert str(info["state"]) == "ready"

# force one remote node to fail and test SOMESHARDS arg
# expect partial results
def test_ft_info_cluster_someshards_one_shard_fail(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
node1: Valkey = self.new_client_for_primary(1)
index_name = "index1"

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

# force node1 to process backfill very slowly
assert node1.execute_command(
"CONFIG SET search.backfill-batch-size 1"
) == b"OK"

N = 10000
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

# run ft.info cluster with all nodes healthy
# node1 should not complete backfill
raw = node0.execute_command("FT.INFO", index_name, "CLUSTER", "SOMESHARDS")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)
assert float(info["backfill_complete_percent_min"]) < 1.000000
assert str(info["state"]) != "ready"

# force node1 to fail continuously
assert node1.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount ",
MAX_RETRIES
) == b"OK"

raw = node0.execute_command("FT.INFO", index_name, "CLUSTER", "SOMESHARDS")
info = parser._parse_key_value_list(raw)

# check partial cluster info results
# node1 is down, expected to return backfill completed results from other nodes
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "cluster"
assert int(info["backfill_in_progress"]) == 0
assert float(info["backfill_complete_percent_max"]) == 1.000000
assert float(info["backfill_complete_percent_min"]) == 1.000000
assert str(info["state"]) == "ready"

# reset remote fail to 0
assert node1.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount ",
0
) == b"OK"

# force all nodes to fail
# expect empty results
def test_ft_info_cluster_someshards_all_shards_fail(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
index_name = "index1"

N = 60
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

# force all remote nodes to fail continuously
results = self.execute_primaries(["FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount", MAX_RETRIES])
assert all(result == b"OK" for result in results)

# force local node to fail continuously
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceLocalFailCount ",
MAX_RETRIES
) == b"OK"

raw = node0.execute_command("FT.INFO", index_name, "CLUSTER", "SOMESHARDS")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)

# check partial results should be 0, state should be empty
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "cluster"
assert int(info["backfill_in_progress"]) == 0
assert float(info["backfill_complete_percent_max"]) == 0.0
assert float(info["backfill_complete_percent_min"]) == 0.0
assert str(info["state"]) == ""

# reset all remote nodes
results = self.execute_primaries(["FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount", 0])
assert all(result == b"OK" for result in results)

# reset local node
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceLocalFailCount ",
0
) == b"OK"

# force inconsistent errors
# expect fail in CONSISTENT but success in INCONSISTENT arg
def test_ft_info_cluster_consistent_arg(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
index_name = "index1"

N = 60
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

waiters.wait_for_true(lambda: self.is_backfill_complete(node0, index_name))

# force inconsistent error
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceInfoClusterInconsistentError yes"
) == b"OK"

# expect error in consistent mode
with pytest.raises(ResponseError) as excinfo:
node0.execute_command("FT.INFO", index_name, "CLUSTER", "ALLSHARDS", "CONSISTENT")
assert "Unable to contact all cluster members" in str(excinfo.value)

raw = node0.execute_command("FT.INFO", index_name, "CLUSTER", "ALLSHARDS", "INCONSISTENT")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)

# expect results in inconsistent mode
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "cluster"
assert int(info["backfill_in_progress"]) == 0
assert float(info["backfill_complete_percent_max"]) == 1.000000
assert float(info["backfill_complete_percent_min"]) == 1.000000
assert str(info["state"]) == "ready"

# reset inconsistent error
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceInfoClusterInconsistentError no"
) == b"OK"
154 changes: 154 additions & 0 deletions integration/test_info_primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from valkeytestframework.conftest import resource_port_tracker
from valkeytestframework.util import waiters
from ft_info_parser import FTInfoParser
from test_fanout_base import MAX_RETRIES
from typing import Any, Union
from valkey.exceptions import ResponseError, ConnectionError
import pytest

def verify_error_response(client, cmd, expected_err_reply):
try:
Expand All @@ -16,6 +20,12 @@ def verify_error_response(client, cmd, expected_err_reply):

class TestFTInfoPrimary(ValkeySearchClusterTestCaseDebugMode):

def execute_primaries(self, command: Union[str, list[str]]) -> list[Any]:
return [
self.client_for_primary(i).execute_command(*command)
for i in range(len(self.replication_groups))
]

def is_indexing_complete(self, node, index_name, N):
raw = node.execute_command("FT.INFO", index_name, "PRIMARY")
parser = FTInfoParser([])
Expand Down Expand Up @@ -95,3 +105,147 @@ def test_ft_info_primary_retry(self):
assert int(info["num_docs"]) == N
assert int(info["num_records"]) == N
assert int(info["hash_indexing_failures"]) == 0

# force one remote node to fail and test SOMESHARDS arg
# expect partial results
def test_ft_info_primary_someshards_one_shard_fail(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
node1: Valkey = self.new_client_for_primary(1)
index_name = "index1"

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

N = 60
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

waiters.wait_for_true(lambda: self.is_indexing_complete(node0, index_name, N))

# force one node to fail continuously
assert node1.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount ",
MAX_RETRIES
) == b"OK"

raw = node0.execute_command("FT.INFO", index_name, "PRIMARY", "SOMESHARDS")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)

# check partial primary info results
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "primary"
assert int(info["num_docs"]) < N
assert int(info["num_records"]) < N

# reset remote fail to 0
assert node1.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount ",
0
) == b"OK"

# force all nodes to fail
# expect empty results
def test_ft_info_primary_someshards_all_shards_fail(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
index_name = "index1"

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

N = 60
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

waiters.wait_for_true(lambda: self.is_indexing_complete(node0, index_name, N))

# force all remote nodes to fail continuously
results = self.execute_primaries(["FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount", MAX_RETRIES])
assert all(result == b"OK" for result in results)

# force local node to fail continuously
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceLocalFailCount ",
MAX_RETRIES
) == b"OK"

raw = node0.execute_command("FT.INFO", index_name, "PRIMARY", "SOMESHARDS")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)

# check partial results should be 0
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "primary"
assert int(info["num_docs"]) == 0
assert int(info["num_records"]) == 0
assert int(info["hash_indexing_failures"]) == 0

# reset all remote nodes
results = self.execute_primaries(["FT._DEBUG CONTROLLED_VARIABLE SET ForceRemoteFailCount", 0])
assert all(result == b"OK" for result in results)

# reset local node
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceLocalFailCount ",
0
) == b"OK"

# force inconsistent errors
# expect fail in CONSISTENT but success in INCONSISTENT arg
def test_ft_info_primary_consistent_arg(self):
cluster: ValkeyCluster = self.new_cluster_client()
node0: Valkey = self.new_client_for_primary(0)
index_name = "index1"

assert node0.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", "doc:",
"SCHEMA", "price", "NUMERIC"
) == b"OK"

N = 60
for i in range(N):
cluster.execute_command("HSET", f"doc:{i}", "price", str(10 + i))

waiters.wait_for_true(lambda: self.is_indexing_complete(node0, index_name, N))

# force inconsistent error
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceInfoPrimaryInconsistentError yes"
) == b"OK"

# expect error in consistent mode
with pytest.raises(ResponseError) as excinfo:
node0.execute_command("FT.INFO", index_name, "PRIMARY", "ALLSHARDS", "CONSISTENT")
assert "Unable to contact all cluster members" in str(excinfo.value)

raw = node0.execute_command("FT.INFO", index_name, "PRIMARY", "ALLSHARDS", "INCONSISTENT")
parser = FTInfoParser([])
info = parser._parse_key_value_list(raw)

# expect results in inconsistent mode
assert info is not None
assert str(info.get("index_name")) == index_name
assert str(info.get("mode")) == "primary"
assert int(info["num_docs"]) == N
assert int(info["num_records"]) == N
assert int(info["hash_indexing_failures"]) == 0

# reset inconsistent error
assert node0.execute_command(
"FT._DEBUG CONTROLLED_VARIABLE SET ForceInfoPrimaryInconsistentError no"
) == b"OK"

8 changes: 5 additions & 3 deletions src/commands/ft_create.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ class CreateConsistencyCheckFanoutOperation
public:
CreateConsistencyCheckFanoutOperation(
uint32_t db_num, const std::string &index_name, unsigned timeout_ms,
coordinator::IndexFingerprintVersion new_entry_fingerprint_version)
: ClusterInfoFanoutOperation(db_num, index_name, timeout_ms),
coordinator::IndexFingerprintVersion new_entry_fingerprint_version,
bool allshards_required, bool consistency_required)
: ClusterInfoFanoutOperation(db_num, index_name, timeout_ms,
allshards_required, consistency_required),
new_entry_fingerprint_version_(new_entry_fingerprint_version) {}

int GenerateReply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
Expand Down Expand Up @@ -66,7 +68,7 @@ absl::Status FTCreateCmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
unsigned timeout_ms = options::GetFTInfoTimeoutMs().GetValue();
auto op = new CreateConsistencyCheckFanoutOperation(
ValkeyModule_GetSelectedDb(ctx), index_schema_proto.name(), timeout_ms,
new_entry_fingerprint_version);
new_entry_fingerprint_version, true, true);
op->StartOperation(ctx);
} else {
ValkeyModule_ReplyWithSimpleString(ctx, "OK");
Expand Down
Loading
Loading