Skip to content
Open
22 changes: 21 additions & 1 deletion core/api/rpc/wsc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace fc::api::rpc {
}));
}
socket.handshake(host, target, ec);
client_data = ClientData{host, port, target, token};
if (ec) {
return ec;
}
Expand Down Expand Up @@ -87,10 +88,11 @@ namespace fc::api::rpc {
}
}
chans.clear();
reconnect(3, std::chrono::seconds(5));
}

void Client::_flush() {
if (!writing && !write_queue.empty()) {
if (!writing && !write_queue.empty() && !reconnecting){
auto &[id, buffer] = write_queue.front();
writing = true;
socket.async_write(boost::asio::buffer(buffer.data(), buffer.size()),
Expand Down Expand Up @@ -185,4 +187,22 @@ namespace fc::api::rpc {
}
}
}

void Client::reconnect(int counter, std::chrono::milliseconds wait) {
if(reconnecting.exchange(true)) return;
logger_->info("Starting reconnect to {}:{}", client_data.host, client_data.port);
for(int i = 0; i < counter; i++){
std::this_thread::sleep_for(wait*(i+1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::this_thread::sleep_for(wait*(i+1));
std::this_thread::sleep_for(wait * (i + 1));

auto res = connect(client_data.host,
client_data.port,
client_data.target,
client_data.token);
if(!res.has_error()) {
break;
}
}
reconnecting.store(false);
logger_->info("Reconnect to {}:{} was successful", client_data.host, client_data.port);
_flush();
}
} // namespace fc::api::rpc
13 changes: 13 additions & 0 deletions core/api/rpc/wsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ namespace fc::api::rpc {
void setup(A &api) {
visit(api, [&](auto &m) { _setup(*this, m); });
}
struct ClientData {
ClientData(std::string host,
std::string port,
std::string target,
std::string token)
: host(host), port(port), target(target), token(token){};
ClientData() = default;

std::string host, port, target, token;
}client_data;

void reconnect(int counter, std::chrono::milliseconds wait);

private:
std::thread thread;
Expand All @@ -72,6 +84,7 @@ namespace fc::api::rpc {
std::map<uint64_t, ChanCb> chans;
std::queue<std::pair<uint64_t, Bytes>> write_queue;
bool writing{false};
std::atomic<bool> reconnecting;

template <typename M>
void _setup(Client &c, M &m);
Expand Down
3 changes: 3 additions & 0 deletions core/sector_storage/impl/local_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -904,4 +904,7 @@ namespace fc::sector_storage {

return call_id;
}
void LocalWorker::ping(std::function<void(const bool &resp)> cb) {
cb(true);
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/local_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ namespace fc::sector_storage {
outcome::result<std::vector<primitives::StoragePath>> getAccessiblePaths()
override;

void ping(std::function<void(const bool &resp)> cb) override;

private:
template <typename W, typename R>
outcome::result<CallId> asyncCall(const SectorRef &sector,
Expand Down
6 changes: 6 additions & 0 deletions core/sector_storage/impl/remote_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,10 @@ namespace fc::sector_storage {
AcquireMode mode) {
return api_.Fetch(sector, file_type, path_type, mode);
}

void RemoteWorker::ping(std::function<void(const bool &resp)> cb) {
api_.Version([=](auto res){
cb(!res.has_error());
});
}
} // namespace fc::sector_storage
2 changes: 2 additions & 0 deletions core/sector_storage/impl/remote_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace fc::sector_storage {
const SectorRef &sector,
const PreCommit1Output &pre_commit_1_output) override;

void ping(std::function<void(const bool &resp)> cb) override;

outcome::result<CallId> sealCommit1(const SectorRef &sector,
const SealRandomness &ticket,
const InteractiveRandomness &seed,
Expand Down
18 changes: 15 additions & 3 deletions core/sector_storage/impl/scheduler_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,22 @@ namespace fc::sector_storage {
return SchedulerErrors::kCannotSelectWorker;
}

WorkerID wid = acceptable[0];

std::promise<WorkerID> wid_promise;
std::future<WorkerID> wid_future = wid_promise.get_future();
auto done = std::make_shared<std::atomic_bool>();
for (const auto &cur : acceptable) {
workers_[cur]->worker->ping([&wid_promise, done, cur](const bool &resp) {
if (resp && !done->exchange(true)) {
wid_promise.set_value(cur);
}
});
}
auto status = wid_future.wait_for(std::chrono::seconds(5));
if(status == std::future_status::timeout){
return false;
}
WorkerID wid = wid_future.get();
assignWorker(wid, workers_[wid], request);

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions core/sector_storage/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ namespace fc::sector_storage {

virtual outcome::result<std::vector<primitives::StoragePath>>
getAccessiblePaths() = 0;

virtual void ping(std::function<void(const bool &resp)> cb) = 0;
};

enum class CallErrorCode : uint64_t {
Expand Down
1 change: 1 addition & 0 deletions test/testutil/mocks/sector_storage/worker_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,6 @@ namespace fc::sector_storage {
gsl::span<const UnpaddedPieceSize>,
const UnpaddedPieceSize &,
int));
MOCK_METHOD1(ping, void(std::function<void(const bool &)>));
};
} // namespace fc::sector_storage