Skip to content

router

zeus.optimizer.pipeline_frequency.server.router

Pipeline frequency optimizer server FastAPI router.

LoggingRoute

Bases: APIRoute

Route handler that logs out all requests and responses in DEBUG level.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class LoggingRoute(APIRoute):
    """Route handler that logs out all requests and responses in DEBUG level."""

    def get_route_handler(self) -> Callable:
        """Wrap the original handler with debug messages."""
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            response: Response = await original_route_handler(request)
            logger.debug(
                "%s %s: %s -> %s",
                request.method,
                request.url,
                await request.json() if await request.body() else "None",
                response.body.decode(response.charset),
            )
            return response

        return custom_route_handler

get_route_handler

get_route_handler()

Wrap the original handler with debug messages.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def get_route_handler(self) -> Callable:
    """Wrap the original handler with debug messages."""
    original_route_handler = super().get_route_handler()

    async def custom_route_handler(request: Request) -> Response:
        response: Response = await original_route_handler(request)
        logger.debug(
            "%s %s: %s -> %s",
            request.method,
            request.url,
            await request.json() if await request.body() else "None",
            response.body.decode(response.charset),
        )
        return response

    return custom_route_handler

startup_hook async

startup_hook()

Startup hook.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
60
61
62
63
64
@app.on_event("startup")
async def startup_hook():
    """Startup hook."""
    logger.info("Using scheduler `%s`", settings.scheduler.__name__)
    init_global_job_manager(settings)

register_job async

register_job(
    job_info, job_manager=Depends(get_global_job_manager)
)

Register the training job's information in the server.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
67
68
69
70
71
72
73
74
@app.post(REGISTER_JOB_URL, response_model=str)
async def register_job(
    job_info: JobInfo, job_manager: JobManager = Depends(get_global_job_manager)
) -> str:
    """Register the training job's information in the server."""
    job_info.set_job_id(scheduler_name=settings.scheduler.__name__)
    job_manager.register_job(job_info)
    return job_info.job_id

register_rank async

register_rank(
    job_id,
    rank_info,
    job_manager=Depends(get_global_job_manager),
)

Register each rank's information in the server.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
77
78
79
80
81
82
83
84
@app.post(REGISTER_RANK_URL)
async def register_rank(
    job_id: str,
    rank_info: RankInfo,
    job_manager: JobManager = Depends(get_global_job_manager),
) -> None:
    """Register each rank's information in the server."""
    job_manager.register_rank(job_id, rank_info)

get_frequency_schedule async

get_frequency_schedule(
    job_id,
    rank,
    job_manager=Depends(get_global_job_manager),
)

Return the next frequency schedule for the rank.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
87
88
89
90
91
92
93
94
@app.get(GET_FREQUENCY_SCHEDULE_URL, response_model=FrequencySchedule)
async def get_frequency_schedule(
    job_id: str,
    rank: int,
    job_manager: JobManager = Depends(get_global_job_manager),
) -> FrequencySchedule:
    """Return the next frequency schedule for the rank."""
    return await job_manager.get_frequency_schedule(job_id, rank)

report_profiling_result async

report_profiling_result(
    job_id,
    profiling_result,
    job_manager=Depends(get_global_job_manager),
)

Report the profiling result for the most recent frequency schedule.

Source code in zeus/optimizer/pipeline_frequency/server/router.py
 97
 98
 99
100
101
102
103
104
@app.post(REPORT_PROFILING_RESULT_URL)
async def report_profiling_result(
    job_id: str,
    profiling_result: ProfilingResult,
    job_manager: JobManager = Depends(get_global_job_manager),
) -> None:
    """Report the profiling result for the most recent frequency schedule."""
    job_manager.report_profiling_result(job_id, profiling_result)