-
Notifications
You must be signed in to change notification settings - Fork 232
Add server-side filtering for KeyValue list operations #769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add server-side filtering for KeyValue list operations #769
Conversation
|
+1 for this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the NATS KeyValue watch functionality to support server-side filtering using multiple NATS subject patterns. It introduces a new watch_filtered method and list_keys method while deprecating the old keys method.
Key changes:
- Introduced
watch_filteredmethod to support multiple filter patterns with server-side filtering - Added
list_keysmethod as the recommended approach for retrieving filtered keys - Refactored
watchmethod to delegate towatch_filteredfor consistency - Deprecated
keysmethod in favor of server-side filtering vialist_keys - Updated
subscribemethod to handlefilter_subjects(multi-filter API) alongside legacyfilter_subject - Applied code formatting changes throughout
client.pyfor consistency
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| nats/src/nats/js/kv.py | Refactored watch functionality to support server-side filtering with multiple patterns; added watch_filtered and list_keys methods; deprecated keys method |
| nats/src/nats/js/client.py | Added support for filter_subjects in consumer config; applied code formatting improvements |
| nats/tests/test_js.py | Added comprehensive tests for list_keys and watch_filtered methods with various filtering scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| config.headers_only = meta_only | ||
| config.deliver_policy = deliver_policy | ||
| config.inactive_threshold = inactive_threshold | ||
|
|
||
| # Use wildcard subject since filtering is done via filter_subjects in config | ||
| subject = f"{self._pre}>" | ||
|
|
||
| watcher._sub = await self._js.subscribe( | ||
| subject, | ||
| cb=watch_updates, | ||
| ordered_consumer=True, | ||
| deliver_policy=deliver_policy, | ||
| headers_only=meta_only, | ||
| inactive_threshold=inactive_threshold, | ||
| config=config, | ||
| ) |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When ordered_consumer=True is set, the subscribe method will override the config.deliver_policy set on line 465. In the subscribe method (client.py:427-434), ordered consumer mode sets specific config values including flow_control, ack_policy, max_deliver, ack_wait, idle_heartbeat, num_replicas, and mem_storage. This means the deliver_policy configured on line 465 will be lost. Consider either removing the deliver_policy assignment or handling ordered consumer configuration before calling subscribe.
| # Create consumer config with filter_subjects | ||
| # Always use filter_subjects (modern API) which supports multiple filters | ||
| # Single filter is just a list with one element | ||
| config = api.ConsumerConfig() |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting filter_subjects directly on the config object doesn't account for the case where a user might have already set filter_subject (singular) on a passed-in config. If a user passes a custom config object through kwargs with filter_subject already set, it would create inconsistency. Consider checking if the config already has filter_subject set and either raise an error or merge the values appropriately.
| config = api.ConsumerConfig() | |
| config = api.ConsumerConfig() | |
| if getattr(config, "filter_subject", None) is not None: | |
| raise ValueError("Cannot set filter_subjects when filter_subject is already set on config") |
Summary
Implements server-side filtering for KeyValue store list operations to address performance issues when working with large numbers of keys.
Fixes: #768
Changes
watch_filtered()method for server-side multi-pattern filtering usingfilter_subjectslist_keys()method with NATS pattern support (*,>wildcards)watch()to delegate towatch_filtered()for consistencyfilter_subjectvsfilter_subjectsconflict insubscribe()methodkeys()methodtest_kv_list_keys()andtest_kv_watch_filtered()Implementation Details
The new
list_keys()method usesfilter_subjectsin consumer configuration to perform server-side filtering, significantly reducing network traffic and server load compared to the client-side filtering in the existingkeys()method.Follows the Go client architecture pattern (nats.go
WatchFiltered()) withwatch_filtered()as the foundational method for filtered watch operations.Why a new method instead of modifying
keys()?The existing
keys()method uses substring matching: a filter"greet"matches both"greet"and"greeting". This behavior is tested and relied upon in the existing test suite (seetest_kv_keys):The new list_keys() method uses NATS subject patterns: "foo" matches only "foo" exactly, while "foo.*" uses wildcard matching for structured keys:
Changing the behavior of keys() would break existing code that relies on substring matching, so a new method provides the performance benefits of server-side filtering while maintaining backwards compatibility.
Compatibility
keys()method behavior unchanged