Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,8 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx,
rc = tr.run (j, ctx->writers, match_op_t::MATCH_ALLOCATE_W_SATISFIABILITY, jobid, at);
else if (std::string ("allocate_orelse_reserve") == cmd)
rc = tr.run (j, ctx->writers, match_op_t::MATCH_ALLOCATE_ORELSE_RESERVE, jobid, at);
else if (std::string ("grow_allocation") == cmd)
rc = tr.run (j, ctx->writers, match_op_t::MATCH_GROW_ALLOCATION, jobid, at);
else if (std::string ("satisfiability") == cmd)
rc = tr.run (j, ctx->writers, match_op_t::MATCH_SATISFIABILITY, jobid, at);
else
Expand Down Expand Up @@ -1877,8 +1879,8 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx,

start = std::chrono::system_clock::now ();
if (strcmp ("allocate", cmd) != 0 && strcmp ("allocate_orelse_reserve", cmd) != 0
&& strcmp ("allocate_with_satisfiability", cmd) != 0
&& strcmp ("satisfiability", cmd) != 0) {
&& strcmp ("allocate_with_satisfiability", cmd) != 0 && strcmp ("satisfiability", cmd) != 0
&& strcmp ("grow_allocation", cmd) != 0) {
rc = -1;
errno = EINVAL;
flux_log (ctx->h, LOG_ERR, "%s: unknown cmd: %s", __FUNCTION__, cmd);
Expand Down Expand Up @@ -2114,9 +2116,17 @@ static void match_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t
< 0)
goto error;
if (is_existent_jobid (ctx, jobid)) {
errno = EINVAL;
flux_log_error (h, "%s: existent job (%jd).", __FUNCTION__, (intmax_t)jobid);
goto error;
if (strcmp ("grow_allocation", cmd) != 0) {
errno = EINVAL;
flux_log_error (h, "%s: existent job (%jd).", __FUNCTION__, (intmax_t)jobid);
goto error;
}
} else {
if (strcmp ("grow_allocation", cmd) == 0) {
errno = EINVAL;
flux_log_error (h, "%s: non-existent job (%jd).", __FUNCTION__, (intmax_t)jobid);
goto error;
}
}
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) {
if (errno != EBUSY && errno != ENODEV)
Expand Down Expand Up @@ -2194,12 +2204,23 @@ static void match_multi_request_cb (flux_t *h,
if (json_unpack (value, "{s:I s:s}", "jobid", &jobid, "jobspec", &js_str) < 0)
goto error;
if (is_existent_jobid (ctx, jobid)) {
errno = EINVAL;
flux_log_error (h,
"%s: existent job (%jd).",
__FUNCTION__,
static_cast<intmax_t> (jobid));
goto error;
if (strcmp ("grow_allocation", cmd) != 0) {
errno = EINVAL;
flux_log_error (h,
"%s: existent job (%jd).",
__FUNCTION__,
static_cast<intmax_t> (jobid));
goto error;
}
} else {
if (strcmp ("grow_allocation", cmd) == 0) {
errno = EINVAL;
flux_log_error (h,
"%s: nonexistent job (%jd).",
__FUNCTION__,
static_cast<intmax_t> (jobid));
goto error;
}
}
if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) {
if (errno != EBUSY && errno != ENODEV)
Expand Down
6 changes: 5 additions & 1 deletion resource/policies/base/match_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ typedef enum match_op_t {
MATCH_ALLOCATE,
MATCH_ALLOCATE_W_SATISFIABILITY,
MATCH_ALLOCATE_ORELSE_RESERVE,
MATCH_GROW_ALLOCATION,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for naming consistency:

Suggested change
MATCH_GROW_ALLOCATION,
MATCH_GROW_ALLOCATE,

I shouldn't be reviewing this early, I've read that word less than 10 times and am getting semantic satiation. "ALLOCATE" looks like some kind of fruit name to me right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I waffled on this and am back to preferring MATCH_GROW_ALLOCATION. My current thinking is that the pattern is verb_verb_{noun/verb/conjunction}, and GROW_ALLOCATION is clearer.

MATCH_SATISFIABILITY
} match_op_t;

Expand All @@ -18,6 +19,8 @@ static const char *match_op_to_string (match_op_t match_op)
return "allocate_orelse_reserve";
case MATCH_ALLOCATE_W_SATISFIABILITY:
return "allocate_with_satisfiability";
case MATCH_GROW_ALLOCATION:
return "grow_allocation";
case MATCH_SATISFIABILITY:
return "satisfiability";
default:
Expand All @@ -28,7 +31,8 @@ static const char *match_op_to_string (match_op_t match_op)
static bool match_op_valid (match_op_t match_op)
{
if ((match_op != MATCH_ALLOCATE) && (match_op != MATCH_ALLOCATE_W_SATISFIABILITY)
&& (match_op != MATCH_ALLOCATE_ORELSE_RESERVE) && (match_op != MATCH_SATISFIABILITY)) {
&& (match_op != MATCH_ALLOCATE_ORELSE_RESERVE) && (match_op != MATCH_SATISFIABILITY)
&& (match_op != MATCH_GROW_ALLOCATION)) {
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions resource/reapi/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

add_subdirectory( bindings )
add_subdirectory( bindings )
add_subdirectory( test )
4 changes: 4 additions & 0 deletions resource/reapi/bindings/c++/reapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class reapi_t {
* allocate.
* MATCH_ALLOCATE_W_SATISFIABILITY: try to allocate and run
* satisfiability check if resources are not available.
* MATCH_GROW_ALLOCATION: try to grow an existing allocation
* now and fail if resources aren't available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does a grow_allocate work with asking for a reservation for it? E.g., right now in Fluxqueue I would do a grow by asking for another job via standard march_allocate_orelse_reserve and in that case I could ask for a reservation. Would that work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that work here?

No, that won't work with these changes. There's a lot more machinery needed to detect cases where the reservation wouldn't start before the current allocation ends, or the reservation time changes and becomes invalid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which makes me realize that even with MATCH_GROW_ALLOCATION there needs to be some way to ensure that the start time (at) of the request isn't invalid or after the current allocation completes.

* \param jobspec jobspec string.
* \param jobid jobid of the uint64_t type.
* \param reserved Boolean into which to return true if this job has been
Expand Down Expand Up @@ -150,6 +152,8 @@ class reapi_t {
* aren't available.
* MATCH_ALLOCATE_ORELSE_RESERVE : Try to allocate and reseve
* if resources aren't available now.
* MATCH_GROW_ALLOCATION: try to grow an existing allocation
* now and fail if resources aren't available.
* MATCH_SATISFIABILITY: Do a satisfiablity check and do not
* allocate.
* MATCH_ALLOCATE_W_SATISFIABILITY: try to allocate and run
Expand Down
8 changes: 8 additions & 0 deletions resource/reapi/bindings/c++/reapi_cli_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ int reapi_cli_t::match_allocate (void *h,
rc = -1;
goto out;
}
if (match_op == match_op_t::MATCH_GROW_ALLOCATION) {
if (!rq->job_exists (jobid)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this function be called separately from the other place that does this same check (and that is why we do the check for the job id existing twice)? If we can just do one, maybe the check could be done closest to when the request to grow is made. If we require both, I'm wondering if there could be some race condition where it is allowed to pass through a first check, but by the second check the job id doesn't exist. I guess that would warrant the second check, and it would be OK as long as there wasn't some state partially changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this function be called separately from the other place that does this same check

Which place are you thinking of?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm honest, I can't remember, and you can disregard this comment. I wish I had linked to the location of "other place!" It could be in match_request_cb here that does a check that the job id exists. I only remember seeing an exists check twice in an execution pathway, and wondering if there could be cases of where the state changes between checks.

Copy link
Member Author

@milroy milroy Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if there could be cases of where the state changes between checks.

That's not likely given Fluxion is serial, but I suppose it is possible with RPCs.

It could be in match_request_cb here that does a check that the job id exists.

That code is responsible for handling RPCs from a REAPI module client rather than function calls to a context via a CLI client.

m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: Nonexistent jobid: " + std::to_string (jobid) + "\n";
rc = -1;
goto out;
}
}

try {
Flux::Jobspec::Jobspec job{jobspec};
Expand Down
4 changes: 3 additions & 1 deletion resource/reapi/bindings/c/reapi_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ extern "C" int reapi_cli_match (reapi_cli_ctx_t *ctx,
goto out;
}

*jobid = ctx->rqt->get_job_counter ();
if (match_op != match_op_t::MATCH_GROW_ALLOCATION) {
*jobid = ctx->rqt->get_job_counter ();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To write out my understanding - normally in match allocate we make a new id, and it is just derived from the counter. But in grow_allocate we are using an existing id, so we don't call this.

}
if ((rc = reapi_cli_t::
match_allocate (ctx->rqt, match_op, jobspec, *jobid, *reserved, R_buf, *at, *ov))
< 0) {
Expand Down
2 changes: 2 additions & 0 deletions resource/reapi/bindings/c/reapi_cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ int reapi_cli_initialize (reapi_cli_ctx_t *ctx, const char *rgraph, const char *
* allocate.
* MATCH_ALLOCATE_W_SATISFIABILITY: try to allocate and run
* satisfiability check if resources are not available.
* MATCH_GROW_ALLOCATION: try to grow an existing allocation
* now and fail if resources aren't available.
* \param jobspec jobspec string.
* \param jobid jobid of the uint64_t type.
* \param reserved Boolean into which to return true if this job has been
Expand Down
2 changes: 2 additions & 0 deletions resource/reapi/bindings/c/reapi_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ void reapi_module_destroy (reapi_module_ctx_t *ctx);
* allocate.
* MATCH_ALLOCATE_W_SATISFIABILITY: try to allocate and run
* satisfiability check if resources are not available.
* MATCH_GROW_ALLOCATION: try to grow an existing allocation
* now and fail if resources aren't available.
* \param jobspec jobspec string.
* \param jobid jobid of the uint64_t type.
* \param reserved Boolean into which to return true if this job has been
Expand Down
11 changes: 11 additions & 0 deletions resource/reapi/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
add_executable(reapi_cli_cxx_test reapi_cli_test_cxx.cpp)
add_sanitizers(reapi_cli_cxx_test)
target_link_libraries(reapi_cli_cxx_test PRIVATE reapi_cli intern Catch2::Catch2WithMain)
flux_add_test(NAME reapi_cli_cxx_test COMMAND reapi_cli_cxx_test)
set_property(TEST reapi_cli_cxx_test APPEND PROPERTY ENVIRONMENT "TESTRESRC_INPUT_FILE=$(CMAKE_SOURCE_DIR)/conf/hype.lua")

add_executable(reapi_cli_c_test reapi_cli_test_c.cpp)
add_sanitizers(reapi_cli_c_test)
target_link_libraries(reapi_cli_c_test PRIVATE reapi_cli intern Catch2::Catch2WithMain)
flux_add_test(NAME reapi_cli_c_test COMMAND reapi_cli_c_test)
set_property(TEST reapi_cli_c_test APPEND PROPERTY ENVIRONMENT "TESTRESRC_INPUT_FILE=$(CMAKE_SOURCE_DIR)/conf/hype.lua")
124 changes: 124 additions & 0 deletions resource/reapi/test/reapi_cli_test_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#define CATCH_CONFIG_MAIN

#include <catch2/catch_test_macros.hpp>
#include <resource/reapi/bindings/c/reapi_cli.h>
#include <fstream>
#include <iostream>

namespace Flux::resource_model::detail {

TEST_CASE ("Initialize REAPI CLI", "[initialize C]")
{
const std::string options = "{}";
std::stringstream buffer;
std::ifstream inputFile ("../../../t/data/resource/jgfs/tiny.json");

if (!inputFile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

buffer << inputFile.rdbuf ();
std::string rgraph = buffer.str ();

reapi_cli_ctx_t *ctx = nullptr;
ctx = reapi_cli_new ();
REQUIRE (ctx);
}

TEST_CASE ("Match basic jobspec", "[match C]")
{
int rc = -1;
const std::string options = "{}";
std::stringstream gbuffer, jbuffer;
std::ifstream graphfile ("../../../t/data/resource/jgfs/tiny.json");
std::ifstream jobspecfile ("../../../t/data/resource/jobspecs/basics/test006.yaml");

if (!graphfile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

jbuffer << jobspecfile.rdbuf ();
std::string jobspec = jbuffer.str ();

if (!jobspecfile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

gbuffer << graphfile.rdbuf ();
std::string rgraph = gbuffer.str ();

reapi_cli_ctx_t *ctx = nullptr;
ctx = reapi_cli_new ();
REQUIRE (ctx);

rc = reapi_cli_initialize (ctx, rgraph.c_str (), options.c_str ());
REQUIRE (rc == 0);

match_op_t match_op = match_op_t::MATCH_ALLOCATE;
bool reserved = false;
char *R;
uint64_t jobid = 1;
double ov = 0.0;
int64_t at = 0;

rc = reapi_cli_match (ctx, match_op, jobspec.c_str (), &jobid, &reserved, &R, &at, &ov);
REQUIRE (rc == 0);
REQUIRE (reserved == false);
REQUIRE (at == 0);
}

TEST_CASE ("Match shrink basic jobspec", "[match shrink C]")
{
int rc = -1;
const std::string options = "{\"load_format\": \"rv1exec\"}";
std::stringstream gbuffer, jbuffer, cbuffer;
std::ifstream graphfile ("../../../t/data/resource/rv1exec/tiny_rv1exec.json");
std::ifstream jobspecfile ("../../../t/data/resource/jobspecs/cancel/test018.yaml");
std::ifstream cancelfile ("../../../t/data/resource/rv1exec/cancel/rank1_cancel.json");

if (!graphfile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

if (!jobspecfile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

if (!cancelfile.is_open ()) {
std::cerr << "Error opening file!" << std::endl;
}

gbuffer << graphfile.rdbuf ();
std::string rgraph = gbuffer.str ();
jbuffer << jobspecfile.rdbuf ();
std::string jobspec = jbuffer.str ();
cbuffer << cancelfile.rdbuf ();
std::string cancel_string = cbuffer.str ();

reapi_cli_ctx_t *ctx = nullptr;
ctx = reapi_cli_new ();
REQUIRE (ctx);

rc = reapi_cli_initialize (ctx, rgraph.c_str (), options.c_str ());
REQUIRE (rc == 0);

match_op_t match_op = match_op_t::MATCH_ALLOCATE;
bool reserved = false;
char *R;
uint64_t jobid = 1;
double ov = 0.0;
int64_t at = 0;

rc = reapi_cli_match (ctx, match_op, jobspec.c_str (), &jobid, &reserved, &R, &at, &ov);
REQUIRE (rc == 0);
REQUIRE (reserved == false);
REQUIRE (at == 0);

bool noent_ok = false;
bool full_removal = true;
rc = reapi_cli_partial_cancel (ctx, jobid, cancel_string.c_str (), noent_ok, &full_removal);
REQUIRE (rc == 0);
REQUIRE (full_removal == false);
}

} // namespace Flux::resource_model::detail
Loading
Loading