Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "util/debug_points.h"

namespace doris {

Expand Down Expand Up @@ -223,9 +225,27 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
expiration_time = 0;
}

if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpState::TRIGGERED_BY_JOB)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
<< ", skip it";
continue;
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
auto segment_size = rs_meta.segment_file_size(segment_id);
auto download_done = [=](Status st) {
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
rowset_id.to_string(), version.to_string(), sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", {
st = Status::InternalError("injected error");
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
tablet_id, rowset_id.to_string(), st.to_string());
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
Expand Down Expand Up @@ -256,6 +276,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 1, 0) ==
WarmUpState::DONE) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
if (wait) {
wait->signal();
}
Expand All @@ -267,13 +292,10 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.offset = 0,
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
Expand All @@ -283,9 +305,18 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
_engine.file_cache_block_downloader().submit_download_task(download_meta);

auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) {
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=](Status st) {
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
"[verbose] block download for rowset={}, inverted index "
"file={}, sleep={}",
rowset_id.to_string(), index_path, sleep_time);
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
});
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
Expand Down Expand Up @@ -319,6 +350,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
<< "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 0, 1) ==
WarmUpState::DONE) {
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
<< rowset_id.to_string() << ") completed";
}
if (wait) {
wait->signal();
}
Expand All @@ -327,18 +363,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun = config::
enable_reader_dryrun_when_download_file_cache,
},
.ctx = {.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = std::move(download_done),
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;

tablet->update_rowset_warmup_state_inverted_idx_num(rowset_id, 1);
if (wait) {
wait->add_count();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,9 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
// after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12].
bool version_overlap =
tablet->max_version_unlocked() >= rowsets.front()->start_version();
tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, warmup_delta_data);
tablet->add_rowsets(
std::move(rowsets), version_overlap, wlock,
warmup_delta_data || config::enable_warmup_immediately_on_new_rowset);
RETURN_IF_ERROR(tablet->merge_rowsets_schema());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
if (request.alter_version > 1) {
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
&rs_splits, false));
&rs_splits, CaptureRowsetOps {}));
}
Defer defer2 {[&]() {
_new_tablet->set_alter_version(-1);
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ CloudStorageEngine::CloudStorageEngine(const EngineOptions& options)
std::make_shared<CloudSizeBasedCumulativeCompactionPolicy>();
_cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] =
std::make_shared<CloudTimeSeriesCumulativeCompactionPolicy>();
_startup_timepoint = std::chrono::system_clock::now();
}

CloudStorageEngine::~CloudStorageEngine() {
Expand Down
13 changes: 13 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <chrono>
#include <memory>
#include <mutex>

Expand Down Expand Up @@ -156,6 +157,16 @@ class CloudStorageEngine final : public BaseStorageEngine {

Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms);

std::chrono::time_point<std::chrono::system_clock> startup_timepoint() const {
return _startup_timepoint;
}

#ifdef BE_TEST
void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
_startup_timepoint = tp;
}
#endif

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand Down Expand Up @@ -227,6 +238,8 @@ class CloudStorageEngine final : public BaseStorageEngine {

EngineOptions _options;
std::mutex _store_lock;

std::chrono::time_point<std::chrono::system_clock> _startup_timepoint;
};

} // namespace doris
Loading
Loading