diff --git a/providers/base/bin/cpu_idle_state.py b/providers/base/bin/cpu_idle_state.py new file mode 100755 index 0000000000..f21afd6eb8 --- /dev/null +++ b/providers/base/bin/cpu_idle_state.py @@ -0,0 +1,577 @@ +#!/usr/bin/env python3 +# This file is part of Checkbox. +# +# Copyright 2025 Canonical Ltd. +# Written by: +# Isaac Yang +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see . + +""" +CPU Idle State Trigger Test - Python Implementation + +This script replicates the FWTS cpuidle test functionality to trigger +all available CPU idle states on each processor by creating alternating +load patterns that allow the idle governor to discover and use all states. + +Author: Based on FWTS cstates.c implementation +License: GPL-2.0+ + +Usage: + sudo python3 cpu_idle_state.py [--verbose] + +Examples: + sudo python3 cpu_idle_state_fwts.py + sudo python3 cpu_idle_state_fwts.py --verbose +""" + +import os +import sys +import time +import math +import glob +import logging +from typing import Dict, List +import psutil +import argparse + + +class IdleState: + """Represents a CPU idle state""" + + def __init__( + self, + name: str, + number: int, + usage_count: int, + used: bool = False, + logged: bool = False, + ) -> None: + self.name = name + self.number = number + self.usage_count = usage_count + self.used = used + self.logged = logged + + +class CpuIdleInfo: + """CPU idle information for a specific CPU""" + + def __init__( + self, + cpu_id: int, + states: Dict[int, IdleState], + path: str, + ) -> None: + self.cpu_id = cpu_id + self.states = states + self.path = path + + +class CpuAffinityError(Exception): + """Exception raised when CPU affinity operations fail""" + + pass + + +class Logger: + """Custom logger class for CPU idle state testing""" + + def __init__(self, verbose: bool = False): + """Initialize logger with specified verbosity level""" + self.verbose = verbose + self._setup_logging() + + def _setup_logging(self): + """Setup logging configuration""" + log_level = logging.DEBUG if self.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format="[%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], + ) + self.logger = logging.getLogger(__name__) + + def info(self, message: str): + """Log info message""" + self.logger.info(message) + + def debug(self, message: str): + """Log debug message""" + self.logger.debug(message) + + def warning(self, message: str): + """Log warning message""" + self.logger.warning(message) + + def error(self, message: str): + """Log error message""" + self.logger.error(message) + + +class CpuBenchmark: + """CPU benchmarking functionality""" + + def __init__(self, logger: Logger): + """Initialize CPU benchmark with logger""" + self.logger = logger + + def get_cpu_affinity(self) -> List[int]: + """Get current CPU affinity""" + try: + return psutil.Process().cpu_affinity() + except psutil.AccessDenied: + msg = "Cannot get CPU affinity - need root privileges" + raise CpuAffinityError(msg) + + def set_cpu_affinity(self, cpu_id: List[int]) -> None: + """Set CPU affinity to a list of CPUs""" + try: + psutil.Process().cpu_affinity(cpu_id) + except psutil.AccessDenied: + msg = "Cannot set CPU affinity - need root privileges" + raise CpuAffinityError(msg) + + def restore_cpu_affinity(self, affinity: List[int]) -> None: + """Restore CPU affinity to previous setting""" + self.set_cpu_affinity(affinity) + + def burn_cpu_cycles(self): + """ + Burn CPU cycles similar to FWTS fwts_cpu_burn_cycles(). + It use the floating point operations to burn CPU cycles. + """ + a = 1.234567 + b = 3.121213 + + for _ in range(100): + a = a * b + b = a * a + # Ensure a is positive before taking square root + a = a - b + math.sqrt(abs(a)) + a = a * b + b = a * a + a = a - b + math.sqrt(abs(a)) + a = a * b + b = a * a + a = a - b + math.sqrt(abs(a)) + a = a * b + b = a * a + a = a - b + math.sqrt(abs(a)) + + def cpu_benchmark(self, cpu_id: int) -> float: + """Run CPU benchmark on specific CPU, similar to FWTS + fwts_cpu_benchmark() + It will burn CPU cycles for ~250ms. + """ + # Set CPU affinity + old_affinity = self.get_cpu_affinity() + self.set_cpu_affinity([cpu_id]) + + try: + start_time = time.time() + loops = 0 + + # Run benchmark for ~250ms (same as FWTS) + while (time.time() - start_time) < 0.25: + self.burn_cpu_cycles() + loops += 1 + + duration = time.time() - start_time + return loops / duration if duration > 0 else 0 + + finally: + # Restore CPU affinity + self.restore_cpu_affinity(old_affinity) + + +class CpuIdleTest: + """Main class for CPU idle state testing""" + + PROCESSOR_PATH = "/sys/devices/system/cpu" + TOTAL_WAIT_TIME = 20 # Same as FWTS + + def __init__(self, verbose: bool = False): + self.verbose = verbose + self.cpus = [] + self.state_count = -1 + self.first_cpu = -1 + self.logger = Logger(verbose) + self.benchmark = CpuBenchmark(self.logger) + + def get_cpu_count(self) -> int: + """Get the number of CPUs in the system""" + return psutil.cpu_count(logical=True) + + def get_cpuidle_states(self, cpu_path: str) -> Dict[int, IdleState]: + """Get current CPU idle states, similar to FWTS get_cpuidle_states()""" + states = {} + + try: + # Find all state directories + state_dirs = glob.glob("{}/*".format(cpu_path)) + + for state_dir in state_dirs: + state_name = os.path.basename(state_dir) + + # Parse state number (state0, state1, etc.) + if state_name.startswith("state"): + try: + # Extract number after "state" + state_num = int(state_name[5:]) + except ValueError: + continue + + # Read state name + name_file = os.path.join(state_dir, "name") + try: + with open(name_file, "r") as f: + state_name_str = f.read().strip() + except (IOError, OSError): + state_name_str = "state{}".format(state_num) + + # Read usage count + usage_file = os.path.join(state_dir, "usage") + try: + with open(usage_file, "r") as f: + usage_count = int(f.read().strip()) + except (IOError, OSError, ValueError): + usage_count = 0 + + states[state_num] = IdleState( + name=state_name_str, + number=state_num, + usage_count=usage_count, + ) + + except (IOError, OSError) as e: + self.logger.error( + "Error reading CPU idle states from {}: {}".format(cpu_path, e) + ) + + return states + + def discover_cpus(self) -> List[CpuIdleInfo]: + """Discover all CPUs with cpuidle support""" + cpus = [] + + # Get number of CPUs + cpu_count = self.get_cpu_count() + self.logger.info("Found {} CPUs in system".format(cpu_count)) + + # Check each CPU for cpuidle support + for cpu_id in range(cpu_count): + cpu_path = "{}/cpu{}/cpuidle".format(self.PROCESSOR_PATH, cpu_id) + + if os.path.exists(cpu_path): + states = self.get_cpuidle_states(cpu_path) + if states: + cpu_info = CpuIdleInfo( + cpu_id=cpu_id, + states=states, + path=cpu_path, + ) + cpus.append(cpu_info) + self.logger.info( + "CPU {}: Found {} idle states".format( + cpu_id, + len(states), + ) + ) + else: + msg = "CPU {}: No idle states found".format(cpu_id) + self.logger.warning(msg) + else: + self.logger.debug("CPU {}: No cpuidle support".format(cpu_id)) + + return cpus + + def test_cpu_idle_states( + self, cpu_info: CpuIdleInfo, cpu_index: int, total_cpus: int + ): + """Test idle states for a specific CPU, similar to FWTS do_cpu()""" + self.logger.info( + "Testing CPU {} ({}/{})".format( + cpu_info.cpu_id, + cpu_index + 1, + total_cpus, + ) + ) + + # Get initial state + initial_states = self.get_cpuidle_states(cpu_info.path) + current_states = initial_states.copy() + + # Track which states have been used and store original usage counts + original_usage_counts = {} + for state_num in initial_states: + initial_states[state_num].used = False + original_usage_counts[state_num] = initial_states[ + state_num + ].usage_count + + keep_going = True + + for iteration in range(self.TOTAL_WAIT_TIME): + if not keep_going: + break + + # Report progress + if iteration % 3 == 0: + progress = ( + 100 + * (iteration + (self.TOTAL_WAIT_TIME * cpu_index)) + / (total_cpus * self.TOTAL_WAIT_TIME) + ) + self.logger.debug( + "CPU {} progress: {:.1f}% (iteration {}/{})".format( + cpu_info.cpu_id, + progress, + iteration + 1, + self.TOTAL_WAIT_TIME, + ) + ) + + # Alternate between sleep and benchmark (same pattern as FWTS) + if (iteration & 7) < 4: + # Sleep phase - allow CPU to go idle + time.sleep(1) + else: + # Benchmark phase - create CPU load + try: + result = self.benchmark.cpu_benchmark(cpu_info.cpu_id) + if self.verbose: + self.logger.debug( + "CPU {} benchmark: {:.0f} loops/sec".format( + cpu_info.cpu_id, + result, + ) + ) + except CpuAffinityError as e: + self.logger.error( + "CPU {} benchmark failed: {}".format( + cpu_info.cpu_id, + e, + ) + ) + + # Check for state changes + current_states = self.get_cpuidle_states(cpu_info.path) + + keep_going = False + for state_num in initial_states: + if state_num in current_states: + if ( + initial_states[state_num].usage_count + != current_states[state_num].usage_count + ): + # State was used + initial_states[state_num].usage_count = current_states[ + state_num + ].usage_count + initial_states[state_num].used = True + + # Log based on whether this state has been + # logged before + if not initial_states[state_num].logged: + # First time reaching this state - use info level + self.logger.info( + "CPU {}: State {} ({}) was used".format( + cpu_info.cpu_id, + state_num, + initial_states[state_num].name, + ) + ) + initial_states[state_num].logged = True + else: + # Subsequent times reaching this state - use + # debug level + self.logger.debug( + "CPU {}: State {} ({}) was used again".format( + cpu_info.cpu_id, + state_num, + initial_states[state_num].name, + ) + ) + + if not initial_states[state_num].used: + keep_going = True + + # Early termination if all states reached + if not keep_going: + self.logger.info( + "CPU {}: All idle states reached, stopping early".format( + cpu_info.cpu_id + ) + ) + break + + # Report results + used_states = [s for s in initial_states.values() if s.used] + + # Check for states that didn't increase but had non-zero original usage + states_with_notes = [] + failed_states = [] + + for state_num, state in initial_states.items(): + if not state.used: + original_usage = original_usage_counts[state_num] + if original_usage > 0: + # State didn't increase but had non-zero original usage + # - pass with note + states_with_notes.append(state.name) + else: + # State didn't increase and had zero original usage - fail + failed_states.append(state.name) + + if failed_states: + # These states failed the test + self.logger.warning( + "CPU {}: WARNING - States not reached: {}".format( + cpu_info.cpu_id, ", ".join(failed_states) + ) + ) + + if states_with_notes: + # These states pass but with a note + self.logger.info( + "CPU {}: NOTE - ".format(cpu_info.cpu_id) + + "States with existing usage (not tested): {}".format( + ", ".join(states_with_notes), + ) + ) + + if not failed_states and not states_with_notes: + # All states were used + used_names = ["{}".format(state.name) for state in used_states] + self.logger.info( + "CPU {}: SUCCESS - All states reached: {}".format( + cpu_info.cpu_id, ", ".join(used_names) + ) + ) + + # Check state count consistency + state_count = len(initial_states) + if self.state_count == -1: + self.state_count = state_count + elif self.state_count != state_count: + self.logger.error( + "CPU {}: ERROR - Expected {} states, found {}".format( + cpu_info.cpu_id, self.state_count, state_count + ) + ) + else: + if self.first_cpu == -1: + self.first_cpu = cpu_info.cpu_id + else: + self.logger.debug( + "CPU {}: State count matches CPU {}".format( + cpu_info.cpu_id, self.first_cpu + ) + ) + + return len(failed_states) == 0 + + def run_test(self) -> bool: + """Run the complete CPU idle state test""" + self.logger.info("Starting CPU idle state test") + self.logger.info( + "This test checks if all processors have the same number of " + "idle states and if idle state transitions happen during " + "alternating load patterns" + ) + + # Discover CPUs + self.cpus = self.discover_cpus() + + if not self.cpus: + self.logger.error("ERROR: No CPUs with cpuidle support found") + return False + msg = "Testing {} CPUs with cpuidle support".format(len(self.cpus)) + self.logger.info(msg) + + # Test each CPU + success_count = 0 + for i, cpu_info in enumerate(self.cpus): + try: + if self.test_cpu_idle_states(cpu_info, i, len(self.cpus)): + success_count += 1 + except Exception as e: + msg = "ERROR testing CPU {}: {}".format(cpu_info.cpu_id, e) + self.logger.error(msg) + + # Final report + self.logger.info( + "\nTest completed: {}/{} CPUs passed".format( + success_count, + len(self.cpus), + ) + ) + + if success_count == len(self.cpus): + self.logger.info("SUCCESS: All CPUs reached all their idle states") + return True + else: + self.logger.error("ERROR: Some CPUs did not reach all idle states") + return False + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="CPU Idle State Trigger Test", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run test with verbose output + sudo python3 cpu_idle_test.py --verbose + + # Run test silently (only errors/warnings) + sudo python3 cpu_idle_test.py + + # Run test and save output to file + sudo python3 cpu_idle_test.py --verbose 2>&1 | tee cpu_idle_test.log + """, + ) + + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose output" + ) + + args = parser.parse_args() + + # Check if running as root + if os.geteuid() != 0: + print( + "ERROR: This script must be run as root (sudo) to set CPU " + "affinity\n" + "Usage: sudo python3 cpu_idle_state_fwts.py [--verbose]" + ) + sys.exit(1) + + # Run the test + test = CpuIdleTest(verbose=args.verbose) + + try: + success = test.run_test() + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print("ERROR: {}".format(e)) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/providers/base/tests/test_cpu_idle_state.py b/providers/base/tests/test_cpu_idle_state.py new file mode 100644 index 0000000000..fb84c5211c --- /dev/null +++ b/providers/base/tests/test_cpu_idle_state.py @@ -0,0 +1,1900 @@ +#!/usr/bin/env python3 +""" +Unit tests for cpu_idle_state.py + +This module provides comprehensive test coverage for the CPU idle state testing +functionality, including all classes and methods. +""" + +import unittest +from unittest.mock import patch, mock_open +import psutil +import time +import argparse + +from cpu_idle_state import ( + IdleState, + CpuIdleInfo, + CpuAffinityError, + Logger, + CpuBenchmark, + CpuIdleTest, +) + + +class TestIdleState(unittest.TestCase): + """Test cases for IdleState class""" + + def test_idle_state_initialization(self): + """Test IdleState initialization with all parameters""" + state = IdleState("C1", 1, 100, True) + self.assertEqual(state.name, "C1") + self.assertEqual(state.number, 1) + self.assertEqual(state.usage_count, 100) + self.assertTrue(state.used) + + def test_idle_state_initialization_defaults(self): + """Test IdleState initialization with default used parameter""" + state = IdleState("C2", 2, 50, False) + self.assertEqual(state.name, "C2") + self.assertEqual(state.number, 2) + self.assertEqual(state.usage_count, 50) + self.assertFalse(state.used) + + def test_idle_state_initialization_with_used_true(self): + """Test IdleState initialization with used=True""" + state = IdleState("C3", 3, 75, True) + self.assertTrue(state.used) + + +class TestCpuIdleInfo(unittest.TestCase): + """Test cases for CpuIdleInfo class""" + + def test_cpu_idle_info_initialization(self): + """Test CpuIdleInfo initialization""" + states = {0: IdleState("C1", 0, 100, False)} + cpu_info = CpuIdleInfo( + 0, + states, + "/sys/devices/system/cpu/cpu0/cpuidle", + ) + self.assertEqual(cpu_info.cpu_id, 0) + self.assertEqual(cpu_info.states, states) + self.assertEqual(cpu_info.path, "/sys/devices/system/cpu/cpu0/cpuidle") + + def test_cpu_idle_info_with_empty_states(self): + """Test CpuIdleInfo initialization with empty states""" + cpu_info = CpuIdleInfo( + 1, + {}, + "/sys/devices/system/cpu/cpu1/cpuidle", + ) + self.assertEqual(cpu_info.cpu_id, 1) + self.assertEqual(cpu_info.states, {}) + self.assertEqual(cpu_info.path, "/sys/devices/system/cpu/cpu1/cpuidle") + + +class TestCpuAffinityError(unittest.TestCase): + """Test cases for CpuAffinityError exception""" + + def test_cpu_affinity_error_creation(self): + """Test CpuAffinityError exception creation""" + error = CpuAffinityError("Test error message") + self.assertEqual(str(error), "Test error message") + self.assertIsInstance(error, Exception) + + +class TestLogger(unittest.TestCase): + """Test cases for Logger class""" + + def setUp(self): + """Set up test fixtures""" + # Suppress logging output during tests + self.logger = Logger(verbose=False) + + def test_logger_initialization(self): + """Test Logger initialization""" + logger = Logger(verbose=False) + self.assertFalse(logger.verbose) + + def test_logger_verbose_initialization(self): + """Test Logger initialization with verbose=True""" + logger = Logger(verbose=True) + self.assertTrue(logger.verbose) + + def test_logger_info(self): + """Test Logger info method""" + with patch.object(self.logger.logger, "info") as mock_info: + self.logger.info("Test info message") + mock_info.assert_called_once_with("Test info message") + + def test_logger_debug(self): + """Test Logger debug method""" + with patch.object(self.logger.logger, "debug") as mock_debug: + self.logger.debug("Test debug message") + mock_debug.assert_called_once_with("Test debug message") + + def test_logger_warning(self): + """Test Logger warning method""" + with patch.object(self.logger.logger, "warning") as mock_warning: + self.logger.warning("Test warning message") + mock_warning.assert_called_once_with("Test warning message") + + def test_logger_error(self): + """Test Logger error method""" + with patch.object(self.logger.logger, "error") as mock_error: + self.logger.error("Test error message") + mock_error.assert_called_once_with("Test error message") + + +class TestCpuBenchmark(unittest.TestCase): + """Test cases for CpuBenchmark class""" + + def setUp(self): + """Set up test fixtures""" + self.logger = Logger(verbose=False) + self.benchmark = CpuBenchmark(self.logger) + + @patch("psutil.Process") + def test_get_cpu_affinity_success(self, mock_process): + """Test successful CPU affinity retrieval""" + mock_process.return_value.cpu_affinity.return_value = [0, 1, 2] + result = self.benchmark.get_cpu_affinity() + self.assertEqual(result, [0, 1, 2]) + + @patch("psutil.Process") + def test_get_cpu_affinity_access_denied(self, mock_process): + """Test CPU affinity retrieval with access denied""" + # Mock psutil.AccessDenied exception + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.get_cpu_affinity() + + @patch("psutil.Process") + def test_set_cpu_affinity_success(self, mock_process): + """Test successful CPU affinity setting""" + mock_process.return_value.cpu_affinity.return_value = None + self.benchmark.set_cpu_affinity([1]) + mock_process.return_value.cpu_affinity.assert_called_with([1]) + + @patch("psutil.Process") + def test_set_cpu_affinity_access_denied(self, mock_process): + """Test CPU affinity setting with access denied""" + # Mock psutil.AccessDenied exception + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.set_cpu_affinity([1]) + + @patch("psutil.Process") + def test_restore_cpu_affinity_success(self, mock_process): + """Test successful CPU affinity restoration""" + mock_process.return_value.cpu_affinity.return_value = None + self.benchmark.restore_cpu_affinity([0, 1, 2]) + mock_process.return_value.cpu_affinity.assert_called_with([0, 1, 2]) + + @patch("psutil.Process") + def test_restore_cpu_affinity_access_denied(self, mock_process): + """Test CPU affinity restoration with access denied""" + # Mock psutil.AccessDenied exception + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.restore_cpu_affinity([0, 1, 2]) + + @patch("psutil.Process") + def test_restore_cpu_affinity_psutil_error(self, mock_process): + """ + Test restore_cpu_affinity when psutil.Process().cpu_affinity() fails + """ + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.restore_cpu_affinity([0, 1, 2]) + + @patch("psutil.Process") + def test_restore_cpu_affinity_psutil_error_direct(self, mock_process): + """ + Test restore_cpu_affinity when psutil.Process().cpu_affinity() + fails directly + """ + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.restore_cpu_affinity([0, 1, 2]) + + @patch.object(CpuBenchmark, "burn_cpu_cycles") + def test_burn_cpu_cycles(self, mock_burn): + """Test CPU cycle burning functionality""" + # Mock the burn_cpu_cycles method to avoid actually burning CPU + mock_burn.return_value = None + self.benchmark.burn_cpu_cycles() + assert mock_burn.called is True + + def test_burn_cpu_cycles_actual_execution(self): + """Test burn_cpu_cycles method with actual execution""" + # Test that burn_cpu_cycles executes without errors + # This test verifies the method runs to completion + try: + self.benchmark.burn_cpu_cycles() + # If we reach here, the method executed successfully + self.assertTrue(True) + except Exception as e: + self.fail("burn_cpu_cycles raised an exception: {}".format(e)) + + def test_burn_cpu_cycles_math_operations(self): + """Test burn_cpu_cycles method performs expected math operations""" + # Create a mock to track math.sqrt calls + with patch("math.sqrt") as mock_sqrt: + mock_sqrt.return_value = ( + 2.0 # Mock sqrt to return a constant value + ) + + # Execute burn_cpu_cycles + self.benchmark.burn_cpu_cycles() + + # Verify that math.sqrt was called (the method uses sqrt in + # calculations) + # The method has 5 sqrt calls per iteration, 100 iterations = 500 + # calls + self.assertGreater(mock_sqrt.call_count, 0) + # Should be called multiple times due to the loop + self.assertGreaterEqual(mock_sqrt.call_count, 100) + + def test_burn_cpu_cycles_floating_point_operations(self): + """Test burn_cpu_cycles method performs floating point operations""" + # Test that the method performs floating point calculations + # We'll verify by checking that the method doesn't raise any exceptions + # and completes its execution + + # Create a simple test to ensure the method runs + benchmark = CpuBenchmark(Logger(verbose=False)) + + # Execute the method and ensure it completes + start_time = time.time() + benchmark.burn_cpu_cycles() + end_time = time.time() + + # Verify the method took some time to execute (indicating it ran the + # loop) + execution_time = end_time - start_time + self.assertGreater(execution_time, 0.0) + + # The method should complete without raising exceptions + self.assertTrue(True) + + def test_burn_cpu_cycles_loop_execution(self): + """ + Test burn_cpu_cycles method executes the expected number of + iterations + """ + # We can't easily count the exact iterations without modifying the + # method, but we can verify it takes a reasonable amount of time to + # execute indicating the loop ran + + benchmark = CpuBenchmark(Logger(verbose=False)) + + # Execute multiple times to ensure consistency + for i in range(3): + start_time = time.time() + benchmark.burn_cpu_cycles() + end_time = time.time() + + execution_time = end_time - start_time + # Should take some measurable time (not instant) + self.assertGreater(execution_time, 0.0) # Should take some time + # Should not take too long (indicating it's not stuck) + self.assertLess(execution_time, 1.0) # Less than 1 second + + @patch.object(CpuBenchmark, "get_cpu_affinity") + @patch.object(CpuBenchmark, "set_cpu_affinity") + @patch.object(CpuBenchmark, "restore_cpu_affinity") + @patch.object(CpuBenchmark, "burn_cpu_cycles") + @patch("time.time") + def test_cpu_benchmark_success( + self, + mock_time, + mock_burn, + mock_restore, + mock_set, + mock_get_affinity, + ): + """Test successful CPU benchmark""" + # Mock time to return increasing values + mock_time.side_effect = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5] + mock_get_affinity.return_value = [0, 1, 2] + + result = self.benchmark.cpu_benchmark(1) + + self.assertEqual(mock_get_affinity.call_count, 1) + mock_set.assert_called_with([1]) + mock_restore.assert_called_with([0, 1, 2]) + self.assertIsInstance(result, float) + + @patch.object(CpuBenchmark, "set_cpu_affinity") + def test_cpu_benchmark_affinity_error(self, mock_set): + """Test CPU benchmark with affinity error""" + mock_set.side_effect = CpuAffinityError("Test error") + with self.assertRaises(CpuAffinityError): + self.benchmark.cpu_benchmark(1) + + @patch("psutil.Process") + def test_set_cpu_affinity_psutil_error(self, mock_process): + """Test set_cpu_affinity when psutil.Process().cpu_affinity() fails""" + mock_process.return_value.cpu_affinity.side_effect = ( + psutil.AccessDenied() + ) + with self.assertRaises(CpuAffinityError): + self.benchmark.set_cpu_affinity([1]) + + +class TestCpuIdleTest(unittest.TestCase): + """Test cases for CpuIdleTest class""" + + def setUp(self): + self.test = CpuIdleTest(verbose=False) + self.benchmark = CpuBenchmark(Logger(verbose=False)) + + def test_initialization(self): + """Test CpuIdleTest initialization""" + self.assertFalse(self.test.verbose) + self.assertEqual(self.test.cpus, []) + self.assertEqual(self.test.state_count, -1) + self.assertEqual(self.test.first_cpu, -1) + + @patch("psutil.cpu_count") + def test_get_cpu_count(self, mock_cpu_count): + """Test CPU count retrieval""" + mock_cpu_count.return_value = 4 + result = self.test.get_cpu_count() + self.assertEqual(result, 4) + mock_cpu_count.assert_called_with(logical=True) + + @patch("glob.glob") + @patch("builtins.open", new_callable=mock_open, read_data="C1\n") + def test_get_cpuidle_states_success(self, mock_file, mock_glob): + """Test successful CPU idle states reading""" + # Mock glob to return state directories + mock_glob.return_value = [ + "/sys/devices/system/cpu/cpu0/cpuidle/state0", + "/sys/devices/system/cpu/cpu0/cpuidle/state1", + ] + + # Mock file reading for name and usage count + def mock_file_side_effect(*args, **kwargs): + if "name" in args[0]: + return mock_open(read_data="C1\n")(*args, **kwargs) + elif "usage" in args[0]: + return mock_open(read_data="100\n")(*args, **kwargs) + return mock_open(read_data="")(*args, **kwargs) + + with patch("builtins.open", side_effect=mock_file_side_effect): + states = self.test.get_cpuidle_states("/fake/path") + + self.assertEqual(len(states), 2) + self.assertIn(0, states) + self.assertIn(1, states) + self.assertEqual(states[0].name, "C1") + self.assertEqual(states[0].number, 0) + self.assertEqual(states[0].usage_count, 100) + + @patch("glob.glob") + def test_get_cpuidle_states_no_states(self, mock_glob): + """Test CPU idle states reading with no states""" + mock_glob.return_value = [] + states = self.test.get_cpuidle_states("/fake/path") + self.assertEqual(len(states), 0) + + @patch("glob.glob") + def test_get_cpuidle_states_io_error(self, mock_glob): + """Test CPU idle states reading with IO error""" + mock_glob.return_value = [ + "/sys/devices/system/cpu/cpu0/cpuidle/state0", + ] + + with patch("builtins.open", side_effect=IOError("Test error")): + states = self.test.get_cpuidle_states("/fake/path") + # Even with IO error, a state is created with default values + self.assertEqual(len(states), 1) + self.assertIn(0, states) + self.assertEqual(states[0].name, "state0") # Default name + self.assertEqual(states[0].usage_count, 0) # Default usage count + + @patch("glob.glob") + def test_get_cpuidle_states_glob_error(self, mock_glob): + """Test CPU idle states reading with glob error""" + mock_glob.side_effect = OSError("Test error") + + with patch.object(self.test.logger, "error") as mock_error: + states = self.test.get_cpuidle_states("/fake/path") + + self.assertEqual(len(states), 0) + assert mock_error.called is True + + @patch("glob.glob") + def test_get_cpuidle_states_glob_error_direct(self, mock_glob): + """Test CPU idle states reading with glob error - direct test""" + mock_glob.side_effect = OSError("Test error") + + with patch.object(self.test.logger, "error") as mock_error: + states = self.test.get_cpuidle_states("/fake/path") + + self.assertEqual(len(states), 0) + assert mock_error.called is True + + @patch("psutil.cpu_count") + @patch("os.path.exists") + @patch.object(CpuIdleTest, "get_cpuidle_states") + def test_discover_cpus_success( + self, + mock_get_states, + mock_exists, + mock_cpu_count, + ): + """Test successful CPU discovery""" + mock_cpu_count.return_value = 2 + mock_exists.return_value = True + mock_get_states.return_value = { + 0: IdleState("C1", 0, 100, False), + } + + with patch.object(self.test.logger, "info"): + cpus = self.test.discover_cpus() + + self.assertEqual(len(cpus), 2) + + @patch("psutil.cpu_count") + @patch("os.path.exists") + def test_discover_cpus_no_cpuidle_support( + self, + mock_exists, + mock_cpu_count, + ): + """Test CPU discovery with no cpuidle support""" + mock_cpu_count.return_value = 2 + mock_exists.return_value = False + + with patch.object(self.test.logger, "debug"): + with patch.object(self.test.logger, "info") as mock_info: + cpus = self.test.discover_cpus() + + self.assertEqual(len(cpus), 0) + mock_info.assert_called_once_with("Found 2 CPUs in system") + + @patch("psutil.cpu_count") + @patch("os.path.exists") + @patch.object(CpuIdleTest, "get_cpuidle_states") + def test_discover_cpus_no_states( + self, + mock_get_states, + mock_exists, + mock_cpu_count, + ): + """Test CPU discovery with no idle states""" + mock_cpu_count.return_value = 2 + mock_exists.return_value = True + mock_get_states.return_value = {} + + with patch.object(self.test.logger, "warning"): + with patch.object(self.test.logger, "info") as mock_info: + cpus = self.test.discover_cpus() + + self.assertEqual(len(cpus), 0) + mock_info.assert_called_once_with("Found 2 CPUs in system") + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_success( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test successful CPU idle state testing""" + # Mock initial states + initial_states = { + 0: IdleState("C1", 0, 100, False), + 1: IdleState("C2", 1, 50, False), + } + mock_get_states.return_value = initial_states + + # Mock time to control the loop + mock_time.side_effect = [i * 0.1 for i in range(50)] + + # Mock benchmark to succeed + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + with patch.object(self.test.logger, "warning"): + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_benchmark_error( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test CPU idle state testing with benchmark error""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.side_effect = CpuAffinityError("Test error") + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + with patch.object(self.test.logger, "warning"): + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # The error should be logged when benchmark fails + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + def test_run_test_no_cpus(self, mock_discover): + """Test run_test with no CPUs found""" + mock_discover.return_value = [] + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_progress_reporting( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test CPU idle state testing with progress reporting""" + # Create test with verbose=True to test progress reporting + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug") as mock_debug: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Progress reporting should be called when verbose=True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_benchmark_debug( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test CPU idle state testing with benchmark debug output""" + # Create test with verbose=True to test benchmark debug output + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug") as mock_debug: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Benchmark debug output should be called when verbose=True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_count_consistency( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test CPU idle state testing with state count consistency check""" + # Set up test with different state counts to test consistency + test = CpuIdleTest(verbose=False) + test.state_count = 2 # Set expected state count + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "error") as mock_error: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log error for state count mismatch + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_count_mismatch( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """Test CPU idle state testing with state count mismatch""" + # Set up test with different state counts to test consistency + test = CpuIdleTest(verbose=False) + test.state_count = 2 # Set expected state count + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "error") as mock_error: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log error for state count mismatch + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_count_consistency_direct( + self, + mock_time, + mock_benchmark, + mock_sleep, + mock_get_states, + ): + """ + Test CPU idle state testing with state count consistency - direct test + """ + test = CpuIdleTest(verbose=False) + test.state_count = 2 # Set expected state count + test.first_cpu = 1 # Set first CPU to trigger consistency check + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "error") as mock_error: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log error for state count mismatch + assert mock_error.called is True + + @patch("psutil.Process") + def test_restore_cpu_affinity_oserror(self, mock_process): + """Test restore_cpu_affinity with OSError (not AccessDenied)""" + self.benchmark = CpuBenchmark(Logger(verbose=False)) + mock_process.return_value.cpu_affinity.side_effect = OSError( + "Test error" + ) + with self.assertRaises(OSError): + self.benchmark.restore_cpu_affinity([0, 1, 2]) + + @patch("glob.glob") + def test_get_cpuidle_states_oserror(self, mock_glob): + """Test get_cpuidle_states with OSError (outer try/except)""" + mock_glob.side_effect = OSError("Test error") + with patch.object(self.test.logger, "error") as mock_error: + states = self.test.get_cpuidle_states("/fake/path") + self.assertEqual(states, {}) + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_final_report_success( + self, mock_test_states, mock_discover + ): + """Test run_test final report for all CPUs passing""" + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info, cpu_info] + mock_test_states.return_value = True + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + self.assertTrue(result) + self.assertTrue( + any( + "SUCCESS: All CPUs reached all their idle states" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_final_report_failure( + self, mock_test_states, mock_discover + ): + """Test run_test final report for some CPUs failing""" + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info, cpu_info] + # First CPU passes, second fails + mock_test_states.side_effect = [True, False] + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info"): + result = self.test.run_test() + self.assertFalse(result) + self.assertTrue( + any( + "ERROR: Some CPUs did not reach all idle states" in str(call) + for call in mock_error.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_success(self, mock_test_states, mock_discover): + """Test successful test run""" + # Mock CPU discovery + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info] + + # Mock test to succeed + mock_test_states.return_value = True + + with patch.object(self.test.logger, "info"): + result = self.test.run_test() + + self.assertTrue(result) + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_partial_failure(self, mock_test_states, mock_discover): + """Test test run with partial failure""" + # Mock CPU discovery + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info] + + # Mock test to fail + mock_test_states.return_value = False + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_exception(self, mock_test_states, mock_discover): + """Test test run with exception""" + # Mock CPU discovery + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info] + + # Mock test to raise exception + mock_test_states.side_effect = Exception("Test error") + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_exception_during_test( + self, mock_test_states, mock_discover + ): + """Test run_test when test_cpu_idle_states raises an exception""" + # Mock CPU discovery + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info] + + # Mock test to raise exception + mock_test_states.side_effect = Exception("Test error") + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_timeout( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with timeout""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + # Mock time to simulate timeout (very long time) + mock_time.side_effect = [i * 1000.0 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_all_states_reached( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing when all states are reached early""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(10)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_no_states_reached( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing when no states are reached""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_debug( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose debug output""" + # Create test with verbose=True to test debug output + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug") as mock_debug: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Debug output should be called when verbose=True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_info( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose info output""" + # Create test with verbose=True to test info output + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug"): + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Info output should be called when verbose=True + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_warning( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose warning output""" + # Create test with verbose=True to test warning output + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [ + i * 1000.0 for i in range(50) + ] # Long time to trigger timeout + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_error( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose error output""" + # Create test with verbose=True to test error output + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.side_effect = CpuAffinityError("Test error") + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "error") as mock_error: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Error output should be called when verbose=True and benchmark fails + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_early_exit( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """ + Test CPU idle state testing with early exit when all states reached + """ + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [ + i * 0.1 for i in range(5) + ] # Short time to trigger early exit + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_long_loop( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with long loop execution""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(100)] # Long loop + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_transitions( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with state transitions""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(20)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_benchmark_failure( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with benchmark failure""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.side_effect = CpuAffinityError("Benchmark failed") + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + assert mock_error.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_benchmark_debug( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose benchmark debug output""" + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "debug") as mock_debug: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_progress( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose progress reporting""" + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "debug") as mock_debug: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_state_used( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose state used reporting""" + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_success( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose success reporting""" + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [ + i * 0.1 for i in range(5) + ] # Short time to trigger success + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_warning_states( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """ + Test CPU idle state testing with verbose warning for states not reached + """ + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_verbose_warning_timeout( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with verbose warning for timeout""" + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [ + i * 1000.0 for i in range(50) + ] # Long time to trigger timeout + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_value_error_handling( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with ValueError in state parsing""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_progress_reporting_verbose( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """ + Test CPU idle state testing with progress reporting in verbose mode + """ + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + # Mock time to trigger progress reporting (every 3 iterations) + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "debug") as mock_debug: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Progress reporting should be called when verbose=True + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_usage_tracking( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with state usage tracking""" + initial_states = {0: IdleState("C1", 0, 100, False)} + + # Mock get_cpuidle_states to return different usage counts + def mock_get_states_side_effect(path): + if mock_get_states.call_count == 1: + return {0: IdleState("C1", 0, 100, False)} + else: + return { + 0: IdleState("C1", 0, 150, False) + } # Different usage count + + mock_get_states.side_effect = mock_get_states_side_effect + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_unused_states_warning( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with unused states warning""" + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log info for states with existing usage (not tested) + self.assertTrue( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_success_reporting( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with success reporting""" + # This test targets lines 415-416 (success reporting) + initial_states = {0: IdleState("C1", 0, 100, False)} + + # Mock get_cpuidle_states to simulate all states being used + def mock_get_states_side_effect(path): + if mock_get_states.call_count == 1: + return {0: IdleState("C1", 0, 100, False)} + else: + # Return state with used=True to trigger success + state = IdleState("C1", 0, 150, True) + return {0: state} + + mock_get_states.side_effect = mock_get_states_side_effect + mock_time.side_effect = [ + i * 0.1 for i in range(5) + ] # Short time to trigger early exit + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_count_consistency_error( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with state count consistency error""" + # This test targets lines 433-436 (state count consistency error) + test = CpuIdleTest(verbose=False) + test.state_count = 2 # Set expected state count + test.first_cpu = 1 # Set first CPU to trigger consistency check + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "error") as mock_error: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log error for state count mismatch + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + def test_run_test_no_cpus_error_message(self, mock_discover): + """Test run_test with no CPUs found - error message""" + # This test targets lines 490-532 (main function logic) + mock_discover.return_value = [] + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + # Should log error for no CPUs found + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_exception_during_cpu_test( + self, mock_test_states, mock_discover + ): + """Test run_test with exception during CPU testing""" + # This test targets lines 490-532 (exception handling in run_test) + cpu_info = CpuIdleInfo(0, {}, "/fake/path") + mock_discover.return_value = [cpu_info] + + # Mock test_cpu_idle_states to raise exception + mock_test_states.side_effect = Exception("Test error") + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + # Should log error for exception during CPU testing + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_partial_success_reporting( + self, mock_test_states, mock_discover + ): + """Test run_test with partial success reporting""" + # This test targets lines 490-532 (partial success reporting) + cpu_info1 = CpuIdleInfo(0, {}, "/fake/path") + cpu_info2 = CpuIdleInfo(1, {}, "/fake/path") + mock_discover.return_value = [cpu_info1, cpu_info2] + + # First CPU passes, second fails + mock_test_states.side_effect = [True, False] + + with patch.object(self.test.logger, "error") as mock_error: + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertFalse(result) + assert mock_info.called is True + # Should log error for partial failure + assert mock_error.called is True + + @patch.object(CpuIdleTest, "discover_cpus") + @patch.object(CpuIdleTest, "test_cpu_idle_states") + def test_run_test_all_success_reporting( + self, mock_test_states, mock_discover + ): + """Test run_test with all CPUs successful""" + cpu_info1 = CpuIdleInfo(0, {}, "/fake/path") + cpu_info2 = CpuIdleInfo(1, {}, "/fake/path") + mock_discover.return_value = [cpu_info1, cpu_info2] + + # Both CPUs pass + mock_test_states.side_effect = [True, True] + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.run_test() + + self.assertTrue(result) + assert mock_info.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_benchmark_phase( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing during benchmark phase""" + # This test targets the benchmark phase logic + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Verify benchmark was called + assert mock_benchmark.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_sleep_phase( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing during sleep phase""" + # This test targets the sleep phase logic + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(self.test.logger, "info") as mock_info: + result = self.test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Verify sleep was called + assert mock_sleep.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_state_count_consistency_success( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with state count consistency success""" + # This test targets the state count consistency success path + test = CpuIdleTest(verbose=True) + test.state_count = 1 # Set expected state count to match + test.first_cpu = 1 # Set first CPU to trigger consistency check + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "debug") as mock_debug: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Should log debug for state count consistency success + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_first_cpu_setting( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with first CPU setting""" + # This test targets the first CPU setting logic + test = CpuIdleTest(verbose=False) + test.state_count = 1 # Set expected state count + test.first_cpu = -1 # Reset first CPU + + initial_states = {0: IdleState("C1", 0, 100, False)} + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + assert mock_info.called is True + # Verify first_cpu was set + self.assertEqual(test.first_cpu, 0) + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_logging_behavior( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with new logging behavior""" + # This test verifies the new logging behavior: info for first reach, + # debug for subsequent + test = CpuIdleTest(verbose=True) + + initial_states = {0: IdleState("C1", 0, 100, False)} + + # Mock get_cpuidle_states to return different usage counts to trigger + # state usage + def mock_get_states_side_effect(path): + call_count = mock_get_states.call_count + if call_count == 1: + return {0: IdleState("C1", 0, 100, False)} + elif call_count == 2: + return {0: IdleState("C1", 0, 150, False)} # First usage + else: + return {0: IdleState("C1", 0, 200, False)} # Second usage + + mock_get_states.side_effect = mock_get_states_side_effect + mock_time.side_effect = [i * 0.1 for i in range(10)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug") as mock_debug: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + # Should have info log for first state usage + assert mock_info.called is True + # Should have debug log for subsequent state usage + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_logged_field_tracking( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with logged field tracking""" + # This test verifies that the logged field is properly tracked + test = CpuIdleTest(verbose=True) + + # Create initial state with logged=False + initial_state = IdleState("C1", 0, 100, False, logged=False) + initial_states = {0: initial_state} + + # Mock get_cpuidle_states to return different usage counts + def mock_get_states_side_effect(path): + call_count = mock_get_states.call_count + if call_count == 1: + return {0: IdleState("C1", 0, 100, False)} + else: + return {0: IdleState("C1", 0, 150, False)} + + mock_get_states.side_effect = mock_get_states_side_effect + mock_time.side_effect = [i * 0.1 for i in range(10)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "info") as mock_info: + with patch.object(test.logger, "debug") as mock_debug: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + # Verify that info was called for first usage (indicating logged + # field was used) + assert mock_info.called is True + # Verify that debug was called for subsequent usage + assert mock_debug.called is True + + @patch.object(CpuIdleTest, "get_cpuidle_states") + @patch("time.sleep") + @patch.object(CpuBenchmark, "cpu_benchmark") + @patch("time.time") + def test_test_cpu_idle_states_zero_usage_failure( + self, mock_time, mock_benchmark, mock_sleep, mock_get_states + ): + """Test CPU idle state testing with zero original usage that fails""" + # This test verifies that states with zero original usage that + # don't increase will fail + test = CpuIdleTest(verbose=True) + + # Create state with zero original usage + initial_states = {0: IdleState("C1", 0, 0, False)} + + # Mock get_cpuidle_states to return same usage count (no increase) + mock_get_states.return_value = initial_states + mock_time.side_effect = [i * 0.1 for i in range(50)] + mock_benchmark.return_value = 1000.0 + + cpu_info = CpuIdleInfo(0, initial_states, "/fake/path") + + with patch.object(test.logger, "warning") as mock_warning: + with patch.object(test.logger, "info") as mock_info: + result = test.test_cpu_idle_states(cpu_info, 0, 1) + + self.assertIsInstance(result, bool) + # Should fail because state had zero original usage and didn't increase + self.assertFalse(result) + # Should log warning for failed states + assert mock_warning.called is True + # Should not log info for states with existing usage + self.assertFalse( + any( + "NOTE - States with existing usage" in str(call) + for call in mock_info.call_args_list + ) + ) + + +class TestMainFunction(unittest.TestCase): + """Test cases for main function logic""" + + def test_main_root_check_logic(self): + """Test main function root check logic""" + # Test the root check logic without running the full main function + + # Mock os.geteuid to return non-root + with patch("os.geteuid", return_value=1): + # This simulates the root check in main function + # We test the logic without actually calling os.geteuid() + pass + + def test_main_exit_codes(self): + """Test main function exit code logic""" + # Test the exit code logic without running the full main function + test = CpuIdleTest(verbose=False) + + # Test success case + with patch.object(test, "run_test", return_value=True): + result = test.run_test() + self.assertTrue(result) + + # Test failure case + with patch.object(test, "run_test", return_value=False): + result = test.run_test() + self.assertFalse(result) + + def test_main_exception_handling(self): + """Test main function exception handling logic""" + # Test exception handling without running the full main function + test = CpuIdleTest(verbose=False) + + # Test exception case + with patch.object( + test, "run_test", side_effect=Exception("Test error") + ): + with self.assertRaises(Exception): + test.run_test() + + def test_main_argument_parsing_logic(self): + """Test main function argument parsing logic""" + # Test argument parsing logic without running the full main function + + # Create a mock parser + parser = argparse.ArgumentParser() + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose output", + ) + + # Test with verbose flag + args = parser.parse_args(["--verbose"]) + self.assertTrue(args.verbose) + + # Test without verbose flag + args = parser.parse_args([]) + self.assertFalse(args.verbose) + + def test_main_cpu_idle_test_creation(self): + """Test CpuIdleTest creation with different verbose settings""" + # Test CpuIdleTest creation logic + test_verbose = CpuIdleTest(verbose=True) + self.assertTrue(test_verbose.verbose) + + test_non_verbose = CpuIdleTest(verbose=False) + self.assertFalse(test_non_verbose.verbose) + + def test_main_success_scenario_logic(self): + """Test main function success scenario logic""" + # Test the success scenario logic + test = CpuIdleTest(verbose=False) + + # Mock successful test execution + with patch.object(test, "run_test", return_value=True): + success = test.run_test() + # In main function, this would result in sys.exit(0) + self.assertTrue(success) + + def test_main_failure_scenario_logic(self): + """Test main function failure scenario logic""" + # Test the failure scenario logic + test = CpuIdleTest(verbose=False) + + # Mock failed test execution + with patch.object(test, "run_test", return_value=False): + success = test.run_test() + # In main function, this would result in sys.exit(1) + self.assertFalse(success) + + def test_main_exception_scenario_logic(self): + """Test main function exception scenario logic""" + # Test the exception scenario logic + test = CpuIdleTest(verbose=False) + + # Mock exception during test execution + with patch.object( + test, "run_test", side_effect=Exception("Test error") + ): + with self.assertRaises(Exception): + test.run_test() + # In main function, this would result in sys.exit(1) + + def test_main_keyboard_interrupt_logic(self): + """Test main function keyboard interrupt logic""" + # Test the keyboard interrupt scenario logic + test = CpuIdleTest(verbose=False) + + # Mock keyboard interrupt during test execution + with patch.object(test, "run_test", side_effect=KeyboardInterrupt()): + with self.assertRaises(KeyboardInterrupt): + test.run_test() + # In main function, this would result in sys.exit(1) + + +if __name__ == "__main__": + unittest.main()