diff --git a/CHANGES.txt b/CHANGES.txt index eac4d69..ec6ac8a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,9 @@ Changes for crash Unreleased ========== +- Added ``\shards`` to show shard relocation progress. With optinal arguments + ``state`` and ``relocating``. + 2026/02/09 0.32.0 ================= diff --git a/docs/commands.rst b/docs/commands.rst index 4743421..204ab14 100644 --- a/docs/commands.rst +++ b/docs/commands.rst @@ -56,6 +56,12 @@ Every command starts with a ``\`` character. | ``\r `` | Reads statements from ```` and execute | | | them. | +------------------------+-----------------------------------------------------+ +| ``\shards [VIEW]`` | Queries ``sys.shards`` table and computes relocation| +| | progress progress per table. If ``VIEW`` is instead:| +| | | +| | - ``state`` provides aggregeration on shard state | +| | - ``relocating`` tracks which shards are relocated | ++------------------------+-----------------------------------------------------+ | ``\sysinfo`` | Query the ``sys`` tables for system and cluster | | | information. | +------------------------+-----------------------------------------------------+ diff --git a/src/crate/crash/commands.py b/src/crate/crash/commands.py index ca011d4..c34fbfb 100644 --- a/src/crate/crash/commands.py +++ b/src/crate/crash/commands.py @@ -234,13 +234,84 @@ def __call__(self, cmd, check_name=None, **kwargs): cmd.logger.warn('No check for {}'.format(check_name)) +class ShardsCommand(Command): + """shows shards state, optionally per table, e.g. \\shards info""" + + DEFAULT_STMT = """ + SELECT + state, + primary, + COUNT(*) + AS shard_count, + SUM(num_docs) + AS num_docs, + SUM(size) / 1073741824.0 + AS size_gb + FROM sys.shards + GROUP BY state, primary + ORDER BY state, primary; + """ + + INFO_STMT = """ + SELECT + schema_name, + table_name, + partition_ident, + COUNT(*) + AS total_shards, + SUM(size) + As total_size, + COUNT(*) FILTER (WHERE routing_state = 'RELOCATING') + AS relocating_shards, + SUM(size) FILTER (WHERE routing_state = 'RELOCATING') + AS relocating_size, + 100.0 * SUM(size) FILTER(WHERE routing_state != 'RELOCATING') / SUM(size) + AS relocated_percent + FROM sys.shards + WHERE routing_state != 'UNASSIGNED' + GROUP BY schema_name, table_name, partition_ident + ORDER BY relocated_percent, schema_name, table_name, partition_ident; + """ + + OPTIONS = { + "info": INFO_STMT, + } + + def complete(self, cmd, text): + return (i for i in self.OPTIONS if i.startswith(text) or text.isspace()) + + def execute(self, cmd, stmt): + success = cmd._exec(stmt) + cmd.exit_code = cmd.exit_code or int(not success) + if not success: + cmd.logger.warn("FAILED") + return False + + cur = cmd.cursor + shards = cur.fetchall() + cmd.pprint(shards, [c[0] for c in cur.description]) + return True + + def __call__(self, cmd, *args, **kwargs): + if len(args) == 0: + self.execute(cmd, self.DEFAULT_STMT) + return + + stmt = self.OPTIONS.get(args[0].strip()) + if stmt: + self.execute(cmd, stmt) + else: + cmd.logger.critical(f'Command argument not supported (available options: {", ".join(f"`{_a}`" for _a in self.OPTIONS.keys())}).') + + built_in_commands = { - '?': HelpCommand(), - 'r': ReadFileCommand(), - 'format': SwitchFormatCommand(), - 'autocomplete': ToggleAutocompleteCommand(), - 'autocapitalize': ToggleAutoCapitalizeCommand(), - 'verbose': ToggleVerboseCommand(), - 'check': CheckCommand(), - 'pager': SetPager(), + "?": HelpCommand(), + "r": ReadFileCommand(), + "format": SwitchFormatCommand(), + "autocomplete": ToggleAutocompleteCommand(), + "autocapitalize": ToggleAutoCapitalizeCommand(), + "verbose": ToggleVerboseCommand(), + "check": CheckCommand(), + "pager": SetPager(), + "shards": ShardsCommand(), } diff --git a/tests/test_commands.py b/tests/test_commands.py index c778930..af80f14 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -35,6 +35,7 @@ ClusterCheckCommand, NodeCheckCommand, ReadFileCommand, + ShardsCommand, ToggleAutoCapitalizeCommand, ToggleAutocompleteCommand, ToggleVerboseCommand, @@ -267,6 +268,52 @@ def test_check_command_with_node_check(self, cmd): cmd.logger.info.assert_called_with('NODE CHECK OK') +class ShardsCommandTest(TestCase): + @patch('crate.crash.command.CrateShell') + def test_shards_command_default(self,cmd): + rows = [ + ['RELOCATING','FALSE','2','33334465','9.307963063940406'], + ['STARTED','TRUE','1010','166665535','26.309150873683393'], + ] + cols = [('state', ), ('primary',), ('shard_count', ), ('num_docs', ), ('size_gb', )] + cmd._exec.return_value = True + cmd.cursor.fetchall.return_value = rows + cmd.cursor.description = cols + + ShardsCommand()(cmd) + cmd.pprint.assert_called_with(rows, [c[0] for c in cols]) + + @patch('crate.crash.command.CrateShell') + def test_shards_command_info(self,cmd): + rows = [ + ['doc','table1','','1','10','1024','0','100.0'], + ['doc','table2','','2','20','2048','1','50.0'], + ['doc','table3','','3','30','3072','2','33.3'], + ] + cols = [('schema_name',),('table_name',),('partition_ident',),('total_shards',),('total_size',),('relocating_shards',),('relocating_size',),('relocated_percent',)] + cmd._exec.return_value = True + cmd.cursor.fetchall.return_value = rows + cmd.cursor.description = cols + + ShardsCommand()(cmd, "info") + cmd.pprint.assert_called_with(rows, [c[0] for c in cols]) + + + @patch('crate.crash.command.CrateShell') + def test_shards_command_default_none(self,cmd): + cmd._exec.return_value = True + cmd.cursor.fetchall.return_value = [] + ShardsCommand()(cmd) + cmd.logger.info.assert_not_called() + + @patch('crate.crash.command.CrateShell') + def test_shards_command_info_none(self,cmd): + cmd._exec.return_value = True + cmd.cursor.fetchall.return_value = [] + ShardsCommand()(cmd,"info") + cmd.logger.info.assert_not_called() + + @patch('crate.client.connection.Cursor', fake_cursor()) class CommentsTest(TestCase): diff --git a/tests/test_integration.py b/tests/test_integration.py index 033c478..2874143 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -653,10 +653,12 @@ def test_help_command(self): '\\pager set an external pager. Use without argument to reset to internal paging', '\\q quit crash', '\\r read and execute statements from a file', + '\\shards shows shards state, optionally per table, e.g. \\shards info', '\\sysinfo print system and cluster info', '\\verbose toggle verbose mode', ]) + help_ = command.commands['?'] self.assertTrue(isinstance(help_, Command)) self.assertEqual(expected, help_(command)) @@ -870,4 +872,95 @@ def test_connect_info_not_available(self, is_conn_available): self.assertEqual(crash.connect_info.schema, None) +class ShardsCommandEmptyDBTest(TestCase): + def setUp(self): + node.reset() + + def test_shards_command_output_default(self): + expected = '\n'.join([ + '+-------+---------+-------------+----------+---------+', + '| state | primary | shard_count | num_docs | size_gb |', + '+-------+---------+-------------+----------+---------+', + '+-------+---------+-------------+----------+---------+\n', + ]) + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + shards_ = cmd.commands['shards'] + with patch('sys.stdout', new_callable=StringIO) as output: + text = shards_(cmd) + self.assertEqual(None, text) + self.assertEqual(expected, output.getvalue()) + + def test_shards_command_output_info(self): + expected = '\n'.join([ + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+', + '| schema_name | table_name | partition_ident | total_shards | total_size | relocating_shards | relocating_size | relocated_percent |', + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+', + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+\n', + ]) + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + shards_ = cmd.commands['shards'] + with patch('sys.stdout', new_callable=StringIO) as output: + text = shards_(cmd, 'info') + self.assertEqual(None, text) + self.assertEqual(expected, output.getvalue()) + + + def test_shards_command_output_wrong_argument(self): + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + shards_ = cmd.commands['shards'] + with patch('sys.stdout', new_callable=StringIO) as output: + cmd.logger = ColorPrinter(False, stream=output) + text = shards_(cmd, 'arg1', 'arg2') + self.assertEqual(None, text) + self.assertEqual('Command argument not supported (available options: `info`).\n', output.getvalue()) + + + +class ShardsCommandWithContentTest(TestCase): + def tearDown(self): + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + cmd.process('DROP TABLE IF EXISTS test_table;') + + def setUp(self): + node.reset() + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + cmd.process('CREATE TABLE test_table (id INTEGER PRIMARY KEY, data STRING ) CLUSTERED INTO 10 SHARDS WITH (number_of_replicas = 0);\n') + + def test_shards_command_output_default(self): + expected = [ + '+---------+---------+-------------+-----------+---------------------+', + '| state | primary | shard_count | num_docs | size_gb |', + '+---------+---------+-------------+-----------+---------------------+', + '| STARTED | FALSEy | ?6 | 1x0000000 | 7. DUMMY VALUE 614 |', + '+---------+---------+-------------+-----------+---------------------+\n', + ] + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + shards_ = cmd.commands['shards'] + with patch('sys.stdout', new_callable=StringIO) as output: + text = shards_(cmd) + self.assertEqual(None, text) + output_lines = output.getvalue().splitlines() + self.assertEqual(len(expected), len(output_lines)) + header = lambda x: [word.strip() for word in x[1].split('|')] + self.assertEqual(header(expected), header(output_lines)) + + + def test_shards_command_ouput_info(self): + expected = [ + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+', + '| schema_name | table_name | partition_ident | total_shards | total_size | relocating_shards | relocating_size | relocated_percent |', + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+', + '| doc | test_table | | 10 | 624 | 0 | NULL | 100.0 |', + '+-------------+------------+-----------------+--------------+------------+-------------------+-----------------+-------------------+\n', + ] + with CrateShell(crate_hosts=[node.http_url], is_tty=False) as cmd: + shards_ = cmd.commands['shards'] + with patch('sys.stdout', new_callable=StringIO) as output: + text = shards_(cmd, 'info') + self.assertEqual(None, text) + output_lines = output.getvalue().splitlines() + self.assertEqual(len(expected), len(output_lines)) + header = lambda x: [word.strip() for word in x[1].split('|')] + self.assertEqual(header(expected), header(output_lines)) + setup_logging(level=logging.INFO)