Skip to content

Commit 308025a

Browse files
committed
ruff based code/import cleanup #741/#815
1 parent c124c54 commit 308025a

File tree

7 files changed

+53
-56
lines changed

7 files changed

+53
-56
lines changed

openeo/extra/job_management/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from openeo.extra.job_management._interface import JobDatabaseInterface
2-
from openeo.extra.job_management._job_db import FullDataFrameJobDatabase, CsvJobDatabase, ParquetJobDatabase, create_job_db, get_job_db
3-
from openeo.extra.job_management.process_based import ProcessBasedJobCreator
2+
from openeo.extra.job_management._job_db import (
3+
CsvJobDatabase,
4+
FullDataFrameJobDatabase,
5+
ParquetJobDatabase,
6+
create_job_db,
7+
get_job_db,
8+
)
49
from openeo.extra.job_management._manager import MultiBackendJobManager
10+
from openeo.extra.job_management.process_based import ProcessBasedJobCreator
511

612
__all__ = [
713
"JobDatabaseInterface",
@@ -11,7 +17,5 @@
1117
"ProcessBasedJobCreator",
1218
"create_job_db",
1319
"get_job_db",
14-
"MultiBackendJobManager"
20+
"MultiBackendJobManager",
1521
]
16-
17-

openeo/extra/job_management/_df_schema.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import dataclasses
22
from typing import Any, Mapping
3+
34
import pandas as pd
45

6+
57
@dataclasses.dataclass(frozen=True)
68
class _ColumnProperties:
79
"""Expected/required properties of a column in the job manager related dataframes"""
810

911
dtype: str = "object"
1012
default: Any = None
1113

14+
1215
# Expected columns in the job DB dataframes.
1316
# TODO: make this part of public API when settled?
1417
# TODO: move non official statuses to seperate column (not_started, queued_for_start)
@@ -28,15 +31,16 @@ class _ColumnProperties:
2831
"costs": _ColumnProperties(dtype="float64"),
2932
}
3033

34+
3135
def _normalize(df: pd.DataFrame) -> pd.DataFrame:
32-
"""
33-
Normalize given pandas dataframe (creating a new one):
34-
ensure we have the required columns.
36+
"""
37+
Normalize given pandas dataframe (creating a new one):
38+
ensure we have the required columns.
3539
36-
:param df: The dataframe to normalize.
37-
:return: a new dataframe that is normalized.
38-
"""
39-
new_columns = {col: req.default for (col, req) in _COLUMN_REQUIREMENTS.items() if col not in df.columns}
40-
df = df.assign(**new_columns)
40+
:param df: The dataframe to normalize.
41+
:return: a new dataframe that is normalized.
42+
"""
43+
new_columns = {col: req.default for (col, req) in _COLUMN_REQUIREMENTS.items() if col not in df.columns}
44+
df = df.assign(**new_columns)
4145

42-
return df
46+
return df

openeo/extra/job_management/_interface.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import abc
2-
from typing import (
3-
Callable,
4-
Iterable,
5-
List,
6-
NamedTuple,
7-
Union,
8-
)
2+
from typing import Iterable, List, Union
93

104
import pandas as pd
115

6+
127
class JobDatabaseInterface(metaclass=abc.ABCMeta):
138
"""
149
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
@@ -68,8 +63,3 @@ def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
6863
:return: DataFrame with jobs filtered by indices.
6964
"""
7065
...
71-
72-
73-
74-
75-

openeo/extra/job_management/_job_db.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import abc
22
import logging
3-
import shapely.errors
4-
import shapely.wkt
53
from pathlib import Path
64
from typing import Iterable, Union
5+
76
import pandas as pd
7+
import shapely.errors
8+
import shapely.wkt
89

10+
from openeo.extra.job_management._df_schema import _COLUMN_REQUIREMENTS, _normalize
911
from openeo.extra.job_management._interface import JobDatabaseInterface
10-
from openeo.extra.job_management._df_schema import _normalize, _COLUMN_REQUIREMENTS
1112

1213
_log = logging.getLogger(__name__)
1314

15+
1416
class FullDataFrameJobDatabase(JobDatabaseInterface):
1517
def __init__(self):
1618
super().__init__()
@@ -241,4 +243,4 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str =
241243
job_db.initialize_from_df(df=df, on_exists=on_exists)
242244
else:
243245
raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.")
244-
return job_db
246+
return job_db

openeo/extra/job_management/_manager.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import collections
22
import contextlib
3-
import dataclasses
43
import datetime
54
import json
65
import logging
@@ -25,28 +24,31 @@
2524
from urllib3.util import Retry
2625

2726
from openeo import BatchJob, Connection
27+
from openeo.extra.job_management._df_schema import _normalize
28+
from openeo.extra.job_management._interface import JobDatabaseInterface
29+
from openeo.extra.job_management._job_db import get_job_db
2830
from openeo.extra.job_management._thread_worker import (
2931
_JobManagerWorkerThreadPool,
3032
_JobStartTask,
3133
)
32-
from openeo.extra.job_management._interface import JobDatabaseInterface
33-
from openeo.extra.job_management._job_db import get_job_db
34-
from openeo.extra.job_management._df_schema import _normalize
35-
3634
from openeo.rest import OpenEoApiError
3735
from openeo.rest.auth.auth import BearerAuth
3836
from openeo.util import deep_get, rfc3339
3937

40-
4138
_log = logging.getLogger(__name__)
4239

40+
4341
MAX_RETRIES = 50
42+
43+
4444
# Sentinel value to indicate that a parameter was not set
4545
_UNSET = object()
4646

47+
4748
def _start_job_default(row: pd.Series, connection: Connection, *args, **kwargs):
4849
raise NotImplementedError("No 'start_job' callable provided")
4950

51+
5052
class _Backend(NamedTuple):
5153
"""Container for backend info/settings"""
5254

@@ -56,8 +58,6 @@ class _Backend(NamedTuple):
5658
parallel_jobs: int
5759

5860

59-
60-
6161
class MultiBackendJobManager:
6262
"""
6363
Tracker for multiple jobs on multiple backends.
@@ -125,7 +125,6 @@ def start_job(
125125
Added ``cancel_running_job_after`` parameter.
126126
"""
127127

128-
129128
def __init__(
130129
self,
131130
poll_sleep: int = 60,
@@ -409,7 +408,7 @@ def run_jobs(
409408
assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}"
410409

411410
if isinstance(job_db, (str, Path)):
412-
job_db = get_job_db(path=job_db) #TODO circular import
411+
job_db = get_job_db(path=job_db) # TODO circular import
413412

414413
if not isinstance(job_db, JobDatabaseInterface):
415414
raise ValueError(f"Unsupported job_db {job_db!r}")
@@ -806,7 +805,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
806805
job_db.persist(active)
807806

808807
return jobs_done, jobs_error, jobs_cancel
809-
808+
810809

811810
def _format_usage_stat(job_metadata: dict, field: str) -> str:
812811
value = deep_get(job_metadata, "usage", field, "value", default=0)

tests/extra/job_management/test_job_db.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
21
import re
2+
33
import geopandas
44

55
# TODO: can we avoid using httpretty?
@@ -18,7 +18,6 @@
1818
create_job_db,
1919
get_job_db,
2020
)
21-
2221
from openeo.utils.version import ComparableVersion
2322

2423
JOB_DB_DF_BASICS = pd.DataFrame(
@@ -378,4 +377,4 @@ def test_create_job_db(tmp_path, filename, expected):
378377
path = tmp_path / filename
379378
db = create_job_db(path=path, df=df)
380379
assert isinstance(db, expected)
381-
assert path.exists()
380+
assert path.exists()

tests/extra/job_management/test_manager.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,15 @@
2626

2727
import openeo
2828
from openeo import BatchJob
29-
from openeo.extra.job_management._manager import (
30-
MAX_RETRIES,
31-
MultiBackendJobManager,
32-
)
33-
3429
from openeo.extra.job_management._job_db import (
3530
CsvJobDatabase,
3631
ParquetJobDatabase,
3732
create_job_db,
3833
)
34+
from openeo.extra.job_management._manager import (
35+
MAX_RETRIES,
36+
MultiBackendJobManager,
37+
)
3938
from openeo.extra.job_management._thread_worker import (
4039
Task,
4140
_JobManagerWorkerThreadPool,
@@ -604,7 +603,9 @@ def get_status(job_id, current_status):
604603
job_db_path = tmp_path / "jobs.csv"
605604

606605
# Mock sleep() to not actually sleep, but skip one hour at a time
607-
with mock.patch.object(openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
606+
with mock.patch.object(
607+
openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)
608+
):
608609
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
609610

610611
final_df = CsvJobDatabase(job_db_path).read()
@@ -723,7 +724,9 @@ def get_status(job_id, current_status):
723724
job_db_path = tmp_path / "jobs.csv"
724725

725726
# Mock sleep() to skip one hour at a time instead of actually sleeping
726-
with mock.patch.object(openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
727+
with mock.patch.object(
728+
openeo.extra.job_management._manager.time, "sleep", new=lambda s: time_machine.shift(60 * 60)
729+
):
727730
job_manager.run_jobs(df=df, start_job=self._create_year_job, job_db=job_db_path)
728731

729732
final_df = CsvJobDatabase(job_db_path).read()
@@ -865,9 +868,8 @@ def test_refresh_bearer_token_before_start(
865868
dummy_backend_bar,
866869
job_manager_root_dir,
867870
sleep_mock,
868-
requests_mock
871+
requests_mock,
869872
):
870-
871873
client_id = "client123"
872874
client_secret = "$3cr3t"
873875
oidc_issuer = "https://oidc.test/"
@@ -902,6 +904,3 @@ def test_refresh_bearer_token_before_start(
902904
# Because of proactive+throttled token refreshing,
903905
# we should have 2 additional token requests now
904906
assert len(oidc_mock.grant_request_history) == 4
905-
906-
907-

0 commit comments

Comments
 (0)