Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/bindings/python/flux/constraint/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ class default operator may optionally be substituted, e.g. "operand"
to split values of that operator. For instance ``{"op": ","}``
would autosplit operator ``op`` values on comma.

convert_values (dict): A mapping of operator name to callable which
should take a single list argument containing the values from
the obtained from the current term for the operator after the
operator_map and split_values operations have been applied. The
callable should return a new list of values. This can be used
to convert values to a new type or to combine multiple values
into a single element, e.g. ::

convert_values = {"ints": lambda args: [int(x) for x in args]}

combined_terms (set): A set of operator terms whose values can be
combined when joined with the AND logical operator. E.g. if
"test" is in ``combined_terms``, then
Expand Down Expand Up @@ -284,6 +294,10 @@ class MyConstraintParser(ConstraintParser):
# Combined terms
combined_terms = set()

# Mapping of operator name to value conversion function.
# E.g. { "integer": lambda args: [ int(x) for x in args ] }
convert_values = {}

def __init__(
self, lexer=None, optimize=True, debug=False, write_tables=False, **kw_args
):
Expand Down Expand Up @@ -408,10 +422,12 @@ def p_expression_token(self, p):
f"invalid character '{invalid}' in operator '{op}:'"
)

values = [value]
if op in self.split_values:
p[0] = {op: value.split(self.split_values[op])}
else:
p[0] = {op: [value]}
values = value.split(self.split_values[op])
if op in self.convert_values:
values = self.convert_values[op](values)
p[0] = {op: values}

def p_quoted_token(self, p):
"""
Expand Down
14 changes: 13 additions & 1 deletion src/bindings/python/flux/job/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from itertools import chain

import flux.constants
from flux.core.inner import raw
from flux.core.inner import ffi, raw
from flux.job.JobID import JobID
from flux.job.stats import JobStats
from flux.memoized_property import memoized_property
Expand All @@ -42,6 +42,12 @@ def statetostr(stateid, fmt="L"):
return raw.flux_job_statetostr(stateid, fmt).decode("utf-8")


def strtostate(state):
result = ffi.new("flux_job_state_t [1]")
raw.flux_job_strtostate(state, result)
return int(result[0])


def statetoemoji(stateid):
statestr = raw.flux_job_statetostr(stateid, "S").decode("utf-8")
if statestr == "N":
Expand Down Expand Up @@ -81,6 +87,12 @@ def resulttostr(resultid, fmt="L"):
return raw.flux_job_resulttostr(resultid, fmt).decode("utf-8")


def strtoresult(arg):
result = ffi.new("flux_job_result_t [1]")
raw.flux_job_strtoresult(arg, result)
return int(result[0])


def resulttoemoji(resultid):
if resultid != "":
resultstr = raw.flux_job_resulttostr(resultid, "S").decode("utf-8")
Expand Down
180 changes: 179 additions & 1 deletion src/bindings/python/flux/job/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
import errno
import os
import pwd
import sys
from collections.abc import Iterable
from datetime import datetime
from functools import reduce

import flux.constants
from flux.constraint.parser import ConstraintLexer, ConstraintParser
from flux.future import WaitAllFuture
from flux.job import JobID
from flux.job.info import JobInfo
from flux.job.info import JobInfo, strtoresult, strtostate
from flux.rpc import RPC
from flux.util import parse_datetime


class JobListRPC(RPC):
Expand Down Expand Up @@ -341,3 +346,176 @@ def jobs(self):
if hasattr(rpc, "errors"):
self.errors = rpc.errors
return [JobInfo(job) for job in jobs]


def job_list_filter_to_mask(args, conv):
"""
Convert all job state or result strings with conv() and combine into
a single state or result mask as accepted by the job-list constraints.

This is a convenience function for the JobListConstraintParser class.

Args:
args (list): list of values to convert
conv (callable): function to call on each arg to convert to a state
or result mask.
"""
return reduce(lambda x, y: x | y, map(conv, args))


class JobListConstraintParser(ConstraintParser):
operator_map = {
None: "filter",
"id": "jobid",
"host": "hostlist",
"hosts": "hostlist",
"rank": "ranks",
}
split_values = {"states": ",", "results": ",", "userid": ","}
convert_values = {
"userid": lambda args: [int(x) for x in args],
"states": lambda args: [job_list_filter_to_mask(args, strtostate)],
"results": lambda args: [job_list_filter_to_mask(args, strtoresult)],
}
valid_states = (
"depend",
"priority",
"sched",
"run",
"cleanup",
"inactive",
"pending",
"running",
"active",
)
valid_results = ("completed", "failed", "canceled", "timeout")

def convert_filter(self, arg):
#
# This is a generic state/result filter for backwards compat with
# --filter=. Split into separate states and results operators and
# return the new term(s) (joined by 'or' since that preserves the
# behavior of `--filter`).
#
states = []
results = []
for name in arg.split(","):
name = name.lower()
if name in self.valid_states:
states.append(name)
elif name in self.valid_results:
results.append(name)
else:
raise ValueError(f"Invalid filter specified: {name}")
arg = ""
if states:
arg += "states:" + ",".join(states) + " "
if results:
arg += "or "
if results:
arg += "results:" + ",".join(results)
return arg.rstrip()

@staticmethod
def convert_user(arg):
op, _, arg = arg.partition(":")
users = []
for user in arg.split(","):
try:
users.append(str(int(user)))
except ValueError:
users.append(str(pwd.getpwnam(user).pw_uid))
return "userid:" + ",".join(users)

@staticmethod
def convert_datetime(dt):
if isinstance(dt, (float, int)):
if dt == 0:
# A datetime of zero indicates unset, or an arbitrary time
# in the future. Return 12 months from now.
return parse_datetime("+12m")
dt = datetime.fromtimestamp(dt).astimezone()
else:
dt = parse_datetime(dt, assumeFuture=False)
return dt.timestamp()

def convert_range(self, arg):
arg = arg[1:]
if ".." in arg:
start, end = arg.split("..")
arg = "(not ("
if start:
dt = self.convert_datetime(start)
arg += f"'t_cleanup:<{dt}'"
if start and end:
arg += " or "
if end:
dt = self.convert_datetime(end)
arg += f"'t_run:>{dt}'"
arg += "))"
else:
dt = self.convert_datetime(arg)
arg = f"(t_run:'<={dt}' and t_cleanup:'>={dt}')"
return arg

def convert_timeop(self, arg):
op, _, arg = arg.partition(":")
prefix = ""
if arg[0] in (">", "<"):
if arg[1] == "=":
prefix = arg[:2]
arg = arg[2:]
else:
prefix = arg[0]
arg = arg[1:]
arg = self.convert_datetime(arg)
return f"'{op}:{prefix}{arg}'"

def convert_token(self, arg):
if arg.startswith("@"):
return self.convert_range(arg)
if arg.startswith("t_"):
return self.convert_timeop(arg)
if arg.startswith("user:"):
return self.convert_user(arg)
if ":" not in arg:
return self.convert_filter(arg)
return f"'{arg}'"

def parse(self, string, debug=False):
# First pass: traverse all tokens and apply convenience conversions
expression = ""
lexer = ConstraintLexer()
lexer.input(str(string))
if debug:
print(f"input: {string}", file=sys.stderr)

# Get all tokens first so we can do lookahead in the next step for
# proper use of whitespace:
tokens = []
while True:
tok = lexer.token()
if tok is None:
break
tokens.append(tok)

# Reconstruct expression while converting tokens:
for i, tok in enumerate(tokens):
next_tok = None
if i < len(tokens) - 1:
next_tok = tokens[i + 1]
if debug:
print(tok, file=sys.stderr)
if tok.type != "TOKEN":
expression += tok.value
else:
expression += self.convert_token(tok.value)
if tok.type not in ("LPAREN", "NEGATE") and (
next_tok and next_tok.type not in ("RPAREN")
):
expression += " "

if debug:
print(f"expression: '{expression}'", file=sys.stderr)

return super().parse(expression)
16 changes: 15 additions & 1 deletion src/bindings/python/flux/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ def __call__(self, parser, namespace, values, option_string=None):
getattr(namespace, self.dest).update(values)


class FilterActionConcatenate(argparse.Action):
"""Concatenate filter arguments separated with space"""

def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, "filtered", True)
current = getattr(namespace, self.dest)
if current is not None:
values = current + " " + values
setattr(namespace, self.dest, values)


# pylint: disable=redefined-builtin
class FilterTrueAction(argparse.Action):
def __init__(
Expand Down Expand Up @@ -331,7 +342,7 @@ def parse_fsd(fsd_string):
return seconds


def parse_datetime(string, now=None):
def parse_datetime(string, now=None, assumeFuture=True):
"""Parse a possibly human readable datetime string or offset

If string starts with `+` or `-`, then the remainder of the string
Expand Down Expand Up @@ -369,6 +380,9 @@ def parse_datetime(string, now=None):

cal = Calendar()
cal.ptc.StartHour = 0
if not assumeFuture:
cal.ptc.DOWParseStyle = 0
cal.ptc.YearParseStyle = 0
time_struct, status = cal.parse(string, sourceTime=now.timetuple())
if status == 0:
raise ValueError(f'Invalid datetime: "{string}"')
Expand Down
21 changes: 10 additions & 11 deletions src/cmd/flux-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from flux.hostlist import Hostlist
from flux.idset import IDset
from flux.job import JobID, JobInfo, JobInfoFormat, JobList, job_fields_to_attrs
from flux.job.list import JobListConstraintParser
from flux.job.stats import JobStats
from flux.util import (
FilterAction,
FilterActionConcatenate,
FilterActionSetUpdate,
FilterTrueAction,
UtilConfig,
Expand Down Expand Up @@ -153,32 +155,30 @@ def fetch_jobs_flux(args, fields, flux_handle=None):
if args.filter:
LOGGER.warning("Both -a and --filter specified, ignoring -a")
else:
args.filter.update(["pending", "running", "inactive"])
args.filter = "pending,running,inactive"

if not args.filter:
args.filter = {"pending", "running"}
args.filter = "pending,running"

constraint = None
if args.include:
try:
constraint = {"ranks": [IDset(args.include).encode()]}
args.filter += " ranks:" + IDset(args.include).encode()
except ValueError:
try:
constraint = {"hostlist": [Hostlist(args.include).encode()]}
args.filter += " host:" + Hostlist(args.include).encode()
except ValueError:
raise ValueError(f"-i/--include: invalid targets: {args.include}")

jobs_rpc = JobList(
flux_handle,
ids=args.jobids,
attrs=attrs,
filters=args.filter,
user=args.user,
max_entries=args.count,
since=since,
name=args.name,
queue=args.queue,
constraint=constraint,
constraint=JobListConstraintParser().parse(args.filter),
)

jobs = jobs_rpc.jobs()
Expand Down Expand Up @@ -231,10 +231,9 @@ def parse_args():
parser.add_argument(
"-f",
"--filter",
action=FilterActionSetUpdate,
metavar="STATE|RESULT",
default=set(),
help="List jobs with specific job state or result",
action=FilterActionConcatenate,
metavar="QUERY",
help="Restrict jobs using a constraint query string",
)
parser.add_argument(
"--since",
Expand Down