Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c23b19e
Initial wiring
allenss-amazon Oct 1, 2025
ab61e93
formating
allenss-amazon Oct 1, 2025
0116204
Code complete
allenss-amazon Oct 3, 2025
230682e
Finished testing
allenss-amazon Oct 4, 2025
d52dcaa
add missing file
allenss-amazon Oct 4, 2025
6344430
Revert
allenss-amazon Oct 4, 2025
9c0d6f6
Cleanup
allenss-amazon Oct 4, 2025
667d77e
fix spelling
allenss-amazon Oct 4, 2025
c7bd274
fix bad merge
allenss-amazon Oct 4, 2025
233c675
bad merge
allenss-amazon Oct 4, 2025
047f043
experiment
allenss-amazon Oct 4, 2025
923c3d4
Cleanup
allenss-amazon Oct 16, 2025
e4ac641
Revised
allenss-amazon Oct 16, 2025
d535c91
hopefully fix valkey server version
allenss-amazon Oct 16, 2025
386686c
Merge branch 'main' into saverestore
allenss-amazon Oct 17, 2025
9b8ee70
Merge branch 'main' into saverestore
allenss-amazon Oct 26, 2025
df9cd25
Revise per review. Add save/restore of multi/exec
allenss-amazon Oct 29, 2025
c0f27dd
Fix validation of vector-only indexes. Add test cases for same.
allenss-amazon Oct 30, 2025
058e7d8
review comments update
allenss-amazon Oct 30, 2025
38189b6
Fix review issue
allenss-amazon Nov 5, 2025
3966629
Merge branch 'main' into saverestore
allenss-amazon Nov 9, 2025
8374c6e
Fixup merge errors
allenss-amazon Nov 9, 2025
2857cac
Stop writer threads during restore
allenss-amazon Nov 10, 2025
0113af0
prevent run-away test runtimes
allenss-amazon Nov 11, 2025
71fc6ac
Merge branch 'main' into saverestore
allenss-amazon Nov 12, 2025
dab6546
hack the timeouts
allenss-amazon Nov 12, 2025
4173b85
Revert "hack the timeouts"
allenss-amazon Nov 12, 2025
a7a659c
limit pytest to 30 minutes
allenss-amazon Nov 12, 2025
7d985ae
limit pytest to 30 minutes
allenss-amazon Nov 12, 2025
5d60325
formatting
allenss-amazon Nov 12, 2025
1e92d43
rejigger timeouts
allenss-amazon Nov 12, 2025
c887927
more debugging
allenss-amazon Nov 12, 2025
f0466e8
more debug
allenss-amazon Nov 12, 2025
13e6583
more test output
allenss-amazon Nov 12, 2025
9426112
more debugging output
allenss-amazon Nov 13, 2025
9fb131d
more debugging
allenss-amazon Nov 13, 2025
a0dc8fe
Merge branch 'main' into saverestore
allenss-amazon Nov 14, 2025
3c03a5c
fix test_CMD, disable stability test
allenss-amazon Nov 14, 2025
4468376
debug on multi/exec
allenss-amazon Nov 14, 2025
642e2dc
fix
allenss-amazon Nov 15, 2025
3912ad0
fix
allenss-amazon Nov 15, 2025
c35fa44
Implement drain on reload
allenss-amazon Nov 15, 2025
9fbee29
Merge branch 'main' into saverestore
allenss-amazon Nov 16, 2025
4f8edf2
Drain queue of non-backfill before reload
allenss-amazon Nov 16, 2025
d5c1d9a
Cleanup
allenss-amazon Nov 16, 2025
796e27b
Fixup timeout
allenss-amazon Nov 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions integration/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
import logging, json
import struct
from enum import Enum
from util import waiters

class KeyDataType(Enum):
HASH = 1
JSON = 2



def float_to_bytes(flt: list[float]) -> bytes:
return struct.pack(f"<{len(flt)}f", *flt)


class Field:
name: str
alias: Union[str, None]
def __init__(self, name: str, alias: Union[str, None]):
self.name = name
self.alias = alias if alias else name
Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(
self.prefixes = prefixes
self.type = type

def create(self, client: valkey.client):
def create(self, client: valkey.client, wait_for_backfill = False):
cmd = (
[
"FT.CREATE",
Expand All @@ -159,22 +160,27 @@ def create(self, client: valkey.client):
cmd += f.create(self.type)
print(f"Creating Index: {cmd}")
client.execute_command(*cmd)
if wait_for_backfill:
waiters.wait_for_true(
lambda: not self.info(client).backfill_in_progress
)

def load_data(self, client: valkey.client, rows: int, start_index: int = 0):
print("Loading data to ", client)
for i in range(start_index, rows):
data = self.make_data(i)
if self.type == KeyDataType.HASH:
client.hset(self.keyname(i), mapping=data)
else:
client.execute_command("JSON.SET", self.keyname(i), "$", json.dumps(data))
self.write_data(client, i, data)

def write_data(self, client: valkey.client, i:int, data: dict[str, str | bytes]):
if self.type == KeyDataType.HASH:
client.hset(self.keyname(i), mapping=data)
else:
client.execute_command("JSON.SET", self.keyname(i), "$", json.dumps(data))

def load_data_with_ttl(self, client: valkey.client, rows: int, ttl_ms: int, start_index: int = 0):
for i in range(start_index, rows):
data = self.make_data(i)
key = self.keyname(i)
client.hset(key, mapping=data)
client.pexpire(key, ttl_ms)
self.write_data(client, i, data)
client.pexpire(self.keyname(i), ttl_ms)

def keyname(self, row: int) -> str:
prefix = self.prefixes[row % len(self.prefixes)] if self.prefixes else ""
Expand Down
2 changes: 1 addition & 1 deletion integration/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ mkdir -p ${LOGS_DIR}
function run_pytest() {
zap valkey-server
LOG_INFO "Running: ${PYTHON_PATH} -m pytest ${FILTER_ARGS} --capture=sys --cache-clear -v ${ROOT_DIR}/integration/"
${PYTHON_PATH} -m pytest ${FILTER_ARGS} --full-trace --capture=sys --cache-clear -v ${ROOT_DIR}/integration/
${PYTHON_PATH} -m pytest ${FILTER_ARGS} --capture=sys --cache-clear -v ${ROOT_DIR}/integration/
RUN_SUCCESS=$?
}

Expand Down
223 changes: 223 additions & 0 deletions integration/test_saverestore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import base64
import os
import tempfile
import time
import subprocess
import shutil
import socket
from valkey import ResponseError, Valkey
from valkey_search_test_case import ValkeySearchTestCaseDebugMode, ValkeySearchTestCaseDebugMode
from valkeytestframework.conftest import resource_port_tracker
from indexes import *
import pytest
import logging
from util import waiters
import threading
from ft_info_parser import FTInfoParser

index = Index("index", [Vector("v", 3, type="HNSW", m=2, efc=1), Numeric("n"), Tag("t")])
NUM_VECTORS = 10

# Keys that are in all results
full_key_names = [index.keyname(i).encode() for i in range(NUM_VECTORS)]

def check_keys(received_keys, expected_keys):
received_set = set(received_keys)
expected_set = set(expected_keys)
print("Result.keys ", received_set)
print("expected.keys", expected_set)
assert received_set == expected_set

def do_search(client: Valkey.client, query: str, extra: list[str] = []) -> dict[str, dict[str, str]]:
cmd = ["ft.search index", query, "limit", "0", "100"] + extra
print("Cmd: ", cmd)
res = client.execute_command(*cmd)[1:]
result = dict()
for i in range(0, len(res), 2):
row = res[i+1]
row_dict = dict()
for j in range(0, len(row), 2):
row_dict[row[j]] = row[j+1]
result[res[i]] = row_dict
print("Result is ", result)
return result

def make_data():
records = []
for i in range(0, NUM_VECTORS):
records += [index.make_data(i)]

data = index.make_data(len(records))
data["v"] = "0"
records += [data]

data = index.make_data(len(records))
data["n"] = "fred"
records += [data]

data = index.make_data(len(records))
data["t"] = ""
records += [data]
return records

def load_data(client: Valkey.client):
records = make_data()
for i in range(0, len(records)):
index.write_data(client, i, records[i])
return len(records)

def verify_data(client: Valkey.client):
'''
Do query operations against each index to ensure that all keys are present
'''

res = do_search(client, "@n:[0 100]")
check_keys(res.keys(), full_key_names + [index.keyname(NUM_VECTORS+0).encode(), index.keyname(NUM_VECTORS+2).encode()])
res = do_search(client, "@t:{Tag*}")
check_keys(res.keys(), full_key_names + [index.keyname(NUM_VECTORS+0).encode(), index.keyname(NUM_VECTORS+1).encode()])

def do_save_restore_test(test, write_v2: bool, read_v2: bool):
index.create(test.client, True)
key_count = load_data(test.client)
verify_data(test.client)
test.client.config_set("search.rdb-validate-on-write", "yes")
test.client.execute_command("save")
os.environ["SKIPLOGCLEAN"] = "1"
test.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = test.client.info("search")
print("Info after save: ", i)
writes = [
i["search_rdb_save_sections"],
i["search_rdb_save_keys"],
i["search_rdb_save_mutation_entries"],
]
if write_v2:
assert writes == [5, key_count, 0]
else:
assert writes == [4, 0, 0]
test.server.restart(remove_rdb=False)
time.sleep(5)
print(test.client.ping())
verify_data(test.client)
test.client.execute_command("CONFIG SET search.info-developer-visible yes")

i = test.client.info("search")
print("Info after load: ", i)
reads = [
i["search_rdb_load_sections"],
i["search_rdb_load_sections_skipped"],
i["search_rdb_load_keys"],
i["search_rdb_load_mutation_entries"],
]
if not write_v2:
assert reads == [4, 0, 0, 0]
elif read_v2:
assert reads == [5, 0, key_count, 0]
else:
assert reads == [5, 1, 0, 0]


class TestSaveRestore_v1_v1(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "no"
args["search.rdb_read_v2"] = "no"
return args

def test_saverestore_v1_v1(self):
do_save_restore_test(self, False, False)

class TestSaveRestore_v1_v2(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "no"
args["search.rdb_read_v2"] = "yes"
return args

def test_saverestore_v1_v2(self):
do_save_restore_test(self, False, True)

class TestSaveRestore_v2_v1(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "no"
return args

def test_saverestore_v2_v1(self):
do_save_restore_test(self, True, False)

class TestSaveRestore_v2_v2(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "yes"
return args

def test_saverestore_v2_v2(self):
do_save_restore_test(self, True, True)

class TestMutationQueue(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "yes"
return args

def mutation_queue_size(self):
info = FTInfoParser(self.client.execute_command("ft.info ", index.name))
return info.mutation_queue_size

def test_mutation_queue(self):
self.client.execute_command("ft._debug PAUSEPOINT SET block_mutation_queue")
index.create(self.client, True)
records = make_data()
#
# Now, load the data.... But since the mutation queue is blocked it will be stopped....
#
client_threads = []
for i in range(len(records)):
new_client = self.server.get_new_client()
t = threading.Thread(target = index.write_data, args=(new_client, i, records[i]) )
t.start()
client_threads += [t]

#
# Now, wait for the mutation queue to get fully loaded
#
waiters.wait_for_true(lambda: self.mutation_queue_size() == len(records))
print("MUTATION QUEUE LOADED")

self.client.execute_command("save")

self.client.execute_command("ft._debug pausepoint reset block_mutation_queue")

for t in client_threads:
t.join()

verify_data(self.client)
os.environ["SKIPLOGCLEAN"] = "1"
self.server.restart(remove_rdb=False)
verify_data(self.client)
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = self.client.info("search")
print("Info: ", i)
reads = [
i["search_rdb_load_mutation_entries"],
]
assert reads == [len(records)]

def test_saverestore_backfill(self):
#
# Delay the backfill and ensure that with new format we will trigger the backfill....
#
self.client.execute_command("FT._DEBUG CONTROLLED_VARIABLE SET StopBackfill yes")
load_data(self.client)
index.create(self.client, False)
self.client.execute_command("save")

os.environ["SKIPLOGCLEAN"] = "1"
self.server.restart(remove_rdb=False)
verify_data(self.client)
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = self.client.info("search")
print("Info: ", i)
reads = [
i["search_backfill_hash_keys"],
]
assert reads == [len(make_data())]
5 changes: 4 additions & 1 deletion integration/valkey_search_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def get_config_file_lines(self, testdir, port) -> List[str]:
See ValkeySearchTestCaseBase.get_config_file_lines & ValkeySearchClusterTestCase.get_config_file_lines
for example usage."""
raise NotImplementedError

def append_startup_args(self, args: dict[str, str]) -> dict[str, str]:
return args

def start_server(
self,
Expand Down Expand Up @@ -177,7 +180,7 @@ def start_server(
server, client = self.create_server(
testdir=testdir,
server_path=server_path,
args={"logfile": logfile},
args=self.append_startup_args({"logfile": logfile}),
port=port,
conf_file=conf_file,
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/common.rc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function setup_valkey_server() {
fi

# Clone and build it
VALKEY_VERSION="${VALKEY_VERSION:=8.1.1}"
VALKEY_VERSION="${VALKEY_VERSION:=9.0.0-rc3}"
export VALKEY_SERVER_HOME_DIR=$(get_third_party_build_dir)/valkey-server
export VALKEY_SERVER_BUILD_DIR=${VALKEY_SERVER_HOME_DIR}/.build-release
if [ ! -d ${VALKEY_SERVER_HOME_DIR} ]; then
Expand Down
Loading
Loading