Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions nativelink-config/examples/redis.json5
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
"name": "CAS_FAST_SLOW_STORE",
"redis_store": {
"addresses": ["redis://127.0.0.1:6379/"],
"mode": "cluster"
"mode": "cluster",
key_prefix: "my_cas_prefix:",
db: 1,
}
}, {
"name": "AC_FAST_SLOW_STORE",
"redis_store": {
"addresses": ["redis://127.0.0.1:6379/"],
"mode": "cluster"
"mode": "cluster",
key_prefix: "my_cas_prefix:",
db: 1,
}
}, {
"name": "AC_MAIN_STORE",
Expand Down
6 changes: 6 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,12 @@ pub struct RedisSpec {
#[serde(deserialize_with = "convert_vec_string_with_shellexpand")]
pub addresses: Vec<String>,

/// The Redis database number to select.
///
/// Defaults to 0.
#[serde(default)]
pub db: u32,

/// The response timeout for the Redis connection in seconds.
///
/// Default: 10
Expand Down
52 changes: 43 additions & 9 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use core::cmp;
use core::convert::TryInto;
use core::ops::{Bound, RangeBounds};
use core::pin::Pin;
use core::time::Duration;
Expand Down Expand Up @@ -53,7 +54,7 @@ use parking_lot::{Mutex, RwLock};
use patricia_tree::StringPatriciaMap;
use tokio::select;
use tokio::time::sleep;
use tracing::{error, warn};
use tracing::{error, info, warn};
use uuid::Uuid;

use crate::cas_utils::is_zero_digest;
Expand Down Expand Up @@ -172,7 +173,7 @@ impl RedisStore {
"Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes."
));
};
let redis_config = match spec.mode {
let mut redis_config = match spec.mode {
RedisMode::Cluster => RedisConfig::from_url_clustered(addr),
RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr),
RedisMode::Standard => RedisConfig::from_url_centralized(addr),
Expand All @@ -183,7 +184,25 @@ impl RedisStore {
format!("while parsing redis node address: {e}"),
)
})?;

if spec.db != 0 {
match spec.db.try_into() {
Ok(db_u16) => {
redis_config.database = Some(db_u16);
info!(
db = spec.db,
"Configuring Redis client to use database {}", spec.db,
);
}
Err(_) => {
return Err(make_err!(
Code::InvalidArgument,
"Invalid Redis database number '{}'. Redis databases range from 0 to 15, and the value must fit within a u16 (0-{}).",
spec.db,
u16::MAX
));
}
}
}
let reconnect_policy = {
if spec.retry.delay == 0.0 {
spec.retry.delay = DEFAULT_RETRY_DELAY;
Expand Down Expand Up @@ -893,16 +912,23 @@ pub struct RedisSubscriptionManager {
subscribed_keys: Arc<RwLock<StringPatriciaMap<RedisSubscriptionPublisher>>>,
tx_for_test: tokio::sync::mpsc::UnboundedSender<String>,
_subscription_spawn: JoinHandleDropGuard<()>,
key_prefix: String,
}

impl RedisSubscriptionManager {
pub fn new(subscribe_client: SubscriberClient, pub_sub_channel: String) -> Self {
pub fn new(
subscribe_client: SubscriberClient,
pub_sub_channel: String,
key_prefix: &str,
) -> Self {
let subscribed_keys = Arc::new(RwLock::new(StringPatriciaMap::new()));
let subscribed_keys_weak = Arc::downgrade(&subscribed_keys);
let (tx_for_test, mut rx_for_test) = tokio::sync::mpsc::unbounded_channel();
let key_prefix = key_prefix.to_string();
Self {
subscribed_keys,
tx_for_test,
key_prefix,
_subscription_spawn: spawn!("redis_subscribe_spawn", async move {
let mut rx = subscribe_client.message_rx();
loop {
Expand Down Expand Up @@ -997,12 +1023,13 @@ impl SchedulerSubscriptionManager for RedisSubscriptionManager {
let mut subscribed_keys = self.subscribed_keys.write();
let key = key.get_key();
let key_str = key.as_str();
let mut subscription = if let Some(publisher) = subscribed_keys.get(&key_str) {
let prefixed_key = format!("{}{}", self.key_prefix, key_str);
let mut subscription = if let Some(publisher) = subscribed_keys.get(&prefixed_key) {
publisher.subscribe(weak_subscribed_keys)
} else {
let (publisher, subscription) =
RedisSubscriptionPublisher::new(key_str.to_string(), weak_subscribed_keys);
subscribed_keys.insert(key_str, publisher);
subscribed_keys.insert(prefixed_key, publisher);
subscription
};
subscription
Expand Down Expand Up @@ -1037,6 +1064,7 @@ impl SchedulerStore for RedisStore {
let sub = Arc::new(RedisSubscriptionManager::new(
self.subscriber_client.clone(),
pub_sub_channel.clone(),
&self.key_prefix,
));
*subscription_manager = Some(sub.clone());
Ok(sub)
Expand Down Expand Up @@ -1114,6 +1142,7 @@ impl SchedulerStore for RedisStore {
K: SchedulerIndexProvider + SchedulerStoreDecodeTo + Send,
{
let index_value = index.index_value();
let global_key_prefix = &self.key_prefix;
let run_ft_aggregate = || {
let client = self.client_pool.next().clone();
let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
Expand All @@ -1123,7 +1152,8 @@ impl SchedulerStore for RedisStore {
ft_aggregate(
client,
format!(
"{}",
"{}{}",
global_key_prefix.clone(),
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
),
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field),
Expand Down Expand Up @@ -1156,6 +1186,7 @@ impl SchedulerStore for RedisStore {
};
let stream = run_ft_aggregate()?
.or_else(|_| async move {
let global_key_prefix = global_key_prefix.clone();
let mut schema = vec![SearchSchema {
field_name: K::INDEX_NAME.into(),
alias: None,
Expand Down Expand Up @@ -1187,12 +1218,15 @@ impl SchedulerStore for RedisStore {
.next()
.ft_create::<(), _>(
format!(
"{}",
"{}{}",
global_key_prefix,
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
),
FtCreateOptions {
on: Some(IndexKind::Hash),
prefixes: vec![K::KEY_PREFIX.into()],
prefixes: vec![
format!("{}{}", global_key_prefix, K::KEY_PREFIX).into(),
],
nohl: true,
nofields: true,
nofreqs: true,
Expand Down
116 changes: 116 additions & 0 deletions nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,122 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> {
Ok(())
}

#[nativelink_test]
async fn test_has_with_prefix() -> Result<(), Error> {
let prefix = "has_pfx:";

let digest = DigestInfo::try_new(VALID_HASH1, 2)?;
let packed_hash_hex = format!("{prefix}{digest}");

let real_key = RedisValue::Bytes(packed_hash_hex.into());

let mocks = Arc::new(MockRedisBackend::new());

mocks
.expect(
MockCommand {
cmd: Str::from_static("STRLEN"),
subcommand: None,
args: vec![real_key.clone()],
},
Ok(RedisValue::Integer(2)),
)
.expect(
MockCommand {
cmd: Str::from_static("EXISTS"),
subcommand: None,
args: vec![real_key],
},
Ok(RedisValue::Integer(1)),
);

let store = {
let mut builder = Builder::default_centralized();
builder.set_config(RedisConfig {
mocks: Some(mocks),
..Default::default()
});
let (client_pool, subscriber_client) = make_clients(builder);

RedisStore::new_from_builder_and_parts(
client_pool,
subscriber_client,
None,
mock_uuid_generator,
prefix.to_string(),
DEFAULT_READ_CHUNK_SIZE,
DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE,
DEFAULT_SCAN_COUNT,
)
.unwrap()
};

let result = store.has(digest).await.unwrap();
assert!(
result.is_some(),
"Expected has() to return Some when mock indicates existence"
);

Ok(())
}

#[nativelink_test]
async fn test_has_without_prefix() -> Result<(), Error> {
let digest = DigestInfo::try_new(VALID_HASH1, 2)?;

let packed_hash_hex = format!("{digest}");
let real_key = RedisValue::Bytes(packed_hash_hex.into());

let mocks = Arc::new(MockRedisBackend::new());

mocks
.expect(
MockCommand {
cmd: Str::from_static("STRLEN"),
subcommand: None,
args: vec![real_key.clone()],
},
Ok(RedisValue::Integer(2)),
)
.expect(
MockCommand {
cmd: Str::from_static("EXISTS"),
subcommand: None,
args: vec![real_key],
},
Ok(RedisValue::Integer(1)),
);

let store = {
let mut builder = Builder::default_centralized();
builder.set_config(RedisConfig {
mocks: Some(mocks),
..Default::default()
});
let (client_pool, subscriber_client) = make_clients(builder);

RedisStore::new_from_builder_and_parts(
client_pool,
subscriber_client,
None,
mock_uuid_generator,
String::new(),
DEFAULT_READ_CHUNK_SIZE,
DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE,
DEFAULT_SCAN_COUNT,
)
.unwrap()
};

let result = store.has(digest).await.unwrap();
assert!(
result.is_some(),
"Expected has() to return Some when mock indicates existence (no prefix)"
);

Ok(())
}

#[nativelink_test]
async fn upload_empty_data() -> Result<(), Error> {
let data = Bytes::from_static(b"");
Expand Down
Loading