Skip to content

async_utils

zeus.util.async_utils

Utilities for asyncio.

create_task

1
create_task(coroutine, logger=None)

Create an asyncio.Task but ensure that exceptions are logged.

Reference: https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/

Parameters:

Name Type Description Default
coroutine Coroutine[Any, Any, T]

The coroutine to be wrapped.

required
logger Logger | None

The logger to be used for logging exceptions. If None, the the logger with the name zeus.util.async_utils is used.

None
Source code in zeus/util/async_utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def create_task(
    coroutine: Coroutine[Any, Any, T],
    logger: logging.Logger | None = None,
) -> asyncio.Task[T]:
    """Create an `asyncio.Task` but ensure that exceptions are logged.

    Reference: https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/

    Args:
        coroutine: The coroutine to be wrapped.
        logger: The logger to be used for logging exceptions. If `None`, the
            the logger with the name `zeus.util.async_utils` is used.
    """
    loop = asyncio.get_running_loop()
    task = loop.create_task(coroutine)
    task.add_done_callback(
        functools.partial(_handle_task_exception, logger=logger or default_logger)
    )
    return task

_handle_task_exception

1
_handle_task_exception(task, logger)

Print out exception and tracebook when a task dies with an exception.

Source code in zeus/util/async_utils.py
51
52
53
54
55
56
57
58
59
60
def _handle_task_exception(task: asyncio.Task, logger: logging.Logger) -> None:
    """Print out exception and tracebook when a task dies with an exception."""
    try:
        task.result()
    except asyncio.CancelledError:
        # Cancellation should not be logged as an error.
        pass
    except Exception:
        # `logger.exception` automatically handles exception and traceback info.
        logger.exception("Job task died with an exception!")