Skip to content

Commit 8147cc1

Browse files
committed
cmd: use library for scanning treeobj valrefs
Problem: Both flux-dump and flux-fsck have similar code for scanning a valref treeobj in parallel. Use the common library function valref_blobrefs() instead.
1 parent d975d5b commit 8147cc1

File tree

2 files changed

+63
-126
lines changed

2 files changed

+63
-126
lines changed

src/cmd/builtin/dump.c

Lines changed: 35 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,20 @@
2828
#include "ccan/str/str.h"
2929

3030
#include "builtin.h"
31-
32-
#define BLOBREF_ASYNC_MAX 1000
31+
#include "src/cmd/util/util.h"
3332

3433
struct dump_valref_data
3534
{
3635
flux_t *h;
3736
json_t *treeobj;
3837
const flux_msg_t **msgs;
3938
const char *path;
40-
int total_size;
41-
int index;
4239
int count;
43-
int in_flight;
40+
int total_size;
4441
int errorcount;
4542
int errnum;
4643
};
4744

48-
static void get_blobref (struct dump_valref_data *dvd);
49-
5045
static void dump_treeobj (struct archive *ar,
5146
flux_t *h,
5247
const char *path,
@@ -175,70 +170,56 @@ static void dump_write_data (struct archive *ar, const void *data, int size)
175170
"assuming non-fatal libarchive write size reporting error");
176171
}
177172

178-
static void get_blobref_continuation (flux_future_t *f, void *arg)
173+
static int get_blobref_cb (const flux_msg_t *msg, int index, void *arg)
179174
{
180175
struct dump_valref_data *dvd = arg;
181-
const flux_msg_t *msg;
182176
size_t len;
183-
int *index;
184177

185-
index = flux_future_aux_get (f, "index");
186-
if (flux_future_get (f, (const void **)&msg) < 0
187-
|| flux_response_decode_raw (msg, NULL, NULL, &len) < 0) {
188-
read_error ("%s: missing blobref %d: %s",
178+
if (flux_response_decode_raw (msg, NULL, NULL, &len) < 0) {
179+
read_error ("%s: cannot decode blobref %d: %s",
189180
dvd->path,
190-
(*index),
191-
future_strerror (f, errno));
192-
flux_future_destroy (f);
181+
index,
182+
strerror (errno));
193183
dvd->errorcount++;
194184
dvd->errnum = errno; /* we'll report the last errno */
195-
return;
185+
return -1; /* -1 informs no more blobref retrieval */
196186
}
197-
dvd->in_flight--;
198187
dvd->total_size += len;
199-
dvd->msgs[(*index)] = flux_msg_incref (msg);
200-
201-
/* if an error has occurred, we won't get more blobrefs */
202-
if (dvd->index < dvd->count
203-
&& !dvd->errorcount) {
204-
get_blobref (dvd);
205-
dvd->in_flight++;
206-
dvd->index++;
207-
}
208-
flux_future_destroy (f);
188+
dvd->msgs[index] = flux_msg_incref (msg);
189+
return 0;
209190
}
210191

211-
static void get_blobref (struct dump_valref_data *dvd)
192+
static int err_blobref_cb (int errnum, int index, void *arg)
212193
{
213-
const char *blobref;
214-
flux_future_t *f;
215-
int *indexp;
216-
217-
blobref = treeobj_get_blobref (dvd->treeobj, dvd->index);
194+
struct dump_valref_data *dvd = arg;
218195

219-
if (!(f = content_load_byblobref (dvd->h, blobref, content_flags))
220-
|| flux_future_then (f, -1, get_blobref_continuation, dvd) < 0) {
221-
read_error ("%s: cannot load blobref %d", dvd->path, dvd->index);
222-
flux_future_destroy (f);
223-
}
224-
if (!(indexp = (int *)malloc (sizeof (int))))
225-
log_err_exit ("cannot allocate index memory");
226-
(*indexp) = dvd->index;
227-
if (flux_future_aux_set (f, "index", indexp, free) < 0)
228-
log_err_exit ("could not save index value");
196+
if (errnum == ENOENT)
197+
read_error ("%s: missing blobref %d: %s",
198+
dvd->path,
199+
index,
200+
strerror (errnum));
201+
else
202+
read_error ("%s: error getting blobref %d: %s",
203+
dvd->path,
204+
index,
205+
strerror (errnum));
206+
dvd->errorcount++;
207+
dvd->errnum = errnum; /* we'll report the last errno */
208+
return -1; /* -1 informs no more blobref retrieval*/
229209
}
230210

231211
static int dump_valref_async (struct dump_valref_data *dvd)
232212
{
233-
while (dvd->in_flight < BLOBREF_ASYNC_MAX
234-
&& dvd->index < dvd->count) {
235-
get_blobref (dvd);
236-
dvd->in_flight++;
237-
dvd->index++;
238-
}
239-
240-
if (flux_reactor_run (flux_get_reactor (dvd->h), 0) < 0)
241-
log_err_exit ("flux_reactor_run");
213+
const char *topic = "content.load";
214+
if (content_flags & CONTENT_FLAG_CACHE_BYPASS)
215+
topic = "content-backing.load";
216+
if (valref_blobrefs (dvd->h,
217+
topic,
218+
dvd->treeobj,
219+
get_blobref_cb,
220+
err_blobref_cb,
221+
dvd) < 0)
222+
return -1;
242223

243224
if (dvd->errorcount) {
244225
errno = dvd->errnum;

src/cmd/builtin/fsck.c

Lines changed: 28 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "ccan/str/str.h"
2929

3030
#include "builtin.h"
31+
#include "src/cmd/util/util.h"
3132

3233
#define BLOBREF_ASYNC_MAX 1000
3334

@@ -50,10 +51,6 @@ struct fsck_ctx {
5051
struct fsck_valref_data
5152
{
5253
struct fsck_ctx *ctx;
53-
json_t *treeobj;
54-
int index;
55-
int count;
56-
int in_flight;
5754
const char *path;
5855
int errorcount;
5956
int errnum;
@@ -64,8 +61,6 @@ static void fsck_treeobj (struct fsck_ctx *ctx,
6461
const char *path,
6562
json_t *treeobj);
6663

67-
static void valref_validate (struct fsck_valref_data *fvd);
68-
6964
static void vmsg (struct fsck_ctx *ctx, const char *fmt, va_list ap)
7065
{
7166
char buf[128];
@@ -116,65 +111,28 @@ static void save_missing_ref_index (struct fsck_valref_data *fvd, int index)
116111
zlist_freefn (fvd->missing_indexes, cpy, (zlist_free_fn *) free, true);
117112
}
118113

119-
static void valref_validate_continuation (flux_future_t *f, void *arg)
114+
static int err_blobref_cb (int errnum, int index, void *arg)
120115
{
121116
struct fsck_valref_data *fvd = arg;
122117

123-
if (flux_rpc_get (f, NULL) < 0) {
124-
int *index = flux_future_aux_get (f, "index");
125-
if (fvd->ctx->verbose) {
126-
if (errno == ENOENT)
127-
errmsg (fvd->ctx,
128-
"%s: missing blobref index=%d",
129-
fvd->path,
130-
(*index));
131-
else
132-
errmsg (fvd->ctx,
133-
"%s: error retrieving blobref index=%d: %s",
134-
fvd->path,
135-
(*index),
136-
future_strerror (f, errno));
137-
}
138-
fvd->errorcount++;
139-
fvd->errnum = errno; /* we'll report the last errno */
140-
if (fvd->ctx->repair && errno == ENOENT)
141-
save_missing_ref_index (fvd, *index);
142-
}
143-
fvd->in_flight--;
144-
145-
if (fvd->index < fvd->count) {
146-
valref_validate (fvd);
147-
fvd->in_flight++;
148-
fvd->index++;
118+
if (fvd->ctx->verbose) {
119+
if (errnum == ENOENT)
120+
errmsg (fvd->ctx,
121+
"%s: missing blobref index=%d",
122+
fvd->path,
123+
index);
124+
else
125+
errmsg (fvd->ctx,
126+
"%s: error retrieving blobref index=%d: %s",
127+
fvd->path,
128+
index,
129+
strerror (errnum));
149130
}
150-
151-
flux_future_destroy (f);
152-
}
153-
154-
static void valref_validate (struct fsck_valref_data *fvd)
155-
{
156-
const char *topic = fvd->ctx->validate_available ?
157-
"content-backing.validate" : "content-backing.load";
158-
uint32_t hash[BLOBREF_MAX_DIGEST_SIZE];
159-
ssize_t hash_size;
160-
const char *blobref;
161-
flux_future_t *f;
162-
int *indexp;
163-
164-
blobref = treeobj_get_blobref (fvd->treeobj, fvd->index);
165-
166-
if ((hash_size = blobref_strtohash (blobref, hash, sizeof (hash))) < 0)
167-
log_err_exit ("cannot get hash from ref string");
168-
169-
if (!(f = flux_rpc_raw (fvd->ctx->h, topic, hash, hash_size, 0, 0)))
170-
log_err_exit ("failed to validate valref blob");
171-
if (flux_future_then (f, -1, valref_validate_continuation, fvd) < 0)
172-
log_err_exit ("cannot validate valref blob");
173-
if (!(indexp = (int *)malloc (sizeof (int))))
174-
log_err_exit ("cannot allocate index memory");
175-
(*indexp) = fvd->index;
176-
if (flux_future_aux_set (f, "index", indexp, free) < 0)
177-
log_err_exit ("could not save index value");
131+
fvd->errorcount++;
132+
fvd->errnum = errnum; /* we'll report the last errno */
133+
if (fvd->ctx->repair && errnum == ENOENT)
134+
save_missing_ref_index (fvd, index);
135+
return 0; /* 0, continue to scan remaining blobrefs */
178136
}
179137

180138
static json_t *repair_valref (struct fsck_ctx *ctx,
@@ -336,22 +294,20 @@ static void fsck_valref (struct fsck_ctx *ctx,
336294
const char *path,
337295
json_t *treeobj)
338296
{
297+
const char *topic = ctx->validate_available ?
298+
"content-backing.validate" : "content-backing.load";
339299
struct fsck_valref_data fvd = {0};
340300

341301
fvd.ctx = ctx;
342-
fvd.treeobj = treeobj;
343-
fvd.count = treeobj_get_count (treeobj);
344302
fvd.path = path;
345303

346-
while (fvd.in_flight < BLOBREF_ASYNC_MAX
347-
&& fvd.index < fvd.count) {
348-
valref_validate (&fvd);
349-
fvd.in_flight++;
350-
fvd.index++;
351-
}
352-
353-
if (flux_reactor_run (flux_get_reactor (ctx->h), 0) < 0)
354-
log_err_exit ("flux_reactor_run");
304+
if (valref_blobrefs (ctx->h,
305+
topic,
306+
treeobj,
307+
NULL,
308+
err_blobref_cb,
309+
&fvd) < 0)
310+
log_err_exit ("unable to scan valref %s", path);
355311

356312
if (fvd.errorcount) {
357313
/* each invalid blobref will be output in verbose mode */

0 commit comments

Comments
 (0)