Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions integration/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
import logging, json
import struct
from enum import Enum
from util import waiters

class KeyDataType(Enum):
HASH = 1
JSON = 2



def float_to_bytes(flt: list[float]) -> bytes:
return struct.pack(f"<{len(flt)}f", *flt)


class Field:
name: str
alias: Union[str, None]
def __init__(self, name: str, alias: Union[str, None]):
self.name = name
self.alias = alias if alias else name
Expand Down Expand Up @@ -142,7 +143,7 @@ def __init__(
self.prefixes = prefixes
self.type = type

def create(self, client: valkey.client):
def create(self, client: valkey.client, wait_for_backfill = False):
cmd = (
[
"FT.CREATE",
Expand All @@ -159,22 +160,27 @@ def create(self, client: valkey.client):
cmd += f.create(self.type)
print(f"Creating Index: {cmd}")
client.execute_command(*cmd)
if wait_for_backfill:
waiters.wait_for_true(
lambda: not self.info(client).backfill_in_progress
)

def load_data(self, client: valkey.client, rows: int, start_index: int = 0):
print("Loading data to ", client)
for i in range(start_index, rows):
data = self.make_data(i)
if self.type == KeyDataType.HASH:
client.hset(self.keyname(i), mapping=data)
else:
client.execute_command("JSON.SET", self.keyname(i), "$", json.dumps(data))
self.write_data(client, i, data)

def write_data(self, client: valkey.client, i:int, data: dict[str, str | bytes]):
if self.type == KeyDataType.HASH:
client.hset(self.keyname(i), mapping=data)
else:
client.execute_command("JSON.SET", self.keyname(i), "$", json.dumps(data))

def load_data_with_ttl(self, client: valkey.client, rows: int, ttl_ms: int, start_index: int = 0):
for i in range(start_index, rows):
data = self.make_data(i)
key = self.keyname(i)
client.hset(key, mapping=data)
client.pexpire(key, ttl_ms)
self.write_data(client, i, data)
client.pexpire(self.keyname(i), ttl_ms)

def keyname(self, row: int) -> str:
prefix = self.prefixes[row % len(self.prefixes)] if self.prefixes else ""
Expand Down
2 changes: 1 addition & 1 deletion integration/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ mkdir -p ${LOGS_DIR}
function run_pytest() {
zap valkey-server
LOG_INFO "Running: ${PYTHON_PATH} -m pytest ${FILTER_ARGS} --capture=sys --cache-clear -v ${ROOT_DIR}/integration/"
${PYTHON_PATH} -m pytest ${FILTER_ARGS} --full-trace --capture=sys --cache-clear -v ${ROOT_DIR}/integration/
${PYTHON_PATH} -m pytest ${FILTER_ARGS} --capture=sys --cache-clear -v ${ROOT_DIR}/integration/
RUN_SUCCESS=$?
}

Expand Down
255 changes: 255 additions & 0 deletions integration/test_saverestore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import base64
import os
import tempfile
import time
import subprocess
import shutil
import socket
from valkey import ResponseError, Valkey
from valkey_search_test_case import ValkeySearchTestCaseDebugMode, ValkeySearchTestCaseDebugMode
from valkeytestframework.conftest import resource_port_tracker
from indexes import *
import pytest
import logging
from util import waiters
import threading
from ft_info_parser import FTInfoParser

index = Index("index", [Vector("v", 3, type="HNSW", m=2, efc=1), Numeric("n"), Tag("t")])
NUM_VECTORS = 10

# Keys that are in all results
full_key_names = [index.keyname(i).encode() for i in range(NUM_VECTORS)]

def check_keys(received_keys, expected_keys):
received_set = set(received_keys)
expected_set = set(expected_keys)
print("Result.keys ", received_set)
print("expected.keys", expected_set)
assert received_set == expected_set

def do_search(client: Valkey.client, query: str, extra: list[str] = []) -> dict[str, dict[str, str]]:
cmd = ["ft.search index", query, "limit", "0", "100"] + extra
print("Cmd: ", cmd)
res = client.execute_command(*cmd)[1:]
result = dict()
for i in range(0, len(res), 2):
row = res[i+1]
row_dict = dict()
for j in range(0, len(row), 2):
row_dict[row[j]] = row[j+1]
result[res[i]] = row_dict
print("Result is ", result)
return result

def make_data():
records = []
for i in range(0, NUM_VECTORS):
records += [index.make_data(i)]

data = index.make_data(len(records))
data["v"] = "0"
records += [data]

data = index.make_data(len(records))
data["n"] = "fred"
records += [data]

data = index.make_data(len(records))
data["t"] = ""
records += [data]
return records

def load_data(client: Valkey.client):
records = make_data()
for i in range(0, len(records)):
index.write_data(client, i, records[i])
return len(records)

def verify_data(client: Valkey.client):
'''
Do query operations against each index to ensure that all keys are present
'''

res = do_search(client, "@n:[0 100]")
check_keys(res.keys(), full_key_names + [index.keyname(NUM_VECTORS+0).encode(), index.keyname(NUM_VECTORS+2).encode()])
res = do_search(client, "@t:{Tag*}")
check_keys(res.keys(), full_key_names + [index.keyname(NUM_VECTORS+0).encode(), index.keyname(NUM_VECTORS+1).encode()])

def do_save_restore_test(test, write_v2: bool, read_v2: bool):
index.create(test.client, True)
key_count = load_data(test.client)
verify_data(test.client)
test.client.config_set("search.rdb-validate-on-write", "yes")
test.client.execute_command("save")
os.environ["SKIPLOGCLEAN"] = "1"
test.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = test.client.info("search")
print("Info after save: ", i)
writes = [
i["search_rdb_save_sections"],
i["search_rdb_save_keys"],
i["search_rdb_save_mutation_entries"],
]
if write_v2:
assert writes == [5, key_count, 0]
else:
assert writes == [4, 0, 0]
test.server.restart(remove_rdb=False)
time.sleep(5)
print(test.client.ping())
verify_data(test.client)
test.client.execute_command("CONFIG SET search.info-developer-visible yes")

i = test.client.info("search")
print("Info after load: ", i)
reads = [
i["search_rdb_load_sections"],
i["search_rdb_load_sections_skipped"],
i["search_rdb_load_keys"],
i["search_rdb_load_mutation_entries"],
]
if not write_v2:
assert reads == [4, 0, 0, 0]
elif read_v2:
assert reads == [5, 0, key_count, 0]
else:
assert reads == [5, 1, 0, 0]


class TestSaveRestore_v1_v1(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "no"
args["search.rdb_read_v2"] = "no"
return args

def test_saverestore_v1_v1(self):
do_save_restore_test(self, False, False)

class TestSaveRestore_v1_v2(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "no"
args["search.rdb_read_v2"] = "yes"
return args

def test_saverestore_v1_v2(self):
do_save_restore_test(self, False, True)

class TestSaveRestore_v2_v1(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "no"
return args

def test_saverestore_v2_v1(self):
do_save_restore_test(self, True, False)

class TestSaveRestore_v2_v2(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "yes"
return args

def test_saverestore_v2_v2(self):
do_save_restore_test(self, True, True)

class TestMutationQueue(ValkeySearchTestCaseDebugMode):
def append_startup_args(self, args):
args["search.rdb_write_v2"] = "yes"
args["search.rdb_read_v2"] = "yes"
return args

def mutation_queue_size(self):
info = FTInfoParser(self.client.execute_command("ft.info ", index.name))
return info.mutation_queue_size

def test_mutation_queue(self):
self.client.execute_command("ft._debug PAUSEPOINT SET block_mutation_queue")
index.create(self.client, True)
records = make_data()
#
# Now, load the data.... But since the mutation queue is blocked it will be stopped....
#
client_threads = []
for i in range(len(records)):
new_client = self.server.get_new_client()
t = threading.Thread(target = index.write_data, args=(new_client, i, records[i]) )
t.start()
client_threads += [t]

#
# Now, wait for the mutation queue to get fully loaded
#
waiters.wait_for_true(lambda: self.mutation_queue_size() == len(records))
print("MUTATION QUEUE LOADED")

self.client.execute_command("save")

self.client.execute_command("ft._debug pausepoint reset block_mutation_queue")

for t in client_threads:
t.join()

verify_data(self.client)
os.environ["SKIPLOGCLEAN"] = "1"
self.server.restart(remove_rdb=False)
verify_data(self.client)
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = self.client.info("search")
print("Info: ", i)
reads = [
i["search_rdb_load_mutation_entries"],
]
assert reads == [len(records)]

def test_multi_exec_queue(self):
self.client.execute_command("ft._debug PAUSEPOINT SET block_mutation_queue")
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
index.create(self.client, True)
records = make_data()
#
# Now, load the data as a multi/exec... But this won't block us.
#
self.client.execute_command("MULTI")
for i in range(len(records)):
index.write_data(self.client, i, records[i])
self.client.execute_command("EXEC")

self.client.execute_command("save")

i = self.client.info("search")
assert i["search_rdb_save_multi_exec_entries"] == len(records)

self.client.execute_command("ft._debug pausepoint reset block_mutation_queue")

verify_data(self.client)
os.environ["SKIPLOGCLEAN"] = "1"
self.server.restart(remove_rdb=False)
verify_data(self.client)
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = self.client.info("search")
print("Info: ", i)
reads = [
i["search_rdb_load_multi_exec_entries"],
]
assert reads == [len(records)]

def test_saverestore_backfill(self):
#
# Delay the backfill and ensure that with new format we will trigger the backfill....
#
self.client.execute_command("FT._DEBUG CONTROLLED_VARIABLE SET StopBackfill yes")
load_data(self.client)
index.create(self.client, False)
self.client.execute_command("save")

os.environ["SKIPLOGCLEAN"] = "1"
self.server.restart(remove_rdb=False)
verify_data(self.client)
self.client.execute_command("CONFIG SET search.info-developer-visible yes")
i = self.client.info("search")
print("Info: ", i)
reads = [
i["search_backfill_hash_keys"],
]
assert reads == [len(make_data())]
5 changes: 4 additions & 1 deletion integration/valkey_search_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def get_config_file_lines(self, testdir, port) -> List[str]:
See ValkeySearchTestCaseBase.get_config_file_lines & ValkeySearchClusterTestCase.get_config_file_lines
for example usage."""
raise NotImplementedError

def append_startup_args(self, args: dict[str, str]) -> dict[str, str]:
return args

def start_server(
self,
Expand Down Expand Up @@ -177,7 +180,7 @@ def start_server(
server, client = self.create_server(
testdir=testdir,
server_path=server_path,
args={"logfile": logfile},
args=self.append_startup_args({"logfile": logfile}),
port=port,
conf_file=conf_file,
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/common.rc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function setup_valkey_server() {
fi

# Clone and build it
VALKEY_VERSION="${VALKEY_VERSION:=8.1.1}"
VALKEY_VERSION="${VALKEY_VERSION:=9.0.0-rc3}"
export VALKEY_SERVER_HOME_DIR=$(get_third_party_build_dir)/valkey-server
export VALKEY_SERVER_BUILD_DIR=${VALKEY_SERVER_HOME_DIR}/.build-release
if [ ! -d ${VALKEY_SERVER_HOME_DIR} ]; then
Expand Down
Loading
Loading