Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ int shrink_resources (std::shared_ptr<resource_ctx_t> &ctx, const char *ids)
goto done;
}
// Update total counts:
ctx->traverser->initialize ();
rc = ctx->traverser->initialize ();
flux_log (ctx->h, LOG_DEBUG, "successfully removed ranks %s from resource set", ids);

done:
Expand Down
2 changes: 2 additions & 0 deletions resource/planner/c++/planner_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ const char *planner_multi::get_resource_type_at (size_t i) const
size_t planner_multi::get_resource_type_idx (const char *type) const
{
auto by_res = m_types_totals_planners.get<res_type> ().find (type);
if (by_res == m_types_totals_planners.get<res_type> ().end ())
return m_types_totals_planners.size ();
auto curr_idx = m_types_totals_planners.get<idx> ().iterator_to (*by_res);
return curr_idx - m_types_totals_planners.begin ();
}
Expand Down
13 changes: 10 additions & 3 deletions resource/planner/c/planner_multi_c_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ extern "C" int64_t planner_multi_span_planned_at (planner_multi_t *ctx,
int64_t span_id,
unsigned int i)
{
if (!ctx || span_id < 0) {
if (!ctx || span_id < 0 || i >= ctx->plan_multi->get_planners_size ()) {
errno = EINVAL;
return -1;
}
Expand All @@ -560,8 +560,15 @@ extern "C" int64_t planner_multi_span_planned_at (planner_multi_t *ctx,
errno = ENOENT;
return -1;
}
return planner_span_resource_count (ctx->plan_multi->get_planner_at (i),
span_it->second.at (i));
int64_t p_span_id = span_it->second.at (i);
// Span may have been removed during a partial cancel
if (p_span_id == -1) {
return 0;
} else if (p_span_id < 0) {
errno = EINVAL;
return -1;
}
return planner_span_resource_count (ctx->plan_multi->get_planner_at (i), p_span_id);
}

extern "C" int64_t planner_multi_span_first (planner_multi_t *ctx)
Expand Down
8 changes: 6 additions & 2 deletions resource/planner/test/planner_test02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ static int test_partial_cancel ()
size_t len = 5;
int rc = -1;
int64_t span1 = -1, span2 = -1, span3 = -1, span4 = -1, avail1 = -1, avail2 = -1, avail3 = -1,
avail4 = -1, avail5 = -1, avail6 = -1, avail7 = -1;
avail4 = -1, avail5 = -1, avail6 = -1, avail7 = -1, used = -1;
const uint64_t resource_totals[] = {10, 20, 30, 40, 50};
const char *resource_types[] = {"A", "B", "C", "D", "E"};
const char *resource_types1[] = {"B", "A", "E"};
Expand Down Expand Up @@ -768,6 +768,10 @@ static int test_partial_cancel ()
span4 = planner_multi_add_span (ctx, 3000, 1000, request3, len);
rc = planner_multi_reduce_span (ctx, span4, reduce5, resource_types5, 1, removed1);
rc = planner_multi_reduce_span (ctx, span4, reduce5, resource_types5, 1, removed2);
used = planner_multi_span_planned_at (ctx, span4, 0);
ok ((used == 0),
"partial reductions correctly report used resources of completely removed resource");

rc = planner_multi_reduce_span (ctx, span4, reduce5, resource_types6, 1, removed3);
rc = planner_multi_reduce_span (ctx, span4, reduce5, resource_types6, 1, removed4);
avail6 = planner_multi_avail_resources_at (ctx, 3500, 0);
Expand All @@ -783,7 +787,7 @@ static int test_partial_cancel ()

int main (int argc, char *argv[])
{
plan (106);
plan (107);

test_multi_basics ();

Expand Down
6 changes: 4 additions & 2 deletions resource/readers/resource_reader_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ enum class job_modify_t { CANCEL, PARTIAL_CANCEL, VTX_CANCEL };

struct modify_data_t {
job_modify_t mod_type = job_modify_t::PARTIAL_CANCEL;
std::unordered_set<int64_t> ranks_removed;
std::unordered_map<const char *, int64_t> type_to_count;
std::unordered_set<int64_t> ranks;
std::unordered_map<int64_t, vtx_t> rank_to_root;
std::unordered_map<int64_t, std::unordered_map<resource_type_t, int64_t>> rank_to_counts;
std::unordered_map<resource_type_t, int64_t> type_to_count;
};

/*! Base resource reader class.
Expand Down
68 changes: 45 additions & 23 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,10 @@ int resource_reader_jgf_t::cancel_vtx (vtx_t vtx,
const fetch_helper_t &fetcher,
jgf_updater_data &update_data)
{
int rc = -1;
int64_t span = -1;
int64_t xspan = -1;
int64_t sched_span = -1;
int64_t prev_avail = -1;
int64_t prev_occu = -1;
planner_multi_t *subtree_plan = NULL;
planner_t *x_checker = NULL;
planner_t *plans = NULL;
Expand All @@ -808,57 +807,78 @@ int resource_reader_jgf_t::cancel_vtx (vtx_t vtx,
auto &tags = g[vtx].idata.tags;
std::map<int64_t, int64_t>::iterator span_it;
std::map<int64_t, int64_t>::iterator xspan_it;
uint32_t path_len = 0;
rank_data *rdata = nullptr;

static subsystem_t containment_sub{"containment"};
static const subsystem_t containment_sub{"containment"};
// remove from aggregate filter if present
auto agg_span = job2span.find (update_data.jobid);
if (agg_span != job2span.end ()) {
if ((subtree_plan = g[vtx].idata.subplans[containment_sub]) == NULL)
goto ret;
goto error;
if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0)
goto ret;
goto error;
// Delete from job2span tracker
job2span.erase (update_data.jobid);
}

// remove from exclusive filter;
xspan_it = x_spans.find (update_data.jobid);
if (xspan_it == x_spans.end ()) {
errno = EINVAL;
goto ret;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't find x_checker span for job ";
m_err_msg += std::to_string (update_data.jobid) + " in " + g[vtx].name + ".\n";
goto error;
}
xspan = xspan_it->second;
x_checker = g[vtx].idata.x_checker;
g[vtx].idata.tags.erase (update_data.jobid);
g[vtx].idata.x_spans.erase (update_data.jobid);
if (planner_rem_span (x_checker, xspan) == -1) {
errno = EINVAL;
goto ret;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't remove x_checker span for job ";
m_err_msg += std::to_string (update_data.jobid) + " in " + g[vtx].name + ".\n";
goto error;
}
// rem plan
span_it = g[vtx].schedule.allocations.find (update_data.jobid);
sched_span = span_it->second;
if (span_it != g[vtx].schedule.allocations.end ()) {
g[vtx].schedule.allocations.erase (update_data.jobid);
} else {
errno = EINVAL;
goto ret;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't find allocation span for job ";
m_err_msg += std::to_string (update_data.jobid) + " in " + g[vtx].name + ".\n";
goto error;
}
plans = g[vtx].schedule.plans;
prev_avail = planner_avail_resources_at (plans, 0);
if ((prev_occu = planner_span_resource_count (plans, sched_span)) < 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_span_resource_count failed for job ";
m_err_msg += std::to_string (update_data.jobid) + " in " + g[vtx].name + ".\n";
goto error;
}
if (planner_rem_span (plans, sched_span) == -1) {
errno = EINVAL;
goto ret;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't remove allocation span for job ";
m_err_msg += std::to_string (update_data.jobid) + " in " + g[vtx].name + ".\n";
goto error;
}
// Don't need to check if rank is invalid; check done in find_vtx ().
// Add the newly freed counts, Can't assume it freed everything.
update_data.type_to_count[g[vtx].type.c_str ()] +=
(planner_avail_resources_at (plans, 0) - prev_avail);
rdata = &(update_data.rank_to_data[g[vtx].rank]);
rdata->type_to_count[g[vtx].type] += prev_occu;
update_data.ranks.insert (g[vtx].rank);
path_len = g[vtx].paths.at (containment_sub).length ();
if (rdata->length > path_len) {
rdata->length = path_len;
rdata->root = vtx;
}

rc = 0;

ret:
return rc;
return 0;
error:
errno = EINVAL;
return -1;
}

int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
Expand Down Expand Up @@ -1328,14 +1348,16 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g,
// Fill in updater data
p_cancel_data.jobid = jobid;
p_cancel_data.update = false;

if ((rc = fetch_jgf (R, &jgf, &nodes, &edges, p_cancel_data)) != 0)
goto done;
if ((rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0)
goto done;

mod_data.type_to_count = p_cancel_data.type_to_count;
mod_data.ranks_removed = p_cancel_data.ranks;
for (const auto &[rank, data] : p_cancel_data.rank_to_data) {
mod_data.rank_to_counts[rank] = data.type_to_count;
mod_data.ranks.insert (rank);
mod_data.rank_to_root[rank] = data.root;
}

done:
json_decref (jgf);
Expand Down
13 changes: 11 additions & 2 deletions resource/readers/resource_reader_jgf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@ struct vmap_val_t;
namespace Flux {
namespace resource_model {

struct rank_data {
// store counts of resources to be cancelled
std::unordered_map<resource_type_t, int64_t> type_to_count;
// store length of paths to determine subgraph roots
uint32_t length = std::numeric_limits<uint32_t>::max ();
// store root vertex of subgraph for this rank
vtx_t root = boost::graph_traits<resource_graph_t>::null_vertex ();
};

// Struct to track data for updates
struct jgf_updater_data {
int64_t jobid = 0;
int64_t at = 0;
uint64_t duration = 0;
bool reserved = false;
// track counts of resources to be cancelled
std::unordered_map<const char *, int64_t> type_to_count;
// track data of resources to be cancelled
std::unordered_map<int64_t, rank_data> rank_to_data;
// track count of rank vertices to determine if rank
// should be removed from by_rank map
std::unordered_set<int64_t> ranks;
Expand Down
2 changes: 1 addition & 1 deletion resource/readers/resource_reader_rv1exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ int resource_reader_rv1exec_t::partial_cancel_internal (resource_graph_t &g,
}
rank = idset_first (r_ids);
while (rank != IDSET_INVALID_ID) {
mod_data.ranks_removed.insert (rank);
mod_data.ranks.insert (rank);
rank = idset_next (r_ids, rank);
}
rc = 0;
Expand Down
11 changes: 9 additions & 2 deletions resource/traversers/dfu_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,10 +900,17 @@ int dfu_impl_t::dom_find_dfv (std::shared_ptr<match_writers_t> &w,
m_err_msg += " for vertex: " + (*m_graph)[u].name + "\n";
goto done;
}
int64_t planned_resources = 0;
for (size_t i = 0; i < planner_multi_resources_len (filter_plan); ++i) {
int64_t total_resources = planner_multi_resource_total_at (filter_plan, i);
int64_t planned_resources =
planner_multi_span_planned_at (filter_plan, span_it->second, i);
if ((planned_resources =
planner_multi_span_planned_at (filter_plan, span_it->second, i))
< 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": error from planner_multi_span_planned_at: ";
m_err_msg += " for vertex: " + (*m_graph)[u].name + "\n";
goto done;
}
std::string rtype = std::string (planner_multi_resource_type_at (filter_plan, i));
std::string fcounts = "used:" + std::to_string (planned_resources)
+ ", total:" + std::to_string (total_resources);
Expand Down
Loading