Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ impl AwClient {
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
proxy_method!(get_event_count, i64, bucketname: &str);
proxy_method!(get_info, aw_models::Info,);

pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
self.client.wait_for_start()
}
}
36 changes: 36 additions & 0 deletions aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error};

use chrono::{DateTime, Utc};
use serde_json::{json, Map};
use std::net::TcpStream;
use std::time::Duration;

pub use aw_models::{Bucket, BucketMetadata, Event};

Expand Down Expand Up @@ -221,4 +223,38 @@ impl AwClient {
let url = format!("{}/api/0/info", self.baseurl);
self.client.get(url).send().await?.json().await
}

// TODO: make async
pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
let socket_addrs = self.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.first()
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running with exponential backoff
let mut retry_delay = Duration::from_millis(100);
let max_wait = Duration::from_secs(10);
let mut total_wait = Duration::from_secs(0);

while total_wait < max_wait {
match TcpStream::connect_timeout(socket_addr, retry_delay) {
Ok(_) => break,
Err(_) => {
std::thread::sleep(retry_delay);
total_wait += retry_delay;
retry_delay *= 2;
}
}
}

if total_wait >= max_wait {
return Err(format!(
"Local server {} not running after 10 seconds of retrying",
socket_addr
)
.into());
}

Ok(())
}
}
21 changes: 15 additions & 6 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,23 @@ fn main() -> Result<(), Box<dyn Error>> {

fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
loop {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;
if let Err(e) = daemon_sync_cycle(client) {
error!("Error during sync cycle: {}", e);
// Re-throw the error
return Err(e);
}

info!("Sync pass done, sleeping for 5 minutes");

std::thread::sleep(std::time::Duration::from_secs(300));
}
}

fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;

Ok(())
}
11 changes: 1 addition & 10 deletions aw-sync/src/sync_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;
use std::fs;
use std::net::TcpStream;

use crate::sync::{sync_run, SyncMode, SyncSpec};
use aw_client_rust::blocking::AwClient;
Expand All @@ -14,15 +13,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box<dyn Error>> {
}

pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
let socket_addrs = client.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.get(0)
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running
if TcpStream::connect(socket_addr).is_err() {
return Err(format!("Local server {} not running", &client.baseurl).into());
}
client.wait_for_start()?;

// Path to the sync folder
// Sync folder is structured ./{hostname}/{device_id}/test.db
Expand Down
Loading