Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit db5be68

Browse files
committed
more error handling
1 parent 744e984 commit db5be68

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

sqld/src/connection/write_proxy.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl WriteProxyConnection {
194194

195195
let (builder, new_status, new_frame_no) = match res {
196196
Ok(res) => res,
197-
Err(e @ Error::StreamDisconnect) => {
197+
Err(e @ (Error::PrimaryStreamDisconnect | Error::PrimaryStreamMisuse)) => {
198198
// drop the connection
199199
self.remote_conn.lock().await.take();
200200
return Err(e);
@@ -289,23 +289,28 @@ impl RemoteConnection {
289289
self.request_sender
290290
.send(req)
291291
.await
292-
.map_err(|_| Error::StreamDisconnect)?;
292+
.map_err(|_| Error::PrimaryStreamDisconnect)?;
293293

294-
'outer: while let Some(resp) = self.response_stream.next().await {
294+
while let Some(resp) = self.response_stream.next().await {
295295
match resp {
296296
Ok(resp) => {
297-
// todo: handle interuption
298-
if resp.request_id != request_id {
299-
todo!("stream misuse: connection should be serialized");
297+
// there was an interuption, and we moved to the next query
298+
if resp.request_id > request_id {
299+
return Err(Error::PrimaryStreamInterupted)
300300
}
301301

302-
if !response_cb(resp.response.unwrap())? {
303-
break 'outer;
302+
// we can ignore response for previously interupted requests
303+
if resp.request_id < request_id {
304+
continue;
305+
}
306+
307+
if !response_cb(resp.response.ok_or(Error::PrimaryStreamMisuse)?)? {
308+
break;
304309
}
305310
}
306311
Err(e) => {
307-
tracing::error!("received error from connection stream: {e}");
308-
return Err(Error::StreamDisconnect);
312+
tracing::error!("received an error from connection stream: {e}");
313+
return Err(Error::PrimaryStreamDisconnect);
309314
}
310315
}
311316
}
@@ -325,16 +330,16 @@ impl RemoteConnection {
325330
match response {
326331
exec_resp::Response::ProgramResp(resp) => {
327332
for step in resp.steps {
328-
let Some(step) = step.step else {panic!("invalid pgm")};
333+
let Some(step) = step.step else { return Err(Error::PrimaryStreamMisuse) };
329334
match step {
330335
Step::Init(_) => builder.init(&builder_config)?,
331336
Step::BeginStep(_) => builder.begin_step()?,
332337
Step::FinishStep(FinishStep {
333338
affected_row_count,
334339
last_insert_rowid,
335340
}) => builder.finish_step(affected_row_count, last_insert_rowid)?,
336-
Step::StepError(StepError { error }) => builder
337-
.step_error(crate::error::Error::RpcQueryError(error.unwrap()))?,
341+
Step::StepError(StepError { error: Some(err) }) => builder
342+
.step_error(crate::error::Error::RpcQueryError(err))?,
338343
Step::ColsDescription(ColsDescription { columns }) => {
339344
let cols = columns.iter().map(|c| Column {
340345
name: &c.name,
@@ -364,12 +369,12 @@ impl RemoteConnection {
364369
builder.finish(last_frame_no, txn_status)?;
365370
return Ok(false);
366371
}
367-
_ => todo!("invalid request"),
372+
_ => return Err(Error::PrimaryStreamMisuse),
368373
}
369374
}
370375
}
371-
exec_resp::Response::DescribeResp(_) => todo!("invalid resp"),
372-
exec_resp::Response::Error(_) => todo!(),
376+
exec_resp::Response::DescribeResp(_) => return Err(Error::PrimaryStreamMisuse),
377+
exec_resp::Response::Error(e) => return Err(Error::RpcQueryError(e)),
373378
}
374379

375380
Ok(true)
@@ -409,12 +414,12 @@ impl RemoteConnection {
409414
is_explain: resp.is_explain,
410415
is_readonly: resp.is_readonly,
411416
});
417+
418+
Ok(false)
412419
}
413-
exec_resp::Response::Error(_) => todo!(),
414-
exec_resp::Response::ProgramResp(_) => todo!(),
420+
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
421+
exec_resp::Response::ProgramResp(_) => Err(Error::PrimaryStreamMisuse),
415422
}
416-
417-
Ok(false)
418423
};
419424

420425
self.make_request(
@@ -423,7 +428,7 @@ impl RemoteConnection {
423428
)
424429
.await?;
425430

426-
Ok(out.unwrap())
431+
out.ok_or(Error::PrimaryStreamMisuse)
427432
}
428433
}
429434

sqld/src/error.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ pub enum Error {
7979
ConflictingRestoreParameters,
8080
#[error("failed to fork database: {0}")]
8181
Fork(#[from] ForkError),
82+
8283
#[error("Connection with primary broken")]
83-
StreamDisconnect,
84+
PrimaryStreamDisconnect,
85+
#[error("Proxy protocal misuse")]
86+
PrimaryStreamMisuse,
87+
#[error("Proxy request interupted")]
88+
PrimaryStreamInterupted,
8489
}
8590

8691
trait ResponseError: std::error::Error {
@@ -131,7 +136,7 @@ impl IntoResponse for Error {
131136
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
132137
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
133138
Fork(e) => e.into_response(),
134-
StreamDisconnect => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
139+
PrimaryStreamDisconnect => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
135140
}
136141
}
137142
}

0 commit comments

Comments
 (0)