@@ -134,42 +134,47 @@ mongoc_async_cmd_run (mongoc_async_cmd_t *acmd)
134134 return false;
135135}
136136
137- void
138- _mongoc_async_cmd_init_send (const mongoc_opcode_t cmd_opcode ,
137+ static void
138+ _mongoc_async_cmd_init_send (const int32_t cmd_opcode ,
139139 mongoc_async_cmd_t * acmd ,
140140 const char * dbname )
141141{
142- acmd -> rpc .header .msg_len = 0 ;
143- acmd -> rpc .header .request_id = ++ acmd -> async -> request_id ;
144- acmd -> rpc .header .response_to = 0 ;
142+ BSON_ASSERT (cmd_opcode == MONGOC_OP_CODE_QUERY ||
143+ cmd_opcode == MONGOC_OP_CODE_MSG );
145144
146- if (MONGOC_OPCODE_QUERY == cmd_opcode ) {
147- acmd -> ns = bson_strdup_printf ("%s.$cmd" , dbname );
148- acmd -> rpc .header .opcode = MONGOC_OPCODE_QUERY ;
149- acmd -> rpc .query .flags = MONGOC_QUERY_SECONDARY_OK ;
150- acmd -> rpc .query .collection = acmd -> ns ;
151- acmd -> rpc .query .skip = 0 ;
152- acmd -> rpc .query .n_return = -1 ;
153- acmd -> rpc .query .query = bson_get_data (& acmd -> cmd );
154- acmd -> rpc .query .fields = NULL ;
155- }
145+ int32_t message_length = 0 ;
156146
157- if (MONGOC_OPCODE_MSG == cmd_opcode ) {
158- acmd -> rpc .header .opcode = MONGOC_OPCODE_MSG ;
147+ message_length += mcd_rpc_header_set_message_length (acmd -> rpc , 0 );
148+ message_length +=
149+ mcd_rpc_header_set_request_id (acmd -> rpc , ++ acmd -> async -> request_id );
150+ message_length += mcd_rpc_header_set_response_to (acmd -> rpc , 0 );
151+ message_length += mcd_rpc_header_set_op_code (acmd -> rpc , cmd_opcode );
159152
160- acmd -> rpc .msg .msg_len = 0 ;
161- acmd -> rpc .msg .flags = 0 ;
162- acmd -> rpc .msg .n_sections = 1 ;
163- acmd -> rpc .msg .sections [0 ].payload_type = 0 ;
164- acmd -> rpc .msg .sections [0 ].payload .bson_document =
165- bson_get_data (& acmd -> cmd );
153+ if (cmd_opcode == MONGOC_OP_CODE_QUERY ) {
154+ acmd -> ns = bson_strdup_printf ("%s.$cmd" , dbname );
155+ message_length += mcd_rpc_op_query_set_flags (
156+ acmd -> rpc , MONGOC_OP_QUERY_FLAG_SECONDARY_OK );
157+ message_length +=
158+ mcd_rpc_op_query_set_full_collection_name (acmd -> rpc , acmd -> ns );
159+ message_length += mcd_rpc_op_query_set_number_to_skip (acmd -> rpc , 0 );
160+ message_length += mcd_rpc_op_query_set_number_to_return (acmd -> rpc , -1 );
161+ message_length +=
162+ mcd_rpc_op_query_set_query (acmd -> rpc , bson_get_data (& acmd -> cmd ));
163+ } else {
164+ mcd_rpc_op_msg_set_sections_count (acmd -> rpc , 1u );
165+ message_length +=
166+ mcd_rpc_op_msg_set_flag_bits (acmd -> rpc , MONGOC_OP_MSG_FLAG_NONE );
167+ message_length += mcd_rpc_op_msg_section_set_kind (acmd -> rpc , 0u , 0 );
168+ message_length += mcd_rpc_op_msg_section_set_body (
169+ acmd -> rpc , 0u , bson_get_data (& acmd -> cmd ));
166170 }
167171
172+ mcd_rpc_message_set_length (acmd -> rpc , message_length );
173+
168174 /* This will always be hello, which are not allowed to be compressed */
169- _mongoc_rpc_gather (& acmd -> rpc , & acmd -> array );
170- acmd -> iovec = (mongoc_iovec_t * ) acmd -> array .data ;
171- acmd -> niovec = acmd -> array .len ;
172- _mongoc_rpc_swab_to_le (& acmd -> rpc );
175+ acmd -> iovec = mcd_rpc_message_to_iovecs (acmd -> rpc , & acmd -> niovec );
176+ BSON_ASSERT (acmd -> iovec );
177+
173178 acmd -> bytes_written = 0 ;
174179}
175180
@@ -198,17 +203,15 @@ mongoc_async_cmd_new (mongoc_async_t *async,
198203 void * setup_ctx ,
199204 const char * dbname ,
200205 const bson_t * cmd ,
201- const mongoc_opcode_t cmd_opcode , /* OP_QUERY or OP_MSG */
206+ const int32_t cmd_opcode , /* OP_QUERY or OP_MSG */
202207 mongoc_async_cmd_cb_t cb ,
203208 void * cb_data ,
204209 int64_t timeout_msec )
205210{
206- mongoc_async_cmd_t * acmd ;
211+ BSON_ASSERT_PARAM (cmd );
212+ BSON_ASSERT_PARAM (dbname );
207213
208- BSON_ASSERT (cmd );
209- BSON_ASSERT (dbname );
210-
211- acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t );
214+ mongoc_async_cmd_t * const acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t );
212215 acmd -> async = async ;
213216 acmd -> dns_result = dns_result ;
214217 acmd -> timeout_msec = timeout_msec ;
@@ -222,12 +225,13 @@ mongoc_async_cmd_new (mongoc_async_t *async,
222225 acmd -> connect_started = bson_get_monotonic_time ();
223226 bson_copy_to (cmd , & acmd -> cmd );
224227
225- if (MONGOC_OPCODE_MSG == cmd_opcode ) {
226- /* If we're sending an OPCODE_MSG , we need to add the "db" field: */
228+ if (MONGOC_OP_CODE_MSG == cmd_opcode ) {
229+ /* If we're sending an OP_MSG , we need to add the "db" field: */
227230 bson_append_utf8 (& acmd -> cmd , "$db" , 3 , "admin" , 5 );
228231 }
229232
230- _mongoc_array_init (& acmd -> array , sizeof (mongoc_iovec_t ));
233+ acmd -> rpc = mcd_rpc_message_new ();
234+ acmd -> iovec = NULL ;
231235 _mongoc_buffer_init (& acmd -> buffer , NULL , 0 , NULL , NULL );
232236
233237 _mongoc_async_cmd_init_send (cmd_opcode , acmd , dbname );
@@ -255,8 +259,9 @@ mongoc_async_cmd_destroy (mongoc_async_cmd_t *acmd)
255259 bson_destroy (& acmd -> reply );
256260 }
257261
258- _mongoc_array_destroy ( & acmd -> array );
262+ bson_free ( acmd -> iovec );
259263 _mongoc_buffer_destroy (& acmd -> buffer );
264+ mcd_rpc_message_destroy (acmd -> rpc );
260265
261266 bson_free (acmd -> ns );
262267 bson_free (acmd );
@@ -348,7 +353,7 @@ _mongoc_async_cmd_phase_send (mongoc_async_cmd_t *acmd)
348353 used_temp_iovec = true;
349354 }
350355
351- _mongoc_rpc_op_egress_inc ( & acmd -> rpc );
356+ mcd_rpc_message_egress ( acmd -> rpc );
352357 bytes = mongoc_stream_writev (acmd -> stream , iovec , niovec , 0 );
353358
354359 if (used_temp_iovec ) {
@@ -462,38 +467,39 @@ _mongoc_async_cmd_phase_recv_rpc (mongoc_async_cmd_t *acmd)
462467 acmd -> bytes_to_read = (size_t ) (acmd -> bytes_to_read - bytes );
463468
464469 if (!acmd -> bytes_to_read ) {
465- if (!_mongoc_rpc_scatter (
466- & acmd -> rpc , acmd -> buffer .data , acmd -> buffer .len )) {
470+ mcd_rpc_message_reset (acmd -> rpc );
471+ if (!mcd_rpc_message_from_data_in_place (
472+ acmd -> rpc , acmd -> buffer .data , acmd -> buffer .len , NULL )) {
467473 bson_set_error (& acmd -> error ,
468474 MONGOC_ERROR_PROTOCOL ,
469475 MONGOC_ERROR_PROTOCOL_INVALID_REPLY ,
470476 "Invalid reply from server." );
471477 return MONGOC_ASYNC_CMD_ERROR ;
472478 }
473- if (BSON_UINT32_FROM_LE (acmd -> rpc .header .opcode ) ==
474- MONGOC_OPCODE_COMPRESSED ) {
475- uint8_t * buf = NULL ;
476- size_t len =
477- BSON_UINT32_FROM_LE (acmd -> rpc .compressed .uncompressed_size ) +
478- sizeof (mongoc_rpc_header_t );
479-
480- buf = bson_malloc0 (len );
481- if (!_mongoc_rpc_decompress (& acmd -> rpc , buf , len )) {
482- bson_free (buf );
483- bson_set_error (& acmd -> error ,
484- MONGOC_ERROR_PROTOCOL ,
485- MONGOC_ERROR_PROTOCOL_INVALID_REPLY ,
486- "Could not decompress server reply" );
487- return MONGOC_ASYNC_CMD_ERROR ;
488- }
479+ mcd_rpc_message_ingress (acmd -> rpc );
489480
490- _mongoc_buffer_destroy (& acmd -> buffer );
491- _mongoc_buffer_init (& acmd -> buffer , buf , len , NULL , NULL );
481+ void * decompressed_data ;
482+ size_t decompressed_data_len ;
483+
484+ if (!mcd_rpc_message_decompress_if_necessary (
485+ acmd -> rpc , & decompressed_data , & decompressed_data_len )) {
486+ bson_set_error (& acmd -> error ,
487+ MONGOC_ERROR_PROTOCOL ,
488+ MONGOC_ERROR_PROTOCOL_INVALID_REPLY ,
489+ "Could not decompress server reply" );
490+ return MONGOC_ASYNC_CMD_ERROR ;
492491 }
493492
494- _mongoc_rpc_swab_from_le (& acmd -> rpc );
493+ if (decompressed_data ) {
494+ _mongoc_buffer_destroy (& acmd -> buffer );
495+ _mongoc_buffer_init (& acmd -> buffer ,
496+ decompressed_data ,
497+ decompressed_data_len ,
498+ NULL ,
499+ NULL );
500+ }
495501
496- if (!_mongoc_rpc_get_first_document ( & acmd -> rpc , & acmd -> reply )) {
502+ if (!mcd_rpc_message_get_body ( acmd -> rpc , & acmd -> reply )) {
497503 bson_set_error (& acmd -> error ,
498504 MONGOC_ERROR_PROTOCOL ,
499505 MONGOC_ERROR_PROTOCOL_INVALID_REPLY ,
0 commit comments