Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 70a906f

Browse files
committed
error handling
1 parent a94fd54 commit 70a906f

File tree

5 files changed

+131
-67
lines changed

5 files changed

+131
-67
lines changed

sqld/src/connection/libsql.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ mod test {
858858
TestBuilder::default(),
859859
)
860860
.unwrap();
861-
assert_eq!(state, State::Txn);
861+
assert_eq!(state, TxnStatus::Txn);
862862

863863
tokio::time::advance(TXN_TIMEOUT * 2).await;
864864

@@ -868,7 +868,7 @@ mod test {
868868
TestBuilder::default(),
869869
)
870870
.unwrap();
871-
assert_eq!(state, State::Init);
871+
assert_eq!(state, TxnStatus::Init);
872872
assert!(matches!(builder.into_ret()[0], Err(Error::LibSqlTxTimeout)));
873873
}
874874

@@ -902,7 +902,7 @@ mod test {
902902
TestBuilder::default(),
903903
)
904904
.unwrap();
905-
assert_eq!(state, State::Txn);
905+
assert_eq!(state, TxnStatus::Txn);
906906
assert!(builder.into_ret()[0].is_ok());
907907
});
908908
}
@@ -945,7 +945,7 @@ mod test {
945945
TestBuilder::default(),
946946
)
947947
.unwrap();
948-
assert_eq!(state, State::Txn);
948+
assert_eq!(state, TxnStatus::Txn);
949949
assert!(builder.into_ret()[0].is_ok());
950950
}
951951
})
@@ -963,7 +963,7 @@ mod test {
963963
TestBuilder::default(),
964964
)
965965
.unwrap();
966-
assert_eq!(state, State::Txn);
966+
assert_eq!(state, TxnStatus::Txn);
967967
assert!(builder.into_ret()[0].is_ok());
968968
before.elapsed()
969969
}
@@ -978,7 +978,7 @@ mod test {
978978
let (builder, state) =
979979
Connection::run(conn, Program::seq(&["COMMIT"]), TestBuilder::default())
980980
.unwrap();
981-
assert_eq!(state, State::Init);
981+
assert_eq!(state, TxnStatus::Init);
982982
assert!(builder.into_ret()[0].is_ok());
983983
}
984984
})

sqld/src/connection/write_proxy.rs

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,22 @@ impl WriteProxyConnection {
187187
) -> Result<(B, TxnStatus)> {
188188
self.stats.inc_write_requests_delegated();
189189
*status = TxnStatus::Invalid;
190-
let (builder, new_status, new_frame_no) = self
190+
let res = self
191191
.with_remote_conn(auth, self.builder_config, |conn| {
192192
Box::pin(conn.execute(pgm, builder))
193193
})
194-
.await?;
194+
.await;
195+
196+
let (builder, new_status, new_frame_no) = match res {
197+
Ok(res) => res,
198+
Err(e @ Error::StreamDisconnect) => {
199+
// drop the connection
200+
self.remote_conn.lock().await.take();
201+
return Err(e);
202+
}
203+
Err(e) => return Err(e),
204+
};
205+
195206
*status = new_status;
196207
if let Some(current_frame_no) = new_frame_no {
197208
self.update_last_write_frame_no(current_frame_no);
@@ -251,9 +262,7 @@ impl RemoteConnection {
251262
req.metadata_mut()
252263
.insert_bin(NAMESPACE_METADATA_KEY, namespace);
253264
auth.upgrade_grpc_request(&mut req);
254-
dbg!();
255265
let response_stream = client.stream_exec(req).await.unwrap().into_inner();
256-
dbg!();
257266

258267
Ok(Self {
259268
response_stream,
@@ -276,14 +285,11 @@ impl RemoteConnection {
276285
request: Some(rpc::exec_req::Request::Execute(program.into())),
277286
};
278287

279-
dbg!();
280288
self.request_sender.send(req).await.unwrap(); // TODO: the stream was close!
281-
dbg!();
282289
let mut txn_status = TxnStatus::Invalid;
283290
let mut new_frame_no = None;
284291

285292
'outer: while let Some(resp) = self.response_stream.next().await {
286-
dbg!(&resp);
287293
match resp {
288294
Ok(resp) => {
289295
if resp.request_id != request_id {
@@ -314,7 +320,6 @@ impl RemoteConnection {
314320
Payload::BeginRow(_) => builder.begin_row()?,
315321
Payload::AddRowValue(AddRowValue { val }) => {
316322
let value: Value = bincode::deserialize(&val.unwrap().data)
317-
// something is wrong, better stop right here
318323
.map_err(QueryResultBuilderError::from_any)?;
319324
builder.add_row_value(ValueRef::from(&value))?;
320325
}
@@ -324,7 +329,6 @@ impl RemoteConnection {
324329
txn_status = TxnStatus::from(f.state());
325330
new_frame_no = last_frame_no;
326331
builder.finish(last_frame_no, txn_status)?;
327-
dbg!();
328332
break 'outer;
329333
}
330334
Payload::Error(error) => {
@@ -333,7 +337,10 @@ impl RemoteConnection {
333337
}
334338
}
335339
}
336-
Err(_e) => todo!("handle stream error"),
340+
Err(e) => {
341+
tracing::error!("received error from connection stream: {e}");
342+
return Err(Error::StreamDisconnect)
343+
},
337344
}
338345
}
339346

@@ -426,7 +433,7 @@ pub mod test {
426433
use rand::Fill;
427434

428435
use super::*;
429-
use crate::query_result_builder::test::test_driver;
436+
use crate::{query_result_builder::test::test_driver, rpc::proxy::rpc::{ExecuteResults, query_result::RowResult}};
430437

431438
/// generate an arbitraty rpc value. see build.rs for usage.
432439
pub fn arbitrary_rpc_value(u: &mut Unstructured) -> arbitrary::Result<Vec<u8>> {
@@ -442,10 +449,55 @@ pub mod test {
442449
Ok(v.into())
443450
}
444451

452+
fn execute_results_to_builder<B: QueryResultBuilder>(
453+
execute_result: ExecuteResults,
454+
mut builder: B,
455+
config: &QueryBuilderConfig,
456+
) -> Result<B> {
457+
builder.init(config)?;
458+
for result in execute_result.results {
459+
match result.row_result {
460+
Some(RowResult::Row(rows)) => {
461+
builder.begin_step()?;
462+
builder.cols_description(rows.column_descriptions.iter().map(|c| Column {
463+
name: &c.name,
464+
decl_ty: c.decltype.as_deref(),
465+
}))?;
466+
467+
builder.begin_rows()?;
468+
for row in rows.rows {
469+
builder.begin_row()?;
470+
for value in row.values {
471+
let value: Value = bincode::deserialize(&value.data)
472+
// something is wrong, better stop right here
473+
.map_err(QueryResultBuilderError::from_any)?;
474+
builder.add_row_value(ValueRef::from(&value))?;
475+
}
476+
builder.finish_row()?;
477+
}
478+
479+
builder.finish_rows()?;
480+
481+
builder.finish_step(rows.affected_row_count, rows.last_insert_rowid)?;
482+
}
483+
Some(RowResult::Error(err)) => {
484+
builder.begin_step()?;
485+
builder.step_error(Error::RpcQueryError(err))?;
486+
builder.finish_step(0, None)?;
487+
}
488+
None => (),
489+
}
490+
}
491+
492+
builder.finish(execute_result.current_frame_no, TxnStatus::Init)?;
493+
494+
Ok(builder)
495+
}
496+
445497
/// In this test, we generate random ExecuteResults, and ensures that the `execute_results_to_builder` drives the builder FSM correctly.
446498
#[test]
447499
fn test_execute_results_to_builder() {
448-
test_driver(1000, |b| {
500+
test_driver(1000, |b| -> std::result::Result<crate::query_result_builder::test::FsmQueryBuilder, Error> {
449501
let mut data = [0; 10_000];
450502
data.try_fill(&mut rand::thread_rng()).unwrap();
451503
let mut un = Unstructured::new(&data);

sqld/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ pub enum Error {
7979
ConflictingRestoreParameters,
8080
#[error("failed to fork database: {0}")]
8181
Fork(#[from] ForkError),
82+
#[error("Connection with primary broken")]
83+
StreamDisconnect,
8284
}
8385

8486
trait ResponseError: std::error::Error {

sqld/src/query_result_builder.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub static TOTAL_RESPONSE_SIZE: AtomicUsize = AtomicUsize::new(0);
1515

1616
#[derive(Debug)]
1717
pub enum QueryResultBuilderError {
18+
/// The response payload is too large
1819
ResponseTooLarge(u64),
1920
Internal(anyhow::Error),
2021
}
@@ -616,6 +617,7 @@ pub mod test {
616617
fn finish(
617618
&mut self,
618619
_last_frame_no: Option<FrameNo>,
620+
_txn_status: TxnStatus,
619621
) -> Result<(), QueryResultBuilderError> {
620622
Ok(())
621623
}
@@ -751,7 +753,7 @@ pub mod test {
751753
FinishRow => b.finish_row().unwrap(),
752754
FinishRows => b.finish_rows().unwrap(),
753755
Finish => {
754-
b.finish(Some(0)).unwrap();
756+
b.finish(Some(0), TxnStatus::Init).unwrap();
755757
break;
756758
}
757759
BuilderError => return b,
@@ -899,6 +901,7 @@ pub mod test {
899901
fn finish(
900902
&mut self,
901903
_last_frame_no: Option<FrameNo>,
904+
_txn_status: TxnStatus,
902905
) -> Result<(), QueryResultBuilderError> {
903906
self.maybe_inject_error()?;
904907
self.transition(Finish)
@@ -947,7 +950,7 @@ pub mod test {
947950
builder.finish_rows().unwrap();
948951
builder.finish_step(0, None).unwrap();
949952

950-
builder.finish(Some(0)).unwrap();
953+
builder.finish(Some(0), TxnStatus::Init).unwrap();
951954
}
952955

953956
#[test]

0 commit comments

Comments
 (0)