Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ impl AwClient {
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
proxy_method!(get_event_count, i64, bucketname: &str);
proxy_method!(get_info, aw_models::Info,);

pub fn wait_for_server(&self) -> Result<(), Box<dyn Error>> {
self.client.wait_for_server()
}
}
35 changes: 35 additions & 0 deletions aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error};

use chrono::{DateTime, Utc};
use serde_json::{json, Map};
use std::net::TcpStream;
use std::time::Duration;

pub use aw_models::{Bucket, BucketMetadata, Event};

Expand Down Expand Up @@ -221,4 +223,37 @@ impl AwClient {
let url = format!("{}/api/0/info", self.baseurl);
self.client.get(url).send().await?.json().await
}

// TODO: make async
pub fn wait_for_server(&self) -> Result<(), Box<dyn Error>> {
let socket_addrs = self.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs.first()
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running with exponential backoff
let mut retry_delay = Duration::from_millis(100);
let max_wait = Duration::from_secs(10);
let mut total_wait = Duration::from_secs(0);

while total_wait < max_wait {
match TcpStream::connect_timeout(socket_addr, retry_delay) {
Ok(_) => break,
Err(_) => {
std::thread::sleep(retry_delay);
total_wait += retry_delay;
retry_delay *= 2;
}
}
}

if total_wait >= max_wait {
return Err(format!(
"Local server {} not running after 10 seconds of retrying",
socket_addr
)
.into());
}

Ok(())
}
}
21 changes: 15 additions & 6 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,23 @@ fn main() -> Result<(), Box<dyn Error>> {

fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
loop {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;
if let Err(e) = daemon_sync_cycle(client) {
error!("Error during sync cycle: {}", e);
// Re-throw the error
return Err(e);
}

info!("Sync pass done, sleeping for 5 minutes");

std::thread::sleep(std::time::Duration::from_secs(300));
}
}

fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;

Ok(())
}
11 changes: 2 additions & 9 deletions aw-sync/src/sync_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::error::Error;
use std::fs;
use std::net::TcpStream;
use std::time::Duration;

use crate::sync::{sync_run, SyncMode, SyncSpec};
use aw_client_rust::blocking::AwClient;
Expand All @@ -14,15 +15,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box<dyn Error>> {
}

pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
let socket_addrs = client.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.get(0)
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running
if TcpStream::connect(socket_addr).is_err() {
return Err(format!("Local server {} not running", &client.baseurl).into());
}
client.wait_for_server()?;

// Path to the sync folder
// Sync folder is structured ./{hostname}/{device_id}/test.db
Expand Down
Loading