Progress Observer

The mastf.core.progress module provides functionality to observe and monitor the progress of Celery tasks. It includes a class named Observer that can be used to track the execution and completion of Celery tasks. This documentation provides an overview of the Observer class and its usage.

The Observer class is designed to monitor the progress of Celery tasks and provide real-time updates on their execution. It serves as a useful tool for tracking task progress, identifying potential issues, and ensuring the successful completion of tasks. Some key features are:

  1. Task Progress Tracking:

    The Observer class allows you to monitor the progress of Celery tasks by tracking their state and providing updates during different stages of execution.

  2. Real-Time Updates:

    It provides real-time updates on task progress, including information such as debug message, status, elapsed time, and completion percentage. These updates can be used to display progress indicators or log task progress in external systems.

Usage Example:

 1from mastf.core.progress import Observer
 2from celery import shared_task
 3
 4@shared_task(bind=True)
 5def my_shared_task(self, *args):
 6    observer = Observer(self)
 7
 8    # set the current completion percentage
 9    observer.pos = 34
10    # otherwise specify it directly
11    observer.update("Some information...", current=34)
12
13    # Let the observer increment the position itself
14    observer.update("Further information...", increment=True, step=5)
15
16    if not some_condition:
17        # Fail should be called on the end of a non-successful task
18        _, meta = observer.fail("Test not passed")
19        # always return a result as it will be used within the frontend
20        return meta.get("description")
21
22    try:
23        so_something()
24    except Exception as err:
25        _, meta = observer.exception(err, "An error occurred!")
26        return meta.get("description")
27
28    # You can also return the updated metadata directly
29    status, meta = observer.success("Finished task!")
30    return meta
class mastf.core.progress.Observer(task: Task, position: int = 0, scan_task=None, _logger: Logger = None)[source]

Represents an observer of a task.

Use this class wihtin a shared_task registered in your celery worker. This class enables process tracking, e.g:

 1@shared_task(bind=True)
 2def my_task(self, *args):
 3    observer = Observer(self)
 4
 5    if some_condition_to_fail:
 6        # Fail will set a exception class that is used by celery
 7        # to report any issue was raised during execution
 8        status, meta = observer.fail("Condition not accepted!")
 9        return meta.get("description")
10
11    # Always return the detail string as it will be used later on
12    status, meta = observer.succes("Condition passed!")
13    return meta.get("description")
Parameters:
  • task (Task) – The task being observed.

  • position (int, optional) – the initial progress position, defaults to 0

create_meta() dict[source]

Creates the meta information about the current task state.

exception(exception, msg: str, *args) tuple[source]

Sets the task state to Failure and inserts an exception message.

Parameters:
  • msg (str) – the message to format

  • exception (? extends Exception) – the exception that was raised

Returns:

the updated task state and meta information

Return type:

tuple

fail(msg: str, exc_type=<class 'RuntimeError'>, *args) tuple[source]

Sets the task state to FALIURE and inserts the given message.

Parameters:

msg (str) – the message to format

Returns:

the updated task state and meta information

Return type:

tuple

increment(val: int = 1) int[source]

Increments the current position by the given value and returns the updated position.

Parameters:

val (int, optional) – The value to increment the current position by, defaults to 1

Returns:

The updated position.

Return type:

int

property logger: Logger

Gets the underlying logger.

Returns:

a linked task logger

Return type:

Task

property pos: int

Gets the current position.

Returns:

the current progress position

Return type:

int

success(msg: str = '', *args) tuple[source]

Sets the task state to SUCCESS and inserts the given message.

Parameters:

msg (str) – the message to format

Returns:

the updated task state and meta information

Return type:

tuple

property task: Task

Gets the observed task.

Returns:

the linked task

Return type:

Task

update(msg: str, *args, current: int = -1, increment: bool = True, step: int = 1, total: int = 100, state: str = 'PROGRESS', meta: dict = None, do_log: bool = False, log_level: str = 10) tuple[source]

Update the current task state.

This method will add a desciption by applying msg % args to format additional parameters.

Parameters:
  • msg (str) – the progress message

  • current (int, optional) – the current progress value (optional), defaults to -1

  • increment (bool, optional) – tells whether the internal counter should be incremented before using it, defaults to True

  • total (int, optional) – maximum value, defaults to 100

  • state (str, optional) – the current state’s string representation, defaults to PROGRESS

  • meta (dict, optional) – additional meta variables, defaults to None

Returns:

the new task state and meta information

Return type:

tuple