Skip to content

Conversation

@nleigh
Copy link
Contributor

@nleigh nleigh commented Jun 12, 2025

https://jira.yelpcorp.com/browse/FLINK-5725

Refactor Flink Paasta Status

Move Flink Paasta Status functionality into own file

Paasta Status Had a lot of Flink specific functionality just to Flink
Will make future refactors to Flink status easier and more isolated

Addressed circular dependency errors

Easier to follow per commit msg

Future Refactors Plan
https://yelp.slack.com/archives/CA05GTDB9/p1749650708339909?thread_ts=1747320721.347129&cid=CA05GTDB9

nleigh and others added 6 commits June 12, 2025 07:27
https://jira.yelpcorp.com/browse/FLINK-5725

Refactor Flink Paasta Status

Move Flink Paasta Status functionality into own file

Paasta Status Had a lot of Flink specific functionality just to Flink
Will make future refactors to Flink easier and more isolated
https://jira.yelpcorp.com/browse/FLINK-5725

Errors running tests
```
 ImportError: cannot import name 'append_pod_status' from partially initialized module 'paasta_tools.cli.cmds.status', is due to a circular import dependency.
```

Moved function dependencies used by new flink paasta status file and status file to new file to avoid circular dependency issue
https://jira.yelpcorp.com/browse/FLINK-5725

Errors running tests
```
 ERROR tests/cli/test_cmds_help.py - AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser'
```

Used LLM agent to fix issue

 ```
 The error AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser' arises because the get_argparser function in paasta_tools/cli/cli.py attempts to load an add_subparser function from every module it finds in the paasta_tools/cli/cmds directory.

 However, status_flink.py is a helper module for the status command and is not intended to be a standalone subcommand with its own add_subparser function.

 To fix this, I will modify get_argparser to only iterate over the explicitly defined subcommands in the PAASTA_SUBCOMMANDS dictionary.

 This dictionary maps user-facing command names to the Python module names that implement them. This change will prevent the CLI from attempting to treat status_flink (and potentially other helper modules) as a main command.

 I've updated the get_argparser function in paasta_tools/cli/cli.py to iterate over the PAASTA_SUBCOMMANDS dictionary instead of dynamically walking the cmds package.

 This will ensure that add_subparser is only called for modules that are actual PaaSTA subcommands and are expected to have an add_subparser function.

 This change should resolve the AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser' and allow the tests in tests/cli/test_cmds_help.py to be collected correctly.
https://jira.yelpcorp.com/browse/FLINK-5725

Errors running tests
```
TestPrintFlinkStatus.test_error_no_flink

self = <tests.cli.test_cmds_status.TestPrintFlinkStatus object at 0x7fa472d5e730>
mock_load_system_paasta_config = <function load_flink_instance_config at 0x7fa4702aa550>
mock_get_paasta_oapi_client = <function get_paasta_oapi_client at 0x7fa4703283a0>
mock_load_flink_instance_config = <function load_system_paasta_config at 0x7fa4703285e0>
mock_flink_status = defaultdict(None, {'metadata': {'annotations': {'flink.yelp.com/dashboard_url': 'http://flink.k8s.fake_cluster.paasta:31080/app-9bf849b89'}, 'labels': {'paasta.yelp.com/config_sha': 'config00000'}}, 'status': None})
system_paasta_config = SystemPaastaConfig({'cluster': 'fake_cluster', 'api_endpoints': {'fake_cluster': 'http://fake_cluster:5054'}, 'api_cli...envoy': {}}, 'kube_clusters': {'pnw-prod': {'aws_account': 'prod'}, 'pnw-devc': {'aws_account': 'dev'}}}, '/fake_dir/')
flink_instance_config = FlinkDeploymentConfig('fake_service', 'fake_instance', 'fake_cluster', {'spot': False, 'monitoring': {'team': 'fake_owner', 'runbook': 'fake_runbook_url'}}, None, '/nail/etc/services')

    @patch("paasta_tools.api.client.load_system_paasta_config", autospec=True)
    @patch("paasta_tools.cli.cmds.status_flink.get_paasta_oapi_client", autospec=True)
    @patch(
        "paasta_tools.cli.cmds.status_flink.load_flink_instance_config", autospec=True
    )
    def test_error_no_flink(
        self,
        mock_load_system_paasta_config,
        mock_get_paasta_oapi_client,
        mock_load_flink_instance_config,
        # Fixtures
        mock_flink_status,
        system_paasta_config,
        flink_instance_config,
    ):
        mock_load_system_paasta_config.return_value = system_paasta_config
        mock_flink_status["status"] = None
        mock_load_flink_instance_config.return_value = flink_instance_config
        mock_get_paasta_oapi_client.return_value = None

        output = []
>       return_value = print_flink_status(
            cluster="fake_cluster",
            service="fake_service",
            instance="fake_instance",
            output=output,
            flink=mock_flink_status,
            verbose=1,
        )

tests/cli/test_cmds_status.py:2473:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
paasta_tools/cli/cmds/status_flink.py:349: in print_flink_status
    system_paasta_config = load_system_paasta_config()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

path = '/etc/paasta/'

    def load_system_paasta_config(
        path: str = PATH_TO_SYSTEM_PAASTA_CONFIG_DIR,
    ) -> "SystemPaastaConfig":
        """
        Reads Paasta configs in specified directory in lexicographical order and deep merges
        the dictionaries (last file wins).
        """
        if not os.path.isdir(path):
>           raise PaastaNotConfiguredError(
                "Could not find system paasta configuration directory: %s" % path
            )
E           paasta_tools.utils.PaastaNotConfiguredError: Could not find system paasta configuration directory: /etc/paasta/

paasta_tools/utils.py:2079: PaastaNotConfiguredError
```

Fix
```
In your test TestPrintFlinkStatus.test_error_no_flink, the issue stems from two things:
Incorrect patch target: The load_system_paasta_config function is called from within paasta_tools.cli.cmds.status_flink.py. However, your first patch decorator is targeting paasta_tools.api.client.load_system_paasta_config.
Mismatched mock assignments: Due to the order of decorators, the mock parameters in your test method signature don't align with their names if read naively. The first parameter mocks the innermost (third) decorator, and the third parameter mocks the outermost (first) decorator. This leads to system_paasta_config and flink_instance_config being assigned to the wrong mocks.
```
@nleigh nleigh marked this pull request as ready for review June 13, 2025 17:42
@nleigh nleigh requested a review from a team as a code owner June 13, 2025 17:42
@nleigh nleigh requested a review from Copilot June 13, 2025 17:43
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR isolates Flink-specific status logic into a new status_flink module, updates tests to reference the new location, and adds pod uptime/status formatting utilities.

  • Extracted Flink helpers (get_flink_job_name, OUTPUT_HORIZONTAL_RULE) into cli/cmds/status_flink.py and updated imports/patches in tests.
  • Introduced get_pod_uptime and append_pod_status in cli/utils.py for Kubernetes pod status formatting.
  • Modified the CLI’s subcommand loader in cli/cli.py to use PAASTA_SUBCOMMANDS directly.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

File Description
tests/cli/test_cmds_status.py Switched imports and @patch targets from status to status_flink, but test fixture assignments appear swapped.
paasta_tools/cli/utils.py Added new helper functions for pod uptime calculation and table rendering; relies on format_table.
paasta_tools/cli/cli.py Replaced dynamic discovery via cmds package with iteration over PAASTA_SUBCOMMANDS.
Comments suppressed due to low confidence (1)

paasta_tools/cli/utils.py:1127

  • New functions get_pod_uptime and append_pod_status lack dedicated unit tests—consider adding tests for various timestamp formats and pod states.
def get_pod_uptime(pod_deployed_timestamp: str) -> str:

https://jira.yelpcorp.com/browse/FLINK-5725

Errors running tests
```
TestPrintFlinkStatus.test_error_no_flink

self = <tests.cli.test_cmds_status.TestPrintFlinkStatus object at 0x7fa472d5e730>
mock_load_system_paasta_config = <function load_flink_instance_config at 0x7fa4702aa550>
mock_get_paasta_oapi_client = <function get_paasta_oapi_client at 0x7fa4703283a0>
mock_load_flink_instance_config = <function load_system_paasta_config at 0x7fa4703285e0>
mock_flink_status = defaultdict(None, {'metadata': {'annotations': {'flink.yelp.com/dashboard_url': 'http://flink.k8s.fake_cluster.paasta:31080/app-9bf849b89'}, 'labels': {'paasta.yelp.com/config_sha': 'config00000'}}, 'status': None})
system_paasta_config = SystemPaastaConfig({'cluster': 'fake_cluster', 'api_endpoints': {'fake_cluster': 'http://fake_cluster:5054'}, 'api_cli...envoy': {}}, 'kube_clusters': {'pnw-prod': {'aws_account': 'prod'}, 'pnw-devc': {'aws_account': 'dev'}}}, '/fake_dir/')
flink_instance_config = FlinkDeploymentConfig('fake_service', 'fake_instance', 'fake_cluster', {'spot': False, 'monitoring': {'team': 'fake_owner', 'runbook': 'fake_runbook_url'}}, None, '/nail/etc/services')

    @patch("paasta_tools.api.client.load_system_paasta_config", autospec=True)
    @patch("paasta_tools.cli.cmds.status_flink.get_paasta_oapi_client", autospec=True)
    @patch(
        "paasta_tools.cli.cmds.status_flink.load_flink_instance_config", autospec=True
    )
    def test_error_no_flink(
        self,
        mock_load_system_paasta_config,
        mock_get_paasta_oapi_client,
        mock_load_flink_instance_config,
        # Fixtures
        mock_flink_status,
        system_paasta_config,
        flink_instance_config,
    ):
        mock_load_system_paasta_config.return_value = system_paasta_config
        mock_flink_status["status"] = None
        mock_load_flink_instance_config.return_value = flink_instance_config
        mock_get_paasta_oapi_client.return_value = None

        output = []
>       return_value = print_flink_status(
            cluster="fake_cluster",
            service="fake_service",
            instance="fake_instance",
            output=output,
            flink=mock_flink_status,
            verbose=1,
        )

tests/cli/test_cmds_status.py:2473:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
paasta_tools/cli/cmds/status_flink.py:349: in print_flink_status
    system_paasta_config = load_system_paasta_config()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

path = '/etc/paasta/'

    def load_system_paasta_config(
        path: str = PATH_TO_SYSTEM_PAASTA_CONFIG_DIR,
    ) -> "SystemPaastaConfig":
        """
        Reads Paasta configs in specified directory in lexicographical order and deep merges
        the dictionaries (last file wins).
        """
        if not os.path.isdir(path):
>           raise PaastaNotConfiguredError(
                "Could not find system paasta configuration directory: %s" % path
            )
E           paasta_tools.utils.PaastaNotConfiguredError: Could not find system paasta configuration directory: /etc/paasta/

paasta_tools/utils.py:2079: PaastaNotConfiguredError
```

Fix
```
In your test TestPrintFlinkStatus.test_error_no_flink, the issue stems from two things:
Incorrect patch target: The load_system_paasta_config function is called from within paasta_tools.cli.cmds.status_flink.py. However, your first patch decorator is targeting paasta_tools.api.client.load_system_paasta_config.
Mismatched mock assignments: Due to the order of decorators, the mock parameters in your test method signature don't align with their names if read naively. The first parameter mocks the innermost (third) decorator, and the third parameter mocks the outermost (first) decorator. This leads to system_paasta_config and flink_instance_config being assigned to the wrong mocks.
```
@nleigh nleigh requested a review from nemacysts June 16, 2025 08:50
@nleigh nleigh changed the title FLINK-5725: Refactor Flink Paasta Status FLINK-5725: Move Flink Paasta Status functionality into own file Jun 16, 2025
Copy link
Contributor

@marcos-sb marcos-sb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Comment on lines +179 to +182
for cli_command_name, module_name in sorted(PAASTA_SUBCOMMANDS.items()):
# cli_command_name is the user-facing command, module_name is the python module filename
command_choices.append(
(command, (add_subparser, [command, subparsers], {}))
(cli_command_name, (add_subparser, [module_name, subparsers], {}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: what is this change for?

Comment on lines +1127 to +1138
def get_pod_uptime(pod_deployed_timestamp: str) -> str:
# NOTE: the k8s API returns timestamps in UTC, so we make sure to always work in UTC
pod_creation_time = datetime.strptime(
pod_deployed_timestamp, "%Y-%m-%dT%H:%M:%SZ"
).replace(tzinfo=timezone.utc)
pod_uptime = datetime.now(timezone.utc) - pod_creation_time
pod_uptime_total_seconds = pod_uptime.total_seconds()
pod_uptime_days = divmod(pod_uptime_total_seconds, 86400)
pod_uptime_hours = divmod(pod_uptime_days[1], 3600)
pod_uptime_minutes = divmod(pod_uptime_hours[1], 60)
pod_uptime_seconds = divmod(pod_uptime_minutes[1], 1)
return f"{int(pod_uptime_days[0])}d{int(pod_uptime_hours[0])}h{int(pod_uptime_minutes[0])}m{int(pod_uptime_seconds[0])}s"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be in a more flink-oriented file - i don't think we use this uptime format elsewhere (we usually do something like what arrow.humanize() and whatnot return and present something like 2 days ago or whatnot

return f"{int(pod_uptime_days[0])}d{int(pod_uptime_hours[0])}h{int(pod_uptime_minutes[0])}m{int(pod_uptime_seconds[0])}s"


def append_pod_status(pod_status: List[Dict[str, Any]], output: List[str]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here - in general, it's probably best to make sure these aren't targeted for reuse by folks otherwise it'll make changing the flink status output harder since you'd have multiple consumers of your helpers :p

(also, this is a bit misplaced here i think - this is very closely tied to paasta status-like commands, i don't think any other cli commands would ever use this util)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, speaking of: it does look like the kafka operator relies on this :p

)


def should_job_info_be_shown(cluster_state):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def should_job_info_be_shown(cluster_state):
def should_job_info_be_shown(cluster_state: str) ->:

we're just moving this, but might as well type this simple function while we're at it :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like there's a bunch of formatting changes in this file?

Comment on lines +2469 to +2471
mock_load_system_paasta_config.return_value = flink_instance_config
mock_flink_status["status"] = None
mock_load_flink_instance_config.return_value = flink_instance_config
mock_load_flink_instance_config.return_value = system_paasta_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem quite right - these look flipped

Comment on lines +2507 to +2509
mock_load_system_paasta_config.return_value = flink_instance_config
mock_get_paasta_oapi_client.return_value = None
mock_load_flink_instance_config.return_value = flink_instance_config
mock_load_flink_instance_config.return_value = system_paasta_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@nemacysts nemacysts requested a review from EvanKrall June 17, 2025 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants