Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions doc/man1/flux-fsck.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ OPTIONS
This option should be considered :ref:`EXPERIMENTAL <fsck_experimental>`
at this time.

.. option:: -j, --job-aware

When specified with :option:`--repair`, if a single key within a
job's directory is corrupted, all data from the job will be moved
to the lost+found directory.

This option should be considered :ref:`EXPERIMENTAL <fsck_experimental>`
at this time.

EXIT STATUS
===========
Expand Down
246 changes: 234 additions & 12 deletions src/cmd/builtin/fsck.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <jansson.h>
#include <time.h>
#include <stdarg.h>
#include <regex.h>

#include "src/common/libeventlog/eventlog.h"
#include "src/common/libkvs/treeobj.h"
Expand All @@ -31,6 +32,9 @@
#include "builtin.h"

#define BLOBREF_ASYNC_MAX 1000
/* regex for "job.xxxx.xxxx.xxxx.xxxx", where 'x' is a hex character */
#define KVS_JOB_PATH_REGEX \
"^job.[0-9a-fA-F]{4}\\.[0-9a-fA-F]{4}\\.[0-9a-fA-F]{4}\\.[0-9a-fA-F]{4}"

struct fsck_ctx {
flux_t *h;
Expand All @@ -44,8 +48,11 @@ struct fsck_ctx {
bool verbose;
bool quiet;
bool repair;
bool job_aware;
bool isatty;
int errorcount;
regex_t jobpath;
zlist_t *repaired_jobdirs;
};

struct fsck_valref_data
Expand Down Expand Up @@ -266,9 +273,9 @@ static json_t *get_dir (struct fsck_ctx *ctx,
return o;
}

static void put_valref_lost_and_found (struct fsck_ctx *ctx,
const char *path,
json_t *repaired)
static void put_lost_and_found (struct fsck_ctx *ctx,
const char *path,
json_t *obj)
{
json_t *dir = ctx->root;
char *cpy;
Expand All @@ -291,16 +298,13 @@ static void put_valref_lost_and_found (struct fsck_ctx *ctx,
dir = subdir;
}

if (treeobj_insert_entry (dir, name, repaired) < 0)
log_err_exit ("cannot insert repaired valref");
if (treeobj_insert_entry (dir, name, obj) < 0)
log_err_exit ("cannot insert treeobj");

free (cpy);

ctx->repair_count++;
}

static void unlink_path (struct fsck_ctx *ctx,
const char *path)
static void unlink_path (struct fsck_ctx *ctx, const char *path)
{
json_t *dir = ctx->root;
char *cpy;
Expand All @@ -324,11 +328,37 @@ static void unlink_path (struct fsck_ctx *ctx,
}

if (treeobj_delete_entry (dir, name) < 0)
log_err_exit ("cannot unlink repaired entry");
log_err_exit ("cannot unlink entry");

free (cpy);
}

static void save_kvs_job_path (struct fsck_ctx *ctx, const char *path)
{
if (!ctx->repaired_jobdirs) {
if (!(ctx->repaired_jobdirs = zlist_new ()))
log_err_exit ("cannot create list for job dirs");
zlist_comparefn (ctx->repaired_jobdirs, (zlist_compare_fn *) strcmp);
}

if (regexec (&ctx->jobpath, path, 0, NULL, 0) == 0) {
/* N.B.
* job.xxxx.xxxx.xxxx.xxxx
* is exactly 23 chars.
*/
char *s = strndup (path, 23);
if (!s)
log_err_exit ("cannot dup job path");
if (!zlist_exists (ctx->repaired_jobdirs, (void *)s)) {
if (zlist_append (ctx->repaired_jobdirs, s) < 0)
log_err_exit ("failed to save job dir path");
zlist_freefn (ctx->repaired_jobdirs, s, free, true);
}
else
free (s);
}
}

static void fsck_valref (struct fsck_ctx *ctx,
const char *path,
json_t *treeobj)
Expand Down Expand Up @@ -369,13 +399,18 @@ static void fsck_valref (struct fsck_ctx *ctx,
&& fvd.errorcount == zlist_size (fvd.missing_indexes)) {
json_t *repaired = repair_valref (ctx, treeobj, &fvd);

put_valref_lost_and_found (ctx, path, repaired);
put_lost_and_found (ctx, path, repaired);

unlink_path (ctx, path);

ctx->repair_count++;

warn (ctx, "%s repaired and moved to lost+found", path);

json_decref (repaired);

if (ctx->job_aware)
save_kvs_job_path (ctx, path);
}
}

Expand Down Expand Up @@ -524,6 +559,171 @@ static void fsck_blobref (struct fsck_ctx *ctx, const char *blobref)
flux_future_destroy (f);
}

static json_t *lookup_job_subdir (struct fsck_ctx *ctx,
json_t *treeobj_dir,
const char *job_path,
const char *subdir)
{
json_t *o;

if (!(o = treeobj_get_entry (treeobj_dir, subdir))) {
log_msg ("path %s missing directory %s", job_path, subdir);
return NULL;
}

/* N.B. all dirref treeobjs in job dir path should have been
* converted to dir treeobjs during repair
*/
if (!treeobj_is_dir (o))
log_err_exit ("path %s in %s is not a directory", subdir, job_path);

return o;
}

static json_t *lookup_job_dir (struct fsck_ctx *ctx, const char *job_path)
{
json_t *dir = ctx->root;
json_t *subdir;
char *path;
char *next;

if (!treeobj_is_dir (dir))
log_err_exit ("treeobj dir is type %s", treeobj_type_name (dir));

if (!(path = strdup (job_path)))
log_err_exit ("could not duplicate job path");

while ((next = strchr (path, '.'))) {

*next++ = '\0';

if (!(subdir = lookup_job_subdir (ctx, dir, job_path, path)))
return NULL;

path = next;
dir = subdir;
}

if (!(subdir = lookup_job_subdir (ctx, dir, job_path, path))) {
log_msg ("path %s missing in %s", path, job_path);
return NULL;
}

return subdir;
}

static json_t *lookup_dir_from_dirref (struct fsck_ctx *ctx,
const char *path,
json_t *treeobj)
{
flux_future_t *f = NULL;
const void *buf;
size_t buflen;
json_t *treeobj_deref = NULL;
int count;

count = treeobj_get_count (treeobj);
if (count != 1) {
errmsg (ctx,
"%s: invalid dirref treeobj count=%d",
path,
count);
return NULL;
}
if (!(f = content_load_byblobref (ctx->h,
treeobj_get_blobref (treeobj, 0),
CONTENT_FLAG_CACHE_BYPASS))
|| content_load_get (f, &buf, &buflen) < 0) {
errmsg (ctx,
"%s: error retrieving dirref blobref: %s",
path,
future_strerror (f, errno));
goto cleanup;
}
if (!(treeobj_deref = treeobj_decodeb (buf, buflen))) {
errmsg (ctx, "%s: could not decode directory", path);
goto cleanup;
}
if (!treeobj_is_dir (treeobj_deref)) {
errmsg (ctx, "%s: dirref references non-directory", path);
goto cleanup;
}
flux_future_destroy (f);
return treeobj_deref;

cleanup:
json_decref (treeobj_deref);
flux_future_destroy (f);
return NULL;
}

static void move_dir_lost_and_found (struct fsck_ctx *ctx,
const char *path,
json_t *treeobj)
{
json_t *dict = treeobj_get_data (treeobj);
const char *name;
json_t *entry;

json_object_foreach (dict, name, entry) {
char *newpath;
if (asprintf (&newpath, "%s.%s", path, name) < 0)
log_msg_exit ("out of memory");
if (treeobj_is_symlink (entry)
|| treeobj_is_val (entry)
|| treeobj_is_valref (entry))
put_lost_and_found (ctx, newpath, entry);
else if (treeobj_is_dir (entry))
move_dir_lost_and_found (ctx, newpath, entry);
else if (treeobj_is_dirref (entry)) {
json_t *dir = lookup_dir_from_dirref (ctx, newpath, entry);
if (dir) {
move_dir_lost_and_found (ctx, newpath, dir);
json_decref (dir);
}
}
else
errmsg (ctx, "Cannot move %s, unknown treeobj type", newpath);
free (newpath);
}
}

static void move_job_dirs (struct fsck_ctx *ctx)
{
flux_jobid_t id;
char *job_path;

if (!ctx->repaired_jobdirs)
return;

while ((job_path = zlist_pop (ctx->repaired_jobdirs))) {
json_t *job_dir;
char buf[64];

if (!(job_dir = lookup_job_dir (ctx, job_path)))
goto next;

move_dir_lost_and_found (ctx, job_path, job_dir);

/* N.B. empty job directories may result from the following
* unlink, but we ignore this, as eventual calls to
* flux-dump(1) will clean this up. This is no different than
* when jobs are purged via flux-job(1).
*/
unlink_path (ctx, job_path);

if (flux_job_id_parse (job_path, &id) < 0
|| flux_job_id_encode (id, "f58plain", buf, sizeof (buf)) < 0)
log_err_exit ("cannot parse kvs job id path");

if (ctx->verbose)
warn (ctx, "job %s moved to lost+found", buf);

next:
free (job_path);
}
}

static bool kvs_is_running (struct fsck_ctx *ctx)
{
flux_future_t *f;
Expand Down Expand Up @@ -689,8 +889,20 @@ static int cmd_fsck (optparse_t *p, int ac, char *av[])
ctx.verbose = true;
if (optparse_hasopt (p, "quiet"))
ctx.quiet = true;
if (optparse_hasopt (p, "repair"))
if (optparse_hasopt (p, "repair")) {
ctx.repair = true;
if (optparse_hasopt (p, "job-aware")) {
int ret;
ctx.job_aware = true;
if ((ret = regcomp (&ctx.jobpath,
KVS_JOB_PATH_REGEX,
REG_EXTENDED | REG_NOSUB)) != 0) {
char buf[256];
regerror (ret, &ctx.jobpath, buf, sizeof (buf));
log_msg_exit ("error compiling regex %s", buf);
}
}
}

ctx.h = builtin_get_flux_handle (p);
ctx.isatty = isatty (STDERR_FILENO);
Expand Down Expand Up @@ -731,6 +943,13 @@ static int cmd_fsck (optparse_t *p, int ac, char *av[])

fsck_blobref (&ctx, blobref);

/* N.B. the movement of job dirs is done after fsck completes, b/c
* during fsck scan, we have no knowledge of what has or has-not
* been fsck-ed yet due to recursion.
*/
if (ctx.job_aware)
move_job_dirs (&ctx);

flux_future_destroy (f);

if (ctx.verbose || ctx.errorcount)
Expand Down Expand Up @@ -785,6 +1004,9 @@ static struct optparse_option fsck_opts[] = {
{ .name = "repair", .key = 'R', .has_arg = 0,
.usage = "Repair recoverable keys and place in lost+found",
},
{ .name = "job-aware", .key = 'j', .has_arg = 0,
.usage = "Move all job data to lost+found if errors found",
},
OPTPARSE_TABLE_END
};

Expand Down
Loading
Loading