-
Notifications
You must be signed in to change notification settings - Fork 244
FLINK-5725: Move Flink Paasta Status functionality into own file #4082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
FLINK-5725: Move Flink Paasta Status functionality into own file #4082
Conversation
https://jira.yelpcorp.com/browse/FLINK-5725 Refactor Flink Paasta Status Move Flink Paasta Status functionality into own file Paasta Status Had a lot of Flink specific functionality just to Flink Will make future refactors to Flink easier and more isolated
https://jira.yelpcorp.com/browse/FLINK-5725 To use new flink paasta status file function locations
https://jira.yelpcorp.com/browse/FLINK-5725 Errors running tests ``` ImportError: cannot import name 'append_pod_status' from partially initialized module 'paasta_tools.cli.cmds.status', is due to a circular import dependency. ``` Moved function dependencies used by new flink paasta status file and status file to new file to avoid circular dependency issue
https://jira.yelpcorp.com/browse/FLINK-5725 Errors running tests ``` ERROR tests/cli/test_cmds_help.py - AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser' ``` Used LLM agent to fix issue ``` The error AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser' arises because the get_argparser function in paasta_tools/cli/cli.py attempts to load an add_subparser function from every module it finds in the paasta_tools/cli/cmds directory. However, status_flink.py is a helper module for the status command and is not intended to be a standalone subcommand with its own add_subparser function. To fix this, I will modify get_argparser to only iterate over the explicitly defined subcommands in the PAASTA_SUBCOMMANDS dictionary. This dictionary maps user-facing command names to the Python module names that implement them. This change will prevent the CLI from attempting to treat status_flink (and potentially other helper modules) as a main command. I've updated the get_argparser function in paasta_tools/cli/cli.py to iterate over the PAASTA_SUBCOMMANDS dictionary instead of dynamically walking the cmds package. This will ensure that add_subparser is only called for modules that are actual PaaSTA subcommands and are expected to have an add_subparser function. This change should resolve the AttributeError: module 'paasta_tools.cli.cmds.status_flink' has no attribute 'add_subparser' and allow the tests in tests/cli/test_cmds_help.py to be collected correctly.
https://jira.yelpcorp.com/browse/FLINK-5725 Errors running tests ``` TestPrintFlinkStatus.test_error_no_flink self = <tests.cli.test_cmds_status.TestPrintFlinkStatus object at 0x7fa472d5e730> mock_load_system_paasta_config = <function load_flink_instance_config at 0x7fa4702aa550> mock_get_paasta_oapi_client = <function get_paasta_oapi_client at 0x7fa4703283a0> mock_load_flink_instance_config = <function load_system_paasta_config at 0x7fa4703285e0> mock_flink_status = defaultdict(None, {'metadata': {'annotations': {'flink.yelp.com/dashboard_url': 'http://flink.k8s.fake_cluster.paasta:31080/app-9bf849b89'}, 'labels': {'paasta.yelp.com/config_sha': 'config00000'}}, 'status': None}) system_paasta_config = SystemPaastaConfig({'cluster': 'fake_cluster', 'api_endpoints': {'fake_cluster': 'http://fake_cluster:5054'}, 'api_cli...envoy': {}}, 'kube_clusters': {'pnw-prod': {'aws_account': 'prod'}, 'pnw-devc': {'aws_account': 'dev'}}}, '/fake_dir/') flink_instance_config = FlinkDeploymentConfig('fake_service', 'fake_instance', 'fake_cluster', {'spot': False, 'monitoring': {'team': 'fake_owner', 'runbook': 'fake_runbook_url'}}, None, '/nail/etc/services') @patch("paasta_tools.api.client.load_system_paasta_config", autospec=True) @patch("paasta_tools.cli.cmds.status_flink.get_paasta_oapi_client", autospec=True) @patch( "paasta_tools.cli.cmds.status_flink.load_flink_instance_config", autospec=True ) def test_error_no_flink( self, mock_load_system_paasta_config, mock_get_paasta_oapi_client, mock_load_flink_instance_config, # Fixtures mock_flink_status, system_paasta_config, flink_instance_config, ): mock_load_system_paasta_config.return_value = system_paasta_config mock_flink_status["status"] = None mock_load_flink_instance_config.return_value = flink_instance_config mock_get_paasta_oapi_client.return_value = None output = [] > return_value = print_flink_status( cluster="fake_cluster", service="fake_service", instance="fake_instance", output=output, flink=mock_flink_status, verbose=1, ) tests/cli/test_cmds_status.py:2473: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ paasta_tools/cli/cmds/status_flink.py:349: in print_flink_status system_paasta_config = load_system_paasta_config() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ path = '/etc/paasta/' def load_system_paasta_config( path: str = PATH_TO_SYSTEM_PAASTA_CONFIG_DIR, ) -> "SystemPaastaConfig": """ Reads Paasta configs in specified directory in lexicographical order and deep merges the dictionaries (last file wins). """ if not os.path.isdir(path): > raise PaastaNotConfiguredError( "Could not find system paasta configuration directory: %s" % path ) E paasta_tools.utils.PaastaNotConfiguredError: Could not find system paasta configuration directory: /etc/paasta/ paasta_tools/utils.py:2079: PaastaNotConfiguredError ``` Fix ``` In your test TestPrintFlinkStatus.test_error_no_flink, the issue stems from two things: Incorrect patch target: The load_system_paasta_config function is called from within paasta_tools.cli.cmds.status_flink.py. However, your first patch decorator is targeting paasta_tools.api.client.load_system_paasta_config. Mismatched mock assignments: Due to the order of decorators, the mock parameters in your test method signature don't align with their names if read naively. The first parameter mocks the innermost (third) decorator, and the third parameter mocks the outermost (first) decorator. This leads to system_paasta_config and flink_instance_config being assigned to the wrong mocks. ```
…linkPaastaStatus2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR isolates Flink-specific status logic into a new status_flink module, updates tests to reference the new location, and adds pod uptime/status formatting utilities.
- Extracted Flink helpers (
get_flink_job_name,OUTPUT_HORIZONTAL_RULE) intocli/cmds/status_flink.pyand updated imports/patches in tests. - Introduced
get_pod_uptimeandappend_pod_statusincli/utils.pyfor Kubernetes pod status formatting. - Modified the CLI’s subcommand loader in
cli/cli.pyto usePAASTA_SUBCOMMANDSdirectly.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| tests/cli/test_cmds_status.py | Switched imports and @patch targets from status to status_flink, but test fixture assignments appear swapped. |
| paasta_tools/cli/utils.py | Added new helper functions for pod uptime calculation and table rendering; relies on format_table. |
| paasta_tools/cli/cli.py | Replaced dynamic discovery via cmds package with iteration over PAASTA_SUBCOMMANDS. |
Comments suppressed due to low confidence (1)
paasta_tools/cli/utils.py:1127
- New functions
get_pod_uptimeandappend_pod_statuslack dedicated unit tests—consider adding tests for various timestamp formats and pod states.
def get_pod_uptime(pod_deployed_timestamp: str) -> str:
https://jira.yelpcorp.com/browse/FLINK-5725 Errors running tests ``` TestPrintFlinkStatus.test_error_no_flink self = <tests.cli.test_cmds_status.TestPrintFlinkStatus object at 0x7fa472d5e730> mock_load_system_paasta_config = <function load_flink_instance_config at 0x7fa4702aa550> mock_get_paasta_oapi_client = <function get_paasta_oapi_client at 0x7fa4703283a0> mock_load_flink_instance_config = <function load_system_paasta_config at 0x7fa4703285e0> mock_flink_status = defaultdict(None, {'metadata': {'annotations': {'flink.yelp.com/dashboard_url': 'http://flink.k8s.fake_cluster.paasta:31080/app-9bf849b89'}, 'labels': {'paasta.yelp.com/config_sha': 'config00000'}}, 'status': None}) system_paasta_config = SystemPaastaConfig({'cluster': 'fake_cluster', 'api_endpoints': {'fake_cluster': 'http://fake_cluster:5054'}, 'api_cli...envoy': {}}, 'kube_clusters': {'pnw-prod': {'aws_account': 'prod'}, 'pnw-devc': {'aws_account': 'dev'}}}, '/fake_dir/') flink_instance_config = FlinkDeploymentConfig('fake_service', 'fake_instance', 'fake_cluster', {'spot': False, 'monitoring': {'team': 'fake_owner', 'runbook': 'fake_runbook_url'}}, None, '/nail/etc/services') @patch("paasta_tools.api.client.load_system_paasta_config", autospec=True) @patch("paasta_tools.cli.cmds.status_flink.get_paasta_oapi_client", autospec=True) @patch( "paasta_tools.cli.cmds.status_flink.load_flink_instance_config", autospec=True ) def test_error_no_flink( self, mock_load_system_paasta_config, mock_get_paasta_oapi_client, mock_load_flink_instance_config, # Fixtures mock_flink_status, system_paasta_config, flink_instance_config, ): mock_load_system_paasta_config.return_value = system_paasta_config mock_flink_status["status"] = None mock_load_flink_instance_config.return_value = flink_instance_config mock_get_paasta_oapi_client.return_value = None output = [] > return_value = print_flink_status( cluster="fake_cluster", service="fake_service", instance="fake_instance", output=output, flink=mock_flink_status, verbose=1, ) tests/cli/test_cmds_status.py:2473: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ paasta_tools/cli/cmds/status_flink.py:349: in print_flink_status system_paasta_config = load_system_paasta_config() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ path = '/etc/paasta/' def load_system_paasta_config( path: str = PATH_TO_SYSTEM_PAASTA_CONFIG_DIR, ) -> "SystemPaastaConfig": """ Reads Paasta configs in specified directory in lexicographical order and deep merges the dictionaries (last file wins). """ if not os.path.isdir(path): > raise PaastaNotConfiguredError( "Could not find system paasta configuration directory: %s" % path ) E paasta_tools.utils.PaastaNotConfiguredError: Could not find system paasta configuration directory: /etc/paasta/ paasta_tools/utils.py:2079: PaastaNotConfiguredError ``` Fix ``` In your test TestPrintFlinkStatus.test_error_no_flink, the issue stems from two things: Incorrect patch target: The load_system_paasta_config function is called from within paasta_tools.cli.cmds.status_flink.py. However, your first patch decorator is targeting paasta_tools.api.client.load_system_paasta_config. Mismatched mock assignments: Due to the order of decorators, the mock parameters in your test method signature don't align with their names if read naively. The first parameter mocks the innermost (third) decorator, and the third parameter mocks the outermost (first) decorator. This leads to system_paasta_config and flink_instance_config being assigned to the wrong mocks. ```
marcos-sb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…linkPaastaStatus2
| for cli_command_name, module_name in sorted(PAASTA_SUBCOMMANDS.items()): | ||
| # cli_command_name is the user-facing command, module_name is the python module filename | ||
| command_choices.append( | ||
| (command, (add_subparser, [command, subparsers], {})) | ||
| (cli_command_name, (add_subparser, [module_name, subparsers], {})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious: what is this change for?
| def get_pod_uptime(pod_deployed_timestamp: str) -> str: | ||
| # NOTE: the k8s API returns timestamps in UTC, so we make sure to always work in UTC | ||
| pod_creation_time = datetime.strptime( | ||
| pod_deployed_timestamp, "%Y-%m-%dT%H:%M:%SZ" | ||
| ).replace(tzinfo=timezone.utc) | ||
| pod_uptime = datetime.now(timezone.utc) - pod_creation_time | ||
| pod_uptime_total_seconds = pod_uptime.total_seconds() | ||
| pod_uptime_days = divmod(pod_uptime_total_seconds, 86400) | ||
| pod_uptime_hours = divmod(pod_uptime_days[1], 3600) | ||
| pod_uptime_minutes = divmod(pod_uptime_hours[1], 60) | ||
| pod_uptime_seconds = divmod(pod_uptime_minutes[1], 1) | ||
| return f"{int(pod_uptime_days[0])}d{int(pod_uptime_hours[0])}h{int(pod_uptime_minutes[0])}m{int(pod_uptime_seconds[0])}s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be in a more flink-oriented file - i don't think we use this uptime format elsewhere (we usually do something like what arrow.humanize() and whatnot return and present something like 2 days ago or whatnot
| return f"{int(pod_uptime_days[0])}d{int(pod_uptime_hours[0])}h{int(pod_uptime_minutes[0])}m{int(pod_uptime_seconds[0])}s" | ||
|
|
||
|
|
||
| def append_pod_status(pod_status: List[Dict[str, Any]], output: List[str]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here - in general, it's probably best to make sure these aren't targeted for reuse by folks otherwise it'll make changing the flink status output harder since you'd have multiple consumers of your helpers :p
(also, this is a bit misplaced here i think - this is very closely tied to paasta status-like commands, i don't think any other cli commands would ever use this util)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, speaking of: it does look like the kafka operator relies on this :p
| ) | ||
|
|
||
|
|
||
| def should_job_info_be_shown(cluster_state): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def should_job_info_be_shown(cluster_state): | |
| def should_job_info_be_shown(cluster_state: str) ->: |
we're just moving this, but might as well type this simple function while we're at it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like there's a bunch of formatting changes in this file?
| mock_load_system_paasta_config.return_value = flink_instance_config | ||
| mock_flink_status["status"] = None | ||
| mock_load_flink_instance_config.return_value = flink_instance_config | ||
| mock_load_flink_instance_config.return_value = system_paasta_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem quite right - these look flipped
| mock_load_system_paasta_config.return_value = flink_instance_config | ||
| mock_get_paasta_oapi_client.return_value = None | ||
| mock_load_flink_instance_config.return_value = flink_instance_config | ||
| mock_load_flink_instance_config.return_value = system_paasta_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
https://jira.yelpcorp.com/browse/FLINK-5725
Refactor Flink Paasta Status
Move Flink Paasta Status functionality into own file
Paasta Status Had a lot of Flink specific functionality just to Flink
Will make future refactors to Flink status easier and more isolated
Addressed circular dependency errors
Easier to follow per commit msg
Future Refactors Plan
https://yelp.slack.com/archives/CA05GTDB9/p1749650708339909?thread_ts=1747320721.347129&cid=CA05GTDB9