Skip to content

Scheduler

Source code in blue/scheduler.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Scheduler:
    def __init__(self, task=None, cancel_on_failure=False):
        """Initializes a new Job instance.

        This constructor validates the input types, assigns the scheduling task, and
        sets up the scheduler and failure policy.

        Parameters:
            task (callable, required): The function or callable object that this job
                will execute when run by the scheduler. Must accept no arguments.
            cancel_on_failure (bool): If True, the job will be automatically
                removed from the scheduler if it raises an unhandled exception during
                execution.

        Raises:
            TypeError: If `task` is not a callable object (function).
            TypeError: If `cancel_on_failure` is not a boolean.
        """
        if not is_function(task):
            raise TypeError('Object task must be callable')
        if not is_instance_of(cancel_on_failure, bool):
            raise TypeError('Object cancel_on_failure must be an instance of bool')
        self.task = task
        self.scheduler = schedule.Scheduler()
        self.cancel_on_failure = cancel_on_failure
        self.set_job()

    def set_job(self):
        self.job = self.scheduler.every(2).seconds

    def start(self):
        """Starts the job by scheduling its execution and beginning the main loop.

        This method takes the pre-configured job (`self.job`) and
        then starts a continuous monitoring thread to execute the scheduled tasks.

        Raises:
            TypeError: If `self.job` has not been initialized as an instance of
                `schedule.Job`.
        """
        if not is_instance_of(self.job, schedule.Job):
            raise TypeError('Object self.scheduler must be an instance of schedule.Job')
        self.scheduled = self.job.do(self.__background_job)
        self.stop_run_continuously = self.__run_continuously()

    def stop(self):
        """Stops the continuous background thread for the scheduler.

        This is achieved by calling the `set()` method on the thread termination
        event (`self.stop_run_continuously`), signaling the scheduler loop to exit
        gracefully.
        """
        self.stop_run_continuously.set()

    def __run_continuously(self):
        cease_continuous_run = threading.Event()

        class ScheduleThread(threading.Thread):
            @classmethod
            def run(cls):
                while not cease_continuous_run.is_set():
                    if self.scheduled.should_run:
                        self.scheduled.run()
                    time.sleep(1)

        continuous_thread = ScheduleThread()
        continuous_thread.daemon = True
        continuous_thread.start()
        return cease_continuous_run

    def catch_exceptions(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback

                print(traceback.format_exc())
                if args[0].cancel_on_failure:
                    return schedule.CancelJob

        return wrapper

    @catch_exceptions
    def __background_job(self):
        self.task()

__init__(task=None, cancel_on_failure=False)

Initializes a new Job instance.

This constructor validates the input types, assigns the scheduling task, and sets up the scheduler and failure policy.

Parameters:

Name Type Description Default
task (callable, required)

The function or callable object that this job will execute when run by the scheduler. Must accept no arguments.

None
cancel_on_failure bool

If True, the job will be automatically removed from the scheduler if it raises an unhandled exception during execution.

False

Raises:

Type Description
TypeError

If task is not a callable object (function).

TypeError

If cancel_on_failure is not a boolean.

Source code in blue/scheduler.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, task=None, cancel_on_failure=False):
    """Initializes a new Job instance.

    This constructor validates the input types, assigns the scheduling task, and
    sets up the scheduler and failure policy.

    Parameters:
        task (callable, required): The function or callable object that this job
            will execute when run by the scheduler. Must accept no arguments.
        cancel_on_failure (bool): If True, the job will be automatically
            removed from the scheduler if it raises an unhandled exception during
            execution.

    Raises:
        TypeError: If `task` is not a callable object (function).
        TypeError: If `cancel_on_failure` is not a boolean.
    """
    if not is_function(task):
        raise TypeError('Object task must be callable')
    if not is_instance_of(cancel_on_failure, bool):
        raise TypeError('Object cancel_on_failure must be an instance of bool')
    self.task = task
    self.scheduler = schedule.Scheduler()
    self.cancel_on_failure = cancel_on_failure
    self.set_job()

start()

Starts the job by scheduling its execution and beginning the main loop.

This method takes the pre-configured job (self.job) and then starts a continuous monitoring thread to execute the scheduled tasks.

Raises:

Type Description
TypeError

If self.job has not been initialized as an instance of schedule.Job.

Source code in blue/scheduler.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def start(self):
    """Starts the job by scheduling its execution and beginning the main loop.

    This method takes the pre-configured job (`self.job`) and
    then starts a continuous monitoring thread to execute the scheduled tasks.

    Raises:
        TypeError: If `self.job` has not been initialized as an instance of
            `schedule.Job`.
    """
    if not is_instance_of(self.job, schedule.Job):
        raise TypeError('Object self.scheduler must be an instance of schedule.Job')
    self.scheduled = self.job.do(self.__background_job)
    self.stop_run_continuously = self.__run_continuously()

stop()

Stops the continuous background thread for the scheduler.

This is achieved by calling the set() method on the thread termination event (self.stop_run_continuously), signaling the scheduler loop to exit gracefully.

Source code in blue/scheduler.py
53
54
55
56
57
58
59
60
def stop(self):
    """Stops the continuous background thread for the scheduler.

    This is achieved by calling the `set()` method on the thread termination
    event (`self.stop_run_continuously`), signaling the scheduler loop to exit
    gracefully.
    """
    self.stop_run_continuously.set()

Example - SessionCleanupScheduler

The SessionCleanupScheduler class demonstrates how you can customize the core scheduling logic by extending the existing Scheduler class. For job schedule examples, checkout https://schedule.readthedocs.io/.

Bases: Scheduler

Source code in blue/platform.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class SessionCleanupScheduler(Scheduler):

    def __init__(self, platform, callback):
        """Initializes a scheduler for session cleanup.

        This constructor constructs for scheduler for platform clean up expired session, with an optional callback function.
        Built on top of scheduler.

        Parameters:
            platform (str): platform name
            callback (callable): function to execute on schedule
        """
        super().__init__(task=self.__session_cleanup)
        self.platform: Platform = platform
        self.callback = callback

    def __session_cleanup(self):
        """Performs session cleanup.

        Based on `session_expiration_duration` cleans an expired session using `last_activity_date` of session.

        """
        sessions = self.platform.get_sessions()
        deleted_sessions = []
        session_expiration_duration = self.platform.get_metadata('settings.session_expiration_duration')
        # default 3 days
        if pydash.is_empty(session_expiration_duration):
            session_expiration_duration = 3
        session_expiration_duration = pydash.to_integer(session_expiration_duration)
        session_expiration_duration = max(3, session_expiration_duration)
        for session in sessions:
            try:
                epoch = pydash.objects.get(session, 'last_activity_date', session['created_date'])
                elapsed = datetime.datetime.now() - datetime.datetime.fromtimestamp(epoch)
                pinned = pydash.objects.get(session, 'pinned', {})
                is_pinned = False
                for value in pinned.values():
                    if value is True:
                        is_pinned = True
                        break
                if elapsed.days >= session_expiration_duration and not is_pinned:
                    self.platform.delete_session(session['id'])
                    deleted_sessions.append(session['id'])
            except:
                pass
        if pydash.is_function(self.callback):
            self.callback(deleted_sessions)

    def set_job(self):
        """Sets time to execute scheduler"""
        self.job = self.scheduler.every().day.at('00:00')

__init__(platform, callback)

Initializes a scheduler for session cleanup.

This constructor constructs for scheduler for platform clean up expired session, with an optional callback function. Built on top of scheduler.

Parameters:

Name Type Description Default
platform str

platform name

required
callback callable

function to execute on schedule

required
Source code in blue/platform.py
474
475
476
477
478
479
480
481
482
483
484
485
486
def __init__(self, platform, callback):
    """Initializes a scheduler for session cleanup.

    This constructor constructs for scheduler for platform clean up expired session, with an optional callback function.
    Built on top of scheduler.

    Parameters:
        platform (str): platform name
        callback (callable): function to execute on schedule
    """
    super().__init__(task=self.__session_cleanup)
    self.platform: Platform = platform
    self.callback = callback

__session_cleanup()

Performs session cleanup.

Based on session_expiration_duration cleans an expired session using last_activity_date of session.

Source code in blue/platform.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def __session_cleanup(self):
    """Performs session cleanup.

    Based on `session_expiration_duration` cleans an expired session using `last_activity_date` of session.

    """
    sessions = self.platform.get_sessions()
    deleted_sessions = []
    session_expiration_duration = self.platform.get_metadata('settings.session_expiration_duration')
    # default 3 days
    if pydash.is_empty(session_expiration_duration):
        session_expiration_duration = 3
    session_expiration_duration = pydash.to_integer(session_expiration_duration)
    session_expiration_duration = max(3, session_expiration_duration)
    for session in sessions:
        try:
            epoch = pydash.objects.get(session, 'last_activity_date', session['created_date'])
            elapsed = datetime.datetime.now() - datetime.datetime.fromtimestamp(epoch)
            pinned = pydash.objects.get(session, 'pinned', {})
            is_pinned = False
            for value in pinned.values():
                if value is True:
                    is_pinned = True
                    break
            if elapsed.days >= session_expiration_duration and not is_pinned:
                self.platform.delete_session(session['id'])
                deleted_sessions.append(session['id'])
        except:
            pass
    if pydash.is_function(self.callback):
        self.callback(deleted_sessions)

set_job()

Sets time to execute scheduler

Source code in blue/platform.py
520
521
522
def set_job(self):
    """Sets time to execute scheduler"""
    self.job = self.scheduler.every().day.at('00:00')
Last update: 2025-10-08