Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion safeeyes/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def main():
# Initialize the logging
utility.initialize_logging(args.debug)
utility.initialize_platform()
config = Config()
config = Config.load()
utility.cleanup_old_user_stylesheet()

if __running():
Expand Down
144 changes: 93 additions & 51 deletions safeeyes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,31 @@
import logging
import threading
import time
import typing

from safeeyes import utility
from safeeyes.model import BreakType
from safeeyes.model import BreakQueue
from safeeyes.model import EventHook
from safeeyes.model import State
from safeeyes.model import Config


class SafeEyesCore:
"""Core of Safe Eyes runs the scheduler and notifies the breaks."""

def __init__(self, context):
scheduled_next_break_time: typing.Optional[datetime.datetime] = None
scheduled_next_break_timestamp: int = -1
running: bool = False
paused_time: float = -1
postpone_duration: int = 0
default_postpone_duration: int = 0
pre_break_warning_time: int = 0

_break_queue: typing.Optional[BreakQueue] = None

def __init__(self, context) -> None:
"""Create an instance of SafeEyesCore and initialize the variables."""
self.break_queue = None
self.postpone_duration = 0
self.default_postpone_duration = 0
self.pre_break_warning_time = 0
self.running = False
self.scheduled_next_break_timestamp = -1
self.scheduled_next_break_time = None
self.paused_time = -1
# This event is fired before <time-to-prepare> for a break
self.on_pre_break = EventHook()
# This event is fired just before the start of a break
Expand All @@ -64,32 +68,33 @@ def __init__(self, context):
self.context["postpone_button_disabled"] = False
self.context["state"] = State.WAITING

def initialize(self, config):
def initialize(self, config: Config):
"""Initialize the internal properties from configuration."""
logging.info("Initialize the core")
self.pre_break_warning_time = config.get("pre_break_warning_time")
self.break_queue = BreakQueue(config, self.context)
self._break_queue = BreakQueue.create(config, self.context)
self.default_postpone_duration = (
config.get("postpone_duration") * 60
) # Convert to seconds
self.postpone_duration = self.default_postpone_duration

def start(self, next_break_time=-1, reset_breaks=False):
def start(self, next_break_time=-1, reset_breaks=False) -> None:
"""Start Safe Eyes is it is not running already."""
if self.break_queue.is_empty():
if self._break_queue is None:
logging.info("No breaks defined, not starting the core")
return
with self.lock:
if not self.running:
logging.info("Start Safe Eyes core")
if reset_breaks:
logging.info("Reset breaks to start from the beginning")
self.break_queue.reset()
self._break_queue.reset()

self.running = True
self.scheduled_next_break_timestamp = int(next_break_time)
utility.start_thread(self.__scheduler_job)

def stop(self, is_resting=False):
def stop(self, is_resting=False) -> None:
"""Stop Safe Eyes if it is running."""
with self.lock:
if not self.running:
Expand All @@ -105,11 +110,11 @@ def stop(self, is_resting=False):
self.waiting_condition.notify_all()
self.waiting_condition.release()

def skip(self):
def skip(self) -> None:
"""User skipped the break using Skip button."""
self.context["skipped"] = True

def postpone(self, duration=-1):
def postpone(self, duration=-1) -> None:
"""User postponed the break using Postpone button."""
if duration > 0:
self.postpone_duration = duration
Expand All @@ -118,37 +123,51 @@ def postpone(self, duration=-1):
logging.debug("Postpone the break for %d seconds", self.postpone_duration)
self.context["postponed"] = True

def get_break_time(self, break_type=None):
def get_break_time(
self, break_type: typing.Optional[BreakType] = None
) -> typing.Optional[datetime.datetime]:
"""Returns the next break time."""
break_obj = self.break_queue.get_break(break_type)
if not break_obj:
return False
if self._break_queue is None:
return None
break_obj = self._break_queue.get_break_with_type(break_type)
if not break_obj or self.scheduled_next_break_time is None:
return None
time = self.scheduled_next_break_time + datetime.timedelta(
minutes=break_obj.time - self.break_queue.get_break().time
minutes=break_obj.time - self._break_queue.get_break().time
)
return time

def take_break(self, break_type=None):
def take_break(self, break_type: typing.Optional[BreakType] = None) -> None:
"""Calling this method stops the scheduler and show the next break
screen.
"""
if self.break_queue.is_empty():
if self._break_queue is None:
return
if not self.context["state"] == State.WAITING:
return
utility.start_thread(self.__take_break, break_type=break_type)

def has_breaks(self, break_type=None):
def has_breaks(self, break_type: typing.Optional[BreakType] = None) -> bool:
"""Check whether Safe Eyes has breaks or not.

Use the break_type to check for either short or long break.
"""
return not self.break_queue.is_empty(break_type)
if self._break_queue is None:
return False

if break_type is None:
return True

def __take_break(self, break_type=None):
return not self._break_queue.is_empty(break_type)

def __take_break(self, break_type: typing.Optional[BreakType] = None) -> None:
"""Show the next break screen."""
logging.info("Take a break due to external request")

if self._break_queue is None:
# This will only be called by self.take_break, which checks this
return

with self.lock:
if not self.running:
return
Expand All @@ -163,33 +182,35 @@ def __take_break(self, break_type=None):
time.sleep(1) # Wait for 1 sec to ensure the scheduler is dead
self.running = True

if break_type is not None and self.break_queue.get_break().type != break_type:
self.break_queue.next(break_type)
if break_type is not None and self._break_queue.get_break().type != break_type:
self._break_queue.next(break_type)
utility.execute_main_thread(self.__fire_start_break)

def __scheduler_job(self):
def __scheduler_job(self) -> None:
"""Scheduler task to execute during every interval."""
if not self.running:
return

if self._break_queue is None:
# This will only be called by methods which check this
return

current_time = datetime.datetime.now()
current_timestamp = current_time.timestamp()

if self.context["state"] == State.RESTING and self.paused_time > -1:
# Safe Eyes was resting
paused_duration = int(current_timestamp - self.paused_time)
self.paused_time = -1
if (
paused_duration
> self.break_queue.get_break(BreakType.LONG_BREAK).duration
):
next_long = self._break_queue.get_break_with_type(BreakType.LONG_BREAK)
if next_long is not None and paused_duration > next_long.duration:
logging.info(
"Skip next long break due to the pause %ds longer than break"
" duration",
paused_duration,
)
# Skip the next long break
self.break_queue.reset()
self._break_queue.reset()

if self.context["postponed"]:
# Previous break was postponed
Expand All @@ -204,7 +225,7 @@ def __scheduler_job(self):
self.scheduled_next_break_timestamp = -1
else:
# Use next break, convert to seconds
time_to_wait = self.break_queue.get_break().time * 60
time_to_wait = self._break_queue.get_break().time * 60

self.scheduled_next_break_time = current_time + datetime.timedelta(
seconds=time_to_wait
Expand All @@ -221,23 +242,31 @@ def __scheduler_job(self):
logging.info("Pre-break waiting is over")

if not self.running:
return
# This can be reached if another thread changed running while __wait_for was
# blocking
return # type: ignore[unreachable]
utility.execute_main_thread(self.__fire_pre_break)

def __fire_on_update_next_break(self, next_break_time):
def __fire_on_update_next_break(self, next_break_time: datetime.datetime) -> None:
"""Pass the next break information to the registered listeners."""
self.on_update_next_break.fire(self.break_queue.get_break(), next_break_time)
if self._break_queue is None:
# This will only be called by methods which check this
return
self.on_update_next_break.fire(self._break_queue.get_break(), next_break_time)

def __fire_pre_break(self):
def __fire_pre_break(self) -> None:
"""Show the notification and start the break after the notification."""
if self._break_queue is None:
# This will only be called by methods which check this
return
self.context["state"] = State.PRE_BREAK
if not self.on_pre_break.fire(self.break_queue.get_break()):
if not self.on_pre_break.fire(self._break_queue.get_break()):
# Plugins wanted to ignore this break
self.__start_next_break()
return
utility.start_thread(self.__wait_until_prepare)

def __wait_until_prepare(self):
def __wait_until_prepare(self) -> None:
logging.info(
"Wait for %d seconds before the break", self.pre_break_warning_time
)
Expand All @@ -247,12 +276,15 @@ def __wait_until_prepare(self):
return
utility.execute_main_thread(self.__fire_start_break)

def __postpone_break(self):
def __postpone_break(self) -> None:
self.__wait_for(self.postpone_duration)
utility.execute_main_thread(self.__fire_start_break)

def __fire_start_break(self):
break_obj = self.break_queue.get_break()
def __fire_start_break(self) -> None:
if self._break_queue is None:
# This will only be called by methods which check this
return
break_obj = self._break_queue.get_break()
# Show the break screen
if not self.on_start_break.fire(break_obj):
# Plugins want to ignore this break
Expand All @@ -261,6 +293,10 @@ def __fire_start_break(self):
if self.context["postponed"]:
# Plugins want to postpone this break
self.context["postponed"] = False

if self.scheduled_next_break_time is None:
raise Exception("this should never happen")

# Update the next break time
self.scheduled_next_break_time = (
self.scheduled_next_break_time
Expand All @@ -273,10 +309,13 @@ def __fire_start_break(self):
self.start_break.fire(break_obj)
utility.start_thread(self.__start_break)

def __start_break(self):
def __start_break(self) -> None:
"""Start the break screen."""
if self._break_queue is None:
# This will only be called by methods which check this
return
self.context["state"] = State.BREAK
break_obj = self.break_queue.get_break()
break_obj = self._break_queue.get_break()
countdown = break_obj.duration
total_break_time = countdown

Expand All @@ -292,7 +331,7 @@ def __start_break(self):
countdown -= 1
utility.execute_main_thread(self.__fire_stop_break)

def __fire_stop_break(self):
def __fire_stop_break(self) -> None:
# Loop terminated because of timeout (not skipped) -> Close the break alert
if not self.context["skipped"] and not self.context["postponed"]:
logging.info("Break is terminated automatically")
Expand All @@ -304,15 +343,18 @@ def __fire_stop_break(self):
self.context["postpone_button_disabled"] = False
self.__start_next_break()

def __wait_for(self, duration):
def __wait_for(self, duration: int) -> None:
"""Wait until someone wake up or the timeout happens."""
self.waiting_condition.acquire()
self.waiting_condition.wait(duration)
self.waiting_condition.release()

def __start_next_break(self):
def __start_next_break(self) -> None:
if self._break_queue is None:
# This will only be called by methods which check this
return
if not self.context["postponed"]:
self.break_queue.next()
self._break_queue.next()

if self.running:
# Schedule the break again
Expand Down
Loading