From 0b7e9887429c5fced789487f6dad61b9b9534397 Mon Sep 17 00:00:00 2001 From: deltragon Date: Wed, 23 Jul 2025 21:14:09 +0200 Subject: [PATCH 1/7] Config: split loading out of constructor --- safeeyes/__main__.py | 2 +- safeeyes/model.py | 99 ++++++++++++++++++----------- safeeyes/tests/test_model.py | 118 +++++++++++++++++++---------------- 3 files changed, 127 insertions(+), 92 deletions(-) diff --git a/safeeyes/__main__.py b/safeeyes/__main__.py index e8de3724..fe5c9e28 100755 --- a/safeeyes/__main__.py +++ b/safeeyes/__main__.py @@ -112,7 +112,7 @@ def main(): # Initialize the logging utility.initialize_logging(args.debug) utility.initialize_platform() - config = Config() + config = Config.load() utility.cleanup_old_user_stylesheet() if __running(): diff --git a/safeeyes/model.py b/safeeyes/model.py index b19080f1..fd5ccfff 100644 --- a/safeeyes/model.py +++ b/safeeyes/model.py @@ -20,11 +20,13 @@ plugins. """ +import copy import logging import random from enum import Enum from dataclasses import dataclass from typing import Optional, Union +import typing from packaging.version import parse @@ -297,45 +299,63 @@ def fire(self, *args, **keywargs): class Config: """The configuration of Safe Eyes.""" - def __init__(self, init=True): + __user_config: dict[str, typing.Any] + __system_config: dict[str, typing.Any] + + @classmethod + def load(cls) -> "Config": # Read the config files - self.__user_config = utility.load_json(utility.CONFIG_FILE_PATH) - self.__system_config = utility.load_json(utility.SYSTEM_CONFIG_FILE_PATH) + user_config = utility.load_json(utility.CONFIG_FILE_PATH) + system_config = utility.load_json(utility.SYSTEM_CONFIG_FILE_PATH) # If there any breaking changes in long_breaks, short_breaks or any other keys, - # use the __force_upgrade list - self.__force_upgrade = [] - # self.__force_upgrade = ['long_breaks', 'short_breaks'] - - if init: - # if create_startup_entry finds a broken autostart symlink, it will repair - # it - utility.create_startup_entry(force=False) - if self.__user_config is None: - utility.initialize_safeeyes() - self.__user_config = self.__system_config - self.save() + # use the force_upgrade_keys list + force_upgrade_keys: list[str] = [] + # force_upgrade_keys = ['long_breaks', 'short_breaks'] + + # if create_startup_entry finds a broken autostart symlink, it will repair + # it + utility.create_startup_entry(force=False) + if user_config is None: + utility.initialize_safeeyes() + user_config = copy.deepcopy(system_config) + cfg = cls(user_config, system_config) + cfg.save() + return cfg + else: + system_config_version = system_config["meta"]["config_version"] + meta_obj = user_config.get("meta", None) + if meta_obj is None: + # Corrupted user config + user_config = copy.deepcopy(system_config) else: - system_config_version = self.__system_config["meta"]["config_version"] - meta_obj = self.__user_config.get("meta", None) - if meta_obj is None: - # Corrupted user config - self.__user_config = self.__system_config - else: - user_config_version = str(meta_obj.get("config_version", "0.0.0")) - if parse(user_config_version) != parse(system_config_version): - # Update the user config - self.__merge_dictionary( - self.__user_config, self.__system_config - ) - self.__user_config = self.__system_config - - utility.merge_plugins(self.__user_config) - self.save() - - def __merge_dictionary(self, old_dict, new_dict): + user_config_version = str(meta_obj.get("config_version", "0.0.0")) + if parse(user_config_version) != parse(system_config_version): + # Update the user config + new_user_config = copy.deepcopy(system_config) + cls.__merge_dictionary( + user_config, new_user_config, force_upgrade_keys + ) + user_config = new_user_config + + utility.merge_plugins(user_config) + + cfg = cls(user_config, system_config) + cfg.save() + return cfg + + def __init__( + self, + user_config: dict[str, typing.Any], + system_config: dict[str, typing.Any], + ): + self.__user_config = user_config + self.__system_config = system_config + + @classmethod + def __merge_dictionary(cls, old_dict, new_dict, force_upgrade_keys: list[str]): """Merge the dictionaries.""" for key in new_dict: - if key == "meta" or key in self.__force_upgrade: + if key == "meta" or key in force_upgrade_keys: continue if key in old_dict: new_value = new_dict[key] @@ -343,15 +363,18 @@ def __merge_dictionary(self, old_dict, new_dict): if type(new_value) is type(old_value): # Both properties have same type if isinstance(new_value, dict): - self.__merge_dictionary(old_value, new_value) + cls.__merge_dictionary(old_value, new_value, force_upgrade_keys) else: new_dict[key] = old_value - def clone(self): - config = Config(init=False) + def clone(self) -> "Config": + config = Config( + user_config=copy.deepcopy(self.__user_config), + system_config=self.__system_config, + ) return config - def save(self): + def save(self) -> None: """Save the configuration to file.""" utility.write_json(utility.CONFIG_FILE_PATH, self.__user_config) diff --git a/safeeyes/tests/test_model.py b/safeeyes/tests/test_model.py index 7ff82818..089321ef 100644 --- a/safeeyes/tests/test_model.py +++ b/safeeyes/tests/test_model.py @@ -52,15 +52,18 @@ def test_break_long(self) -> None: class TestBreakQueue: def test_create_empty(self) -> None: - config = { - "short_breaks": [], - "long_breaks": [], - "short_break_interval": 15, - "long_break_interval": 75, - "long_break_duration": 60, - "short_break_duration": 15, - "random_order": False, - } + config = model.Config( + user_config={ + "short_breaks": [], + "long_breaks": [], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": False, + }, + system_config={}, + ) context: dict[str, typing.Any] = {} @@ -82,19 +85,22 @@ def get_bq_only_short( model, "_", lambda message: "translated!: " + message, raising=False ) - config = { - "short_breaks": [ - {"name": "break 1"}, - {"name": "break 2"}, - {"name": "break 3"}, - ], - "long_breaks": [], - "short_break_interval": 15, - "long_break_interval": 75, - "long_break_duration": 60, - "short_break_duration": 15, - "random_order": random_seed is not None, - } + config = model.Config( + user_config={ + "short_breaks": [ + {"name": "break 1"}, + {"name": "break 2"}, + {"name": "break 3"}, + ], + "long_breaks": [], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + }, + system_config={}, + ) context: dict[str, typing.Any] = { "session": {}, @@ -112,19 +118,22 @@ def get_bq_only_long( model, "_", lambda message: "translated!: " + message, raising=False ) - config = { - "short_breaks": [], - "long_breaks": [ - {"name": "long break 1"}, - {"name": "long break 2"}, - {"name": "long break 3"}, - ], - "short_break_interval": 15, - "long_break_interval": 75, - "long_break_duration": 60, - "short_break_duration": 15, - "random_order": random_seed is not None, - } + config = model.Config( + user_config={ + "short_breaks": [], + "long_breaks": [ + {"name": "long break 1"}, + {"name": "long break 2"}, + {"name": "long break 3"}, + ], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + }, + system_config={}, + ) context: dict[str, typing.Any] = { "session": {}, @@ -142,24 +151,27 @@ def get_bq_full( model, "_", lambda message: "translated!: " + message, raising=False ) - config = { - "short_breaks": [ - {"name": "break 1"}, - {"name": "break 2"}, - {"name": "break 3"}, - {"name": "break 4"}, - ], - "long_breaks": [ - {"name": "long break 1"}, - {"name": "long break 2"}, - {"name": "long break 3"}, - ], - "short_break_interval": 15, - "long_break_interval": 75, - "long_break_duration": 60, - "short_break_duration": 15, - "random_order": random_seed is not None, - } + config = model.Config( + user_config={ + "short_breaks": [ + {"name": "break 1"}, + {"name": "break 2"}, + {"name": "break 3"}, + {"name": "break 4"}, + ], + "long_breaks": [ + {"name": "long break 1"}, + {"name": "long break 2"}, + {"name": "long break 3"}, + ], + "short_break_interval": 15, + "long_break_interval": 75, + "long_break_duration": 60, + "short_break_duration": 15, + "random_order": random_seed is not None, + }, + system_config={}, + ) context: dict[str, typing.Any] = { "session": {}, From c6732e10fd0e8f06270e80dd5712bd3dc205764e Mon Sep 17 00:00:00 2001 From: deltragon Date: Sun, 8 Jun 2025 13:57:29 +0200 Subject: [PATCH 2/7] remove dead code --- safeeyes/model.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/safeeyes/model.py b/safeeyes/model.py index fd5ccfff..772cd7f0 100644 --- a/safeeyes/model.py +++ b/safeeyes/model.py @@ -96,15 +96,6 @@ def __init__(self, config, context): self.__build_longs() self.__build_shorts() - # Interface guarantees that short_interval >= 1 - # And that long_interval is a multiple of short_interval - short_interval = config.get("short_break_interval") - long_interval = config.get("long_break_interval") - self.__cycle_len = int(long_interval / short_interval) - # To count every long break as a cycle in .next() if there are no short breaks - if self.__short_queue is None: - self.__cycle_len = 1 - # Restore the last break from session if not self.is_empty(): last_break = context["session"].get("break") From 7da4ecb75598b70d85e234a85785d1b814985766 Mon Sep 17 00:00:00 2001 From: deltragon Date: Wed, 4 Jun 2025 16:25:55 +0200 Subject: [PATCH 3/7] typing: add types to core and breakqueue --- safeeyes/core.py | 82 +++++++++++++----------- safeeyes/model.py | 120 ++++++++++++++++++++++++----------- safeeyes/tests/test_model.py | 11 ++-- 3 files changed, 136 insertions(+), 77 deletions(-) diff --git a/safeeyes/core.py b/safeeyes/core.py index 44a68efb..01fc5218 100644 --- a/safeeyes/core.py +++ b/safeeyes/core.py @@ -22,27 +22,30 @@ import logging import threading import time +import typing from safeeyes import utility from safeeyes.model import BreakType from safeeyes.model import BreakQueue from safeeyes.model import EventHook from safeeyes.model import State +from safeeyes.model import Config class SafeEyesCore: """Core of Safe Eyes runs the scheduler and notifies the breaks.""" - def __init__(self, context): + break_queue: BreakQueue + scheduled_next_break_time: typing.Optional[datetime.datetime] = None + scheduled_next_break_timestamp: int = -1 + running: bool = False + paused_time: float = -1 + postpone_duration: int = 0 + default_postpone_duration: int = 0 + pre_break_warning_time: int = 0 + + def __init__(self, context) -> None: """Create an instance of SafeEyesCore and initialize the variables.""" - self.break_queue = None - self.postpone_duration = 0 - self.default_postpone_duration = 0 - self.pre_break_warning_time = 0 - self.running = False - self.scheduled_next_break_timestamp = -1 - self.scheduled_next_break_time = None - self.paused_time = -1 # This event is fired before for a break self.on_pre_break = EventHook() # This event is fired just before the start of a break @@ -64,7 +67,7 @@ def __init__(self, context): self.context["postpone_button_disabled"] = False self.context["state"] = State.WAITING - def initialize(self, config): + def initialize(self, config: Config): """Initialize the internal properties from configuration.""" logging.info("Initialize the core") self.pre_break_warning_time = config.get("pre_break_warning_time") @@ -74,9 +77,10 @@ def initialize(self, config): ) # Convert to seconds self.postpone_duration = self.default_postpone_duration - def start(self, next_break_time=-1, reset_breaks=False): + def start(self, next_break_time=-1, reset_breaks=False) -> None: """Start Safe Eyes is it is not running already.""" if self.break_queue.is_empty(): + logging.info("No breaks defined, not starting the core") return with self.lock: if not self.running: @@ -89,7 +93,7 @@ def start(self, next_break_time=-1, reset_breaks=False): self.scheduled_next_break_timestamp = int(next_break_time) utility.start_thread(self.__scheduler_job) - def stop(self, is_resting=False): + def stop(self, is_resting=False) -> None: """Stop Safe Eyes if it is running.""" with self.lock: if not self.running: @@ -105,11 +109,11 @@ def stop(self, is_resting=False): self.waiting_condition.notify_all() self.waiting_condition.release() - def skip(self): + def skip(self) -> None: """User skipped the break using Skip button.""" self.context["skipped"] = True - def postpone(self, duration=-1): + def postpone(self, duration=-1) -> None: """User postponed the break using Postpone button.""" if duration > 0: self.postpone_duration = duration @@ -118,17 +122,19 @@ def postpone(self, duration=-1): logging.debug("Postpone the break for %d seconds", self.postpone_duration) self.context["postponed"] = True - def get_break_time(self, break_type=None): + def get_break_time( + self, break_type: typing.Optional[BreakType] = None + ) -> typing.Optional[datetime.datetime]: """Returns the next break time.""" - break_obj = self.break_queue.get_break(break_type) - if not break_obj: - return False + break_obj = self.break_queue.get_break_with_type(break_type) + if not break_obj or self.scheduled_next_break_time is None: + return None time = self.scheduled_next_break_time + datetime.timedelta( minutes=break_obj.time - self.break_queue.get_break().time ) return time - def take_break(self, break_type=None): + def take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Calling this method stops the scheduler and show the next break screen. """ @@ -138,14 +144,14 @@ def take_break(self, break_type=None): return utility.start_thread(self.__take_break, break_type=break_type) - def has_breaks(self, break_type=None): + def has_breaks(self, break_type: typing.Optional[BreakType] = None) -> bool: """Check whether Safe Eyes has breaks or not. Use the break_type to check for either short or long break. """ return not self.break_queue.is_empty(break_type) - def __take_break(self, break_type=None): + def __take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Show the next break screen.""" logging.info("Take a break due to external request") @@ -167,7 +173,7 @@ def __take_break(self, break_type=None): self.break_queue.next(break_type) utility.execute_main_thread(self.__fire_start_break) - def __scheduler_job(self): + def __scheduler_job(self) -> None: """Scheduler task to execute during every interval.""" if not self.running: return @@ -179,10 +185,8 @@ def __scheduler_job(self): # Safe Eyes was resting paused_duration = int(current_timestamp - self.paused_time) self.paused_time = -1 - if ( - paused_duration - > self.break_queue.get_break(BreakType.LONG_BREAK).duration - ): + next_long = self.break_queue.get_break_with_type(BreakType.LONG_BREAK) + if next_long is not None and paused_duration > next_long.duration: logging.info( "Skip next long break due to the pause %ds longer than break" " duration", @@ -221,14 +225,16 @@ def __scheduler_job(self): logging.info("Pre-break waiting is over") if not self.running: - return + # This can be reached if another thread changed running while __wait_for was + # blocking + return # type: ignore[unreachable] utility.execute_main_thread(self.__fire_pre_break) - def __fire_on_update_next_break(self, next_break_time): + def __fire_on_update_next_break(self, next_break_time: datetime.datetime) -> None: """Pass the next break information to the registered listeners.""" self.on_update_next_break.fire(self.break_queue.get_break(), next_break_time) - def __fire_pre_break(self): + def __fire_pre_break(self) -> None: """Show the notification and start the break after the notification.""" self.context["state"] = State.PRE_BREAK if not self.on_pre_break.fire(self.break_queue.get_break()): @@ -237,7 +243,7 @@ def __fire_pre_break(self): return utility.start_thread(self.__wait_until_prepare) - def __wait_until_prepare(self): + def __wait_until_prepare(self) -> None: logging.info( "Wait for %d seconds before the break", self.pre_break_warning_time ) @@ -247,11 +253,11 @@ def __wait_until_prepare(self): return utility.execute_main_thread(self.__fire_start_break) - def __postpone_break(self): + def __postpone_break(self) -> None: self.__wait_for(self.postpone_duration) utility.execute_main_thread(self.__fire_start_break) - def __fire_start_break(self): + def __fire_start_break(self) -> None: break_obj = self.break_queue.get_break() # Show the break screen if not self.on_start_break.fire(break_obj): @@ -261,6 +267,10 @@ def __fire_start_break(self): if self.context["postponed"]: # Plugins want to postpone this break self.context["postponed"] = False + + if self.scheduled_next_break_time is None: + raise Exception("this should never happen") + # Update the next break time self.scheduled_next_break_time = ( self.scheduled_next_break_time @@ -273,7 +283,7 @@ def __fire_start_break(self): self.start_break.fire(break_obj) utility.start_thread(self.__start_break) - def __start_break(self): + def __start_break(self) -> None: """Start the break screen.""" self.context["state"] = State.BREAK break_obj = self.break_queue.get_break() @@ -292,7 +302,7 @@ def __start_break(self): countdown -= 1 utility.execute_main_thread(self.__fire_stop_break) - def __fire_stop_break(self): + def __fire_stop_break(self) -> None: # Loop terminated because of timeout (not skipped) -> Close the break alert if not self.context["skipped"] and not self.context["postponed"]: logging.info("Break is terminated automatically") @@ -304,13 +314,13 @@ def __fire_stop_break(self): self.context["postpone_button_disabled"] = False self.__start_next_break() - def __wait_for(self, duration): + def __wait_for(self, duration: int) -> None: """Wait until someone wake up or the timeout happens.""" self.waiting_condition.acquire() self.waiting_condition.wait(duration) self.waiting_condition.release() - def __start_next_break(self): + def __start_next_break(self) -> None: if not self.context["postponed"]: self.break_queue.next() diff --git a/safeeyes/model.py b/safeeyes/model.py index 772cd7f0..94281b35 100644 --- a/safeeyes/model.py +++ b/safeeyes/model.py @@ -39,35 +39,56 @@ from safeeyes.translations import translate as _ +class BreakType(Enum): + """Type of Safe Eyes breaks.""" + + SHORT_BREAK = 1 + LONG_BREAK = 2 + + class Break: """An entity class which represents a break.""" - def __init__(self, break_type, name, time, duration, image, plugins): + type: BreakType + name: str + time: int + duration: int + image: typing.Optional[str] # path + plugins: dict + + def __init__( + self, + break_type: BreakType, + name: str, + time: int, + duration: int, + image: typing.Optional[str], + plugins: dict, + ): self.type = break_type self.name = name self.duration = duration self.image = image self.plugins = plugins self.time = time - self.next = None - def __str__(self): + def __str__(self) -> str: return 'Break: {{name: "{}", type: {}, duration: {}}}\n'.format( self.name, self.type, self.duration ) - def __repr__(self): + def __repr__(self) -> str: return str(self) - def is_long_break(self): + def is_long_break(self) -> bool: """Check whether this break is a long break.""" return self.type == BreakType.LONG_BREAK - def is_short_break(self): + def is_short_break(self) -> bool: """Check whether this break is a short break.""" return self.type == BreakType.SHORT_BREAK - def plugin_enabled(self, plugin_id, is_plugin_enabled): + def plugin_enabled(self, plugin_id: str, is_plugin_enabled: bool) -> bool: """Check whether this break supports the given plugin.""" if self.plugins: return plugin_id in self.plugins @@ -75,19 +96,18 @@ def plugin_enabled(self, plugin_id, is_plugin_enabled): return is_plugin_enabled -class BreakType(Enum): - """Type of Safe Eyes breaks.""" - - SHORT_BREAK = 1 - LONG_BREAK = 2 - - class BreakQueue: - def __init__(self, config, context): + __current_break: typing.Optional[Break] = None + __current_long: int = 0 + __current_short: int = 0 + __short_break_time: int + __long_break_time: int + __is_random_order: bool + __long_queue: typing.Optional[list[Break]] + __short_queue: typing.Optional[list[Break]] + + def __init__(self, config: "Config", context) -> None: self.context = context - self.__current_break = None - self.__current_long = 0 - self.__current_short = 0 self.__short_break_time = config.get("short_break_interval") self.__long_break_time = config.get("long_break_interval") self.__is_random_order = config.get("random_order") @@ -106,7 +126,15 @@ def __init__(self, config, context): while brk != current_break and brk.name != last_break: brk = self.next() - def get_break(self, break_type=None): + def get_break(self) -> Break: + if self.__current_break is None: + self.__current_break = self.next() + + return self.__current_break + + def get_break_with_type( + self, break_type: typing.Optional[BreakType] = None + ) -> typing.Optional[Break]: if self.__current_break is None: self.__current_break = self.next() @@ -122,32 +150,38 @@ def get_break(self, break_type=None): return None return self.__short_queue[self.__current_short] - def is_long_break(self): + def is_long_break(self) -> bool: return ( self.__current_break is not None and self.__current_break.type == BreakType.LONG_BREAK ) - def next(self, break_type=None): + def next(self, break_type: typing.Optional[BreakType] = None) -> Break: break_obj = None shorts = self.__short_queue longs = self.__long_queue # Reset break that has just ended if self.is_long_break(): - self.__current_break.time = self.__long_break_time + # __current_break is checked by is_long_break + self.__current_break.time = self.__long_break_time # type: ignore[union-attr] if self.__current_long == 0 and self.__is_random_order: # Shuffle queue self.__build_longs() elif self.__current_break: # Reduce the break time from the next long break (default) if longs: + if shorts is None: + raise Exception( + "this may not happen, either short or long breaks must be" + " defined" + ) longs[self.__current_long].time -= shorts[self.__current_short].time if self.__current_short == 0 and self.__is_random_order: self.__build_shorts() if self.is_empty(): - return None + raise Exception("this should never be called when the queue is empty") if shorts is None: break_obj = self.__next_long() @@ -166,14 +200,16 @@ def next(self, break_type=None): return break_obj - def reset(self): - for break_object in self.__short_queue: - break_object.time = self.__short_break_time + def reset(self) -> None: + if self.__short_queue: + for break_object in self.__short_queue: + break_object.time = self.__short_break_time - for break_object in self.__long_queue: - break_object.time = self.__long_break_time + if self.__long_queue: + for break_object in self.__long_queue: + break_object.time = self.__long_break_time - def is_empty(self, break_type=None): + def is_empty(self, break_type=None) -> bool: """Check if the given break type is empty or not. If the break_type is None, check for both short and long breaks. @@ -185,8 +221,12 @@ def is_empty(self, break_type=None): else: return self.__short_queue is None and self.__long_queue is None - def __next_short(self): + def __next_short(self) -> Break: shorts = self.__short_queue + + if shorts is None: + raise Exception("this may only be called when there are short breaks") + break_obj = shorts[self.__current_short] self.context["break_type"] = "short" @@ -195,8 +235,12 @@ def __next_short(self): return break_obj - def __next_long(self): + def __next_long(self) -> Break: longs = self.__long_queue + + if longs is None: + raise Exception("this may only be called when there are long breaks") + break_obj = longs[self.__current_long] self.context["break_type"] = "long" @@ -205,7 +249,9 @@ def __next_long(self): return break_obj - def __build_queue(self, break_type, break_configs, break_time, break_duration): + def __build_queue( + self, break_type, break_configs, break_time, break_duration + ) -> typing.Optional[list[Break]]: """Build a queue of breaks.""" size = len(break_configs) @@ -218,8 +264,8 @@ def __build_queue(self, break_type, break_configs, break_time, break_duration): else: breaks_order = break_configs - queue = [None] * size - for i, break_config in enumerate(breaks_order): + queue: list[Break] = [] + for break_config in breaks_order: name = _(break_config["name"]) duration = break_config.get("duration", break_duration) image = break_config.get("image") @@ -232,11 +278,11 @@ def __build_queue(self, break_type, break_configs, break_time, break_duration): continue break_obj = Break(break_type, name, interval, duration, image, plugins) - queue[i] = break_obj + queue.append(break_obj) return queue - def __build_shorts(self): + def __build_shorts(self) -> None: self.__short_queue = self.__build_queue( BreakType.SHORT_BREAK, self.__config.get("short_breaks"), @@ -244,7 +290,7 @@ def __build_shorts(self): self.__config.get("short_break_duration"), ) - def __build_longs(self): + def __build_longs(self) -> None: self.__long_queue = self.__build_queue( BreakType.LONG_BREAK, self.__config.get("long_breaks"), diff --git a/safeeyes/tests/test_model.py b/safeeyes/tests/test_model.py index 089321ef..1dc47b19 100644 --- a/safeeyes/tests/test_model.py +++ b/safeeyes/tests/test_model.py @@ -30,7 +30,7 @@ def test_break_short(self) -> None: time=15, duration=15, image=None, - plugins=None, + plugins={}, ) assert b.is_short_break() @@ -43,7 +43,7 @@ def test_break_long(self) -> None: time=75, duration=60, image=None, - plugins=None, + plugins={}, ) assert not b.is_short_break() @@ -72,8 +72,11 @@ def test_create_empty(self) -> None: assert bq.is_empty() assert bq.is_empty(model.BreakType.LONG_BREAK) assert bq.is_empty(model.BreakType.SHORT_BREAK) - assert bq.next() is None - assert bq.get_break() is None + + with pytest.raises(Exception, match="this should never be called"): + bq.next() + with pytest.raises(Exception, match="this should never be called"): + bq.get_break() def get_bq_only_short( self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None From 455f92e621bfff6be819a35cd93956d2d6ec0a00 Mon Sep 17 00:00:00 2001 From: deltragon Date: Sun, 8 Jun 2025 14:01:08 +0200 Subject: [PATCH 4/7] core: make break_queue protected --- safeeyes/core.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/safeeyes/core.py b/safeeyes/core.py index 01fc5218..784d1f17 100644 --- a/safeeyes/core.py +++ b/safeeyes/core.py @@ -35,7 +35,6 @@ class SafeEyesCore: """Core of Safe Eyes runs the scheduler and notifies the breaks.""" - break_queue: BreakQueue scheduled_next_break_time: typing.Optional[datetime.datetime] = None scheduled_next_break_timestamp: int = -1 running: bool = False @@ -44,6 +43,8 @@ class SafeEyesCore: default_postpone_duration: int = 0 pre_break_warning_time: int = 0 + _break_queue: BreakQueue + def __init__(self, context) -> None: """Create an instance of SafeEyesCore and initialize the variables.""" # This event is fired before for a break @@ -71,7 +72,7 @@ def initialize(self, config: Config): """Initialize the internal properties from configuration.""" logging.info("Initialize the core") self.pre_break_warning_time = config.get("pre_break_warning_time") - self.break_queue = BreakQueue(config, self.context) + self._break_queue = BreakQueue(config, self.context) self.default_postpone_duration = ( config.get("postpone_duration") * 60 ) # Convert to seconds @@ -79,7 +80,7 @@ def initialize(self, config: Config): def start(self, next_break_time=-1, reset_breaks=False) -> None: """Start Safe Eyes is it is not running already.""" - if self.break_queue.is_empty(): + if self._break_queue.is_empty(): logging.info("No breaks defined, not starting the core") return with self.lock: @@ -87,7 +88,7 @@ def start(self, next_break_time=-1, reset_breaks=False) -> None: logging.info("Start Safe Eyes core") if reset_breaks: logging.info("Reset breaks to start from the beginning") - self.break_queue.reset() + self._break_queue.reset() self.running = True self.scheduled_next_break_timestamp = int(next_break_time) @@ -126,11 +127,11 @@ def get_break_time( self, break_type: typing.Optional[BreakType] = None ) -> typing.Optional[datetime.datetime]: """Returns the next break time.""" - break_obj = self.break_queue.get_break_with_type(break_type) + break_obj = self._break_queue.get_break_with_type(break_type) if not break_obj or self.scheduled_next_break_time is None: return None time = self.scheduled_next_break_time + datetime.timedelta( - minutes=break_obj.time - self.break_queue.get_break().time + minutes=break_obj.time - self._break_queue.get_break().time ) return time @@ -138,7 +139,7 @@ def take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Calling this method stops the scheduler and show the next break screen. """ - if self.break_queue.is_empty(): + if self._break_queue.is_empty(): return if not self.context["state"] == State.WAITING: return @@ -149,7 +150,7 @@ def has_breaks(self, break_type: typing.Optional[BreakType] = None) -> bool: Use the break_type to check for either short or long break. """ - return not self.break_queue.is_empty(break_type) + return not self._break_queue.is_empty(break_type) def __take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Show the next break screen.""" @@ -169,8 +170,8 @@ def __take_break(self, break_type: typing.Optional[BreakType] = None) -> None: time.sleep(1) # Wait for 1 sec to ensure the scheduler is dead self.running = True - if break_type is not None and self.break_queue.get_break().type != break_type: - self.break_queue.next(break_type) + if break_type is not None and self._break_queue.get_break().type != break_type: + self._break_queue.next(break_type) utility.execute_main_thread(self.__fire_start_break) def __scheduler_job(self) -> None: @@ -185,7 +186,7 @@ def __scheduler_job(self) -> None: # Safe Eyes was resting paused_duration = int(current_timestamp - self.paused_time) self.paused_time = -1 - next_long = self.break_queue.get_break_with_type(BreakType.LONG_BREAK) + next_long = self._break_queue.get_break_with_type(BreakType.LONG_BREAK) if next_long is not None and paused_duration > next_long.duration: logging.info( "Skip next long break due to the pause %ds longer than break" @@ -193,7 +194,7 @@ def __scheduler_job(self) -> None: paused_duration, ) # Skip the next long break - self.break_queue.reset() + self._break_queue.reset() if self.context["postponed"]: # Previous break was postponed @@ -208,7 +209,7 @@ def __scheduler_job(self) -> None: self.scheduled_next_break_timestamp = -1 else: # Use next break, convert to seconds - time_to_wait = self.break_queue.get_break().time * 60 + time_to_wait = self._break_queue.get_break().time * 60 self.scheduled_next_break_time = current_time + datetime.timedelta( seconds=time_to_wait @@ -232,12 +233,12 @@ def __scheduler_job(self) -> None: def __fire_on_update_next_break(self, next_break_time: datetime.datetime) -> None: """Pass the next break information to the registered listeners.""" - self.on_update_next_break.fire(self.break_queue.get_break(), next_break_time) + self.on_update_next_break.fire(self._break_queue.get_break(), next_break_time) def __fire_pre_break(self) -> None: """Show the notification and start the break after the notification.""" self.context["state"] = State.PRE_BREAK - if not self.on_pre_break.fire(self.break_queue.get_break()): + if not self.on_pre_break.fire(self._break_queue.get_break()): # Plugins wanted to ignore this break self.__start_next_break() return @@ -258,7 +259,7 @@ def __postpone_break(self) -> None: utility.execute_main_thread(self.__fire_start_break) def __fire_start_break(self) -> None: - break_obj = self.break_queue.get_break() + break_obj = self._break_queue.get_break() # Show the break screen if not self.on_start_break.fire(break_obj): # Plugins want to ignore this break @@ -286,7 +287,7 @@ def __fire_start_break(self) -> None: def __start_break(self) -> None: """Start the break screen.""" self.context["state"] = State.BREAK - break_obj = self.break_queue.get_break() + break_obj = self._break_queue.get_break() countdown = break_obj.duration total_break_time = countdown @@ -322,7 +323,7 @@ def __wait_for(self, duration: int) -> None: def __start_next_break(self) -> None: if not self.context["postponed"]: - self.break_queue.next() + self._break_queue.next() if self.running: # Schedule the break again From 521906218b0883951ec5929b5a2856dd3a62d865 Mon Sep 17 00:00:00 2001 From: deltragon Date: Thu, 24 Jul 2025 11:48:02 +0200 Subject: [PATCH 5/7] tests: make random break tests independent of actual order --- safeeyes/tests/test_model.py | 135 +++++++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 30 deletions(-) diff --git a/safeeyes/tests/test_model.py b/safeeyes/tests/test_model.py index 1dc47b19..3b4cdae0 100644 --- a/safeeyes/tests/test_model.py +++ b/safeeyes/tests/test_model.py @@ -223,13 +223,33 @@ def test_only_short_next_break_random( random_seed = 5 bq = self.get_bq_only_short(monkeypatch, random_seed) - next = bq.get_break() - assert next.name == "translated!: break 3" - - assert bq.next().name == "translated!: break 2" - assert bq.next().name == "translated!: break 1" - assert bq.next().name == "translated!: break 3" - assert bq.next().name == "translated!: break 1" + breaks = [] + breaks.append(bq.get_break().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) + + assert sorted(breaks) == [ + "translated!: break 1", + "translated!: break 2", + "translated!: break 3", + ] + + prev_breaks = breaks + + for i in range(3): + breaks = [] + breaks.append(bq.next().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) + + assert sorted(breaks) == [ + "translated!: break 1", + "translated!: break 2", + "translated!: break 3", + ] + + assert prev_breaks != breaks + prev_breaks = breaks def test_create_only_long(self, monkeypatch: pytest.MonkeyPatch) -> None: bq = self.get_bq_only_long(monkeypatch) @@ -270,13 +290,33 @@ def test_only_long_next_break_random(self, monkeypatch: pytest.MonkeyPatch) -> N random_seed = 5 bq = self.get_bq_only_long(monkeypatch, random_seed) - next = bq.get_break() - assert next.name == "translated!: long break 3" + breaks = [] + breaks.append(bq.get_break().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) - assert bq.next().name == "translated!: long break 2" - assert bq.next().name == "translated!: long break 1" - assert bq.next().name == "translated!: long break 3" - assert bq.next().name == "translated!: long break 1" + assert sorted(breaks) == [ + "translated!: long break 1", + "translated!: long break 2", + "translated!: long break 3", + ] + + prev_breaks = breaks + + for i in range(3): + breaks = [] + breaks.append(bq.next().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) + + assert sorted(breaks) == [ + "translated!: long break 1", + "translated!: long break 2", + "translated!: long break 3", + ] + + assert prev_breaks != breaks + prev_breaks = breaks def test_create_full(self, monkeypatch: pytest.MonkeyPatch) -> None: bq = self.get_bq_full(monkeypatch) @@ -346,20 +386,55 @@ def test_full_next_break_random(self, monkeypatch: pytest.MonkeyPatch) -> None: random_seed = 5 bq = self.get_bq_full(monkeypatch, random_seed) - next = bq.get_break() - assert next.name == "translated!: break 1" - - assert bq.next().name == "translated!: break 2" - assert bq.next().name == "translated!: break 4" - assert bq.next().name == "translated!: break 3" - assert bq.next().name == "translated!: long break 3" - assert bq.next().name == "translated!: break 2" - assert bq.next().name == "translated!: break 1" - assert bq.next().name == "translated!: break 4" - assert bq.next().name == "translated!: break 3" - assert bq.next().name == "translated!: long break 2" - assert bq.next().name == "translated!: break 2" - assert bq.next().name == "translated!: break 4" - assert bq.next().name == "translated!: break 1" - assert bq.next().name == "translated!: break 3" - assert bq.next().name == "translated!: long break 1" + first = True + + prev_breaks: list[list[str]] = [] + prev_long_breaks: list[list[str]] = [] + + for i in range(5): + long_breaks = [] + for i in range(3): + breaks = [] + if first: + first = False + breaks.append(bq.get_break().name) + else: + breaks.append(bq.next().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) + breaks.append(bq.next().name) + long_breaks.append(bq.next().name) + + # assert that we used all breaks in this iteration + assert sorted(breaks) == [ + "translated!: break 1", + "translated!: break 2", + "translated!: break 3", + "translated!: break 4", + ] + + # assert that not all the iterations are exactly the same order + # (this may happen randomly sometimes of course - at least one + # should be different) + assert self.at_least_one_different(prev_breaks, breaks) + prev_breaks.append(breaks) + + assert sorted(long_breaks) == [ + "translated!: long break 1", + "translated!: long break 2", + "translated!: long break 3", + ] + assert self.at_least_one_different(prev_long_breaks, long_breaks) + prev_long_breaks.append(long_breaks) + + def at_least_one_different( + self, previous: list[list[str]], current: list[str] + ) -> bool: + if len(previous) == 0: + return True + + for prev in previous: + if prev != current: + return True + + return False From 7ad97f021e645d6cc28a1b0ad7d79cdea5ae2965 Mon Sep 17 00:00:00 2001 From: deltragon Date: Sun, 8 Jun 2025 14:46:31 +0200 Subject: [PATCH 6/7] typing: only instantiate BreakQueue when there are breaks --- safeeyes/core.py | 39 +++++++++-- safeeyes/model.py | 121 ++++++++++++++++++++++------------- safeeyes/tests/test_model.py | 32 ++++----- 3 files changed, 129 insertions(+), 63 deletions(-) diff --git a/safeeyes/core.py b/safeeyes/core.py index 784d1f17..386ea1ad 100644 --- a/safeeyes/core.py +++ b/safeeyes/core.py @@ -43,7 +43,7 @@ class SafeEyesCore: default_postpone_duration: int = 0 pre_break_warning_time: int = 0 - _break_queue: BreakQueue + _break_queue: typing.Optional[BreakQueue] = None def __init__(self, context) -> None: """Create an instance of SafeEyesCore and initialize the variables.""" @@ -72,7 +72,7 @@ def initialize(self, config: Config): """Initialize the internal properties from configuration.""" logging.info("Initialize the core") self.pre_break_warning_time = config.get("pre_break_warning_time") - self._break_queue = BreakQueue(config, self.context) + self._break_queue = BreakQueue.create(config, self.context) self.default_postpone_duration = ( config.get("postpone_duration") * 60 ) # Convert to seconds @@ -80,7 +80,7 @@ def initialize(self, config: Config): def start(self, next_break_time=-1, reset_breaks=False) -> None: """Start Safe Eyes is it is not running already.""" - if self._break_queue.is_empty(): + if self._break_queue is None: logging.info("No breaks defined, not starting the core") return with self.lock: @@ -127,6 +127,8 @@ def get_break_time( self, break_type: typing.Optional[BreakType] = None ) -> typing.Optional[datetime.datetime]: """Returns the next break time.""" + if self._break_queue is None: + return None break_obj = self._break_queue.get_break_with_type(break_type) if not break_obj or self.scheduled_next_break_time is None: return None @@ -139,7 +141,7 @@ def take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Calling this method stops the scheduler and show the next break screen. """ - if self._break_queue.is_empty(): + if self._break_queue is None: return if not self.context["state"] == State.WAITING: return @@ -150,12 +152,22 @@ def has_breaks(self, break_type: typing.Optional[BreakType] = None) -> bool: Use the break_type to check for either short or long break. """ + if self._break_queue is None: + return False + + if break_type is None: + return True + return not self._break_queue.is_empty(break_type) def __take_break(self, break_type: typing.Optional[BreakType] = None) -> None: """Show the next break screen.""" logging.info("Take a break due to external request") + if self._break_queue is None: + # This will only be called by self.take_break, which checks this + return + with self.lock: if not self.running: return @@ -179,6 +191,10 @@ def __scheduler_job(self) -> None: if not self.running: return + if self._break_queue is None: + # This will only be called by methods which check this + return + current_time = datetime.datetime.now() current_timestamp = current_time.timestamp() @@ -233,10 +249,16 @@ def __scheduler_job(self) -> None: def __fire_on_update_next_break(self, next_break_time: datetime.datetime) -> None: """Pass the next break information to the registered listeners.""" + if self._break_queue is None: + # This will only be called by methods which check this + return self.on_update_next_break.fire(self._break_queue.get_break(), next_break_time) def __fire_pre_break(self) -> None: """Show the notification and start the break after the notification.""" + if self._break_queue is None: + # This will only be called by methods which check this + return self.context["state"] = State.PRE_BREAK if not self.on_pre_break.fire(self._break_queue.get_break()): # Plugins wanted to ignore this break @@ -259,6 +281,9 @@ def __postpone_break(self) -> None: utility.execute_main_thread(self.__fire_start_break) def __fire_start_break(self) -> None: + if self._break_queue is None: + # This will only be called by methods which check this + return break_obj = self._break_queue.get_break() # Show the break screen if not self.on_start_break.fire(break_obj): @@ -286,6 +311,9 @@ def __fire_start_break(self) -> None: def __start_break(self) -> None: """Start the break screen.""" + if self._break_queue is None: + # This will only be called by methods which check this + return self.context["state"] = State.BREAK break_obj = self._break_queue.get_break() countdown = break_obj.duration @@ -322,6 +350,9 @@ def __wait_for(self, duration: int) -> None: self.waiting_condition.release() def __start_next_break(self) -> None: + if self._break_queue is None: + # This will only be called by methods which check this + return if not self.context["postponed"]: self._break_queue.next() diff --git a/safeeyes/model.py b/safeeyes/model.py index 94281b35..8e383179 100644 --- a/safeeyes/model.py +++ b/safeeyes/model.py @@ -106,25 +106,70 @@ class BreakQueue: __long_queue: typing.Optional[list[Break]] __short_queue: typing.Optional[list[Break]] - def __init__(self, config: "Config", context) -> None: - self.context = context - self.__short_break_time = config.get("short_break_interval") - self.__long_break_time = config.get("long_break_interval") - self.__is_random_order = config.get("random_order") - self.__config = config + @classmethod + def create(cls, config: "Config", context) -> typing.Optional["BreakQueue"]: + short_break_time = config.get("short_break_interval") + long_break_time = config.get("long_break_interval") + is_random_order = config.get("random_order") + + short_queue = cls.__build_queue( + BreakType.SHORT_BREAK, + config.get("short_breaks"), + short_break_time, + config.get("short_break_duration"), + is_random_order, + ) + + long_queue = cls.__build_queue( + BreakType.LONG_BREAK, + config.get("long_breaks"), + long_break_time, + config.get("long_break_duration"), + is_random_order, + ) + + if short_queue is None and long_queue is None: + return None - self.__build_longs() - self.__build_shorts() + return cls( + context, + short_break_time, + long_break_time, + is_random_order, + short_queue, + long_queue, + ) + + def __init__( + self, + context, + short_break_time: int, + long_break_time: int, + is_random_order: bool, + short_queue: typing.Optional[list[Break]], + long_queue: typing.Optional[list[Break]], + ) -> None: + """Constructor for BreakQueue. Do not call this directly. + + Instead, use BreakQueue.create() instead. + short_queue and long_queue must not both be None, and must not be an empty + list. + """ + self.context = context + self.__short_break_time = short_break_time + self.__long_break_time = long_break_time + self.__is_random_order = is_random_order + self.__short_queue = short_queue + self.__long_queue = long_queue # Restore the last break from session - if not self.is_empty(): - last_break = context["session"].get("break") - if last_break is not None: - current_break = self.get_break() - if last_break != current_break.name: + last_break = context["session"].get("break") + if last_break is not None: + current_break = self.get_break() + if last_break != current_break.name: + brk = self.next() + while brk != current_break and brk.name != last_break: brk = self.next() - while brk != current_break and brk.name != last_break: - brk = self.next() def get_break(self) -> Break: if self.__current_break is None: @@ -167,7 +212,8 @@ def next(self, break_type: typing.Optional[BreakType] = None) -> Break: self.__current_break.time = self.__long_break_time # type: ignore[union-attr] if self.__current_long == 0 and self.__is_random_order: # Shuffle queue - self.__build_longs() + if self.__long_queue is not None: + random.shuffle(self.__long_queue) elif self.__current_break: # Reduce the break time from the next long break (default) if longs: @@ -178,10 +224,8 @@ def next(self, break_type: typing.Optional[BreakType] = None) -> Break: ) longs[self.__current_long].time -= shorts[self.__current_short].time if self.__current_short == 0 and self.__is_random_order: - self.__build_shorts() - - if self.is_empty(): - raise Exception("this should never be called when the queue is empty") + if self.__short_queue is not None: + random.shuffle(self.__short_queue) if shorts is None: break_obj = self.__next_long() @@ -209,17 +253,14 @@ def reset(self) -> None: for break_object in self.__long_queue: break_object.time = self.__long_break_time - def is_empty(self, break_type=None) -> bool: - """Check if the given break type is empty or not. - - If the break_type is None, check for both short and long breaks. - """ + def is_empty(self, break_type: BreakType) -> bool: + """Check if the given break type is empty or not.""" if break_type == BreakType.SHORT_BREAK: return self.__short_queue is None elif break_type == BreakType.LONG_BREAK: return self.__long_queue is None else: - return self.__short_queue is None and self.__long_queue is None + typing.assert_never(break_type) def __next_short(self) -> Break: shorts = self.__short_queue @@ -249,8 +290,13 @@ def __next_long(self) -> Break: return break_obj + @staticmethod def __build_queue( - self, break_type, break_configs, break_time, break_duration + break_type: BreakType, + break_configs: list[dict], + break_time: int, + break_duration: int, + is_random_order: bool, ) -> typing.Optional[list[Break]]: """Build a queue of breaks.""" size = len(break_configs) @@ -259,7 +305,7 @@ def __build_queue( # No breaks return None - if self.__is_random_order: + if is_random_order: breaks_order = random.sample(break_configs, size) else: breaks_order = break_configs @@ -280,23 +326,10 @@ def __build_queue( break_obj = Break(break_type, name, interval, duration, image, plugins) queue.append(break_obj) - return queue - - def __build_shorts(self) -> None: - self.__short_queue = self.__build_queue( - BreakType.SHORT_BREAK, - self.__config.get("short_breaks"), - self.__short_break_time, - self.__config.get("short_break_duration"), - ) + if len(queue) == 0: + return None - def __build_longs(self) -> None: - self.__long_queue = self.__build_queue( - BreakType.LONG_BREAK, - self.__config.get("long_breaks"), - self.__long_break_time, - self.__config.get("long_break_duration"), - ) + return queue class State(Enum): diff --git a/safeeyes/tests/test_model.py b/safeeyes/tests/test_model.py index 3b4cdae0..52a7b9e6 100644 --- a/safeeyes/tests/test_model.py +++ b/safeeyes/tests/test_model.py @@ -67,16 +67,9 @@ def test_create_empty(self) -> None: context: dict[str, typing.Any] = {} - bq = model.BreakQueue(config, context) + bq = model.BreakQueue.create(config, context) - assert bq.is_empty() - assert bq.is_empty(model.BreakType.LONG_BREAK) - assert bq.is_empty(model.BreakType.SHORT_BREAK) - - with pytest.raises(Exception, match="this should never be called"): - bq.next() - with pytest.raises(Exception, match="this should never be called"): - bq.get_break() + assert bq is None def get_bq_only_short( self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None @@ -109,7 +102,11 @@ def get_bq_only_short( "session": {}, } - return model.BreakQueue(config, context) + bq = model.BreakQueue.create(config, context) + + assert bq is not None + + return bq def get_bq_only_long( self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None @@ -142,7 +139,11 @@ def get_bq_only_long( "session": {}, } - return model.BreakQueue(config, context) + bq = model.BreakQueue.create(config, context) + + assert bq is not None + + return bq def get_bq_full( self, monkeypatch: pytest.MonkeyPatch, random_seed: typing.Optional[int] = None @@ -180,12 +181,15 @@ def get_bq_full( "session": {}, } - return model.BreakQueue(config, context) + bq = model.BreakQueue.create(config, context) + + assert bq is not None + + return bq def test_create_only_short(self, monkeypatch: pytest.MonkeyPatch) -> None: bq = self.get_bq_only_short(monkeypatch) - assert not bq.is_empty() assert not bq.is_empty(model.BreakType.SHORT_BREAK) assert bq.is_empty(model.BreakType.LONG_BREAK) @@ -254,7 +258,6 @@ def test_only_short_next_break_random( def test_create_only_long(self, monkeypatch: pytest.MonkeyPatch) -> None: bq = self.get_bq_only_long(monkeypatch) - assert not bq.is_empty() assert not bq.is_empty(model.BreakType.LONG_BREAK) assert bq.is_empty(model.BreakType.SHORT_BREAK) @@ -321,7 +324,6 @@ def test_only_long_next_break_random(self, monkeypatch: pytest.MonkeyPatch) -> N def test_create_full(self, monkeypatch: pytest.MonkeyPatch) -> None: bq = self.get_bq_full(monkeypatch) - assert not bq.is_empty() assert not bq.is_empty(model.BreakType.LONG_BREAK) assert not bq.is_empty(model.BreakType.SHORT_BREAK) From ce238c4b7d1f3e827309d5d3949b5ecc9dfae3e9 Mon Sep 17 00:00:00 2001 From: deltragon Date: Sun, 8 Jun 2025 15:08:39 +0200 Subject: [PATCH 7/7] typing: BreakQueue: __current_break is always set --- safeeyes/model.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/safeeyes/model.py b/safeeyes/model.py index 8e383179..9d02e70a 100644 --- a/safeeyes/model.py +++ b/safeeyes/model.py @@ -97,7 +97,7 @@ def plugin_enabled(self, plugin_id: str, is_plugin_enabled: bool) -> bool: class BreakQueue: - __current_break: typing.Optional[Break] = None + __current_break: Break __current_long: int = 0 __current_short: int = 0 __short_break_time: int @@ -162,6 +162,9 @@ def __init__( self.__short_queue = short_queue self.__long_queue = long_queue + # load first break + self.__set_next_break() + # Restore the last break from session last_break = context["session"].get("break") if last_break is not None: @@ -172,17 +175,11 @@ def __init__( brk = self.next() def get_break(self) -> Break: - if self.__current_break is None: - self.__current_break = self.next() - return self.__current_break def get_break_with_type( self, break_type: typing.Optional[BreakType] = None ) -> typing.Optional[Break]: - if self.__current_break is None: - self.__current_break = self.next() - if break_type is None or self.__current_break.type == break_type: return self.__current_break @@ -196,25 +193,27 @@ def get_break_with_type( return self.__short_queue[self.__current_short] def is_long_break(self) -> bool: - return ( - self.__current_break is not None - and self.__current_break.type == BreakType.LONG_BREAK - ) + return self.__current_break.type == BreakType.LONG_BREAK def next(self, break_type: typing.Optional[BreakType] = None) -> Break: - break_obj = None + """Advance to the next break, and return that break. + + If break_type is given, advance to the next break with that type. + If the last break in the queue is reached, this resets the internal index to + the first break again, and shuffle if needed. + """ shorts = self.__short_queue longs = self.__long_queue + previous_break = self.__current_break # Reset break that has just ended - if self.is_long_break(): - # __current_break is checked by is_long_break - self.__current_break.time = self.__long_break_time # type: ignore[union-attr] + if previous_break.is_long_break(): + previous_break.time = self.__long_break_time if self.__current_long == 0 and self.__is_random_order: # Shuffle queue if self.__long_queue is not None: random.shuffle(self.__long_queue) - elif self.__current_break: + else: # Reduce the break time from the next long break (default) if longs: if shorts is None: @@ -227,6 +226,14 @@ def next(self, break_type: typing.Optional[BreakType] = None) -> Break: if self.__short_queue is not None: random.shuffle(self.__short_queue) + self.__set_next_break(break_type) + + return self.__current_break + + def __set_next_break(self, break_type: typing.Optional[BreakType] = None) -> None: + shorts = self.__short_queue + longs = self.__long_queue + if shorts is None: break_obj = self.__next_long() elif longs is None: @@ -242,8 +249,6 @@ def next(self, break_type: typing.Optional[BreakType] = None) -> Break: self.__current_break = break_obj self.context["session"]["break"] = self.__current_break.name - return break_obj - def reset(self) -> None: if self.__short_queue: for break_object in self.__short_queue: