diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f8efd0c18..3f6eb2bb7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,12 +15,12 @@ jobs: fail-fast: false matrix: toxenv: - - py38,docs + - py310,docs steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: '3.10' # GHA won't setup tox for us - run: pip install tox==3.2 # there are no pre-built wheels for bsddb3, so we need to install @@ -44,7 +44,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: '3.10' # Update package lists to ensure we have the latest information - run: sudo apt-get update # the container provided by GitHub doesn't include utilities diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c1712ca7..4ce706453 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ --- default_language_version: - python: python3.8 + python: python3.10 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.5.0 @@ -27,10 +27,10 @@ repos: - id: reorder-python-imports args: [--py3-plus] - repo: https://github.com/asottile/pyupgrade - rev: v3.3.1 + rev: v3.20.0 hooks: - id: pyupgrade - args: [--py38-plus] + args: [--py39-plus] - repo: local hooks: - id: patch-enforce-autospec @@ -44,4 +44,4 @@ repos: rev: 22.3.0 hooks: - id: black - args: [--target-version, py38] + args: [--target-version, py310] diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 113bdb3db..5c13ecbb2 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -8,7 +8,7 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.8" + python: "3.10" # You can also specify other tool versions: # nodejs: "20" # rust: "1.70" diff --git a/Makefile b/Makefile index 04b592b99..a498e57fe 100644 --- a/Makefile +++ b/Makefile @@ -60,10 +60,10 @@ coffee_%: docker_% ' test: - tox -e py38 + tox -e py310 test_in_docker_%: docker_% - $(DOCKER_RUN) tron-builder-$* python3.8 -m tox -vv -e py38 + $(DOCKER_RUN) tron-builder-$* python3.10 -m tox -vv -e py310 tox_%: tox -e $* @@ -78,13 +78,13 @@ itest_%: debitest_% @echo "itest $* OK" dev: - SSH_AUTH_SOCK=$(SSH_AUTH_SOCK) .tox/py38/bin/trond --debug --working-dir=dev -l logging.conf --host=0.0.0.0 + SSH_AUTH_SOCK=$(SSH_AUTH_SOCK) .tox/py310/bin/trond --debug --working-dir=dev -l logging.conf --host=0.0.0.0 example_cluster: tox -e example-cluster yelpy: - .tox/py38/bin/pip install -r yelp_package/extra_requirements_yelp.txt + .tox/py310/bin/pip install -r yelp_package/extra_requirements_yelp.txt # 1. Bump version at the top of this file diff --git a/bin/tronctl b/bin/tronctl index 709af3128..59d4d38e9 100755 --- a/bin/tronctl +++ b/bin/tronctl @@ -11,12 +11,9 @@ import logging import pprint import sys from collections import defaultdict +from collections.abc import Callable +from collections.abc import Generator from typing import Any -from typing import Callable -from typing import Dict -from typing import Generator -from typing import Optional -from typing import Tuple from urllib.parse import urljoin import argcomplete # type: ignore @@ -238,7 +235,7 @@ def parse_cli(): return args -def request(url: str, data: Dict[str, Any], headers=None, method=None) -> bool: +def request(url: str, data: dict[str, Any], headers=None, method=None) -> bool: # We want every tronctl request to be attributable response = client.request(url, data=data, headers=headers, method=method, user_attribution=True) if response.error: @@ -264,7 +261,7 @@ def event_discard(args): ) -def _get_triggers_for_action(server: str, action_identifier: str) -> Optional[Tuple[str, ...]]: +def _get_triggers_for_action(server: str, action_identifier: str) -> tuple[str, ...] | None: try: namespace, job_name, run_number, action_name = action_identifier.split(".") except ValueError: @@ -452,7 +449,7 @@ def tron_version(args): yield True -COMMANDS: Dict[str, Callable[[argparse.Namespace], Generator[bool, None, None]]] = defaultdict( +COMMANDS: dict[str, Callable[[argparse.Namespace], Generator[bool, None, None]]] = defaultdict( lambda: control_objects, publish=event_publish, discard=event_discard, diff --git a/contrib/mock_patch_checker.py b/contrib/mock_patch_checker.py index 0a95bbf84..15288bb43 100755 --- a/contrib/mock_patch_checker.py +++ b/contrib/mock_patch_checker.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 import ast import sys diff --git a/contrib/patch-config-loggers.diff b/contrib/patch-config-loggers.diff index 4ccf7d506..62d27444e 100644 --- a/contrib/patch-config-loggers.diff +++ b/contrib/patch-config-loggers.diff @@ -1,5 +1,5 @@ ---- a/debian/tron/opt/venvs/tron/lib/python3.8/site-packages/kubernetes/client/configuration.py -+++ b/debian/tron/opt/venvs/tron/lib/python3.8/site-packages/kubernetes/client/configuration.py +--- a/debian/tron/opt/venvs/tron/lib/python3.10/site-packages/kubernetes/client/configuration.py ++++ b/debian/tron/opt/venvs/tron/lib/python3.10/site-packages/kubernetes/client/configuration.py @@ -71,11 +71,11 @@ """ diff --git a/debian/control b/debian/control index d40824e96..b9828bb59 100644 --- a/debian/control +++ b/debian/control @@ -2,13 +2,13 @@ Source: tron Section: admin Priority: optional Maintainer: Daniel Nephin -Build-Depends: debhelper (>= 7), python3.8-dev, libdb5.3-dev, libyaml-dev, libssl-dev, libffi-dev, dh-virtualenv +Build-Depends: debhelper (>= 7), python3.10-dev, libdb5.3-dev, libyaml-dev, libssl-dev, libffi-dev, dh-virtualenv Standards-Version: 3.8.3 Package: tron Architecture: all Homepage: http://github.com/yelp/Tron -Depends: bsdutils, python3.8, libdb5.3, libyaml-0-2, ${shlibs:Depends}, ${misc:Depends} +Depends: bsdutils, python3.10, libdb5.3, libyaml-0-2, ${shlibs:Depends}, ${misc:Depends} Description: Tron is a job scheduling, running and monitoring package. Designed to replace Cron for complex scheduling and dependencies. Provides: diff --git a/debian/rules b/debian/rules index e14032789..02b906eb9 100755 --- a/debian/rules +++ b/debian/rules @@ -22,11 +22,11 @@ override_dh_virtualenv: dh_virtualenv --index-url $(PIP_INDEX_URL) \ --extra-pip-arg --trusted-host=169.254.255.254 \ --extra-pip-arg --only-binary=cryptography \ - --python=/usr/bin/python3.8 \ + --python=/usr/bin/python3.10 \ --preinstall cython==0.29.36 \ --preinstall pip==24.3.1 \ --preinstall setuptools==65.5.1 @echo patching k8s client lib for configuration class - patch debian/tron/opt/venvs/tron/lib/python3.8/site-packages/kubernetes/client/configuration.py contrib/patch-config-loggers.diff + patch debian/tron/opt/venvs/tron/lib/python3.10/site-packages/kubernetes/client/configuration.py contrib/patch-config-loggers.diff override_dh_installinit: dh_installinit --noscripts diff --git a/dev/config/MASTER.yaml b/dev/config/MASTER.yaml index a973a4dd1..615dd63c4 100755 --- a/dev/config/MASTER.yaml +++ b/dev/config/MASTER.yaml @@ -17,7 +17,7 @@ nodes: # action_runner: # runner_type: "subprocess" # remote_status_path: "pg/tron/status" -# remote_exec_path: "pg/tron/.tox/py38/bin" +# remote_exec_path: "pg/tron/.tox/py310/bin" jobs: testjob0: diff --git a/mypy.ini b/mypy.ini index 3b3d2e66d..a702b0cdb 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,14 +1,14 @@ [mypy] -python_version = 3.8 +python_version = 3.10 # TODO: we'd like to be as strict as we are internally, but we need to fully type Tron first # disallow_any_generics = true -disallow_incomplete_defs = true +disallow_incomplete_defs = True # disallow_untyped_calls = true -disallow_untyped_decorators = true +disallow_untyped_decorators = True # disallow_untyped_defs = true show_column_numbers = True -show_error_codes = true +show_error_codes = True show_error_context = True warn_incomplete_stub = True diff --git a/pyproject.toml b/pyproject.toml index ba07a8f9a..880aab301 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,3 @@ [tool.black] line-length = 120 -target_version = ['py38'] +target_version = ['py310'] diff --git a/requirements-dev-minimal.txt b/requirements-dev-minimal.txt index 97a1a6197..2fae9980a 100644 --- a/requirements-dev-minimal.txt +++ b/requirements-dev-minimal.txt @@ -2,10 +2,9 @@ asynctest botocore-stubs debugpy flake8 -mock +moto mypy pre-commit -pylint pytest pytest-asyncio requirements-tools diff --git a/requirements-dev.txt b/requirements-dev.txt index 7623b2004..be9e6e8a3 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,18 +1,15 @@ -astroid==2.13.3 asynctest==0.12.0 boto3-stubs==1.35.63 botocore-stubs==1.38.19 cfgv==2.0.1 debugpy==1.8.1 -dill==0.3.6 distlib==0.3.6 filelock==3.4.1 flake8==5.0.4 identify==2.5.5 iniconfig==1.1.1 -isort==4.3.18 -lazy-object-proxy==1.9.0 mccabe==0.7.0 +moto==4.1.0 mypy==1.9.0 mypy-extensions==1.0.0 nodeenv==1.3.3 @@ -23,14 +20,13 @@ pre-commit==2.20.0 py==1.10.0 pycodestyle==2.9.0 pyflakes==2.5.0 -pylint==2.15.10 pyparsing==2.4.2 -pytest==6.2.2 +pytest==7.0.1 pytest-asyncio==0.14.0 -requirements-tools==1.2.1 +requirements-tools==2.1.0 +responses==0.13.0 toml==0.10.2 tomli==2.0.1 -tomlkit==0.11.6 types-awscrt==0.27.2 types-boto3==1.0.2 types-cachetools==5.5.0.20240820 @@ -42,3 +38,4 @@ types-s3transfer==0.13.0 types-setuptools==75.8.0.20250110 types-urllib3==1.26.25.14 virtualenv==20.17.1 +xmltodict==0.12.0 diff --git a/requirements-minimal.txt b/requirements-minimal.txt index 47400a9ed..d34ab8a12 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -11,7 +11,6 @@ ipdb ipython Jinja2>=3.1.2 lockfile -moto prometheus-client psutil py-bcrypt diff --git a/requirements.txt b/requirements.txt index 75a2886e6..a9779b1f2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,27 +3,20 @@ argcomplete==1.9.5 asttokens==2.2.1 attrs==19.3.0 Automat==20.2.0 -aws-sam-translator==1.15.1 -aws-xray-sdk==2.4.2 backcall==0.1.0 -boto==2.49.0 boto3==1.34.80 botocore==1.34.80 bsddb3==6.2.7 cachetools==4.2.1 certifi==2022.12.7 -cffi==1.12.3 -cfn-lint==0.24.4 +cffi==1.15.0 charset-normalizer==2.0.12 constantly==15.1.0 cryptography==41.0.5 dataclasses==0.6 -DateTime==4.3 decorator==4.4.0 -docker==4.1.0 ecdsa==0.13.3 executing==1.2.0 -future==0.18.3 google-auth==1.23.0 http-parser==0.9.0 humanize==4.10.0 @@ -36,17 +29,10 @@ ipython-genutils==0.2.0 jedi==0.16.0 Jinja2==3.1.2 jmespath==0.9.4 -jsondiff==1.1.2 -jsonpatch==1.24 -jsonpickle==1.2 -jsonpointer==2.0 -jsonschema==3.2.0 kubernetes==26.1.0 lockfile==0.12.2 MarkupSafe==2.1.1 matplotlib-inline==0.1.3 -mock==3.0.5 -moto==1.3.13 oauthlib==3.1.0 parso==0.7.0 pexpect==4.7.0 @@ -68,18 +54,15 @@ pyrsistent==0.15.4 pysensu-yelp==1.0.3 PyStaticConfiguration==0.11.1 python-dateutil==2.8.1 -python-jose==3.0.1 pytimeparse==1.1.8 pytz==2019.3 PyYAML==6.0.1 requests==2.27.1 requests-oauthlib==1.2.0 -responses==0.10.6 rsa==4.9 s3transfer==0.10.1 setuptools==65.5.1 six==1.15.0 -sshpubkeys==3.1.0 stack-data==0.6.2 task-processing==1.3.5 traitlets==5.0.0 @@ -89,6 +72,4 @@ urllib3==1.25.10 wcwidth==0.1.7 websocket-client==0.56.0 Werkzeug==2.2.3 -wrapt==1.11.2 -xmltodict==0.12.0 -zope.interface==5.1.0 +zope.interface==7.2 diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index 6e13c31bb..047285793 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -1,5 +1,4 @@ from typing import Any -from typing import Dict from unittest import mock import pytest @@ -66,7 +65,7 @@ def mock_event_factory( task_id: str, platform_type: str, message: str = None, - raw: Dict[str, Any] = None, + raw: dict[str, Any] = None, success: bool = False, terminal: bool = False, ) -> Event: diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 8b9821eab..389f88c05 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -4,8 +4,8 @@ import boto3 import pytest import staticconf.testing -from moto import mock_dynamodb2 -from moto.dynamodb2.responses import dynamo_json_dump +from moto import mock_dynamodb +from moto.dynamodb.responses import dynamo_json_dump from testifycompat import assert_equal from tron.serialize.runstate.dynamodb_state_store import DynamoDBStateStore @@ -61,10 +61,10 @@ def update_item(item): @pytest.fixture(autouse=True) def store(): with mock.patch( - "moto.dynamodb2.responses.DynamoHandler.transact_write_items", + "moto.dynamodb.responses.DynamoHandler.transact_write_items", new=mock_transact_write_items, create=True, - ), mock_dynamodb2(): + ), mock_dynamodb(): dynamodb = boto3.resource("dynamodb", region_name="us-west-2") table_name = "tmp" store = DynamoDBStateStore(table_name, "us-west-2", stopping=True) @@ -125,7 +125,7 @@ def large_object(): "runs": [], "cleanup_run": None, "manual": False, - "large_data": [i for i in range(1_000_000)], + "large_data": [i for i in range(10000)], } diff --git a/tests/serialize/runstate/statemanager_test.py b/tests/serialize/runstate/statemanager_test.py index 7eaa65f8e..3d57d7262 100644 --- a/tests/serialize/runstate/statemanager_test.py +++ b/tests/serialize/runstate/statemanager_test.py @@ -87,7 +87,7 @@ def test_restore(self): with mock.patch.object(self.manager, "_restore_dicts", autospec=True,) as mock_restore_dicts, mock.patch.object( self.manager, "_restore_runs_for_job", - autospect=True, + autospec=True, ) as mock_restore_runs: mock_restore_dicts.side_effect = [ # _restore_dicts for JOB_STATE diff --git a/tests/tools/sync_tron_state_from_k8s_test.py b/tests/tools/sync_tron_state_from_k8s_test.py index 284ada312..bb3d757e5 100644 --- a/tests/tools/sync_tron_state_from_k8s_test.py +++ b/tests/tools/sync_tron_state_from_k8s_test.py @@ -1,4 +1,3 @@ -from typing import Dict from unittest import mock import pytest @@ -11,7 +10,7 @@ from tools.sync_tron_state_from_k8s import update_tron_from_pods -def create_mock_pod(name: str, phase: str, labels: Dict[str, str], creation_timestamp: str): +def create_mock_pod(name: str, phase: str, labels: dict[str, str], creation_timestamp: str): metadata = V1ObjectMeta(name=name, creation_timestamp=creation_timestamp, labels=labels) status = V1PodStatus(phase=phase) return V1Pod(metadata=metadata, status=status) diff --git a/tools/pickles_to_json.py b/tools/pickles_to_json.py index 37eb25e08..a7a9bfad6 100644 --- a/tools/pickles_to_json.py +++ b/tools/pickles_to_json.py @@ -2,8 +2,6 @@ import math import os import pickle -from typing import List -from typing import Optional import boto3 import requests @@ -20,7 +18,7 @@ def get_dynamodb_table( - aws_profile: Optional[str] = None, table: str = "infrastage-tron-state", region: str = "us-west-1" + aws_profile: str | None = None, table: str = "infrastage-tron-state", region: str = "us-west-1" ) -> ServiceResource: """ Get the DynamoDB table resource. @@ -33,7 +31,7 @@ def get_dynamodb_table( return session.resource("dynamodb", region_name=region).Table(table) -def get_all_jobs(source_table: ServiceResource) -> List[str]: +def get_all_jobs(source_table: ServiceResource) -> list[str]: """ Scan the DynamoDB table and return a list of unique job keys. :param source_table: The DynamoDB table resource to scan. @@ -44,7 +42,7 @@ def get_all_jobs(source_table: ServiceResource) -> List[str]: return list(unique_keys) -def get_job_names(base_url: str) -> List[str]: +def get_job_names(base_url: str) -> list[str]: """ Get the list of job names from the Tron API. :param base_url: The base URL of the Tron API. @@ -101,7 +99,7 @@ def dump_pickle_key(source_table: ServiceResource, key: str) -> None: raise -def dump_pickle_keys(source_table: ServiceResource, keys: List[str]) -> None: +def dump_pickle_keys(source_table: ServiceResource, keys: list[str]) -> None: """ Load and print pickles for the given list of keys. :param source_table: The DynamoDB table resource. @@ -128,7 +126,7 @@ def dump_json_key(source_table: ServiceResource, key: str) -> None: print(f"Key: {key} - Failed to load JSON: {e}") -def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: +def dump_json_keys(source_table: ServiceResource, keys: list[str]) -> None: """ Load and print JSON data for the given list of keys. :param source_table: The DynamoDB table resource. @@ -139,7 +137,7 @@ def dump_json_keys(source_table: ServiceResource, keys: List[str]) -> None: # TODO: clean up old run history for valid jobs? something something look at job_state, then whitelist those runs instead of whitelisting entire jobs -def delete_keys(source_table: ServiceResource, keys: List[str]) -> None: +def delete_keys(source_table: ServiceResource, keys: list[str]) -> None: """ Delete items with the given list of keys from the DynamoDB table. :param source_table: The DynamoDB table resource. @@ -179,7 +177,7 @@ def get_num_partitions(source_table: ServiceResource, key: str) -> int: return max(num_partitions, num_json_val_partitions) -def combine_json_partitions(source_table: ServiceResource, key: str) -> Optional[str]: +def combine_json_partitions(source_table: ServiceResource, key: str) -> str | None: """ Combine all partitions of a JSON item from DynamoDB. :param source_table: The DynamoDB table resource. @@ -254,11 +252,11 @@ def convert_pickle_to_json_and_update_table(source_table: ServiceResource, key: def convert_pickles_to_json_and_update_table( source_table: ServiceResource, - keys: List[str], + keys: list[str], dry_run: bool = True, - deprecated_keys_output: Optional[str] = None, - failed_keys_output: Optional[str] = None, - job_names: List[str] = [], + deprecated_keys_output: str | None = None, + failed_keys_output: str | None = None, + job_names: list[str] = [], ) -> None: """ Convert pickled items in the DynamoDB table to JSON and update the entries. @@ -324,7 +322,7 @@ def convert_pickles_to_json_and_update_table( print("Dry run complete. No changes were made to the DynamoDB table.") -def scan_table(source_table: ServiceResource) -> List[dict]: +def scan_table(source_table: ServiceResource) -> list[dict]: """ Scan the DynamoDB table and return all items, handling pagination. :param source_table: The DynamoDB table resource to scan. diff --git a/tools/sync_tron_state_from_k8s.py b/tools/sync_tron_state_from_k8s.py index 60f4624d0..fa2d6856a 100644 --- a/tools/sync_tron_state_from_k8s.py +++ b/tools/sync_tron_state_from_k8s.py @@ -14,9 +14,6 @@ import subprocess import sys from typing import Any -from typing import Dict -from typing import List -from typing import Optional from kubernetes.client import V1Pod from task_processing.plugins.kubernetes.kube_client import KubeClient @@ -113,7 +110,7 @@ def parse_args(): return args -def fetch_pods(kubeconfig_path: str, kubecontext: Optional[str]) -> Dict[str, V1Pod]: +def fetch_pods(kubeconfig_path: str, kubecontext: str | None) -> dict[str, V1Pod]: if kubecontext: # KubeClient only uses the environment variable os.environ["KUBECONTEXT"] = kubecontext @@ -127,7 +124,7 @@ def fetch_pods(kubeconfig_path: str, kubecontext: Optional[str]) -> Dict[str, V1 return {pod.metadata.name: pod for pod in completed_pod_list.items} -def get_tron_state_from_api(tron_server: str, num_runs: int = 100) -> List[Dict[str, Dict[Any, Any]]]: +def get_tron_state_from_api(tron_server: str, num_runs: int = 100) -> list[dict[str, dict[Any, Any]]]: if not tron_server: client_config = get_client_config() tron_server = client_config.get("server", "http://localhost:8089") @@ -153,7 +150,7 @@ def get_tron_state_from_api(tron_server: str, num_runs: int = 100) -> List[Dict[ return jobs -def get_matching_pod(action_run: Dict[str, Any], pods: Dict[str, V1Pod]) -> Optional[V1Pod]: +def get_matching_pod(action_run: dict[str, Any], pods: dict[str, V1Pod]) -> V1Pod | None: """Given a tron action_run, try to find the right pod that matches.""" action_name = action_run["action_name"] job_name = action_run["job_name"] @@ -184,7 +181,7 @@ def get_desired_state_from_pod(pod: V1Pod) -> str: def update_tron_from_pods( - jobs: List[Dict[str, Any]], pods: Dict[str, V1Pod], tronctl_wrapper: str = "tronctl", do_work: bool = False + jobs: list[dict[str, Any]], pods: dict[str, V1Pod], tronctl_wrapper: str = "tronctl", do_work: bool = False ): updated = [] error = [] diff --git a/tox.ini b/tox.ini index 36e5c77c4..b4b2b127d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,8 @@ [tox] -envlist = py38 +envlist = py310 [testenv] -basepython = python3.8 +basepython = python3.10 deps = --requirement={toxinidir}/requirements.txt --requirement={toxinidir}/requirements-dev.txt diff --git a/tron/actioncommand.py b/tron/actioncommand.py index 4d943cf1b..5d84ccb22 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -4,8 +4,6 @@ from io import StringIO from shlex import quote from typing import Any -from typing import Dict -from typing import Optional from tron.config import schema from tron.serialize import filehandler @@ -210,7 +208,7 @@ def __ne__(self, other): return not self == other @staticmethod - def from_json(state_data: str) -> Dict[str, Any]: + def from_json(state_data: str) -> dict[str, Any]: try: json_data = json.loads(state_data) deserialized_data = { @@ -223,7 +221,7 @@ def from_json(state_data: str) -> Dict[str, Any]: raise @staticmethod - def to_json(state_data: dict) -> Optional[str]: + def to_json(state_data: dict) -> str | None: try: return json.dumps( { diff --git a/tron/api/adapter.py b/tron/api/adapter.py index b9d0715bc..6b40d1d21 100644 --- a/tron/api/adapter.py +++ b/tron/api/adapter.py @@ -7,10 +7,8 @@ import functools import os.path import time +from collections.abc import Callable from typing import Any -from typing import Callable -from typing import List -from typing import Optional from typing import TypeVar from urllib.parse import quote @@ -28,8 +26,8 @@ class ReprAdapter: """Creates a dictionary from the given object for a set of rules.""" - field_names: List[str] = [] - translated_field_names: List[str] = [] + field_names: list[str] = [] + translated_field_names: list[str] = [] def __init__(self, internal_obj): self._obj = internal_obj @@ -61,15 +59,15 @@ def toggle_flag( [Callable[[Any], R]], # and that decorator returns another callable (since this is a decorator factory) that takes self (again, the Any) # and returns None (if the flag is False) or R (if True) - Callable[[Any], Optional[R]], + Callable[[Any], R | None], ]: """Create a decorator which checks if flag_name is true before running the wrapped function. If False returns None. """ - def wrap(f: Callable[[Any], R]) -> Callable[[Any], Optional[R]]: + def wrap(f: Callable[[Any], R]) -> Callable[[Any], R | None]: @functools.wraps(f) - def wrapper(self: Any) -> Optional[R]: + def wrapper(self: Any) -> R | None: if getattr(self, flag_name): return f(self) return None @@ -155,7 +153,7 @@ def get_requirements(self): required = self.job_run.action_graph.get_dependencies(action_name) return [act.name for act in required] - def _get_serializer(self, path: Optional[str] = None) -> filehandler.OutputStreamSerializer: + def _get_serializer(self, path: str | None = None) -> filehandler.OutputStreamSerializer: base_path = filehandler.OutputPath(path) if path else self._obj.output_path return filehandler.OutputStreamSerializer(base_path) @@ -181,14 +179,14 @@ def _get_alternate_output_paths(self): yield formatted_alt_path @toggle_flag("include_meta") - def get_meta(self) -> List[str]: + def get_meta(self) -> list[str]: if not isinstance(self._obj, KubernetesActionRun): return ["When this action is migrated to Kubernetes, this will contain Tron/task_processing output."] # We're reusing the "old" (i.e., SSH/Mesos) logging files for task_processing output since # that won't make it into anything but Splunk filename = actioncommand.ActionCommand.STDERR - output: List[str] = self._get_serializer().tail(filename, self.max_lines) + output: list[str] = self._get_serializer().tail(filename, self.max_lines) if not output: for alt_path in self._get_alternate_output_paths(): output = self._get_serializer(alt_path).tail(filename, self.max_lines) @@ -197,7 +195,7 @@ def get_meta(self) -> List[str]: return output @toggle_flag("include_stdout") - def get_stdout(self) -> List[str]: + def get_stdout(self) -> list[str]: if isinstance(self._obj, KubernetesActionRun): # it's possible that we have a job that logs to the samestream as another job on a # different master (e.g., 1 job in pnw-devc and another in norcal-devc), so we @@ -239,7 +237,7 @@ def get_stdout(self) -> List[str]: return output @toggle_flag("include_stderr") - def get_stderr(self) -> List[str]: + def get_stderr(self) -> list[str]: if isinstance(self._obj, KubernetesActionRun): # it's possible that we have a job that logs to the samestream as another job on a # different master (e.g., 1 job in pnw-devc and another in norcal-devc), so we diff --git a/tron/api/auth.py b/tron/api/auth.py index 2e1342b3f..a8cb02c59 100644 --- a/tron/api/auth.py +++ b/tron/api/auth.py @@ -3,7 +3,6 @@ import re from functools import lru_cache from typing import NamedTuple -from typing import Optional import cachetools.func import requests @@ -68,7 +67,7 @@ def _is_request_authorized_impl( path: str, token: str, method: str, - service: Optional[str], + service: str | None, ) -> AuthorizationOutcome: """Check if API request is authorized @@ -107,7 +106,7 @@ def _is_request_authorized_impl( return AuthorizationOutcome(True, reason) @staticmethod - def _extract_service_from_path(path: str) -> Optional[str]: + def _extract_service_from_path(path: str) -> str | None: """If a request path contains a service name, extract it. Example: diff --git a/tron/api/controller.py b/tron/api/controller.py index 3e55a8f83..4c7b23068 100644 --- a/tron/api/controller.py +++ b/tron/api/controller.py @@ -2,7 +2,6 @@ Web Controllers for the API. """ import logging -from typing import Dict from typing import TYPE_CHECKING from typing import TypedDict @@ -210,7 +209,7 @@ def read_config(self, name: str) -> ConfigResponse: config_hash = self.config_manager.get_hash(name) return {"config": config_content, "hash": config_hash} - def read_all_configs(self) -> Dict[str, ConfigResponse]: + def read_all_configs(self) -> dict[str, ConfigResponse]: configs = {} for service in self.config_manager.get_namespaces(): diff --git a/tron/bin/action_runner.py b/tron/bin/action_runner.py index 939c5a76c..78439a271 100755 --- a/tron/bin/action_runner.py +++ b/tron/bin/action_runner.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 """ Write pid and stdout/stderr to a standard location before execing a command. """ @@ -10,8 +10,6 @@ import sys import threading import time -from typing import Dict -from typing import Optional from tron import yaml @@ -95,7 +93,7 @@ def build_environment(run_id, original_env=None): return new_env -def build_labels(run_id: str, original_labels: Optional[Dict[str, str]] = None) -> Dict[str, str]: +def build_labels(run_id: str, original_labels: dict[str, str] | None = None) -> dict[str, str]: if original_labels is None: original_labels = dict() diff --git a/tron/bin/action_status.py b/tron/bin/action_status.py index 6b5b86749..153b6942f 100755 --- a/tron/bin/action_status.py +++ b/tron/bin/action_status.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 import argparse import logging import os diff --git a/tron/bin/check_tron_datastore_staleness.py b/tron/bin/check_tron_datastore_staleness.py index d3cd86ddb..e3d628bce 100755 --- a/tron/bin/check_tron_datastore_staleness.py +++ b/tron/bin/check_tron_datastore_staleness.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 import argparse import logging import os diff --git a/tron/bin/check_tron_jobs.py b/tron/bin/check_tron_jobs.py index cffb8b564..bb422f5e1 100755 --- a/tron/bin/check_tron_jobs.py +++ b/tron/bin/check_tron_jobs.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 import datetime import logging import pprint diff --git a/tron/bin/get_tron_metrics.py b/tron/bin/get_tron_metrics.py index 706ec7db3..11c5f20d5 100755 --- a/tron/bin/get_tron_metrics.py +++ b/tron/bin/get_tron_metrics.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 # # get_tron_metrics.py # This script is designed to retrieve metrics from Tron via its API and send diff --git a/tron/bin/recover_batch.py b/tron/bin/recover_batch.py index c43934b5a..8abcd4a23 100755 --- a/tron/bin/recover_batch.py +++ b/tron/bin/recover_batch.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.8 +#!/usr/bin/env python3.10 import argparse import logging import signal diff --git a/tron/commands/authentication.py b/tron/commands/authentication.py index 894be1f8a..bb9f54e9a 100644 --- a/tron/commands/authentication.py +++ b/tron/commands/authentication.py @@ -1,6 +1,5 @@ import os from typing import cast -from typing import Optional from tron.commands.cmd_utils import get_client_config @@ -9,7 +8,7 @@ from okta_auth import get_and_cache_jwt_default # type: ignore # library lacks py.typed marker except ImportError: - def get_instance_oidc_identity_token(role: str, ecosystem: Optional[str] = None) -> str: + def get_instance_oidc_identity_token(role: str, ecosystem: str | None = None) -> str: return "" def get_and_cache_jwt_default(client_id: str, refreshable: bool = False, force: bool = False) -> str: diff --git a/tron/commands/backfill.py b/tron/commands/backfill.py index 2c21dafd7..1e565c2dd 100644 --- a/tron/commands/backfill.py +++ b/tron/commands/backfill.py @@ -6,9 +6,6 @@ import re import signal import sys -from typing import List -from typing import Optional -from typing import Set from urllib.parse import urljoin from tron.commands import client @@ -25,7 +22,7 @@ def get_date_range( start_date: datetime.datetime, end_date: datetime.datetime, descending: bool = False, -) -> List[datetime.datetime]: +) -> list[datetime.datetime]: dates = [] delta = end_date - start_date for days_to_add in range(delta.days + 1): @@ -35,7 +32,7 @@ def get_date_range( return dates -def print_backfill_cmds(job: str, date_strs: List[str]) -> None: +def print_backfill_cmds(job: str, date_strs: list[str]) -> None: print(f"Please run the following {len(date_strs)} commands:") print("") for date in date_strs: @@ -44,7 +41,7 @@ def print_backfill_cmds(job: str, date_strs: List[str]) -> None: print("Note that many jobs operate on the previous day's data.") -def confirm_backfill(job: str, date_strs: List[str]) -> bool: +def confirm_backfill(job: str, date_strs: list[str]) -> bool: print( f"To backfill for the job '{job}', a job run will be created for each " f"of the following {len(date_strs)} dates:" @@ -70,8 +67,8 @@ def __init__(self, tron_client: client.Client, job_id: client.TronObjectIdentifi self.tron_client = tron_client self.job_id = job_id self.run_time = run_time - self.run_name: Optional[str] = None - self.run_id: Optional[client.TronObjectIdentifier] = None + self.run_name: str | None = None + self.run_id: client.TronObjectIdentifier | None = None self.run_state = BackfillRun.NOT_STARTED_STATE @property @@ -88,7 +85,7 @@ async def run_until_completion(self) -> str: await self.cancel() return self.run_state - async def create(self) -> Optional[str]: + async def create(self) -> str | None: """Creates job run for a specific date. Returns the name of the run, if it was created with no issues. @@ -128,7 +125,7 @@ async def create(self) -> Optional[str]: return self.run_name - async def get_run_id(self) -> Optional[client.TronObjectIdentifier]: + async def get_run_id(self) -> client.TronObjectIdentifier | None: if not self.run_id: loop = asyncio.get_event_loop() try: @@ -232,10 +229,10 @@ async def cancel(self) -> bool: async def run_backfill_for_date_range( server: str, job_name: str, - dates: List[datetime.datetime], + dates: list[datetime.datetime], max_parallel: int = DEFAULT_MAX_PARALLEL_RUNS, ignore_errors: bool = True, -) -> List[BackfillRun]: +) -> list[BackfillRun]: """Creates and watches job runs over a range of dates for a given job. At most, max_parallel runs can run in parallel to prevent resource exhaustion. """ @@ -257,13 +254,13 @@ async def run_backfill_for_date_range( raise ValueError(f"'{job_name}' is a {job_id.type.lower()}, not a job") backfill_runs = [BackfillRun(tron_client, job_id, run_time) for run_time in dates] - running: Set[asyncio.Future] = set() + running: set[asyncio.Future] = set() finished_cnt = 0 all_successful = True # `current_task()` will always return a task here, but we need to account # for the None case for mypy - current_task = asyncio.Task.current_task() + current_task = asyncio.current_task() if current_task: loop.add_signal_handler(signal.SIGINT, current_task.cancel) try: @@ -303,7 +300,7 @@ class DisplayBackfillRuns(display.TableDisplay): header_color = "hgray" -def print_backfill_runs_table(runs: List[BackfillRun]) -> None: +def print_backfill_runs_table(runs: list[BackfillRun]) -> None: """Prints backfill runs in a table""" with display.Color.enable(): table = DisplayBackfillRuns().format( diff --git a/tron/commands/client.py b/tron/commands/client.py index 73dac8c51..edc6b22fc 100644 --- a/tron/commands/client.py +++ b/tron/commands/client.py @@ -8,7 +8,6 @@ import urllib.parse import urllib.request from collections import namedtuple -from typing import Dict import tron from tron.commands.authentication import get_auth_token @@ -99,7 +98,7 @@ def build_get_url(url, data=None): return url -def ensure_user_attribution(headers: Dict[str, str]) -> Dict[str, str]: +def ensure_user_attribution(headers: dict[str, str]) -> dict[str, str]: headers = headers.copy() if "User-Agent" not in headers: headers["User-Agent"] = USER_AGENT diff --git a/tron/commands/display.py b/tron/commands/display.py index 2ab076327..399f5e878 100644 --- a/tron/commands/display.py +++ b/tron/commands/display.py @@ -2,13 +2,10 @@ Format and color output for tron commands. """ import contextlib +from collections.abc import Callable +from collections.abc import Collection from functools import partial from operator import itemgetter -from typing import Callable -from typing import Collection -from typing import Dict -from typing import List -from typing import Optional from tron.core import actionrun from tron.core import job @@ -87,11 +84,11 @@ class TableDisplay: Footer """ - columns: List[str] = [] - fields: List[str] = [] - widths: List[int] = [] - colors: Dict[str, Callable[[str], str]] = {} - title: Optional[str] = None + columns: list[str] = [] + fields: list[str] = [] + widths: list[int] = [] + colors: dict[str, Callable[[str], str]] = {} + title: str | None = None resize_fields: Collection[str] = set() reversed = False diff --git a/tron/commands/retry.py b/tron/commands/retry.py index d48bf4157..acfc7e920 100644 --- a/tron/commands/retry.py +++ b/tron/commands/retry.py @@ -3,9 +3,6 @@ import datetime import functools import random -from typing import Dict -from typing import List -from typing import Optional from urllib.parse import urljoin import pytimeparse # type:ignore @@ -59,7 +56,7 @@ def __init__( self._elapsed = datetime.timedelta(seconds=0) self._triggers_done = False self._required_actions_done = False - self._retry_request_result: Optional[bool] = RetryAction.RETRY_NOT_ISSUED + self._retry_request_result: bool | None = RetryAction.RETRY_NOT_ISSUED @property def job_run_name(self) -> str: @@ -95,7 +92,7 @@ def _validate_action_name(self, full_action_name: str) -> client.TronObjectIdent self.tron_client.action_runs(action_run_id.url, num_lines=0) # verify action exists return action_run_id - def _get_required_action_indices(self) -> Dict[str, int]: + def _get_required_action_indices(self) -> dict[str, int]: job_run = self.tron_client.job_runs(self.job_run_id.url) required_actions = set() action_indices = {} @@ -131,7 +128,7 @@ async def can_retry(self) -> bool: self._log(f"Required actions not yet succeeded: {remaining_required_actions}") return self._triggers_done and self._required_actions_done - async def check_trigger_statuses(self) -> Dict[str, bool]: + async def check_trigger_statuses(self) -> dict[str, bool]: action_run = await asyncio.get_event_loop().run_in_executor( None, functools.partial( @@ -153,7 +150,7 @@ async def check_trigger_statuses(self) -> Dict[str, bool]: trigger_states[trigger] = len(maybe_state) == 1 return trigger_states - async def check_required_actions_statuses(self) -> Dict[str, bool]: + async def check_required_actions_statuses(self) -> dict[str, bool]: action_runs = ( await asyncio.get_event_loop().run_in_executor( None, @@ -232,10 +229,10 @@ async def issue_retry(self) -> bool: def retry_actions( tron_server: str, - full_action_names: List[str], + full_action_names: list[str], use_latest_command: bool = False, deps_timeout_s: int = RetryAction.NO_TIMEOUT, -) -> List[RetryAction]: +) -> list[RetryAction]: tron_client = client.Client(tron_server, user_attribution=True) r_actions = [RetryAction(tron_client, name, use_latest_command=use_latest_command) for name in full_action_names] @@ -263,7 +260,7 @@ class DisplayRetries(display.TableDisplay): header_color = "hgray" -def print_retries_table(retries: List[RetryAction]) -> None: +def print_retries_table(retries: list[RetryAction]) -> None: """Prints retry runs in a table""" with display.Color.enable(): table = DisplayRetries().format([dict(full_action_name=r.full_action_name, status=r.status) for r in retries]) diff --git a/tron/config/config_parse.py b/tron/config/config_parse.py index 2e0cb0677..6c89d0fd4 100644 --- a/tron/config/config_parse.py +++ b/tron/config/config_parse.py @@ -13,12 +13,6 @@ import os from copy import deepcopy from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Set -from typing import Tuple -from typing import Union from urllib.parse import urlparse import pytz @@ -287,7 +281,7 @@ class ValidateSecretSource(Validator): valid_secret_source = ValidateSecretSource() -def valid_permission_mode(value: Union[str, int], config_context: ConfigContext) -> str: +def valid_permission_mode(value: str | int, config_context: ConfigContext) -> str: try: decimal_value = int( str(value), base=8 @@ -423,7 +417,7 @@ def _valid_when_unsatisfiable(value: str, config_context: ConfigContext) -> str: return value -def _valid_topology_spread_label_selector(value: Dict[str, str], config_context: ConfigContext) -> Dict[str, str]: +def _valid_topology_spread_label_selector(value: dict[str, str], config_context: ConfigContext) -> dict[str, str]: if not value: raise ConfigError("TopologySpreadConstraints must have a label_selector") @@ -773,7 +767,7 @@ class ValidateJob(Validator): """Validate jobs.""" config_class = ConfigJob - defaults: Dict[str, Any] = { + defaults: dict[str, Any] = { "run_limit": 50, "all_nodes": False, "cleanup_action": None, @@ -812,14 +806,14 @@ def cast(self, in_dict, config_context): # TODO: extract common code to a util function def _validate_dependencies( self, - job: Dict[str, Any], # TODO: create TypedDict for this + job: dict[str, Any], # TODO: create TypedDict for this # TODO: setup UniqueNameDict for use with mypy so that the following line # is not a lie - actions: Dict[str, ConfigAction], + actions: dict[str, ConfigAction], base_action: ConfigAction, - current_action: Optional[ConfigAction] = None, - stack: Optional[List[str]] = None, - already_validated: Optional[Set[Tuple[str, str]]] = None, + current_action: ConfigAction | None = None, + stack: list[str] | None = None, + already_validated: set[tuple[str, str]] | None = None, ) -> None: """Check for circular or misspelled dependencies.""" # for large graphs, we can end up validating the same jobs/actions repeatedly diff --git a/tron/config/config_utils.py b/tron/config/config_utils.py index 5d2731ab0..d8d27a768 100644 --- a/tron/config/config_utils.py +++ b/tron/config/config_utils.py @@ -4,7 +4,6 @@ import itertools import re from string import Formatter -from typing import Optional from tron.config import ConfigError from tron.config.schema import MASTER_NAMESPACE @@ -293,7 +292,7 @@ class Validator: collection from the source. """ - config_class: Optional[type] = None + config_class: type | None = None defaults = {} # type: ignore validators = {} # type: ignore optional = False diff --git a/tron/config/manager.py b/tron/config/manager.py index 30e7a986e..194c80efd 100644 --- a/tron/config/manager.py +++ b/tron/config/manager.py @@ -2,8 +2,6 @@ import logging import os from copy import deepcopy -from typing import List -from typing import Union from tron import yaml from tron.config import config_parse @@ -45,7 +43,7 @@ def read_raw(path: str) -> str: return fh.read() -def hash_digest(content: Union[str, bytes]) -> str: +def hash_digest(content: str | bytes) -> str: return hashlib.sha1( maybe_encode(content) ).hexdigest() # TODO: TRON-2293 maybe_encode is a relic of Python2->Python3 migration. Remove it. @@ -209,7 +207,7 @@ def get_hash(self, name: str) -> str: def __contains__(self, name): return name in self.manifest - def get_namespaces(self) -> List[str]: + def get_namespaces(self) -> list[str]: return list(self.manifest.get_file_mapping().keys()) diff --git a/tron/config/schema.py b/tron/config/schema.py index 792925cf1..e0fbdefbb 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -6,8 +6,6 @@ from collections import namedtuple from enum import Enum from typing import Any -from typing import Dict -from typing import Type from typing import TypeVar MASTER_NAMESPACE = "MASTER" @@ -38,7 +36,7 @@ def config_object_factory(name, required=None, optional=None): T = TypeVar("T", bound="config_class") # i'm sorry. @classmethod - def from_dict(cls: Type[T], data: Dict[str, Any]) -> T: + def from_dict(cls: type[T], data: dict[str, Any]) -> T: supported_keys = set(required + optional) filtered_data = {k: v for k, v in data.items() if k in supported_keys} return cls(**filtered_data) diff --git a/tron/core/action.py b/tron/core/action.py index 0f36b7399..a64d91932 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -5,10 +5,6 @@ from dataclasses import field from dataclasses import fields from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Union from tron import node from tron.config.schema import CLEANUP_ACTION_NAME @@ -32,28 +28,28 @@ class ActionCommandConfig(Persistable): """A configurable data object for one try of an Action.""" command: str - cpus: Optional[float] = None - mem: Optional[float] = None - disk: Optional[float] = None - cap_add: List[str] = field(default_factory=list) - cap_drop: List[str] = field(default_factory=list) + cpus: float | None = None + mem: float | None = None + disk: float | None = None + cap_add: list[str] = field(default_factory=list) + cap_drop: list[str] = field(default_factory=list) constraints: set = field(default_factory=set) - docker_image: Optional[str] = None + docker_image: str | None = None docker_parameters: set = field(default_factory=set) env: dict = field(default_factory=dict) secret_env: dict = field(default_factory=dict) - secret_volumes: List[ConfigSecretVolume] = field(default_factory=list) - projected_sa_volumes: List[ConfigProjectedSAVolume] = field(default_factory=list) + secret_volumes: list[ConfigSecretVolume] = field(default_factory=list) + projected_sa_volumes: list[ConfigProjectedSAVolume] = field(default_factory=list) field_selector_env: dict = field(default_factory=dict) extra_volumes: set = field(default_factory=set) node_selectors: dict = field(default_factory=dict) - node_affinities: List[ConfigNodeAffinity] = field(default_factory=list) - topology_spread_constraints: List[ConfigTopologySpreadConstraints] = field(default_factory=list) + node_affinities: list[ConfigNodeAffinity] = field(default_factory=list) + topology_spread_constraints: list[ConfigTopologySpreadConstraints] = field(default_factory=list) labels: dict = field(default_factory=dict) idempotent: bool = False annotations: dict = field(default_factory=dict) - service_account_name: Optional[str] = None - ports: List[int] = field(default_factory=list) + service_account_name: str | None = None + ports: list[int] = field(default_factory=list) @property def state_data(self): @@ -65,7 +61,7 @@ def copy(self): @staticmethod def from_json( state_data: str, - ) -> Dict[str, Any]: # TODO: use a TypedDict (or return an ActionCommandConfig instance) + ) -> dict[str, Any]: # TODO: use a TypedDict (or return an ActionCommandConfig instance) """Deserialize a JSON string to an ActionCommandConfig dict.""" try: json_data = json.loads(state_data) @@ -185,14 +181,14 @@ class Action: name: str command_config: ActionCommandConfig node_pool: str - retries: Optional[int] = None - retries_delay: Optional[datetime.timedelta] = None - expected_runtime: Optional[datetime.timedelta] = None - executor: Optional[str] = None - trigger_downstreams: Optional[Union[bool, dict]] = None - triggered_by: Optional[set] = None - on_upstream_rerun: Optional[str] = None - trigger_timeout: Optional[datetime.timedelta] = None + retries: int | None = None + retries_delay: datetime.timedelta | None = None + expected_runtime: datetime.timedelta | None = None + executor: str | None = None + trigger_downstreams: bool | dict | None = None + triggered_by: set | None = None + on_upstream_rerun: str | None = None + trigger_timeout: datetime.timedelta | None = None idempotent: bool = False @property diff --git a/tron/core/actiongraph.py b/tron/core/actiongraph.py index fd3477811..c4dcda504 100644 --- a/tron/core/actiongraph.py +++ b/tron/core/actiongraph.py @@ -1,9 +1,7 @@ import logging from collections import namedtuple -from typing import Mapping -from typing import Sequence -from typing import Set -from typing import Union +from collections.abc import Mapping +from collections.abc import Sequence from tron.core.action import Action from tron.utils.timeutils import delta_total_seconds @@ -18,8 +16,8 @@ class ActionGraph: def __init__( self, action_map: Mapping[str, Action], - required_actions: Mapping[str, Set[str]], - required_triggers: Mapping[str, Set[str]], + required_actions: Mapping[str, set[str]], + required_triggers: Mapping[str, set[str]], ) -> None: self.action_map = action_map self.required_actions = required_actions @@ -29,7 +27,7 @@ def __init__( self.all_triggers |= action_triggers self.all_triggers -= set(self.action_map) - def get_dependencies(self, action_name: str, include_triggers: bool = False) -> Sequence[Union[Action, Trigger]]: + def get_dependencies(self, action_name: str, include_triggers: bool = False) -> Sequence[Action | Trigger]: """Given an Action's name return the Actions (and optionally, Triggers) required to run before that Action. """ diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index dba887a1c..1f731e93a 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -5,18 +5,14 @@ import json import logging import os +from collections.abc import Callable from dataclasses import dataclass from dataclasses import fields from typing import Any -from typing import Callable from typing import cast -from typing import Dict -from typing import List from typing import Literal from typing import Optional -from typing import Set from typing import TYPE_CHECKING -from typing import Union from twisted.internet import reactor from twisted.internet.base import DelayedCall @@ -59,7 +55,7 @@ log = logging.getLogger(__name__) MAX_RECOVER_TRIES = 5 INITIAL_RECOVER_DELAY = 3 -KUBERNETES_ACTIONRUN_EXECUTORS: Set[str] = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} # type: ignore # mypy can't seem to inspect this enum +KUBERNETES_ACTIONRUN_EXECUTORS: set[str] = {ExecutorTypes.kubernetes.value, ExecutorTypes.spark.value} # type: ignore # mypy can't seem to inspect this enum class ActionRunFactory: @@ -160,12 +156,12 @@ class ActionRunAttempt(Persistable): """Stores state about one try of an action run.""" command_config: action.ActionCommandConfig - start_time: Optional[datetime.datetime] = None - end_time: Optional[datetime.datetime] = None - rendered_command: Optional[str] = None - exit_status: Optional[int] = None - mesos_task_id: Optional[str] = None - kubernetes_task_id: Optional[str] = None + start_time: datetime.datetime | None = None + end_time: datetime.datetime | None = None + rendered_command: str | None = None + exit_status: int | None = None + mesos_task_id: str | None = None + kubernetes_task_id: str | None = None def exit(self, exit_status, end_time=None): if self.end_time is None: @@ -187,7 +183,7 @@ def state_data(self): return state_data @staticmethod - def to_json(state_data: dict) -> Optional[str]: + def to_json(state_data: dict) -> str | None: """Serialize the ActionRunAttempt instance to a JSON string.""" try: return json.dumps( @@ -214,7 +210,7 @@ def to_json(state_data: dict) -> Optional[str]: raise @staticmethod - def from_json(state_data: str) -> Dict[str, Any]: # TODO: use a TypedDict + def from_json(state_data: str) -> dict[str, Any]: # TODO: use a TypedDict """Deserialize the ActionRunAttempt instance from a JSON string.""" try: json_data = json.loads(state_data) @@ -369,24 +365,24 @@ def __init__( name: str, node: node.Node, command_config: action.ActionCommandConfig, - parent_context: Optional[CommandContext] = None, - output_path: Optional[filehandler.OutputPath] = None, + parent_context: CommandContext | None = None, + output_path: filehandler.OutputPath | None = None, cleanup: bool = False, - start_time: Optional[datetime.datetime] = None, - end_time: Optional[datetime.datetime] = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, run_state: str = SCHEDULED, - exit_status: Optional[int] = None, - attempts: Optional[List[ActionRunAttempt]] = None, - action_runner: Optional[Union[NoActionRunnerFactory, SubprocessActionRunnerFactory]] = None, - retries_remaining: Optional[int] = None, - retries_delay: Optional[datetime.timedelta] = None, - machine: Optional[Machine] = None, - executor: Optional[str] = None, - trigger_downstreams: Optional[Union[bool, dict]] = None, - triggered_by: Optional[List[str]] = None, - on_upstream_rerun: Optional[schema.ActionOnRerun] = None, - trigger_timeout_timestamp: Optional[float] = None, - original_command: Optional[str] = None, + exit_status: int | None = None, + attempts: list[ActionRunAttempt] | None = None, + action_runner: NoActionRunnerFactory | SubprocessActionRunnerFactory | None = None, + retries_remaining: int | None = None, + retries_delay: datetime.timedelta | None = None, + machine: Machine | None = None, + executor: str | None = None, + trigger_downstreams: bool | dict | None = None, + triggered_by: list[str] | None = None, + on_upstream_rerun: schema.ActionOnRerun | None = None, + trigger_timeout_timestamp: float | None = None, + original_command: str | None = None, ): super().__init__() self.job_run_id = maybe_decode( @@ -561,7 +557,7 @@ def from_state( run.transition_and_notify("fail_unknown") return run - def start(self, original_command: bool = True) -> Optional[Union[bool, ActionCommand]]: + def start(self, original_command: bool = True) -> bool | ActionCommand | None: """Start this ActionRun.""" if self.in_delay is not None: log.warning(f"{self} cancelling suspend timer") @@ -613,7 +609,7 @@ def create_attempt(self, original_command=True): self.attempts.append(new_attempt) return new_attempt - def submit_command(self, attempt: ActionRunAttempt) -> Optional[Union[bool, ActionCommand]]: + def submit_command(self, attempt: ActionRunAttempt) -> bool | ActionCommand | None: raise NotImplementedError() def stop(self): @@ -622,10 +618,10 @@ def stop(self): def kill(self, final=True): raise NotImplementedError() - def recover(self) -> Optional[ActionCommand]: + def recover(self) -> ActionCommand | None: raise NotImplementedError() - def _done(self, target: str, exit_status: Optional[int] = 0) -> Optional[bool]: + def _done(self, target: str, exit_status: int | None = 0) -> bool | None: if self.machine.check(target): if self.triggered_by: EventBus.clear_subscriptions(self.__hash__()) @@ -676,7 +672,7 @@ def start_after_delay(self): self.in_delay = None self.start() - def restart(self, original_command: bool = True) -> Optional[Union[bool, ActionCommand]]: + def restart(self, original_command: bool = True) -> bool | ActionCommand | None: """Used by `fail` when action run has to be re-tried""" if self.retries_delay is not None: self.in_delay = reactor.callLater( # type: ignore # no twisted stubs @@ -697,11 +693,11 @@ def fail(self, exit_status=None): def _exit_unsuccessful( self, - exit_status: Optional[int] = None, + exit_status: int | None = None, retry_original_command: bool = True, # TODO: delete this feature or refactor to not have a mutable default value - non_retryable_exit_codes: Optional[List[int]] = [], - ) -> Optional[Union[bool, ActionCommand]]: + non_retryable_exit_codes: list[int] | None = [], + ) -> bool | ActionCommand | None: if non_retryable_exit_codes is None: non_retryable_exit_codes = [] @@ -729,7 +725,7 @@ def _exit_unsuccessful( else: return self._done("fail", exit_status) - def triggers_to_emit(self) -> List[str]: + def triggers_to_emit(self) -> list[str]: if not self.trigger_downstreams: return [] @@ -752,7 +748,7 @@ def emit_triggers(self): # TODO: cache if safe @property - def rendered_triggers(self) -> List[str]: + def rendered_triggers(self) -> list[str]: return [self.render_template(trig) for trig in self.triggered_by or []] # TODO: subscribe for events and maintain a list of remaining triggers @@ -760,7 +756,7 @@ def rendered_triggers(self) -> List[str]: def remaining_triggers(self): return [trig for trig in self.rendered_triggers if not EventBus.has_event(trig)] - def success(self) -> Optional[bool]: + def success(self) -> bool | None: transition_valid = self._done("success") if transition_valid: if self.trigger_downstreams: @@ -813,7 +809,7 @@ def state_data(self): } @staticmethod - def from_json(state_data: str) -> Dict[str, Any]: # TODO: would be nice to have a TypedDict here + def from_json(state_data: str) -> dict[str, Any]: # TODO: would be nice to have a TypedDict here """Deserialize the ActionRun instance from a JSON Dictionary.""" try: json_data = json.loads(state_data) @@ -850,7 +846,7 @@ def from_json(state_data: str) -> Dict[str, Any]: # TODO: would be nice to have return deserialized_data @staticmethod - def to_json(state_data: dict) -> Optional[str]: + def to_json(state_data: dict) -> str | None: """Serialize the ActionRun instance to a JSON string.""" action_runner = state_data.get("action_runner") @@ -979,7 +975,7 @@ def clear_end_state(self): last_attempt.exit_status = None last_attempt.end_time = None - def __getattr__(self, name: str) -> Union[Callable[[], Optional[bool]], bool]: + def __getattr__(self, name: str) -> Callable[[], bool | None] | bool: """Support convenience properties for checking if this ActionRun is in a specific state (Ex: self.is_running would check if self.state is STATE_RUNNING) or for transitioning to a new state (ex: ready). @@ -998,7 +994,7 @@ def __getattr__(self, name: str) -> Union[Callable[[], Optional[bool]], bool]: def __str__(self): return f"ActionRun: {self.id}" - def transition_and_notify(self, target: str) -> Optional[bool]: + def transition_and_notify(self, target: str) -> bool | None: if self.machine.transition(target): self.notify(self.state) return True @@ -1075,7 +1071,7 @@ def handle_unknown(self): log.info(f"Starting try #{self.recover_tries} to recover {self.id}, waiting {desired_delay}") return self.do_recover(delay=desired_delay) - def recover(self) -> Optional[Union[DelayedCall, Literal[True]]]: # type: ignore[override] # this ActionRun subclass is not long for this world (hopefully) + def recover(self) -> DelayedCall | Literal[True] | None: # type: ignore[override] # this ActionRun subclass is not long for this world (hopefully) log.info(f"Creating recovery run for actionrun {self.id}") if isinstance(self.action_runner, NoActionRunnerFactory): log.info( @@ -1093,7 +1089,7 @@ def recover(self) -> Optional[Union[DelayedCall, Literal[True]]]: # type: ignor return self.do_recover(delay=0) - def do_recover(self, delay: float) -> Optional[Union[DelayedCall, Literal[True]]]: + def do_recover(self, delay: float) -> DelayedCall | Literal[True] | None: recovery_command = f"{self.action_runner.exec_path}/recover_batch.py {self.action_runner.status_path}/{self.id}/status" # type: ignore[union-attr] # hopefully we can remove the Union with TRON-2304 command_config = action.ActionCommandConfig(command=recovery_command) rendered_command = self.render_command(recovery_command) @@ -1144,7 +1140,7 @@ def do_recover(self, delay: float) -> Optional[Union[DelayedCall, Literal[True]] def submit_recovery_command( self, recovery_run: "SSHActionRun", recovery_action_command: ActionCommand - ) -> Optional[Literal[True]]: + ) -> Literal[True] | None: log.info( f"Submitting recovery job with command {recovery_action_command.command} " f"to node {recovery_run.node}", ) @@ -1191,8 +1187,8 @@ def _create_mesos_task( mesos_cluster: MesosCluster, serializer: filehandler.OutputStreamSerializer, attempt: ActionRunAttempt, - task_id: Optional[str] = None, - ) -> Optional[MesosTask]: + task_id: str | None = None, + ) -> MesosTask | None: command_config = attempt.command_config return mesos_cluster.create_task( action_run_id=self.id, @@ -1224,7 +1220,7 @@ def submit_command(self, attempt): mesos_cluster.submit(task) return task - def recover(self) -> Optional[MesosTask]: + def recover(self) -> MesosTask | None: if not self.machine.check("running"): log.error( f"{self} unable to transition from {self.machine.state}" "to running for recovery", @@ -1336,7 +1332,7 @@ def handle_action_command_state_change(self, action_command, event, event_data=N class KubernetesActionRun(ActionRun, Observer): """An ActionRun that executes the command on a Kubernetes cluster.""" - def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: + def submit_command(self, attempt: ActionRunAttempt) -> KubernetesTask | None: """ Attempt to run a given ActionRunAttempt on the configured Kubernetes cluster. @@ -1404,7 +1400,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: return task - def recover(self) -> Optional[KubernetesTask]: + def recover(self) -> KubernetesTask | None: """ Called on Tron restart per previously running ActionRun to attempt to restart Tron's tracking of this run. See tron.core.recovery @@ -1488,7 +1484,7 @@ def recover(self) -> Optional[KubernetesTask]: return task - def stop(self) -> Optional[str]: + def stop(self) -> str | None: """ Compatibility alias for KubernetesActionRun::kill(). @@ -1497,7 +1493,7 @@ def stop(self) -> Optional[str]: """ return self.kill() - def kill(self, final: bool = True) -> Optional[str]: + def kill(self, final: bool = True) -> str | None: """ Kills the Kubernetes Pod for this ActionRun and consumes a retry. @@ -1535,11 +1531,11 @@ def kill(self, final: bool = True) -> Optional[str]: def _exit_unsuccessful( self, - exit_status: Optional[int] = None, + exit_status: int | None = None, retry_original_command: bool = True, # TODO: remove this feature or refactor so that we don't have this useless parameter on the subclass - non_retryable_exit_codes: Optional[List[int]] = None, - ) -> Optional[Union[bool, ActionCommand]]: + non_retryable_exit_codes: list[int] | None = None, + ) -> bool | ActionCommand | None: k8s_cluster = KubernetesClusterRepository.get_cluster() real_non_retryable_exit_codes = [] if not k8s_cluster else k8s_cluster.non_retryable_exit_codes @@ -1551,8 +1547,8 @@ def _exit_unsuccessful( ) def handle_action_command_state_change( - self, action_command: ActionCommand, event: str, event_data: Optional[Any] = None - ) -> Optional[Union[bool, ActionCommand]]: + self, action_command: ActionCommand, event: str, event_data: Any | None = None + ) -> bool | ActionCommand | None: """ Observe ActionCommand state changes and transition the ActionCommand state machine to a new state. """ @@ -1590,9 +1586,9 @@ def eager_all(seq): class ActionRunCollection: """A collection of ActionRuns used by a JobRun.""" - def __init__(self, action_graph: ActionGraph, run_map: Dict[str, ActionRun]): + def __init__(self, action_graph: ActionGraph, run_map: dict[str, ActionRun]): self.action_graph = action_graph - self.run_map: Dict[str, ActionRun] = run_map + self.run_map: dict[str, ActionRun] = run_map # Setup proxies self.proxy_action_runs_with_cleanup = proxy.CollectionProxy( self.get_action_runs_with_cleanup, @@ -1642,7 +1638,7 @@ def update_action_config(self, action_graph): return updated @property - def cleanup_action_run(self) -> Optional[ActionRun]: + def cleanup_action_run(self) -> ActionRun | None: return self.run_map.get(action.CLEANUP_ACTION_NAME) @property diff --git a/tron/core/job.py b/tron/core/job.py index 5e78be83a..92d6718eb 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -2,8 +2,6 @@ import json import logging from typing import Any -from typing import Dict -from typing import Optional from typing import TypeVar from tron import command_context @@ -81,19 +79,19 @@ def __init__( scheduler: GeneralScheduler, queueing: bool = True, all_nodes: bool = False, - monitoring: Optional[Dict[str, Any]] = None, - node_pool: Optional[NodePool] = None, + monitoring: dict[str, Any] | None = None, + node_pool: NodePool | None = None, enabled: bool = True, - action_graph: Optional[ActionGraph] = None, - run_collection: Optional[JobRunCollection] = None, - parent_context: Optional[command_context.CommandContext] = None, - output_path: Optional[filehandler.OutputPath] = None, - allow_overlap: Optional[bool] = None, - action_runner: Optional[SubprocessActionRunnerFactory] = None, - max_runtime: Optional[datetime.timedelta] = None, - time_zone: Optional[datetime.tzinfo] = None, - expected_runtime: Optional[datetime.timedelta] = None, - run_limit: Optional[int] = None, + action_graph: ActionGraph | None = None, + run_collection: JobRunCollection | None = None, + parent_context: command_context.CommandContext | None = None, + output_path: filehandler.OutputPath | None = None, + allow_overlap: bool | None = None, + action_runner: SubprocessActionRunnerFactory | None = None, + max_runtime: datetime.timedelta | None = None, + time_zone: datetime.tzinfo | None = None, + expected_runtime: datetime.timedelta | None = None, + run_limit: int | None = None, ): super().__init__() self.name = maybe_decode( @@ -123,7 +121,7 @@ def __init__( log.info(f"{self} created") @staticmethod - def from_json(state_data: str) -> Dict[str, Any]: # TODO: make a TypedDict for this + def from_json(state_data: str) -> dict[str, Any]: # TODO: make a TypedDict for this """deserialize the JSON string to python objects.""" # We store the following fields for jobs in DynamoDB: enabled and list of run numbers try: diff --git a/tron/core/jobgraph.py b/tron/core/jobgraph.py index 18de1bf14..7d9c76d83 100644 --- a/tron/core/jobgraph.py +++ b/tron/core/jobgraph.py @@ -1,9 +1,6 @@ from collections import defaultdict from collections import namedtuple from typing import DefaultDict -from typing import Dict -from typing import List -from typing import Optional from tron.config.config_parse import ConfigContainer from tron.core.action import Action @@ -18,14 +15,14 @@ class JobGraph: cross-job dependencies (aka triggers) """ - def __init__(self, config_container: ConfigContainer, should_validate_missing_dependency: Optional[bool] = False): + def __init__(self, config_container: ConfigContainer, should_validate_missing_dependency: bool | None = False): """Build an adjacency list and a reverse adjacency list for the graph, and store all the actions as well as which actions belong to which job """ - self.action_map: Dict[str, Action] = {} - self._actions_for_job: DefaultDict[str, List[str]] = defaultdict(list) - self._adj_list: DefaultDict[str, List[AdjListEntry]] = defaultdict(list) - self._rev_adj_list: DefaultDict[str, List[AdjListEntry]] = defaultdict(list) + self.action_map: dict[str, Action] = {} + self._actions_for_job: DefaultDict[str, list[str]] = defaultdict(list) + self._adj_list: DefaultDict[str, list[AdjListEntry]] = defaultdict(list) + self._rev_adj_list: DefaultDict[str, list[AdjListEntry]] = defaultdict(list) all_actions = set() for job_name, job_config in config_container.get_jobs().items(): diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 2167516ac..76ab94f74 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -7,8 +7,6 @@ import time from collections import deque from typing import Any -from typing import Dict -from typing import Optional import pytz @@ -60,11 +58,11 @@ def __init__( run_num: int, run_time: datetime.datetime, node: node.Node, - output_path: Optional[filehandler.OutputPath] = None, - base_context: Optional[command_context.CommandContext] = None, - action_runs: Optional[ActionRunCollection] = None, - action_graph: Optional[ActionGraph] = None, - manual: Optional[bool] = None, + output_path: filehandler.OutputPath | None = None, + base_context: command_context.CommandContext | None = None, + action_runs: ActionRunCollection | None = None, + action_graph: ActionGraph | None = None, + manual: bool | None = None, ): super().__init__() self.job_name = maybe_decode( @@ -86,7 +84,7 @@ def __init__( self.context = command_context.build_context(self, base_context) @staticmethod - def to_json(state_data: dict) -> Optional[str]: + def to_json(state_data: dict) -> str | None: """Serialize the JobRun instance to a JSON string.""" try: return json.dumps( @@ -113,7 +111,7 @@ def to_json(state_data: dict) -> Optional[str]: raise @staticmethod - def from_json(state_data: str) -> Dict[str, Any]: # TODO: make a TypedDict for this + def from_json(state_data: str) -> dict[str, Any]: # TODO: make a TypedDict for this """Deserialize the JobRun instance from a JSON string.""" try: json_data = json.loads(state_data) @@ -301,9 +299,7 @@ def _start_action_runs(self): return started_runs - def handle_action_run_state_change( - self, action_run: ActionRun, event: str, event_data: Optional[Any] = None - ) -> None: + def handle_action_run_state_change(self, action_run: ActionRun, event: str, event_data: Any | None = None) -> None: """Handle events triggered by JobRuns.""" log.info(f"{self} got an event: {event}") metrics.meter(f"tron.actionrun.{event}") @@ -386,7 +382,7 @@ def cleanup(self): def get_action_run(self, action_name): return self.action_runs.get(action_name) - def log_state_update(self, state: str, action_name: Optional[str] = None) -> None: + def log_state_update(self, state: str, action_name: str | None = None) -> None: if action_name is None: state = f"job_{state}" else: diff --git a/tron/eventbus.py b/tron/eventbus.py index 60be7a430..5fac964d2 100644 --- a/tron/eventbus.py +++ b/tron/eventbus.py @@ -5,7 +5,6 @@ import time from collections import defaultdict from collections import deque -from typing import Optional from twisted.internet import reactor @@ -148,7 +147,7 @@ def sync_load_log(self): def sync_save_log(self, reason: str) -> bool: started = time.time() new_file = os.path.join(self.log_dir, f"{int(started)}.pickle") - previous_file: Optional[str] = os.path.realpath(os.path.join(self.log_dir, "current")) + previous_file: str | None = os.path.realpath(os.path.join(self.log_dir, "current")) # if we're starting a fresh Tron server, there won't be a current symlink # and the above line will give us the path to what will eventually be the current # symlink...which is undesirable since we clean up whatever this points to :p diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 7bdcecef1..9096099bc 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -1,9 +1,7 @@ import logging +from collections.abc import Collection from logging import Logger from typing import cast -from typing import Collection -from typing import Dict -from typing import List from typing import Optional from typing import TYPE_CHECKING @@ -48,7 +46,7 @@ def combine_volumes( defaults: Collection[ConfigVolume], overrides: Collection[ConfigVolume], -) -> List[ConfigVolume]: +) -> list[ConfigVolume]: """Helper to reconcile lists of volume mounts. If any volumes have the same container path, the one in overrides wins. @@ -61,7 +59,7 @@ def combine_volumes( class KubernetesTask(ActionCommand): def __init__( - self, action_run_id: str, task_config: KubernetesTaskConfig, serializer: Optional[OutputStreamSerializer] = None + self, action_run_id: str, task_config: KubernetesTaskConfig, serializer: OutputStreamSerializer | None = None ) -> None: super().__init__(id=action_run_id, command=task_config.command, serializer=serializer) @@ -288,16 +286,16 @@ def __init__( self, kubeconfig_path: str, enabled: bool = True, - default_volumes: Optional[List[ConfigVolume]] = None, - pod_launch_timeout: Optional[int] = None, - watcher_kubeconfig_paths: Optional[List[str]] = None, - non_retryable_exit_codes: Optional[List[int]] = [], + default_volumes: list[ConfigVolume] | None = None, + pod_launch_timeout: int | None = None, + watcher_kubeconfig_paths: list[str] | None = None, + non_retryable_exit_codes: list[int] | None = [], ): # general k8s config self.kubeconfig_path = kubeconfig_path self.enabled = enabled self.non_retryable_exit_codes = non_retryable_exit_codes - self.default_volumes: Optional[List[ConfigVolume]] = default_volumes or [] + self.default_volumes: list[ConfigVolume] | None = default_volumes or [] self.pod_launch_timeout = pod_launch_timeout or DEFAULT_POD_LAUNCH_TIMEOUT_S self.watcher_kubeconfig_paths = watcher_kubeconfig_paths or [] # creating a task_proc executor has a couple steps: @@ -308,7 +306,7 @@ def __init__( # in this constructor self.processor = TaskProcessor() self.processor.load_plugin(provider_module="task_processing.plugins.kubernetes") - self.runner: Optional[Subscription] = None + self.runner: Subscription | None = None # queue to to use for tron<->task_proc communication - will hold k8s events seen # by task_processing and held for tron to process. @@ -316,10 +314,10 @@ def __init__( # this will hold the current event to process (retrieved from the PyDeferredQueue above) # which we will eventually wrap with some callbacks to actually process using the Twisted # reactor started as part of tron's startup process - self.deferred: Optional[Deferred] = None + self.deferred: Deferred | None = None # map from k8s pod names to the task that said pod corresponds to - self.tasks: Dict[str, KubernetesTask] = {} + self.tasks: dict[str, KubernetesTask] = {} # actually create the executor/runner, as mentioned above. self.connect() @@ -332,7 +330,7 @@ def connect(self) -> None: self.runner = self.get_runner(kubeconfig_path=self.kubeconfig_path, queue=self.queue) self.handle_next_event() - def get_runner(self, kubeconfig_path: str, queue: PyDeferredQueue) -> Optional[Subscription]: + def get_runner(self, kubeconfig_path: str, queue: PyDeferredQueue) -> Subscription | None: """ Gets or creates an instance of our Kubernetes task_processing plugin. """ @@ -478,7 +476,7 @@ def set_enabled(self, is_enabled: bool) -> None: else: self.stop(fail_tasks=True) - def configure_tasks(self, default_volumes: Optional[List[ConfigVolume]]) -> None: + def configure_tasks(self, default_volumes: list[ConfigVolume] | None) -> None: self.default_volumes = default_volumes def create_task( @@ -486,27 +484,27 @@ def create_task( action_run_id: str, serializer: OutputStreamSerializer, command: str, - cpus: Optional[float], - mem: Optional[float], - disk: Optional[float], + cpus: float | None, + mem: float | None, + disk: float | None, docker_image: str, - env: Dict[str, str], - secret_env: Dict[str, ConfigSecretSource], + env: dict[str, str], + secret_env: dict[str, ConfigSecretSource], secret_volumes: Collection[ConfigSecretVolume], projected_sa_volumes: Collection[ConfigProjectedSAVolume], - field_selector_env: Dict[str, ConfigFieldSelectorSource], + field_selector_env: dict[str, ConfigFieldSelectorSource], volumes: Collection[ConfigVolume], cap_add: Collection[str], cap_drop: Collection[str], - node_selectors: Dict[str, str], - node_affinities: List[ConfigNodeAffinity], - topology_spread_constraints: List[ConfigTopologySpreadConstraints], - pod_labels: Dict[str, str], - pod_annotations: Dict[str, str], - service_account_name: Optional[str], - ports: List[int], - task_id: Optional[str] = None, - ) -> Optional[KubernetesTask]: + node_selectors: dict[str, str], + node_affinities: list[ConfigNodeAffinity], + topology_spread_constraints: list[ConfigTopologySpreadConstraints], + pod_labels: dict[str, str], + pod_annotations: dict[str, str], + service_account_name: str | None, + ports: list[int], + task_id: str | None = None, + ) -> KubernetesTask | None: """ Given the execution parameters for a task, create a KubernetesTask that encapsulate those parameters. @@ -635,15 +633,15 @@ def recover(self, task: KubernetesTask) -> None: class KubernetesClusterRepository: # Kubernetes config kubernetes_enabled: bool = False - kubernetes_non_retryable_exit_codes: Optional[List[int]] = [] - kubeconfig_path: Optional[str] = None - pod_launch_timeout: Optional[int] = None - default_volumes: Optional[List[ConfigVolume]] = None - watcher_kubeconfig_paths: Optional[List[str]] = None - non_retryable_exit_codes: Optional[List[int]] = None + kubernetes_non_retryable_exit_codes: list[int] | None = [] + kubeconfig_path: str | None = None + pod_launch_timeout: int | None = None + default_volumes: list[ConfigVolume] | None = None + watcher_kubeconfig_paths: list[str] | None = None + non_retryable_exit_codes: list[int] | None = None # metadata config - clusters: Dict[str, KubernetesCluster] = {} + clusters: dict[str, KubernetesCluster] = {} # state management config state_data = {} # type: ignore # not used yet @@ -654,7 +652,7 @@ def attach(cls, _, observer): cls.state_watcher = observer @classmethod - def get_cluster(cls, kubeconfig_path: Optional[str] = None) -> Optional[KubernetesCluster]: + def get_cluster(cls, kubeconfig_path: str | None = None) -> KubernetesCluster | None: if kubeconfig_path is None: if cls.kubeconfig_path is None: return None diff --git a/tron/mesos.py b/tron/mesos.py index 7942ae93d..f0bb8b238 100644 --- a/tron/mesos.py +++ b/tron/mesos.py @@ -4,8 +4,6 @@ import socket import time from typing import Any -from typing import Dict -from typing import Optional from urllib.parse import urlparse import requests @@ -87,8 +85,8 @@ class MesosClusterRepository: secret = None name = "frameworks" - clusters: Dict[str, "MesosCluster"] = {} - state_data: Dict[str, Any] = {} + clusters: dict[str, "MesosCluster"] = {} + state_data: dict[str, Any] = {} state_watcher = None @classmethod @@ -458,7 +456,7 @@ def create_task( # type: ignore[no-untyped-def] # this file is not long for th extra_volumes, serializer, task_id=None, - ) -> Optional[MesosTask]: + ) -> MesosTask | None: if not self.runner: return None diff --git a/tron/prom_metrics.py b/tron/prom_metrics.py index 2df80de58..70538cbfe 100644 --- a/tron/prom_metrics.py +++ b/tron/prom_metrics.py @@ -1,8 +1,7 @@ import logging import time +from collections.abc import Generator from contextlib import contextmanager -from typing import Generator -from typing import Optional from prometheus_client import Counter from prometheus_client import Gauge @@ -133,8 +132,8 @@ def timer( operation_name: str, log: logging.Logger, - histogram_metric: Optional[Histogram] = None, - gauge_metric: Optional[Gauge] = None, + histogram_metric: Histogram | None = None, + gauge_metric: Gauge | None = None, ) -> Generator[None, None, None]: """Context manager for timing operations with optional Prometheus metrics.""" start_time = time.time() diff --git a/tron/serialize/filehandler.py b/tron/serialize/filehandler.py index 3ebb1e507..44d9a5cf6 100644 --- a/tron/serialize/filehandler.py +++ b/tron/serialize/filehandler.py @@ -10,8 +10,6 @@ from subprocess import PIPE from subprocess import Popen from threading import RLock -from typing import List -from typing import Optional from tron.utils import maybe_encode @@ -178,7 +176,7 @@ def full_path(self, filename): return os.path.join(self.base_path, filename) # TODO: do not use subprocess - def tail(self, filename: str, num_lines: Optional[int] = None) -> List[str]: + def tail(self, filename: str, num_lines: int | None = None) -> list[str]: """Tail a file using `tail`.""" path = self.full_path(filename) if not path or not os.path.exists(path): diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 9b035b5fd..2d64e95a3 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -8,14 +8,10 @@ import time from collections import defaultdict from collections import OrderedDict +from collections.abc import Sequence from typing import Any from typing import DefaultDict -from typing import Dict -from typing import List from typing import Literal -from typing import Optional -from typing import Sequence -from typing import Tuple from typing import TypeVar import boto3 @@ -82,7 +78,7 @@ def build_key(self, type: str, iden: str) -> str: """ return f"{type} {iden}" - def restore(self, keys: List[str], read_json: bool = False) -> Dict[str, Any]: + def restore(self, keys: list[str], read_json: bool = False) -> dict[str, Any]: """ Fetch all under the same partition key(s). ret: @@ -95,7 +91,7 @@ def restore(self, keys: List[str], read_json: bool = False) -> Dict[str, Any]: vals = self._merge_items(first_items, remaining_items, read_json) return vals - def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: + def chunk_keys(self, keys: Sequence[T]) -> list[Sequence[T]]: """Generates a list of chunks of keys to be used to read from DynamoDB""" # have a for loop here for all the key chunks we want to go over cand_keys_chunks = [] @@ -112,7 +108,7 @@ def _calculate_backoff_delay(self, attempt: int) -> int: delay: int = min(base_delay_seconds * (2 ** (safe_attempt - 1)), max_delay_seconds) return delay - def _get_items(self, table_keys: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _get_items(self, table_keys: list[dict[str, Any]]) -> list[dict[str, Any]]: items = [] # let's avoid potentially mutating our input :) cand_keys_list = copy.copy(table_keys) @@ -168,11 +164,11 @@ def _get_items(self, table_keys: List[Dict[str, Any]]) -> List[Dict[str, Any]]: raise KeyError(msg) return items - def _get_first_partitions(self, keys: List[str]) -> List[Dict[str, Any]]: + def _get_first_partitions(self, keys: list[str]) -> list[dict[str, Any]]: new_keys = [{"key": {"S": key}, "index": {"N": "0"}} for key in keys] return self._get_items(new_keys) - def _get_remaining_partitions(self, items: List, read_json: bool) -> List[Dict[str, Any]]: + def _get_remaining_partitions(self, items: list, read_json: bool) -> list[dict[str, Any]]: """Get items in the remaining partitions: N = 1 and beyond""" keys_for_remaining_items = [] for item in items: @@ -193,8 +189,8 @@ def _get_remaining_partitions(self, items: List, read_json: bool) -> List[Dict[s return self._get_items(keys_for_remaining_items) def _merge_items( - self, first_items: List[Dict[str, Any]], remaining_items: List[Dict[str, Any]], read_json: bool = False - ) -> Dict[str, Any]: + self, first_items: list[dict[str, Any]], remaining_items: list[dict[str, Any]], read_json: bool = False + ) -> dict[str, Any]: """ Helper to merge multi-partition data into a single entry. If read_json is False, we merge the pickle partitions - otherwise, we merge the json ones. @@ -241,7 +237,7 @@ def _merge_items( deserialized_items = {k: pickle.loads(v) for k, v in raw_items.items()} return deserialized_items - def save(self, key_value_pairs: List[Tuple[str, Optional[Dict[str, Any]]]]) -> None: + def save(self, key_value_pairs: list[tuple[str, dict[str, Any] | None]]) -> None: """Add items to the save_queue to be later consumed by _consume_save_queue""" for key, val in key_value_pairs: while True: @@ -292,7 +288,7 @@ def get_type_from_key(self, key: str) -> str: return key.split()[0] # TODO: TRON-2305 - In an ideal world, we wouldn't be passing around state/state_data dicts. It would be a lot nicer to have regular objects here - def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> Optional[str]: # type: ignore + def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: dict[str, Any]) -> str | None: # type: ignore try: if key == runstate.JOB_STATE: log.info(f"Serializing Job: {state.get('job_name')}") @@ -307,7 +303,7 @@ def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STAT prom_metrics.json_serialization_errors_counter.inc() return None - def _deserialize_item(self, key: str, state: str) -> Dict[str, Any]: + def _deserialize_item(self, key: str, state: str) -> dict[str, Any]: try: json_key = key.split(" ")[0] if json_key == runstate.JOB_STATE: @@ -339,7 +335,7 @@ def _save_loop(self): log.error("too many dynamodb errors in a row, crashing") os.exit(1) - def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: + def __setitem__(self, key: str, value: tuple[bytes, str]) -> None: """ Partition the item and write up to self.max_transact_write_items partitions atomically using TransactWriteItems. @@ -365,7 +361,7 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: prom_metrics.tron_dynamodb_partitions_histogram.observe(max_partitions) for index in range(max_partitions): - item: Dict[str, Any] = { # TODO: replace this with a TypedDict + item: dict[str, Any] = { # TODO: replace this with a TypedDict "Put": { "Item": { "key": { @@ -439,7 +435,7 @@ def _delete_item(self, key: str) -> None: delta=time.time() - start, ) - def _get_num_of_partitions(self, key: str) -> Tuple[int, int]: + def _get_num_of_partitions(self, key: str) -> tuple[int, int]: """ Return the number of partitions an item is divided into for both pickled and JSON data. """ diff --git a/tron/serialize/runstate/shelvestore.py b/tron/serialize/runstate/shelvestore.py index 67a039c07..ce0a4e555 100644 --- a/tron/serialize/runstate/shelvestore.py +++ b/tron/serialize/runstate/shelvestore.py @@ -5,8 +5,6 @@ import sys from io import BytesIO from typing import Any -from typing import Dict -from typing import List import bsddb3 # type: ignore @@ -96,7 +94,7 @@ def save(self, key_value_pairs): self.shelve[shelve_key] = state_data self.shelve.sync() - def restore(self, keys: List[ShelveKey], read_json: bool = False) -> Dict[ShelveKey, Any]: + def restore(self, keys: list[ShelveKey], read_json: bool = False) -> dict[ShelveKey, Any]: items = zip( keys, (self.shelve.get(str(key.key)) for key in keys), diff --git a/tron/serialize/runstate/statemanager.py b/tron/serialize/runstate/statemanager.py index 8dfca5b0b..f8ebb9206 100644 --- a/tron/serialize/runstate/statemanager.py +++ b/tron/serialize/runstate/statemanager.py @@ -7,8 +7,6 @@ from contextlib import contextmanager from typing import Any from typing import cast -from typing import Dict -from typing import List from tron.config import schema from tron.core import job @@ -108,7 +106,7 @@ def __init__(self, persistence_impl, buffer): self._impl = persistence_impl # TODO: get rid of the Any here - hopefully with a TypedDict - def restore(self, job_names: List[str], read_json: bool = False) -> Dict[str, Any]: + def restore(self, job_names: list[str], read_json: bool = False) -> dict[str, Any]: """Return the most recent serialized state.""" log.debug("Restoring state.") @@ -139,8 +137,8 @@ def restore(self, job_names: List[str], read_json: bool = False) -> Dict[str, An # TODO: get rid of the Any here - hopefully with a TypedDict def _restore_runs_for_job( - self, job_name: str, job_state: Dict[str, Any], read_json: bool = False - ) -> List[Dict[str, Any]]: + self, job_name: str, job_state: dict[str, Any], read_json: bool = False + ) -> list[dict[str, Any]]: """Restore the state for the runs of each job""" run_nums = job_state["run_nums"] keys = [jobrun.get_job_run_id(job_name, run_num) for run_num in run_nums] @@ -162,7 +160,7 @@ def _keys_for_items(self, item_type, names): return dict(zip(keys, names)) # TODO: get rid of the Any here - hopefully with a TypedDict - def _restore_dicts(self, item_type: str, items: List[str], read_json: bool = False) -> Dict[str, Any]: + def _restore_dicts(self, item_type: str, items: list[str], read_json: bool = False) -> dict[str, Any]: """Return a dict mapping of the items name to its state data.""" key_to_item_map = self._keys_for_items(item_type, items) key_to_state_map = self._impl.restore(key_to_item_map.keys(), read_json) @@ -300,7 +298,7 @@ def shutdown(self): def disabled(self): return self.state_manager.disabled() - def restore(self, jobs: List[str], read_json: bool = False) -> Dict[str, Any]: + def restore(self, jobs: list[str], read_json: bool = False) -> dict[str, Any]: # HACK: this cast is nasty, but we should probably refactor things so that the default self.state_manager # in not a NullStateManager return cast(PersistentStateManager, self.state_manager).restore(jobs, read_json) diff --git a/tron/utils/crontab.py b/tron/utils/crontab.py index 1ee5bd07e..11ad1cbc4 100644 --- a/tron/utils/crontab.py +++ b/tron/utils/crontab.py @@ -2,11 +2,6 @@ import calendar import itertools import re -from typing import List -from typing import Optional -from typing import Set -from typing import Tuple -from typing import Union PREDEFINED_SCHEDULE = { "@yearly": "0 0 1 1 *", @@ -33,7 +28,7 @@ class FieldParser: """Parse and validate a field in a crontab entry.""" name: str = "" - bounds: Tuple[int, int] = (0, 0) + bounds: tuple[int, int] = (0, 0) range_pattern = re.compile( r""" (?P\d+|\*) # Initial value @@ -46,20 +41,20 @@ class FieldParser: def normalize(self, source: str) -> str: return source.strip() - def get_groups(self, source: str) -> List[str]: + def get_groups(self, source: str) -> list[str]: return source.split(",") - def parse(self, source: str) -> Optional[Union[List[int], List[Union[int, str]]]]: + def parse(self, source: str) -> list[int] | list[int | str] | None: if source == "*": return None - groups: Set[Union[int, str]] = set( + groups: set[int | str] = set( itertools.chain.from_iterable(self.get_values(group) for group in self.get_groups(source)) ) has_last = "LAST" in groups if has_last: groups.remove("LAST") - sorted_groups: List[Union[int, str]] = sorted(groups, key=lambda x: (isinstance(x, str), x)) + sorted_groups: list[int | str] = sorted(groups, key=lambda x: (isinstance(x, str), x)) if has_last: sorted_groups.append("LAST") @@ -71,7 +66,7 @@ def get_match_groups(self, source: str) -> dict: raise ValueError("Unknown expression: %s" % source) return match.groupdict() - def get_values(self, source: str) -> List[Union[int, str]]: + def get_values(self, source: str) -> list[int | str]: source = self.normalize(source) match_groups = self.get_match_groups(source) step = 1 @@ -81,7 +76,7 @@ def get_values(self, source: str) -> List[Union[int, str]]: step = self.validate_bounds(match_groups["step"]) return self.get_range(min_value, max_value, step) - def get_value_range(self, match_groups: dict) -> Tuple[int, int]: + def get_value_range(self, match_groups: dict) -> tuple[int, int]: if match_groups["min"] == "*": return self.bounds @@ -93,7 +88,7 @@ def get_value_range(self, match_groups: dict) -> Tuple[int, int]: return min_value, min_value + 1 - def get_range(self, min_value: int, max_value: int, step: int) -> List[Union[int, str]]: + def get_range(self, min_value: int, max_value: int, step: int) -> list[int | str]: if min_value < max_value: return list(range(min_value, max_value, step)) @@ -123,7 +118,7 @@ class MonthdayFieldParser(FieldParser): name = "monthdays" bounds = (1, 32) - def get_values(self, source: str) -> List[Union[int, str]]: + def get_values(self, source: str) -> list[int | str]: # Handle special case for last day of month source = self.normalize(source) if source == "L": diff --git a/tron/utils/logreader.py b/tron/utils/logreader.py index a4575647e..7cda287c7 100644 --- a/tron/utils/logreader.py +++ b/tron/utils/logreader.py @@ -2,11 +2,8 @@ import json import logging import operator +from collections.abc import Iterator from functools import lru_cache -from typing import Iterator -from typing import List -from typing import Optional -from typing import Tuple import staticconf import yaml @@ -48,15 +45,13 @@ def get_superregion() -> str: return f.read().strip() -def decompose_action_id(action_run_id: str, paasta_cluster: str) -> Tuple[str, str, str, str]: +def decompose_action_id(action_run_id: str, paasta_cluster: str) -> tuple[str, str, str, str]: namespace, job_name, run_num, action = action_run_id.split(".") for ext in ["yaml", "yml"]: try: with open(f"/nail/etc/services/{namespace}/tron-{paasta_cluster}.{ext}") as f: config = yaml.load(f, Loader=yaml.CSafeLoader) - service: Optional[str] = ( - config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None) - ) + service: str | None = config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None) if service: return service, job_name, run_num, action except FileNotFoundError: @@ -87,10 +82,10 @@ def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> N self.run_num = int(run_num) self.num_lines = 0 self.malformed_lines = 0 - self.output: List[Tuple[str, str]] = [] + self.output: list[tuple[str, str]] = [] self.truncated_output = False - def fetch(self, stream: Iterator[str], max_lines: Optional[int]) -> None: + def fetch(self, stream: Iterator[str], max_lines: int | None) -> None: for line in stream: if max_lines is not None and self.num_lines == max_lines: self.truncated_output = True @@ -118,7 +113,7 @@ def fetch(self, stream: Iterator[str], max_lines: Optional[int]) -> None: ): self.output.append((payload["timestamp"], payload["message"])) - def sorted_lines(self) -> List[str]: + def sorted_lines(self) -> list[str]: self.output.sort(key=operator.itemgetter(0)) return [line for _, line in self.output] @@ -126,11 +121,11 @@ def sorted_lines(self) -> List[str]: def read_log_stream_for_action_run( action_run_id: str, component: str, - min_date: Optional[datetime.datetime], - max_date: Optional[datetime.datetime], - paasta_cluster: Optional[str], - max_lines: Optional[int] = USE_SRV_CONFIGS, -) -> List[str]: + min_date: datetime.datetime | None, + max_date: datetime.datetime | None, + paasta_cluster: str | None, + max_lines: int | None = USE_SRV_CONFIGS, +) -> list[str]: if min_date is None: return [f"{action_run_id} has not started yet."] @@ -155,7 +150,7 @@ def read_log_stream_for_action_run( paasta_logs = PaaSTALogs(component, paasta_cluster, action_run_id) stream_name = paasta_logs.stream_name - end_date: Optional[datetime.date] + end_date: datetime.date | None # S3 reader accepts datetime objects and respects timezone information # if min_date and max_date timezone is missing, astimezone() will assume local timezone and convert it to UTC diff --git a/tron/utils/persistable.py b/tron/utils/persistable.py index 9ba1e7036..7acc54f39 100644 --- a/tron/utils/persistable.py +++ b/tron/utils/persistable.py @@ -1,19 +1,17 @@ from abc import ABC from abc import abstractmethod from typing import Any -from typing import Dict -from typing import Optional class Persistable(ABC): @staticmethod @abstractmethod - def to_json(state_data: Dict[Any, Any]) -> Optional[str]: + def to_json(state_data: dict[Any, Any]) -> str | None: pass @staticmethod @abstractmethod - def from_json(state_data: str) -> Dict[str, Any]: + def from_json(state_data: str) -> dict[str, Any]: # This method is called on because it is intended to handle the deserialization of JSON data into a # dictionary representation of the state. This allows the method to be used in a more flexible and generic way, # enabling different classes to implement their own specific logic for converting the dictionary into an instance of the diff --git a/tron/utils/state.py b/tron/utils/state.py index 2f1d65d3a..eb90b3b50 100644 --- a/tron/utils/state.py +++ b/tron/utils/state.py @@ -1,6 +1,6 @@ import logging from collections import defaultdict -from typing import Mapping +from collections.abc import Mapping log = logging.getLogger(__name__) diff --git a/yelp_package/jammy/Dockerfile b/yelp_package/jammy/Dockerfile index 841920d9e..44bcde1b8 100644 --- a/yelp_package/jammy/Dockerfile +++ b/yelp_package/jammy/Dockerfile @@ -24,8 +24,8 @@ RUN apt-get -q update && \ libyaml-dev \ libssl-dev \ libffi-dev \ - python3.8-dev \ - python3.8-distutils \ + python3.10-dev \ + python3.10-distutils \ python3-pip \ rust-all \ tox \