diff --git a/nats/src/nats/js/client.py b/nats/src/nats/js/client.py index 0cfa433b..0ca93640 100644 --- a/nats/src/nats/js/client.py +++ b/nats/src/nats/js/client.py @@ -19,28 +19,14 @@ import time from email.parser import BytesParser from secrets import token_hex -from typing import ( - TYPE_CHECKING, - Any, - Awaitable, - Callable, - Dict, - List, - Optional, -) +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional import nats.errors import nats.js.errors from nats.aio.msg import Msg from nats.aio.subscription import Subscription from nats.js import api -from nats.js.errors import ( - BadBucketError, - BucketNotFoundError, - FetchTimeoutError, - InvalidBucketNameError, - NotFoundError, -) +from nats.js.errors import BadBucketError, BucketNotFoundError, FetchTimeoutError, InvalidBucketNameError, NotFoundError from nats.js.kv import KeyValue from nats.js.manager import JetStreamManager from nats.js.object_store import ( diff --git a/nats/src/nats/js/kv.py b/nats/src/nats/js/kv.py index a765f8c1..618932b6 100644 --- a/nats/src/nats/js/kv.py +++ b/nats/src/nats/js/kv.py @@ -352,71 +352,9 @@ async def watchall(self, **kwargs) -> KeyWatcher: """ return await self.watch(">", **kwargs) - async def keys(self, filters: List[str] = None, **kwargs) -> List[str]: - """ - Returns a list of the keys from a KeyValue store. - Optionally filters the keys based on the provided filter list. - """ - watcher = await self.watchall( - ignore_deletes=True, - meta_only=True, - ) - keys = [] - - # Check consumer info and make sure filters are applied correctly - try: - consumer_info = await watcher._sub.consumer_info() - if consumer_info and filters: - # If NATS server < 2.10, filters might be ignored. - if consumer_info.config.filter_subject != ">": - logger.warning("Server may ignore filters if version is < 2.10.") - except Exception as e: - raise e - - async for key in watcher: - # None entry is used to signal that there is no more info. - if not key: - break - - # Apply filters if any were provided - if filters: - if any(f in key.key for f in filters): - keys.append(key.key) - else: - # No filters provided, append all keys - keys.append(key.key) - - await watcher.stop() - - if not keys: - raise nats.js.errors.NoKeysError - - return keys - - async def history(self, key: str) -> List[Entry]: - """ - history retrieves a list of the entries so far. - """ - watcher = await self.watch(key, include_history=True) - - entries = [] - - async for entry in watcher: - # None entry is used to signal that there is no more info. - if not entry: - break - entries.append(entry) - - await watcher.stop() - - if not entries: - raise nats.js.errors.NoKeysError - - return entries - - async def watch( + async def watch_filtered( self, - keys, + keys: List[str], headers_only=False, include_history=False, ignore_deletes=False, @@ -424,12 +362,29 @@ async def watch( inactive_threshold=None, ) -> KeyWatcher: """ - watch will fire a callback when a key that matches the keys - pattern is updated. - The first update after starting the watch is None in case - there are no pending updates. + watch_filtered will fire updates when keys matching any of the provided + key patterns are updated. This is the foundational watch method that + supports multiple filter subjects for server-side filtering. + + :param keys: List of key patterns to watch (e.g., ["user.*", "admin.*"]) + :param headers_only: Only retrieve headers + :param include_history: Include historical values + :param ignore_deletes: Ignore delete markers + :param meta_only: Only retrieve metadata + :param inactive_threshold: Inactivity threshold in seconds """ - subject = f"{self._pre}{keys}" + if not keys: + raise ValueError("at least one key pattern is required") + + # Convert key patterns to full subjects with KV prefix + filter_subjects = [f"{self._pre}{key}" for key in keys] + + # Create consumer config with filter_subjects + # Always use filter_subjects (modern API) which supports multiple filters + # Single filter is just a list with one element + config = api.ConsumerConfig() + config.filter_subjects = filter_subjects + watcher = KeyValue.KeyWatcher(self) init_setup: asyncio.Future[bool] = asyncio.Future() @@ -475,13 +430,18 @@ async def watch_updates(msg): if not inactive_threshold: inactive_threshold = 5 * 60 + config.headers_only = meta_only + config.deliver_policy = deliver_policy + config.inactive_threshold = inactive_threshold + + # Use wildcard subject since filtering is done via filter_subjects in config + subject = f"{self._pre}>" + watcher._sub = await self._js.subscribe( subject, cb=watch_updates, ordered_consumer=True, - deliver_policy=deliver_policy, - headers_only=meta_only, - inactive_threshold=inactive_threshold, + config=config, ) await asyncio.sleep(0) @@ -506,3 +466,136 @@ async def watch_updates(msg): raise err return watcher + + async def list_keys(self, filters: Optional[List[str]] = None) -> List[str]: + """ + Returns a list of keys from a KeyValue store using server-side filtering + with NATS subject patterns for optimal performance. + + This is the recommended method for retrieving filtered keys as it uses + server-side filtering to reduce network traffic and load on the server. + + :param filters: Optional list of NATS subject patterns to filter keys + (e.g., ["user.*", "admin.*", "config.>"]) + If not provided, returns all keys. + :return: List of matching keys + + Example: + # Get all keys + all_keys = await kv.list_keys() + + # Get keys matching patterns with wildcards + user_keys = await kv.list_keys(filters=["user.*", "admin.*"]) + """ + if filters: + watcher = await self.watch_filtered( + keys=filters, + ignore_deletes=True, + meta_only=True, + ) + else: + watcher = await self.watchall( + ignore_deletes=True, + meta_only=True, + ) + + keys = [] + + async for key in watcher: + # None entry is used to signal that there is no more info. + if not key: + break + keys.append(key.key) + + await watcher.stop() + + if not keys: + raise nats.js.errors.NoKeysError + + return keys + + async def keys(self, filters: Optional[List[str]] = None, **kwargs) -> List[str]: + """ + Returns a list of the keys from a KeyValue store. + Optionally filters the keys based on the provided filter list using + client-side substring matching. + + .. deprecated:: 2.12.0 + Use :meth:`list_keys` instead for better performance with server-side + filtering using NATS subject patterns. + + :param filters: Optional list of substrings to filter keys (client-side) + :return: List of matching keys + """ + watcher = await self.watchall( + ignore_deletes=True, + meta_only=True, + ) + keys = [] + + async for key in watcher: + # None entry is used to signal that there is no more info. + if not key: + break + + # Apply filters if any were provided (client-side substring matching) + if filters: + if any(f in key.key for f in filters): + keys.append(key.key) + else: + # No filters provided, append all keys + keys.append(key.key) + + await watcher.stop() + + if not keys: + raise nats.js.errors.NoKeysError + + return keys + + async def history(self, key: str) -> List[Entry]: + """ + history retrieves a list of the entries so far. + """ + watcher = await self.watch(key, include_history=True) + + entries = [] + + async for entry in watcher: + # None entry is used to signal that there is no more info. + if not entry: + break + entries.append(entry) + + await watcher.stop() + + if not entries: + raise nats.js.errors.NoKeysError + + return entries + + async def watch( + self, + keys, + headers_only=False, + include_history=False, + ignore_deletes=False, + meta_only=False, + inactive_threshold=None, + ) -> KeyWatcher: + """ + watch will fire a callback when a key that matches the keys + pattern is updated. + The first update after starting the watch is None in case + there are no pending updates. + + This method delegates to watch_filtered for consistency with the Go client. + """ + return await self.watch_filtered( + keys=[keys], + headers_only=headers_only, + include_history=include_history, + ignore_deletes=ignore_deletes, + meta_only=meta_only, + inactive_threshold=inactive_threshold, + ) diff --git a/nats/tests/test_js.py b/nats/tests/test_js.py index 1acfd4b9..7b24555a 100644 --- a/nats/tests/test_js.py +++ b/nats/tests/test_js.py @@ -3830,6 +3830,143 @@ async def error_handler(e): # Clean up await nc.close() + @async_test + async def test_kv_list_keys(self): + """Test list_keys method with server-side NATS pattern filtering""" + errors = [] + + async def error_handler(e): + print("Error:", e, type(e)) + errors.append(e) + + nc = await nats.connect(error_cb=error_handler) + js = nc.jetstream() + + # Create a KV bucket for testing + kv = await js.create_key_value(bucket="TEST_LIST_KEYS", history=5, ttl=3600) + + # Add various keys with different prefixes + await kv.put("user.alice", b"alice_data") + await kv.put("user.bob", b"bob_data") + await kv.put("admin.charlie", b"charlie_data") + await kv.put("guest.dave", b"dave_data") + await kv.put("config.setting1", b"value1") + + # Test 1: Get all keys + all_keys = await kv.list_keys() + assert len(all_keys) == 5 + assert "user.alice" in all_keys + assert "admin.charlie" in all_keys + + # Test 2: Filter with multiple NATS patterns + filtered_keys = await kv.list_keys(filters=["user.*", "admin.*"]) + assert len(filtered_keys) == 3 + assert "user.alice" in filtered_keys + assert "user.bob" in filtered_keys + assert "admin.charlie" in filtered_keys + assert "guest.dave" not in filtered_keys + assert "config.setting1" not in filtered_keys + + # Test 3: Filter with single pattern + config_keys = await kv.list_keys(filters=["config.*"]) + assert len(config_keys) == 1 + assert "config.setting1" in config_keys + + # Test 4: Filter with exact match + exact_keys = await kv.list_keys(filters=["user.alice"]) + assert len(exact_keys) == 1 + assert "user.alice" in exact_keys + + # Clean up + await nc.close() + + @async_test + async def test_kv_watch_filtered(self): + """Test watch_filtered method with server-side filtering using NATS patterns""" + errors = [] + + async def error_handler(e): + print("Error:", e, type(e)) + errors.append(e) + + nc = await nats.connect(error_cb=error_handler) + js = nc.jetstream() + + # Create a KV bucket for testing + kv = await js.create_key_value(bucket="TEST_WATCH_FILTERED", history=5, ttl=3600) + + # Add various keys with different prefixes + await kv.put("user.alice", b"alice_data") + await kv.put("user.bob", b"bob_data") + await kv.put("admin.charlie", b"charlie_data") + await kv.put("guest.dave", b"dave_data") + await kv.put("config.setting1", b"value1") + + # Test 1: Watch multiple patterns with wildcard + watcher = await kv.watch_filtered( + keys=["user.*", "admin.*"], + ignore_deletes=True, + meta_only=True, + ) + + received_keys = [] + async for entry in watcher: + if not entry: + break + received_keys.append(entry.key) + + await watcher.stop() + + # Should only receive user.* and admin.* keys, not guest.* or config.* + assert "user.alice" in received_keys + assert "user.bob" in received_keys + assert "admin.charlie" in received_keys + assert "guest.dave" not in received_keys + assert "config.setting1" not in received_keys + + # Test 2: Watch single pattern + watcher2 = await kv.watch_filtered( + keys=["config.*"], + ignore_deletes=True, + meta_only=True, + ) + + received_keys2 = [] + async for entry in watcher2: + if not entry: + break + received_keys2.append(entry.key) + + await watcher2.stop() + + # Should only receive config.* keys + assert "config.setting1" in received_keys2 + assert "user.alice" not in received_keys2 + assert len(received_keys2) == 1 + + # Test 3: Watch with exact key match + watcher3 = await kv.watch_filtered( + keys=["user.alice"], + ignore_deletes=True, + meta_only=True, + ) + + received_keys3 = [] + async for entry in watcher3: + if not entry: + break + received_keys3.append(entry.key) + + await watcher3.stop() + + # Should only receive exact match + assert "user.alice" in received_keys3 + assert "user.bob" not in received_keys3 + assert len(received_keys3) == 1 + + # Clean up + await nc.close() + class ObjectStoreTest(SingleJetStreamServerTestCase): @async_test