Skip to content

Commit d975d5b

Browse files
committed
cmd: add utility library for async valref scans
Problem: The code between flux-dump and flux-fsck for asynchronously scanning valref treeobjs is very similar. It'd be nice if there was a convenience API for it for the future. Solution: Add a new cmd utility library to place some common command code into. Add valref_blobrefs() as an initial utility function.
1 parent 6dffaeb commit d975d5b

File tree

3 files changed

+192
-1
lines changed

3 files changed

+192
-1
lines changed

src/cmd/Makefile.am

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ flux_SOURCES = \
8181
builtin/shutdown.c \
8282
builtin/lptest.c \
8383
builtin/fsck.c \
84-
builtin/cgroup.c
84+
builtin/cgroup.c \
85+
util/util.h \
86+
util/util.c
8587

8688
nodist_flux_SOURCES = \
8789
builtin-cmds.c

src/cmd/util/util.c

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/************************************************************\
2+
* Copyright 2025 Lawrence Livermore National Security, LLC
3+
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
*
5+
* This file is part of the Flux resource manager framework.
6+
* For details, see https://github.com/flux-framework.
7+
*
8+
* SPDX-License-Identifier: LGPL-3.0
9+
\************************************************************/
10+
11+
#if HAVE_CONFIG_H
12+
# include "config.h"
13+
#endif
14+
#include <stdio.h>
15+
16+
#include "src/common/libkvs/treeobj.h"
17+
#include "src/common/libutil/blobref.h"
18+
#include "src/common/libutil/log.h"
19+
20+
#include "util.h"
21+
22+
#define BLOBREF_ASYNC_MAX 1000
23+
24+
struct valref_blobref_scan
25+
{
26+
flux_t *h;
27+
char *topic;
28+
char *path;
29+
json_t *treeobj;
30+
int content_flags;
31+
int index;
32+
int count;
33+
int in_flight;
34+
bool error;
35+
get_blobrefs_f get_cb;
36+
err_blobrefs_f err_cb;
37+
void *arg;
38+
};
39+
40+
static int get_blobref (struct valref_blobref_scan *vbs);
41+
42+
static void get_blobref_continuation (flux_future_t *f, void *arg)
43+
{
44+
struct valref_blobref_scan *vbs = arg;
45+
int *index;
46+
47+
index = flux_future_aux_get (f, "index");
48+
if (vbs->get_cb) {
49+
const flux_msg_t *msg;
50+
if (flux_future_get (f, (const void **)&msg) < 0) {
51+
if (vbs->err_cb (errno, *index, vbs->arg) < 0)
52+
vbs->error = true;
53+
}
54+
else {
55+
if (vbs->get_cb (msg, *index, vbs->arg) < 0)
56+
vbs->error = true;
57+
}
58+
}
59+
else {
60+
if (flux_future_get (f, NULL) < 0) {
61+
if (vbs->err_cb (errno, *index, vbs->arg) < 0)
62+
vbs->error = true;
63+
}
64+
}
65+
vbs->in_flight--;
66+
67+
/* if an error has occurred, we won't get more blobrefs */
68+
if (vbs->index < vbs->count
69+
&& !vbs->error) {
70+
get_blobref (vbs);
71+
vbs->in_flight++;
72+
vbs->index++;
73+
}
74+
flux_future_destroy (f);
75+
}
76+
77+
static int get_blobref (struct valref_blobref_scan *vbs)
78+
{
79+
uint32_t hash[BLOBREF_MAX_DIGEST_SIZE];
80+
ssize_t hash_size;
81+
const char *blobref;
82+
flux_future_t *f;
83+
int *indexp;
84+
85+
blobref = treeobj_get_blobref (vbs->treeobj, vbs->index);
86+
87+
if ((hash_size = blobref_strtohash (blobref, hash, sizeof (hash))) < 0)
88+
log_err_exit ("cannot get hash from ref string");
89+
if (!(f = flux_rpc_raw (vbs->h, vbs->topic, hash, hash_size, 0, 0)))
90+
log_err_exit ("failed to get valref blob");
91+
if (flux_future_then (f, -1, get_blobref_continuation, vbs) < 0)
92+
log_err_exit ("cannot setup blobref continuation");
93+
if (!(indexp = (int *)malloc (sizeof (int))))
94+
log_err_exit ("cannot allocate index memory");
95+
(*indexp) = vbs->index;
96+
if (flux_future_aux_set (f, "index", indexp, free) < 0)
97+
log_err_exit ("could not save index value");
98+
return 0;
99+
}
100+
101+
int valref_blobrefs (flux_t *h,
102+
const char *topic,
103+
json_t *treeobj,
104+
get_blobrefs_f get_cb,
105+
err_blobrefs_f err_cb,
106+
void *arg)
107+
{
108+
struct valref_blobref_scan vbs = {0};
109+
int save_errno, rv = -1;
110+
111+
if (!h
112+
|| !treeobj
113+
|| !err_cb
114+
|| !treeobj_is_valref (treeobj)) {
115+
errno = EINVAL;
116+
return -1;
117+
}
118+
119+
vbs.h = flux_incref (h);
120+
if (!(vbs.topic = strdup (topic)))
121+
log_err_exit ("could not duplicate topic");
122+
vbs.treeobj = json_incref (treeobj);
123+
if ((vbs.count = treeobj_get_count (treeobj)) < 0)
124+
goto cleanup;
125+
vbs.get_cb = get_cb;
126+
vbs.err_cb = err_cb;
127+
vbs.arg = arg;
128+
129+
while (vbs.in_flight < BLOBREF_ASYNC_MAX
130+
&& vbs.index < vbs.count) {
131+
get_blobref (&vbs);
132+
vbs.in_flight++;
133+
vbs.index++;
134+
}
135+
136+
if (flux_reactor_run (flux_get_reactor (vbs.h), 0) < 0)
137+
goto cleanup;
138+
139+
rv = 0;
140+
cleanup:
141+
save_errno = errno;
142+
free (vbs.topic);
143+
json_decref (vbs.treeobj);
144+
flux_decref (h);
145+
errno = save_errno;
146+
return rv;
147+
}
148+
149+
/*
150+
* vi: ts=4 sw=4 expandtab
151+
*/

src/cmd/util/util.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/************************************************************\
2+
* Copyright 2025 Lawrence Livermore National Security, LLC
3+
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
*
5+
* This file is part of the Flux resource manager framework.
6+
* For details, see https://github.com/flux-framework.
7+
*
8+
* SPDX-License-Identifier: LGPL-3.0
9+
\************************************************************/
10+
11+
#ifndef HAVE_FLUX_UTIL_H
12+
#define HAVE_FLUX_UTIL_H
13+
14+
#include <flux/core.h>
15+
#include <jansson.h>
16+
17+
/* return -1 to discontinue getting more blobrefs */
18+
typedef int (*get_blobrefs_f)(const flux_msg_t *msg,
19+
int index,
20+
void *arg);
21+
22+
/* return -1 to discontinue getting more blobrefs */
23+
typedef int (*err_blobrefs_f)(int errnum,
24+
int index,
25+
void *arg);
26+
27+
/* asynchronously lookup blobrefs
28+
* - get_cb is optional, only if you want the data
29+
* - err_cb is required when a blobref lookup error occurs
30+
*/
31+
int valref_blobrefs (flux_t *h,
32+
const char *topic,
33+
json_t *treeobj,
34+
get_blobrefs_f get_cb,
35+
err_blobrefs_f err_cb,
36+
void *arg);
37+
38+
#endif /* !HAVE_FLUX_UTIL_H */

0 commit comments

Comments
 (0)