Progress Observer
The mastf.core.progress
module provides functionality to observe and monitor the progress
of Celery tasks. It includes a class named Observer
that can be used to track the execution
and completion of Celery tasks. This documentation provides an overview of the Observer class
and its usage.
The Observer
class is designed to monitor the progress of Celery tasks and provide real-time
updates on their execution. It serves as a useful tool for tracking task progress, identifying potential
issues, and ensuring the successful completion of tasks. Some key features are:
Task Progress Tracking:
The
Observer
class allows you to monitor the progress of Celery tasks by tracking their state and providing updates during different stages of execution.Real-Time Updates:
It provides real-time updates on task progress, including information such as debug message, status, elapsed time, and completion percentage. These updates can be used to display progress indicators or log task progress in external systems.
Usage Example:
1from mastf.core.progress import Observer
2from celery import shared_task
3
4@shared_task(bind=True)
5def my_shared_task(self, *args):
6 observer = Observer(self)
7
8 # set the current completion percentage
9 observer.pos = 34
10 # otherwise specify it directly
11 observer.update("Some information...", current=34)
12
13 # Let the observer increment the position itself
14 observer.update("Further information...", increment=True, step=5)
15
16 if not some_condition:
17 # Fail should be called on the end of a non-successful task
18 _, meta = observer.fail("Test not passed")
19 # always return a result as it will be used within the frontend
20 return meta.get("description")
21
22 try:
23 so_something()
24 except Exception as err:
25 _, meta = observer.exception(err, "An error occurred!")
26 return meta.get("description")
27
28 # You can also return the updated metadata directly
29 status, meta = observer.success("Finished task!")
30 return meta
- class mastf.core.progress.Observer(task: Task, position: int = 0, scan_task=None, _logger: Logger = None)[source]
Represents an observer of a task.
Use this class wihtin a shared_task registered in your celery worker. This class enables process tracking, e.g:
1@shared_task(bind=True) 2def my_task(self, *args): 3 observer = Observer(self) 4 5 if some_condition_to_fail: 6 # Fail will set a exception class that is used by celery 7 # to report any issue was raised during execution 8 status, meta = observer.fail("Condition not accepted!") 9 return meta.get("description") 10 11 # Always return the detail string as it will be used later on 12 status, meta = observer.succes("Condition passed!") 13 return meta.get("description")
- Parameters:
task (Task) – The task being observed.
position (int, optional) – the initial progress position, defaults to 0
- exception(exception, msg: str, *args) tuple [source]
Sets the task state to
Failure
and inserts an exception message.- Parameters:
msg (str) – the message to format
exception (? extends Exception) – the exception that was raised
- Returns:
the updated task state and meta information
- Return type:
tuple
- fail(msg: str, exc_type=<class 'RuntimeError'>, *args) tuple [source]
Sets the task state to
FALIURE
and inserts the given message.- Parameters:
msg (str) – the message to format
- Returns:
the updated task state and meta information
- Return type:
tuple
- increment(val: int = 1) int [source]
Increments the current position by the given value and returns the updated position.
- Parameters:
val (int, optional) – The value to increment the current position by, defaults to 1
- Returns:
The updated position.
- Return type:
int
- property logger: Logger
Gets the underlying logger.
- Returns:
a linked task logger
- Return type:
Task
- property pos: int
Gets the current position.
- Returns:
the current progress position
- Return type:
int
- success(msg: str = '', *args) tuple [source]
Sets the task state to
SUCCESS
and inserts the given message.- Parameters:
msg (str) – the message to format
- Returns:
the updated task state and meta information
- Return type:
tuple
- property task: Task
Gets the observed task.
- Returns:
the linked task
- Return type:
Task
- update(msg: str, *args, current: int = -1, increment: bool = True, step: int = 1, total: int = 100, state: str = 'PROGRESS', meta: dict = None, do_log: bool = False, log_level: str = 10) tuple [source]
Update the current task state.
This method will add a desciption by applying
msg % args
to format additional parameters.- Parameters:
msg (str) – the progress message
current (int, optional) – the current progress value (optional), defaults to -1
increment (bool, optional) – tells whether the internal counter should be incremented before using it, defaults to True
total (int, optional) – maximum value, defaults to 100
state (str, optional) – the current state’s string representation, defaults to PROGRESS
meta (dict, optional) – additional meta variables, defaults to None
- Returns:
the new task state and meta information
- Return type:
tuple