Skip to content
251 changes: 133 additions & 118 deletions benchmarks/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions benchmarks/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod insert_one;
pub mod json_multi_export;
pub mod json_multi_import;
pub mod run_command;
pub mod find_raw_batches;

use std::{
convert::TryInto,
Expand Down
54 changes: 54 additions & 0 deletions benchmarks/src/bench/find_raw_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::Result;
use futures::stream::StreamExt;
use mongodb::{
bson::{doc, Document},
Client, Database,
};

use crate::bench::{drop_database, Benchmark, COLL_NAME, DATABASE_NAME};

pub struct FindRawBatchesBenchmark {
db: Database,
uri: String,
}

/// Specifies the options to `FindRawBatchesBenchmark::setup` operation.
pub struct Options {
pub num_iter: usize,
pub doc: Document,
pub uri: String,
}

#[async_trait::async_trait]
impl Benchmark for FindRawBatchesBenchmark {
type Options = Options;
type TaskState = ();

async fn setup(options: Self::Options) -> Result<Self> {
let client = Client::with_uri_str(&options.uri).await?;
let db = client.database(&DATABASE_NAME);
drop_database(options.uri.as_str(), DATABASE_NAME.as_str()).await?;

let coll = db.collection::<Document>(&COLL_NAME);
let docs = vec![options.doc.clone(); options.num_iter];
coll.insert_many(docs).await?;

Ok(FindRawBatchesBenchmark {
db,
uri: options.uri,
})
}

async fn do_task(&self, _state: Self::TaskState) -> Result<()> {
let mut batches = self.db.find_raw_batches(COLL_NAME.as_str(), doc! {}).await?;
while let Some(batch_res) = batches.next().await {
batch_res?;
}
Ok(())
}

async fn teardown(&self) -> Result<()> {
drop_database(self.uri.as_str(), self.db.name()).await?;
Ok(())
}
}
21 changes: 20 additions & 1 deletion benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
bson_decode::BsonDecodeBenchmark,
bson_encode::BsonEncodeBenchmark,
bulk_write::{InsertBulkWriteBenchmark, MixedBulkWriteBenchmark},
find_raw_batches::FindRawBatchesBenchmark,
find_many::FindManyBenchmark,
find_one::FindOneBenchmark,
gridfs_download::GridFsDownloadBenchmark,
Expand Down Expand Up @@ -62,6 +63,7 @@ const FIND_ONE_BENCH: &str = "Find one";
const FIND_MANY_BENCH: &str = "Find many and empty cursor";
const FIND_MANY_BENCH_RAW: &str = "Find many and empty cursor (raw BSON)";
const FIND_MANY_BENCH_SERDE: &str = "Find many and empty cursor (serde structs)";
const FIND_MANY_BENCH_RAW_BATCHES: &str = "Find many and empty cursor (raw batches)";
const GRIDFS_DOWNLOAD_BENCH: &str = "GridFS download";
const LDJSON_MULTI_EXPORT_BENCH: &str = "LDJSON multi-file export";
const GRIDFS_MULTI_DOWNLOAD_BENCH: &str = "GridFS multi-file download";
Expand Down Expand Up @@ -104,6 +106,7 @@ enum BenchmarkId {
SmallDocInsertBulkWrite, // 23
LargeDocInsertBulkWrite, // 24
MixedBulkWrite, // 25
FindManyRawBatches, // 26
}

impl BenchmarkId {
Expand All @@ -127,6 +130,7 @@ impl BenchmarkId {
BenchmarkId::BsonFullDocumentEncode => FULL_BSON_ENCODING,
BenchmarkId::FindManyRawBson => FIND_MANY_BENCH_RAW,
BenchmarkId::FindManySerde => FIND_MANY_BENCH_SERDE,
BenchmarkId::FindManyRawBatches => FIND_MANY_BENCH_RAW_BATCHES,
BenchmarkId::GridFsDownload => GRIDFS_DOWNLOAD_BENCH,
BenchmarkId::GridFsUpload => GRIDFS_UPLOAD_BENCH,
BenchmarkId::GridFsMultiDownload => GRIDFS_MULTI_DOWNLOAD_BENCH,
Expand Down Expand Up @@ -159,6 +163,7 @@ const SINGLE_BENCHES: &[&str] = &[
/// Benchmarks included in the "MultiBench" composite.
const MULTI_BENCHES: &[&str] = &[
FIND_MANY_BENCH_RAW,
FIND_MANY_BENCH_RAW_BATCHES,
SMALL_DOC_INSERT_MANY_BENCH,
LARGE_DOC_INSERT_MANY_BENCH,
GRIDFS_UPLOAD_BENCH,
Expand All @@ -180,6 +185,7 @@ const PARALLEL_BENCHES: &[&str] = &[
const READ_BENCHES: &[&str] = &[
FIND_ONE_BENCH,
FIND_MANY_BENCH_RAW,
FIND_MANY_BENCH_RAW_BATCHES,
GRIDFS_DOWNLOAD_BENCH,
LDJSON_MULTI_EXPORT_BENCH,
GRIDFS_MULTI_DOWNLOAD_BENCH,
Expand All @@ -199,7 +205,7 @@ const WRITE_BENCHES: &[&str] = &[
MIXED_BULK_WRITE_BENCH,
];

const MAX_ID: u8 = BenchmarkId::MixedBulkWrite as u8;
const MAX_ID: u8 = BenchmarkId::FindManyRawBatches as u8;

async fn run_benchmarks(
uri: &str,
Expand Down Expand Up @@ -465,6 +471,17 @@ async fn run_benchmarks(

comp_score += score_test(find_many, id.name(), 16.22, more_info);
}
// Find many using raw batches and empty the cursor
BenchmarkId::FindManyRawBatches => {
let options = bench::find_raw_batches::Options {
num_iter: 10000,
doc: get_tweet().await,
uri: uri.to_string(),
};
let result =
bench::run_benchmark::<FindRawBatchesBenchmark>(options).await?;
comp_score += score_test(result, FIND_MANY_BENCH_RAW_BATCHES, 16.22, more_info);
}

// GridFS download
BenchmarkId::GridFsDownload => {
Expand Down Expand Up @@ -589,6 +606,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet<BenchmarkId> {
}
if matches.is_present("multi") {
ids.insert(BenchmarkId::FindManyRawBson);
ids.insert(BenchmarkId::FindManyRawBatches);
ids.insert(BenchmarkId::SmallDocInsertMany);
ids.insert(BenchmarkId::LargeDocInsertMany);
ids.insert(BenchmarkId::GridFsDownload);
Expand Down Expand Up @@ -621,6 +639,7 @@ fn parse_ids(matches: ArgMatches) -> HashSet<BenchmarkId> {
ids.insert(BenchmarkId::FindMany);
ids.insert(BenchmarkId::FindManyRawBson);
ids.insert(BenchmarkId::FindManySerde);
ids.insert(BenchmarkId::FindManyRawBatches);
ids.insert(BenchmarkId::SmallDocInsertMany);
ids.insert(BenchmarkId::LargeDocInsertMany);
ids.insert(BenchmarkId::LdJsonMultiFileImport);
Expand Down
4 changes: 2 additions & 2 deletions src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use delete::Delete;
pub use distinct::Distinct;
pub use drop::{DropCollection, DropDatabase};
pub use drop_index::DropIndex;
pub use find::{Find, FindOne};
pub use find::{Find, FindOne, FindRawBatches};
pub use find_and_modify::{FindOneAndDelete, FindOneAndReplace, FindOneAndUpdate};
pub use insert_many::InsertMany;
pub use insert_one::InsertOne;
Expand All @@ -69,7 +69,7 @@ pub struct ListNames;
#[allow(missing_docs)]
pub struct ImplicitSession;
#[allow(missing_docs)]
pub struct ExplicitSession<'a>(&'a mut crate::ClientSession);
pub struct ExplicitSession<'a>(pub(crate) &'a mut crate::ClientSession);

#[allow(missing_docs)]
pub struct Single;
Expand Down
29 changes: 29 additions & 0 deletions src/action/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,35 @@ impl<'a, T: Send + Sync> Action for Find<'a, T, ExplicitSession<'a>> {
}
}

/// Finds documents in a collection and returns raw server batches. Construct with
/// [`Database::find_raw_batches`](crate::Database::find_raw_batches).
#[must_use]
pub struct FindRawBatches<'a, Session = ImplicitSession> {
pub(crate) db: &'a crate::Database,
pub(crate) collection: String,
pub(crate) filter: Document,
pub(crate) options: Option<FindOptions>,
pub(crate) session: Session,
}

#[option_setters(crate::coll::options::FindOptions)]
#[export_doc(find_raw_batches)]
impl<'a, Session> FindRawBatches<'a, Session> {
/// Use the provided session when running the operation.
pub fn session<'s>(
self,
value: impl Into<&'s mut ClientSession>,
) -> FindRawBatches<'a, ExplicitSession<'s>> {
FindRawBatches {
db: self.db,
collection: self.collection,
filter: self.filter,
options: self.options,
session: ExplicitSession(value.into()),
}
}
}

/// Finds a single document in a collection matching a filter. Construct with
/// [`Collection::find_one`].
#[must_use]
Expand Down
2 changes: 2 additions & 0 deletions src/bson_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) trait RawDocumentBufExt: Sized {
value: impl Into<crate::bson::raw::RawBsonRef<'a>> + 'a,
);

#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
fn to_document(&self) -> crate::error::Result<crate::bson::Document>;
}

Expand All @@ -50,6 +51,7 @@ impl RawDocumentBufExt for crate::bson::RawDocumentBuf {
self.append(key, value);
}

#[cfg(any(feature = "tracing-unstable", feature = "opentelemetry"))]
fn to_document(&self) -> crate::error::Result<crate::bson::Document> {
self.try_into().map_err(Into::into)
}
Expand Down
114 changes: 84 additions & 30 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use crate::otel::OtelFutureStub as _;
use crate::{
bson::Document,
change_stream::{
event::ChangeStreamEvent,
session::SessionChangeStream,
ChangeStream,
ChangeStreamData,
event::ChangeStreamEvent, session::SessionChangeStream, ChangeStream, ChangeStreamData,
WatchArgs,
},
cmap::{
Expand All @@ -33,33 +30,20 @@ use crate::{
wire::{next_request_id, Message},
PinnedConnectionHandle,
},
ConnectionPool,
RawCommandResponse,
StreamDescription,
ConnectionPool, RawCommandResponse, StreamDescription,
},
cursor::{session::SessionCursor, Cursor, CursorSpecification},
error::{
Error,
ErrorKind,
Result,
RETRYABLE_WRITE_ERROR,
TRANSIENT_TRANSACTION_ERROR,
Error, ErrorKind, Result, RETRYABLE_WRITE_ERROR, TRANSIENT_TRANSACTION_ERROR,
UNKNOWN_TRANSACTION_COMMIT_RESULT,
},
event::command::{
CommandEvent,
CommandFailedEvent,
CommandStartedEvent,
CommandSucceededEvent,
CommandEvent, CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent,
},
hello::LEGACY_HELLO_COMMAND_NAME_LOWERCASE,
operation::{
aggregate::{change_stream::ChangeStreamAggregate, AggregateTarget},
AbortTransaction,
CommandErrorBody,
CommitTransaction,
ExecutionContext,
Operation,
AbortTransaction, CommandErrorBody, CommitTransaction, ExecutionContext, Operation,
Retryability,
},
options::{ChangeStreamOptions, SelectionCriteria},
Expand Down Expand Up @@ -211,6 +195,61 @@ impl Client {
.await
}

pub(crate) async fn execute_raw_batch_cursor_operation<Op>(
&self,
mut op: impl BorrowMut<Op>,
) -> Result<crate::cursor::raw_batch::RawBatchCursor>
where
Op: Operation<O = crate::cursor::raw_batch::RawBatchCursorSpecification>,
{
Box::pin(async {
let mut details = self
.execute_operation_with_details(op.borrow_mut(), None)
.await?;
// Mirror pinning logic without a CursorSpecification.
let pinned = if self.is_load_balanced() && details.output.info.id != 0 {
Some(details.connection.pin()?)
} else {
None
};
Ok(crate::cursor::raw_batch::RawBatchCursor::new(
self.clone(),
details.output,
details.implicit_session,
pinned,
))
})
.await
}

pub(crate) async fn execute_session_raw_batch_cursor_operation<Op>(
&self,
mut op: impl BorrowMut<Op>,
session: &mut ClientSession,
) -> Result<crate::cursor::raw_batch::SessionRawBatchCursor>
where
Op: Operation<O = crate::cursor::raw_batch::RawBatchCursorSpecification>,
{
let mut details = self
.execute_operation_with_details(op.borrow_mut(), &mut *session)
.await?;

// Prefer the transaction's pinned connection if present; otherwise mirror load-balanced
// pinning.
let pinned = if let Some(handle) = session.transaction.pinned_connection() {
Some(handle.replicate())
} else if self.is_load_balanced() && details.output.info.id != 0 {
Some(details.connection.pin()?)
} else {
None
};
Ok(crate::cursor::raw_batch::SessionRawBatchCursor::new(
self.clone(),
details.output,
pinned,
))
}

pub(crate) async fn execute_session_cursor_operation<Op, T>(
&self,
mut op: impl BorrowMut<Op>,
Expand Down Expand Up @@ -657,15 +696,30 @@ impl Client {
effective_criteria: effective_criteria.clone(),
};

match op.handle_response(&response, context).await {
Ok(response) => Ok(response),
Err(mut err) => {
err.add_labels_and_update_pin(
Some(connection.stream_description()?),
session,
Some(retryability),
);
Err(err.with_server_response(&response))
if op.wants_owned_response() {
match op.handle_response_owned(response, context).await {
Ok(output) => Ok(output),
Err(mut err) => {
err.add_labels_and_update_pin(
Some(connection.stream_description()?),
session,
Some(retryability),
);
// Cannot attach server response; it was moved.
Err(err)
}
}
} else {
match op.handle_response(&response, context).await {
Ok(output) => Ok(output),
Err(mut err) => {
err.add_labels_and_update_pin(
Some(connection.stream_description()?),
session,
Some(retryability),
);
Err(err.with_server_response(&response))
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod common;
pub mod raw_batch;
pub(crate) mod session;

#[cfg(test)]
Expand Down
Loading