Skip to content

common

zeus.optimizer.pipeline_frequency.common

Shared constants and models between the server and the client (optimizer).

PFOServerSettings

Bases: BaseSettings

PFO server settings, configurable via environment variables.

For instance, setting ZEUS_PFO_LOG_LEVEL=INFO will automatically set the log_level variable to "INFO".

Attributes:

Name Type Description
scheduler PyObject

Name of the FrequencyScheduler to use.

scheduler_args dict[str, Any]

Any extra arguments required by scheduler.__init__.

log_level str

Log level, e.g. "debug", "info".

dump_data bool

Whether the scheduler should dump internal state to the filesystem (for future inspection purposes).

dump_dir str

Directory to dump state in (if enabled)

max_job_idle_time int

Maximum time in seconds that a job can be idle for before its states are automatically deleted from the server.

Source code in zeus/optimizer/pipeline_frequency/common.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class PFOServerSettings(BaseSettings):
    """PFO server settings, configurable via environment variables.

    For instance, setting `ZEUS_PFO_LOG_LEVEL=INFO` will automatically set
    the `log_level` variable to `"INFO"`.

    Attributes:
        scheduler: Name of the `FrequencyScheduler` to use.
        scheduler_args: Any extra arguments required by `scheduler.__init__`.
        log_level: Log level, e.g. "debug", "info".
        dump_data: Whether the scheduler should dump internal state to the filesystem
            (for future inspection purposes).
        dump_dir: Directory to dump state in (if enabled)
        max_job_idle_time: Maximum time in seconds that a job can be idle for before
            its states are automatically deleted from the server.
    """

    scheduler: PyObject = "PointSolution"  # type: ignore
    scheduler_args: dict[str, Any] = {}
    log_level: str = "DEBUG"
    dump_data: bool = True
    dump_dir: str = "./dump"
    max_job_idle_time: int = 60 * 60 * 24 * 7  # 1 week

    @validator("scheduler", pre=True)
    def _fix_scheduler_import_path(cls, value):
        """Prepend `zeus.optimizer.pipeline_frequency.server.scheduler.` to the scheduler type name."""
        return f"zeus.optimizer.pipeline_frequency.server.scheduler.{value}"

    @validator("scheduler_args")
    def _validate_scheduler_args(cls, args, values):
        """Check whether args are as expected by the scheduler's constructor."""
        scheduler = values["scheduler"]
        full_args = args | dict(job_info=None, rank_infos=None, pfo_settings=None)
        constructor_args = inspect.signature(scheduler)
        try:
            constructor_args.bind(**full_args)
        except TypeError as e:
            raise ValueError(f"Invalid scheduler args: {e}") from None
        return args

    @validator("log_level")
    def _make_upper_case(cls, value):
        return value.upper()

    class Config:  # type: ignore
        """Configuration class read by pydantic."""

        env_prefix = "zeus_pfo_"

Config

Configuration class read by pydantic.

Source code in zeus/optimizer/pipeline_frequency/common.py
67
68
69
70
class Config:  # type: ignore
    """Configuration class read by pydantic."""

    env_prefix = "zeus_pfo_"

_fix_scheduler_import_path

_fix_scheduler_import_path(value)

Prepend zeus.optimizer.pipeline_frequency.server.scheduler. to the scheduler type name.

Source code in zeus/optimizer/pipeline_frequency/common.py
46
47
48
49
@validator("scheduler", pre=True)
def _fix_scheduler_import_path(cls, value):
    """Prepend `zeus.optimizer.pipeline_frequency.server.scheduler.` to the scheduler type name."""
    return f"zeus.optimizer.pipeline_frequency.server.scheduler.{value}"

_validate_scheduler_args

_validate_scheduler_args(args, values)

Check whether args are as expected by the scheduler's constructor.

Source code in zeus/optimizer/pipeline_frequency/common.py
51
52
53
54
55
56
57
58
59
60
61
@validator("scheduler_args")
def _validate_scheduler_args(cls, args, values):
    """Check whether args are as expected by the scheduler's constructor."""
    scheduler = values["scheduler"]
    full_args = args | dict(job_info=None, rank_infos=None, pfo_settings=None)
    constructor_args = inspect.signature(scheduler)
    try:
        constructor_args.bind(**full_args)
    except TypeError as e:
        raise ValueError(f"Invalid scheduler args: {e}") from None
    return args

JobInfo

Bases: BaseModel

Training job information reported to the server.

Attributes:

Name Type Description
job_id str

Globally unique ID of the training job, generated by the server. This field should be an empty string when sent to the server.

pp_degree int

Pipeline parallel degree.

dp_degree int

Data parallel degree.

tp_degree int

Tensor parallel degree.

world_size int

World size of the training job.

job_metadata Optional[str]

An optional arbitrary string that describes the job. This will be appended to the job ID if given. Typically for logging purposes.

Source code in zeus/optimizer/pipeline_frequency/common.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class JobInfo(BaseModel):
    """Training job information reported to the server.

    Attributes:
        job_id: Globally unique ID of the training job, generated by the server.
            This field should be an empty string when sent to the server.
        pp_degree: Pipeline parallel degree.
        dp_degree: Data parallel degree.
        tp_degree: Tensor parallel degree.
        world_size: World size of the training job.
        job_metadata: An optional arbitrary string that describes the job. This will
            be appended to the job ID if given. Typically for logging purposes.
    """

    job_id: str = ""
    pp_degree: int = Field(ge=1)
    dp_degree: int = Field(ge=1)
    tp_degree: int = Field(ge=1)
    world_size: int = Field(ge=1)
    job_metadata: Optional[str] = None

    @validator("job_id")
    def _check_empty_job_id(cls, job_id):
        assert not job_id
        return job_id

    @validator("world_size")
    def _check_world_size(cls, world_size, values):
        """Product of PP, DP, and TP degree would be identical to the world size."""
        assert (
            values["pp_degree"] * values["dp_degree"] * values["tp_degree"]
            == world_size
        )
        return world_size

    def set_job_id(self, scheduler_name: str):
        """Generate and set the job ID."""
        self.job_id = "+".join(
            [
                datetime.now().strftime("%F-%H-%M-%S"),
                f"dp{self.dp_degree}",
                f"pp{self.pp_degree}",
                f"tp{self.tp_degree}",
                scheduler_name,
            ]
        )
        if self.job_metadata:
            self.job_id += f"+{self.job_metadata}"

_check_world_size

_check_world_size(world_size, values)

Product of PP, DP, and TP degree would be identical to the world size.

Source code in zeus/optimizer/pipeline_frequency/common.py
 99
100
101
102
103
104
105
106
@validator("world_size")
def _check_world_size(cls, world_size, values):
    """Product of PP, DP, and TP degree would be identical to the world size."""
    assert (
        values["pp_degree"] * values["dp_degree"] * values["tp_degree"]
        == world_size
    )
    return world_size

set_job_id

set_job_id(scheduler_name)

Generate and set the job ID.

Source code in zeus/optimizer/pipeline_frequency/common.py
108
109
110
111
112
113
114
115
116
117
118
119
120
def set_job_id(self, scheduler_name: str):
    """Generate and set the job ID."""
    self.job_id = "+".join(
        [
            datetime.now().strftime("%F-%H-%M-%S"),
            f"dp{self.dp_degree}",
            f"pp{self.pp_degree}",
            f"tp{self.tp_degree}",
            scheduler_name,
        ]
    )
    if self.job_metadata:
        self.job_id += f"+{self.job_metadata}"

RankInfo

Bases: BaseModel

Information passed to the server from each rank.

Attributes:

Name Type Description
rank int

Global rank of the reporting process.

dp_rank int

Data parallel rank of the reporting procees.

pp_rank int

Pipeline parallel rank of the reporting procees.

tp_rank int

Tensor parallel rank of the reporting procees.

available_frequencies list[int]

List of available frequencies for the rank's GPU.

Source code in zeus/optimizer/pipeline_frequency/common.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class RankInfo(BaseModel):
    """Information passed to the server from each rank.

    Attributes:
        rank: Global rank of the reporting process.
        dp_rank: Data parallel rank of the reporting procees.
        pp_rank: Pipeline parallel rank of the reporting procees.
        tp_rank: Tensor parallel rank of the reporting procees.
        available_frequencies: List of available frequencies for the rank's GPU.
    """

    rank: int = Field(ge=0)
    dp_rank: int = Field(ge=0)
    pp_rank: int = Field(ge=0)
    tp_rank: int = Field(ge=0)
    available_frequencies: list[int]

FrequencySchedule

Bases: BaseModel

Frequency schedule for one iteration.

frequencies is a list of tuples, where the first element is the name of the instruction and the second element is the frequency to use for that instruction.

Source code in zeus/optimizer/pipeline_frequency/common.py
141
142
143
144
145
146
147
148
149
class FrequencySchedule(BaseModel):
    """Frequency schedule for one iteration.

    `frequencies` is a list of tuples, where the first element is the name of the
    instruction and the second element is the frequency to use for that instruction.
    """

    rank: int = Field(ge=0)
    frequencies: list[tuple[str, int]]

ProfilingResult

Bases: BaseModel

Profiling results for a FrequencySchedule of a rank.

Attributes:

Name Type Description
rank int

Global rank of the reporting client.

iter_time list[float]

List of latency of all iterations within the profiling window in seconds.

iter_energy list[float]

List of energy consumption of all iterations within the profiling window in Joules.

time_breakdown dict[str, list[list[float]]]

Duration of each operation across multiple iterations. e.g. time_breakdown["forward"][i] is the list of latencies of all forward computations in the ith iteration.

energy_breakdown dict[str, list[list[float]]]

Energy consumption of each operation across multple iterations. Value has the same structure as time_breakdown.

Source code in zeus/optimizer/pipeline_frequency/common.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class ProfilingResult(BaseModel):
    """Profiling results for a `FrequencySchedule` of a rank.

    Attributes:
        rank: Global rank of the reporting client.
        iter_time: List of latency of all iterations within the profiling window in seconds.
        iter_energy: List of energy consumption of all iterations within the profiling window in Joules.
        time_breakdown: Duration of each operation across multiple iterations.
            e.g. `time_breakdown["forward"][i]` is the list of latencies of all forward computations
            in the `i`th iteration.
        energy_breakdown: Energy consumption of each operation across multple iterations.
            Value has the same structure as `time_breakdown`.
    """

    rank: int = Field(ge=0)
    iter_time: list[float]
    iter_energy: list[float]
    time_breakdown: dict[str, list[list[float]]] = {}
    energy_breakdown: dict[str, list[list[float]]] = {}

OfflineProfilingResult

Bases: BaseModel

Profiling results generated from offline profiling each instruction.

Attributes:

Name Type Description
rank int

Global rank of the reporting client.

dp_rank int

Data parallel rank of the reporting procees.

pp_rank int

Pipeline parallel rank of the reporting procees.

tp_rank int

Tensor parallel rank of the reporting procees.

forward_time dict[int, float]

Dict that maps frequency to average forward computation time.

forward_energy dict[int, float]

Dict that maps frequency to average forward computation energy.

backward_time dict[int, float]

Dict that maps frequency to average backward computation time.

backward_energy dict[int, float]

Dict that maps frequency to average backward computation energy.

Source code in zeus/optimizer/pipeline_frequency/common.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class OfflineProfilingResult(BaseModel):
    """Profiling results generated from offline profiling each instruction.

    Attributes:
        rank: Global rank of the reporting client.
        dp_rank: Data parallel rank of the reporting procees.
        pp_rank: Pipeline parallel rank of the reporting procees.
        tp_rank: Tensor parallel rank of the reporting procees.
        forward_time: Dict that maps frequency to average forward computation time.
        forward_energy: Dict that maps frequency to average forward computation energy.
        backward_time: Dict that maps frequency to average backward computation time.
        backward_energy: Dict that maps frequency to average backward computation energy.
    """

    rank: int = Field(ge=0)
    dp_rank: int = Field(ge=0)
    pp_rank: int = Field(ge=0)
    tp_rank: int = Field(ge=0)
    forward_time: dict[int, float]
    forward_energy: dict[int, float]
    backward_time: dict[int, float]
    backward_energy: dict[int, float]

InstructionProfilingResult

Bases: BaseModel

Time and energy profiling results for each instruction in each stage.

Source code in zeus/optimizer/pipeline_frequency/common.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class InstructionProfilingResult(BaseModel):
    """Time and energy profiling results for each instruction in each stage."""

    __root__: list[OfflineProfilingResult]

    def to_csv(self, filepath: str) -> None:
        """Serialize and save this object into a CSV file.

        Columns: rank, dp_rank, pp_rank, tp_rank, stage, instruction, frequency, time, energy
        Notes
            - `rank` is the global rank of the process.
            - `pp_rank` and `stage` are always the same, for backwards compatibility.
            - All ranks and `stage` are zero-indexed.
            - `instruction` is either "forward" or "backward".
            - `time` and `energy` are already averaged over profiling iterations.
        """
        if not filepath.endswith(".csv"):
            raise ValueError("Filepath does not end with '.csv'")

        # fmt: off
        headers = ["rank", "dp_rank", "pp_rank", "tp_rank", "stage", "instruction", "frequency", "time", "energy"]
        records: list[tuple[int, int, int, int, int, str, int, float, float]] = []
        for res in self.__root__:
            prefix = (res.rank, res.dp_rank, res.pp_rank, res.tp_rank, res.pp_rank)
            for freq in res.forward_time:
                records.append((*prefix, "forward", freq, res.forward_time[freq], res.forward_energy[freq]))
            for freq in res.backward_time:
                records.append((*prefix, "backward", freq, res.backward_time[freq], res.backward_energy[freq]))
        # fmt: on

        df = pd.DataFrame.from_records(records, columns=headers)
        df.to_csv(filepath, index=False)

to_csv

to_csv(filepath)

Serialize and save this object into a CSV file.

Columns: rank, dp_rank, pp_rank, tp_rank, stage, instruction, frequency, time, energy Notes - rank is the global rank of the process. - pp_rank and stage are always the same, for backwards compatibility. - All ranks and stage are zero-indexed. - instruction is either "forward" or "backward". - time and energy are already averaged over profiling iterations.

Source code in zeus/optimizer/pipeline_frequency/common.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def to_csv(self, filepath: str) -> None:
    """Serialize and save this object into a CSV file.

    Columns: rank, dp_rank, pp_rank, tp_rank, stage, instruction, frequency, time, energy
    Notes
        - `rank` is the global rank of the process.
        - `pp_rank` and `stage` are always the same, for backwards compatibility.
        - All ranks and `stage` are zero-indexed.
        - `instruction` is either "forward" or "backward".
        - `time` and `energy` are already averaged over profiling iterations.
    """
    if not filepath.endswith(".csv"):
        raise ValueError("Filepath does not end with '.csv'")

    # fmt: off
    headers = ["rank", "dp_rank", "pp_rank", "tp_rank", "stage", "instruction", "frequency", "time", "energy"]
    records: list[tuple[int, int, int, int, int, str, int, float, float]] = []
    for res in self.__root__:
        prefix = (res.rank, res.dp_rank, res.pp_rank, res.tp_rank, res.pp_rank)
        for freq in res.forward_time:
            records.append((*prefix, "forward", freq, res.forward_time[freq], res.forward_energy[freq]))
        for freq in res.backward_time:
            records.append((*prefix, "backward", freq, res.backward_time[freq], res.backward_energy[freq]))
    # fmt: on

    df = pd.DataFrame.from_records(records, columns=headers)
    df.to_csv(filepath, index=False)

save_prof async

save_prof(data, directory, schedule_num)

Save a list of ProfilingResults in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
231
232
233
234
235
236
237
238
239
240
async def save_prof(
    data: list[ProfilingResult],
    directory: str,
    schedule_num: int,
) -> None:
    """Save a list of `ProfilingResult`s in the designated directory."""
    os.makedirs(directory, exist_ok=True)
    async with aiofiles.open(f"{directory}/{schedule_num}.prof.json", "w") as f:
        obj = _ProfilingResultList(__root__=data).json()
        await f.write(obj)

load_prof

load_prof(directory, schedule_num)

Load a list of ProfilingResults saved in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
243
244
245
246
def load_prof(directory: str, schedule_num: int) -> list[ProfilingResult]:
    """Load a list of `ProfilingResult`s saved in the designated directory."""
    filepath = f"{directory}/{schedule_num}.prof.json"
    return _ProfilingResultList.parse_file(filepath).__root__

save_sched async

save_sched(data, directory, schedule_num)

Save a list of FrequencySchedules in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
249
250
251
252
253
254
255
256
257
258
async def save_sched(
    data: list[FrequencySchedule],
    directory: str,
    schedule_num: int,
) -> None:
    """Save a list of `FrequencySchedule`s in the designated directory."""
    os.makedirs(directory, exist_ok=True)
    async with aiofiles.open(f"{directory}/{schedule_num}.sched.json", "w") as f:
        obj = _FrequencyScheduleList(__root__=data).json()
        await f.write(obj)

load_sched

load_sched(directory, schedule_num)

Load a list of FrequencySchedules saved in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
261
262
263
264
def load_sched(directory: str, schedule_num: int) -> list[FrequencySchedule]:
    """Load a list of `FrequencySchedule`s saved in the designated directory."""
    filepath = f"{directory}/{schedule_num}.sched.json"
    return _FrequencyScheduleList.parse_file(filepath).__root__

save_ranks async

save_ranks(data, directory)

Save a list of RankInfos in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
267
268
269
270
271
272
async def save_ranks(data: list[RankInfo], directory: str) -> None:
    """Save a list of `RankInfo`s in the designated directory."""
    os.makedirs(directory, exist_ok=True)
    async with aiofiles.open(f"{directory}/ranks.json", "w") as f:
        obj = _RankInfoList(__root__=data).json()
        await f.write(obj)

load_ranks

load_ranks(directory)

Load a list of RankInfos saved in the designated directory.

Source code in zeus/optimizer/pipeline_frequency/common.py
275
276
277
278
def load_ranks(directory: str) -> list[RankInfo]:
    """Load a list of `RankInfo`s saved in the designated directory."""
    filepath = f"{directory}/ranks.json"
    return _RankInfoList.parse_file(filepath).__root__