Skip to content

simulate

zeus.simulate

A simulator for running trace-driven Zeus experiments.

Simulator

Simulates job execution optimized by Zeus.

Source code in zeus/simulate.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class Simulator:
    """Simulates job execution optimized by Zeus."""

    def __init__(
        self,
        summary_train: str | pd.DataFrame,
        summary_power: str | pd.DataFrame,
        batch_size_optimizer: BatchSizeOptimizer,
        power_limit_optimizer: PowerLimitOptimizer,
        seed: int = 123456,
        verbose: bool = True,
    ) -> None:
        """Initialize the simulator.

        Args:
            summary_train: Path to or `pd.DataFrame` of the train trace.
            summary_power: Path to or `pd.DataFrame` of the power trace.
            batch_size_optimizer: The user is expected to construct
                the BSO with the desired policy and pass it into the simulator.
            power_limit_optimizer: The user is expected to construct
                the PLO with the desired policy and pass it into the simulator.
            seed: The random seed. Every invocation of any simulation method in this
                class is deterministic given the random seed, because the internal RNG is
                deepcopied before running the simulation.
            verbose: Whether to log out the internal states of the simulator.
        """
        # Generate relevant data.
        train_df = (
            pd.read_csv(summary_train)
            if isinstance(summary_train, str)
            else summary_train
        )
        power_df = (
            pd.read_csv(summary_power)
            if isinstance(summary_power, str)
            else summary_power
        )
        df = train_df.merge(power_df, how="inner")  # type: ignore
        df["TTA"] = df.target_epoch * df.time_per_epoch
        df["ETA"] = df.TTA * df.average_power
        # 'energy_per_epoch' is used to compare different power limits with the same batch size
        # when trying to figure out which power limit is the best one.
        df["energy_per_epoch"] = df.time_per_epoch * df.average_power
        self.df = df

        # Knob optimizers.
        self.bso = batch_size_optimizer
        self.plo = power_limit_optimizer

        # Save arguments.
        self.seed = seed
        self.verbose = verbose

    def simulate_one_job(
        self,
        job: Job,
        num_recurrence: int,
        beta_knob: float,
        eta_knob: float,
    ) -> list[HistoryEntry]:
        r"""Simulate a sequentially recurring job. Explore with early stopping.

        Args:
            job: Job spec to simulate.
            num_recurrence: How many times the job recurs.
            beta_knob: `beta_knob * min_eta` is the early stopping cost threshold.
                Set to `np.inf` to disable early stopping.
            eta_knob: $\eta$ used in the hybrid cost metric.
                $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$

        Returns:
            A list of [`HistoryEntry`][zeus.analyze.HistoryEntry] objects for each job run.
        """
        # Copy all internal state so that simulation does not modify any
        # internal state and is deterministic w.r.t. the random seed.
        bso = deepcopy(self.bso)
        plo = deepcopy(self.plo)
        rng = np.random.default_rng(self.seed)

        # Figure out MAXPOWER.
        max_power = self.df.power_limit.max().item()
        if self.verbose:
            print(f"[Simulator] Max power = {max_power}W")

        # Track the minimum cost observed for the early stopping energy threshold.
        min_cost = np.inf

        # Simulate each job one at a time.
        history: list[HistoryEntry] = []

        if self.verbose:
            print(f"[Simulator] {job} recurring {num_recurrence} times.")

        # A new job. Profile the feasible batch size range.
        batch_sizes = self._profile_batch_size_range(job)

        # Register the job in the BSO.
        bso.register_job(job, batch_sizes)

        # Job recurs.
        for i in range(num_recurrence):
            if self.verbose:
                print(f"\nRecurrence: {i+1}")

            # Run the job until convergence. Upper bound the number of retries to 20.
            # Accumulate the cost of retries before convergence.
            cost_acc = 0.0
            for tries in range(1, 21):
                # Whether this run of the job needed to profile power.
                profiled_power = False

                # Fetch knobs to use.
                bs = bso.predict(job)
                pl = plo.predict(job, bs)

                # When the batch size is first explored, we need to profile power limit.
                if pl is None:
                    profiled_power = True
                    result = self._profile_power_limit(job, bs, eta_knob)
                    for pl, epe in result.items():
                        plo.observe(job, bs, pl, epe)
                    pl = plo.predict(job, bs)
                    assert pl

                # Run the job, potentially early stopping it.
                eta, tta, reached = self._run_job(
                    job=job,
                    batch_size=bs,
                    power_limit=pl,
                    rng=rng,
                    cost_ub=beta_knob * min_cost,
                    eta_knob=eta_knob,
                    profile_power=profiled_power,
                )

                # The job never ran because even one epoch exceeds the cost threshold.
                # Let the BSO observe that this batch size is bad, but since the job
                # did not run, do not add to the history and retry.
                if eta == 0 and tta == 0 and not reached:
                    bso.observe(job, bs, 100 * beta_knob * min_cost, False)
                    continue

                # Compute the hybrid cost metric.
                cost = zeus_cost(eta, tta, eta_knob, max_power)
                cost_acc += cost

                # Provide feedback to the BSO.
                bso.observe(job, bs, cost, reached)

                # Record history for analysis.
                history.append(HistoryEntry(bs, pl, eta, reached, tta))

                # Reached the target metric. Update min_cost and go on to the next recurrence.
                if reached:
                    if self.verbose:
                        print()
                        print(
                            f"[Simulator] Reached target metric in {tries} {'try' if tries == 1 else 'tries'}."
                        )
                    if min_cost > cost_acc:
                        if self.verbose:
                            print(
                                f"[Simulator] Minimum cost updated from {min_cost:.2f} to {cost_acc:.2f}."
                            )
                        min_cost = cost_acc
                    break
                # Didn't reach the target metric.
                # We assume that the default BS (set by the user) will always converge.
                # That is, reaching the target metric with the model should be a feasible task.
                if i == 0:
                    raise RuntimeError(
                        f"The default batch size {job.default_bs} did not converge."
                    )

            # Target metric was not reached in 20 tries. We consider this target metric to be unreachable.
            else:
                raise RuntimeError("Job did not reach the target metric in 20 tries.")

        if self.verbose:
            print()
            print(
                f"[Simulator] {job} (BS, PL, ETA, whether_reached, TTA) history: \n{history}"
            )

        return history

    def simulate_one_alibaba_group(
        self,
        job: Job,
        group_df: pd.DataFrame,
        beta_knob: float,
        eta_knob: float,
    ) -> list[HistoryEntry]:
        r"""Run simulation on one group in the Alibaba trace.

        Concurrent job submissions (jobs that start before the previous job
        finishes) are launched with the batch size known to be of minimum
        cost at that time. The BSO also observes the results of these jobs
        when they are done, and these jobs may well finish before a job that
        started before it. See `observe` in PruningGTSBatchSizeOptimizer for
        an example of handing such a scenario.

        Args:
            job: Job spec of this group.
            group_df: DataFrame of this group. Fields:
                - group: Group ID in trace. Identical across all rows.
                - dataset: Dataset name. Identical across all rows.
                - start_time: Job start time in the trace.
                - end_time: Job end time in the trace.
                - runtime_ratio: runtime of this job over the mean runtime
                    of all the jobs of this dataset. Captures intra-dataset
                    job runtime differences.
            beta_knob: `beta_knob * min_eta` is the early stopping cost threshold.
                Set to `np.inf` to disable early stopping.
            eta_knob: $\eta$ used in the hybrid cost metric.
                $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$

        Returns:
            A list of [`HistoryEntry`][zeus.analyze.HistoryEntry] objects for each job run.
        """
        # Copy all internal state so that simulation does not modify any
        # internal state and is deterministic w.r.t. the random seed.
        bso = deepcopy(self.bso)
        plo = deepcopy(self.plo)
        rng = np.random.default_rng(self.seed)

        # Sanity check
        if job.default_bs is None:
            raise ValueError("You must give the job a default batch size.")

        # Figure out MAXPOWER.
        max_power = self.df.power_limit.max().item()
        if self.verbose:
            print(f"[Simulator] Max power = {max_power}W")

        # Track the minimum cost observed for the early stopping energy threshold.
        # Also track the corresponding minimum cost BS to handle concurrent jobs.
        min_cost = np.inf
        best_bs = job.default_bs

        # Simulate each job one at a time.
        history: list[HistoryEntry] = []

        if self.verbose:
            print(f"[Simulator] {job} recurring {len(group_df)} times.")

        # A new job. Profile the feasible batch size range.
        batch_sizes = self._profile_batch_size_range(job)

        # Register the job in the BSO.
        bso.register_job(job, batch_sizes)

        # List of jobs in flight.
        @dataclass
        class RunningJob:
            """Represents a job that is running.

            We know what's going to happen to this job when we launch it.
            Thus, pre-compute all results (using self.run_job) and have this
            instance hold the information. Then, jobs will be removed from the
            list of running jobs when the current time passes self.end_time and
            the result will be observed.
            """

            start_time: float
            end_time: float
            runtime_ratio: float
            batch_size: int
            power_limit: int
            reached: bool
            time: float
            energy: float
            cost: float

        running_jobs: list[RunningJob] = []

        # Jobs in group_df are already sorted by start_time.
        current_time = 0.0
        for rec_i, (_, job_row) in enumerate(group_df.iterrows()):
            if self.verbose:
                print(f"\nRecurrence: {rec_i+1}")

            # Update the current time.
            current_time = job_row.start_time
            if self.verbose:
                print(f"[Simulator] Current time is {current_time:.1f}")

            # Scan the in-flight job list to reap jobs that have finished.
            # We need a while loop here because we might have submitted a retry job
            # while reaping jobs that failed to reach the target metric, and that retry job
            # may finish before the current job.
            while any(map(lambda j: j.end_time <= current_time, running_jobs)):
                if self.verbose:
                    print(f"[Simulator] Running jobs: {running_jobs}")

                # We copy running_jobs because we want to mutate it as we traverse it.
                running_jobs_copy = deepcopy(running_jobs)

                # Sort the jobs in the order they end.
                for rjob in sorted(
                    running_jobs_copy, key=operator.attrgetter("end_time")
                ):
                    # We're only interested in jobs that finished at this point in time.
                    if rjob.end_time > current_time:
                        continue

                    # Job is finished at this point in time.
                    if self.verbose:
                        print(
                            f"[Simulator] Job(BS={rjob.batch_size},time={rjob.time},"
                            f"energy={rjob.energy},reached={rjob.reached}) finished"
                        )

                    # Remove the job from the in-flight job list.
                    running_jobs.remove(rjob)

                    # Let the BSO observe the cost for this batch size.
                    bso.observe(job, rjob.batch_size, rjob.cost, rjob.reached)

                    # If the job never ran because even one epoch exceeds the cost threshold,
                    # do not add to the history and retry.
                    if rjob.energy != 0 or rjob.time != 0 or rjob.reached:
                        # Record history for analysis.
                        history.append(
                            HistoryEntry(
                                rjob.batch_size,
                                rjob.power_limit,
                                rjob.energy
                                * rjob.runtime_ratio,  # Scale the energy of this job by the runtime ratio.
                                rjob.reached,
                                rjob.time
                                * rjob.runtime_ratio,  # Scale the runtime of this job by the runtime ratio.
                            )
                        )

                    # Reached target metric (no need to retry)
                    if rjob.reached:
                        if min_cost > rjob.cost:
                            if self.verbose:
                                print(
                                    f"[Simulator] Minimum cost updated from {min_cost:.2f} to {rjob.cost:.2f}"
                                )
                            min_cost = rjob.cost
                            best_bs = rjob.batch_size

                    # Didn't reach target metric. Need to run a retry job.
                    else:
                        profiled_power = False

                        # If there are jobs in-flight, we just run additional concurrent
                        # submissions with the best known knobs.
                        if running_jobs:
                            if self.verbose:
                                print(
                                    f"[Simulator] There are in-flight jobs. Use BS {best_bs}."
                                )
                            bs = best_bs
                            pl = plo.predict(job, bs)
                            assert pl, f"Power not profiled for best known BS {bs}"

                        # There are no jobs in-flight. Just submit the job normally.
                        else:
                            # Determine the knobs.
                            bs = bso.predict(job)
                            pl = plo.predict(job, bs)

                            if self.verbose:
                                print(
                                    f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                                )

                            # When the batch size is first explored, we need to profile power limit.
                            if pl is None:
                                profiled_power = True
                                result = self._profile_power_limit(job, bs, eta_knob)
                                for pl, epe in result.items():
                                    plo.observe(job, bs, pl, epe)
                                pl = plo.predict(job, bs)
                                assert pl

                        # Pre-compute the result of the job.
                        eta, tta, reached = self._run_job(
                            job=job,
                            batch_size=bs,
                            power_limit=pl,
                            rng=rng,
                            cost_ub=beta_knob * min_cost,
                            eta_knob=eta_knob,
                            profile_power=profiled_power,
                        )

                        # Compute the hybrid cost metric.
                        cost = zeus_cost(eta, tta, eta_knob, max_power)

                        # Create the RunningJob instance.
                        running_job = RunningJob(
                            start_time=rjob.end_time,
                            end_time=rjob.end_time
                            + (rjob.end_time - rjob.start_time),  # Assume same runtime.
                            runtime_ratio=rjob.runtime_ratio,
                            batch_size=bs,
                            power_limit=pl,
                            reached=reached,
                            time=tta,
                            energy=eta,
                            cost=cost,
                        )
                        running_jobs.append(running_job)

                        if self.verbose:
                            print(f"[Simulator] Started retry job {running_job}")

                        # We must break from the loop that scans the running_jobs list, because
                        # the job we just submitted might actually be the next job that finishes.
                        break

            # We're (finally) done reaping all finished jobs. Run the current job.
            profiled_power = False

            # If there are jobs in-flight, we just run additional concurrent
            # submissions with the best known knobs.
            if running_jobs:
                if self.verbose:
                    print(f"[Simulator] There are in-flight jobs. Use BS {best_bs}.")
                bs = best_bs
                pl = plo.predict(job, bs)
                assert pl is not None, f"Power not profiled for best known BS {bs}"

            # There are no jobs in-flight. Just submit the job.
            else:
                # Determine the knobs.
                bs = bso.predict(job)
                pl = plo.predict(job, bs)

                if self.verbose:
                    print(
                        f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                    )

                # When the batch size is first explored, we need to profile power limit.
                if pl is None:
                    profiled_power = True
                    result = self._profile_power_limit(job, bs, eta_knob)
                    for pl, epe in result.items():
                        plo.observe(job, bs, pl, epe)
                    pl = plo.predict(job, bs)
                    assert pl

            # Run the job, potentially early stopping it.
            eta, tta, reached = self._run_job(
                job=job,
                batch_size=bs,
                power_limit=pl,
                rng=rng,
                cost_ub=beta_knob * min_cost,
                eta_knob=eta_knob,
                profile_power=profiled_power,
            )

            # Compute the hybrid cost metric.
            cost = zeus_cost(eta, tta, eta_knob, max_power)

            # Create the RunningJob instance and enqueue.
            running_job = RunningJob(
                start_time=job_row.start_time,
                end_time=job_row.end_time,
                runtime_ratio=job_row.runtime_ratio,
                batch_size=bs,
                power_limit=pl,
                reached=reached,
                time=tta,
                energy=eta,
                cost=cost,
            )
            running_jobs.append(running_job)

            if self.verbose:
                print(f"[Simulator] Started job {running_job}")

        # Now, we're done iterating the rows of group_df.
        # Set the current time to infinity so that all running jobs finish.
        current_time = np.inf
        if self.verbose:
            print("\n[Simulator] Reap all jobs")

        # Scan the remaining in-flight job list to reap jobs that have finished.
        # Since current_time is infinity, this while loop will reap all the jobs ever launched.
        while any(map(lambda j: j.end_time <= current_time, running_jobs)):
            if self.verbose:
                print(f"[Simulator] Running jobs: {running_jobs}")

            # We copy running_jobs because we want to mutate it as we traverse it.
            running_jobs_copy = deepcopy(running_jobs)

            # Sort the jobs in the order they end.
            for rjob in sorted(running_jobs_copy, key=operator.attrgetter("end_time")):
                # We're only interested in jobs that finished at this point in time.
                if rjob.end_time > current_time:
                    continue

                # Job is finished at this point in time.
                if self.verbose:
                    print(
                        f"[Simulator] Job(BS={rjob.batch_size},time={rjob.time},"
                        f"energy={rjob.energy},reached={rjob.reached}) finished"
                    )

                # Remove the job from the in-flight job list.
                running_jobs.remove(rjob)

                # Let the BSO observe the cost for this batch size.
                bso.observe(job, rjob.batch_size, rjob.cost, rjob.reached)

                # If the job never ran because even one epoch exceeds the cost threshold,
                # do not add to the history and retry.
                if rjob.energy != 0 or rjob.time != 0 or rjob.reached:
                    # Record history for analysis.
                    history.append(
                        HistoryEntry(
                            rjob.batch_size,
                            rjob.power_limit,
                            rjob.energy
                            * rjob.runtime_ratio,  # Scale the energy of this job by the runtime ratio.
                            rjob.reached,
                            rjob.time
                            * rjob.runtime_ratio,  # Scale the runtime of this job by the runtime ratio.
                        )
                    )

                # Reached target metric (no need to retry)
                if rjob.reached:
                    if min_cost > rjob.cost:
                        if self.verbose:
                            print(
                                f"[Simulator] Minimum cost updated from {min_cost:.2f} to {rjob.cost:.2f}"
                            )
                        min_cost = rjob.cost
                        best_bs = rjob.batch_size

                # Didin't reach target metric. Need to run a retry job.
                else:
                    profiled_power = False

                    # If there are jobs in-flight, we just run additional concurrent
                    # submissions with the best known knobs.
                    if running_jobs:
                        if self.verbose:
                            print(
                                f"[Simulator] There are in-flight jobs. Use BS {best_bs}."
                            )
                        bs = best_bs
                        pl = plo.predict(job, bs)
                        assert pl, f"Power not profiled for best known BS {bs}"

                    # There are no jobs in-flight. Just submit the job normally.
                    else:
                        # Determine the knobs.
                        bs = bso.predict(job)
                        pl = plo.predict(job, bs)

                        if self.verbose:
                            print(
                                f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                            )

                        # When the batch size is first explored, we need to profile power limit.
                        if pl is None:
                            profiled_power = True
                            result = self._profile_power_limit(job, bs, eta_knob)
                            for pl, epe in result.items():
                                plo.observe(job, bs, pl, epe)
                            pl = plo.predict(job, bs)
                            assert pl

                    # Pre-compute the result of the job.
                    eta, tta, reached = self._run_job(
                        job=job,
                        batch_size=bs,
                        power_limit=pl,
                        rng=rng,
                        cost_ub=beta_knob * min_cost,
                        eta_knob=eta_knob,
                        profile_power=profiled_power,
                    )

                    # Compute the hybrid cost metric.
                    cost = zeus_cost(eta, tta, eta_knob, max_power)

                    # Create the RunningJob instance.
                    running_job = RunningJob(
                        start_time=rjob.end_time,
                        end_time=rjob.end_time
                        + (rjob.end_time - rjob.start_time),  # Assume same runtime.
                        runtime_ratio=rjob.runtime_ratio,
                        batch_size=bs,
                        power_limit=pl,
                        reached=reached,
                        time=tta,
                        energy=eta,
                        cost=cost,
                    )
                    running_jobs.append(running_job)

                    if self.verbose:
                        print(f"[Simulator] Started retry job {running_job}")

                    # We must break from the loop that scans the running_jobs list, because
                    # the job we just submitted might actually be the next job that finishes.
                    break

        return history

    def _run_job(
        self,
        job: Job,
        batch_size: int,
        power_limit: int,
        rng: np.random.Generator,
        cost_ub: float,
        eta_knob: float,
        profile_power: bool,
    ) -> tuple[float, float, bool]:
        r"""Simulate running the job and return the energy consumed and whether it converged.

        This method will randomly choose one of the possible training "paths". Then,
        based on cost_ub, it will compute the maximum number of epochs the job can run.
        If the path's target_epoch is smaller than or equal to the maximum number of
        epochs, the cost incurred until target_epoch is returned. Otherwise, the cost
        incurred until the maximum number of epochs is returned.

        It is important to note that the job may never run when the first epoch's cost
        is already expected to exceed the cost upper bound. In such a case, the returned
        time and energy consumptions will be zero. This case must be treated separately
        in the calling code.

        If profile_power is True, the first epoch will JIT-profile power limits coarsely
        by dividing the epoch evenly into len(available_power_limits) pieces. Thus the
        the first epoch's energy and time consumption will be slightly adjusted.

        Args:
            job: Job spec to run.
            batch_size: Batch size to use.
            power_limit: Power limit to use. Regardless of whether this run of this
                batch size requires power profiling, the simulator will input the optimal
                power limit for the batch size. The first epoch energy consumption from
                power profiling is adjusted in the latter half of this method based on the
                profile_power flag.
            rng: Random number generator used to sample one training path.
            cost_ub: Cost upper bound. The job is terminated when the next epoch is going
                to exceed the cost upper bound.
            eta_knob: $\eta$ used in the hybrid cost metric.
                $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$
            profile_power: Whether this run of the job should profile power during the
                first epoch.

        Returns:
            Tuple of energy consumption, time consumption, and whether the job reached the target metric.
        """
        # df is filtered with job spec, BS, and PL. We sample one possible training path.
        # power_df is filtered with job spec and BS. We use this to compute the energy
        # consumption of profiling power during the first epoch.
        df = job.filter_df(self.df)
        power_df = df.loc[df.batch_size == batch_size]
        df = power_df.loc[df.power_limit == power_limit]
        path = df.sample(n=1, random_state=rng)

        # Max number of epochs is bound by either the cost upper bound or the user-specified
        # max_epochs, whichever is smaller.
        if cost_ub == np.inf:
            # cost_ub is infinity in two cases:
            # 1. The simulator has never observed any cost value in the early part of simulation.
            # 2. We're simulating with no early stopping, i.e. beta_knob is infinity.
            max_epochs = job.max_epochs
            if self.verbose:
                print(f"[run job] Cost UB is inf. {max_epochs=}")
        else:
            # Stop right before the first epoch when cost will cross the upper bound.
            cost_per_epoch = (
                eta_knob * path.energy_per_epoch.item()
                + (1 - eta_knob)
                * power_df.power_limit.max().item()
                * path.time_per_epoch.item()
            )
            max_epochs = min(cost_ub // cost_per_epoch, job.max_epochs)
            if self.verbose:
                print(f"[run job] {cost_ub=}")
                print(f"[run job] {cost_per_epoch=}")
                print(f"[run job] {max_epochs=}")

        # The job virtually never ran. Time and Energy being zero will be treated specially outside.
        # If the min_cost is so low, this might even prevent this BS from running at all.
        if max_epochs == 0:
            print(
                f"[run job] {job} cannot run even one epoch without exceeding the cost UB."
                f" BS {batch_size}, PL {power_limit}, {eta_knob=}"
            )
            return 0.0, 0.0, False

        def compute_energy_and_time(
            num_epochs: int, profile_power: bool
        ) -> tuple[float, float]:
            """Compute the energy and time consumed for running the job for num_epochs."""
            # This is the first run of this batch size, and we need to profile power
            # during the first epoch.
            if profile_power:
                # Note that power_df holds rows with all power limits. Evenly splitting the
                # epochs with the number of samples and running each slice with each power
                # limit consumes (1/N) * e_100 + (1/N) * e_125 + ... + (1/N) * e_250.
                # Also there are all runs 1, 2, ... included, but power info is actually
                # completely duplicated across different runs in the DataFrame.
                # Thus, taking the mean across the entire power_df gets us what we want.
                energy_first_epoch = power_df.energy_per_epoch.mean().item()
                energy_from_second_epoch = path.energy_per_epoch.item() * (
                    num_epochs - 1
                )
                energy_consumption = energy_first_epoch + energy_from_second_epoch
                time_first_epoch = power_df.time_per_epoch.mean().item()
                time_from_second_epoch = path.time_per_epoch.item() * (num_epochs - 1)
                time_consumption = time_first_epoch + time_from_second_epoch
            # Just run num_epochs with the given power limit. Simple.
            else:
                energy_consumption = path.energy_per_epoch.item() * num_epochs
                time_consumption = path.time_per_epoch.item() * num_epochs
            return energy_consumption, time_consumption

        # Job reached target metric.
        target_epoch = path.target_epoch.item()
        if path.target_epoch.notnull().item() and target_epoch <= max_epochs:
            eta, tta = compute_energy_and_time(target_epoch, profile_power)
            if self.verbose:
                print(
                    f"[run job] {job} @ {batch_size},{power_limit}W{' prof' if profile_power else ''} "
                    f"=> \033[31mReached in {int(target_epoch)} epochs, "
                    f"TTA {tta:.2f} seconds, ETA {eta:.2f}\033[0m"
                )
            return eta, tta, True

        # Job failed to reach the target metric.
        energy_consumption, time_consumption = compute_energy_and_time(
            max_epochs, profile_power
        )
        if self.verbose:
            print(
                f"[run job] {job} @ {batch_size},{power_limit}W{' prof' if profile_power else ''} "
                f"=> \033[31mFailed (stopped after {int(max_epochs)} epochs), "
                f"TTA {time_consumption:.2f} seconds, ETA {energy_consumption:.2f}\033[0m"
            )
        return energy_consumption, time_consumption, False

    def _profile_power_limit(
        self, job: Job, batch_size: int, eta_knob: float
    ) -> dict[int, float]:
        """Simulate running the job and profiling the power limit.

        Returns:
            Dictionary mapping PL to `energy_per_epoch`. PL is inserted in high to low order.
        """
        # Filter by job spec and BS.
        df = job.filter_df(self.df)
        df = df.loc[(df.batch_size == batch_size)]

        # Compute the epoch cost of each power limit (Equation 7).
        max_pl = df.power_limit.max().item()
        df = df.groupby(["power_limit"], as_index=False).mean(numeric_only=True)
        df["epoch_cost"] = (
            eta_knob * df["average_power"] + (1 - eta_knob) * max_pl
        ) * df["time_per_epoch"]

        # We'll be profiling energy from larger to smaller power limits.
        df = df.sort_values(by="power_limit", ascending=False)
        result = {rec.power_limit: rec.epoch_cost for rec in df.to_records(index=False)}
        if self.verbose:
            print(f"[PL profile] {job} @ {batch_size} => PL = {min(result, key=result.get)}W")  # type: ignore
        return result

    def _profile_batch_size_range(self, job: Job) -> list[int]:
        """Simulate profiling the available batch size range of the job.

        Returns:
            A list of feasible batch sizes.
        """
        df = self.df
        # Do not filter by target_metric here since we do not want to constrain
        # the feasible batch size range to only those that reached the target metric.
        df = df.loc[
            (df.dataset == job.dataset)
            & (df.network == job.network)
            & (df.optimizer == job.optimizer)
        ]
        result = sorted(list(df.batch_size.unique()))
        if self.verbose:
            print(f"[BS profile] {job} => BS = {result}")
        return result

__init__

__init__(summary_train, summary_power, batch_size_optimizer, power_limit_optimizer, seed=123456, verbose=True)

Parameters:

Name Type Description Default
summary_train str | DataFrame

Path to or pd.DataFrame of the train trace.

required
summary_power str | DataFrame

Path to or pd.DataFrame of the power trace.

required
batch_size_optimizer BatchSizeOptimizer

The user is expected to construct the BSO with the desired policy and pass it into the simulator.

required
power_limit_optimizer PowerLimitOptimizer

The user is expected to construct the PLO with the desired policy and pass it into the simulator.

required
seed int

The random seed. Every invocation of any simulation method in this class is deterministic given the random seed, because the internal RNG is deepcopied before running the simulation.

123456
verbose bool

Whether to log out the internal states of the simulator.

True
Source code in zeus/simulate.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    summary_train: str | pd.DataFrame,
    summary_power: str | pd.DataFrame,
    batch_size_optimizer: BatchSizeOptimizer,
    power_limit_optimizer: PowerLimitOptimizer,
    seed: int = 123456,
    verbose: bool = True,
) -> None:
    """Initialize the simulator.

    Args:
        summary_train: Path to or `pd.DataFrame` of the train trace.
        summary_power: Path to or `pd.DataFrame` of the power trace.
        batch_size_optimizer: The user is expected to construct
            the BSO with the desired policy and pass it into the simulator.
        power_limit_optimizer: The user is expected to construct
            the PLO with the desired policy and pass it into the simulator.
        seed: The random seed. Every invocation of any simulation method in this
            class is deterministic given the random seed, because the internal RNG is
            deepcopied before running the simulation.
        verbose: Whether to log out the internal states of the simulator.
    """
    # Generate relevant data.
    train_df = (
        pd.read_csv(summary_train)
        if isinstance(summary_train, str)
        else summary_train
    )
    power_df = (
        pd.read_csv(summary_power)
        if isinstance(summary_power, str)
        else summary_power
    )
    df = train_df.merge(power_df, how="inner")  # type: ignore
    df["TTA"] = df.target_epoch * df.time_per_epoch
    df["ETA"] = df.TTA * df.average_power
    # 'energy_per_epoch' is used to compare different power limits with the same batch size
    # when trying to figure out which power limit is the best one.
    df["energy_per_epoch"] = df.time_per_epoch * df.average_power
    self.df = df

    # Knob optimizers.
    self.bso = batch_size_optimizer
    self.plo = power_limit_optimizer

    # Save arguments.
    self.seed = seed
    self.verbose = verbose

simulate_one_job

simulate_one_job(job, num_recurrence, beta_knob, eta_knob)

Simulate a sequentially recurring job. Explore with early stopping.

Parameters:

Name Type Description Default
job Job

Job spec to simulate.

required
num_recurrence int

How many times the job recurs.

required
beta_knob float

beta_knob * min_eta is the early stopping cost threshold. Set to np.inf to disable early stopping.

required
eta_knob float

\(\eta\) used in the hybrid cost metric. \(\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}\)

required

Returns:

Type Description
list[HistoryEntry]

A list of HistoryEntry objects for each job run.

Source code in zeus/simulate.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def simulate_one_job(
    self,
    job: Job,
    num_recurrence: int,
    beta_knob: float,
    eta_knob: float,
) -> list[HistoryEntry]:
    r"""Simulate a sequentially recurring job. Explore with early stopping.

    Args:
        job: Job spec to simulate.
        num_recurrence: How many times the job recurs.
        beta_knob: `beta_knob * min_eta` is the early stopping cost threshold.
            Set to `np.inf` to disable early stopping.
        eta_knob: $\eta$ used in the hybrid cost metric.
            $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$

    Returns:
        A list of [`HistoryEntry`][zeus.analyze.HistoryEntry] objects for each job run.
    """
    # Copy all internal state so that simulation does not modify any
    # internal state and is deterministic w.r.t. the random seed.
    bso = deepcopy(self.bso)
    plo = deepcopy(self.plo)
    rng = np.random.default_rng(self.seed)

    # Figure out MAXPOWER.
    max_power = self.df.power_limit.max().item()
    if self.verbose:
        print(f"[Simulator] Max power = {max_power}W")

    # Track the minimum cost observed for the early stopping energy threshold.
    min_cost = np.inf

    # Simulate each job one at a time.
    history: list[HistoryEntry] = []

    if self.verbose:
        print(f"[Simulator] {job} recurring {num_recurrence} times.")

    # A new job. Profile the feasible batch size range.
    batch_sizes = self._profile_batch_size_range(job)

    # Register the job in the BSO.
    bso.register_job(job, batch_sizes)

    # Job recurs.
    for i in range(num_recurrence):
        if self.verbose:
            print(f"\nRecurrence: {i+1}")

        # Run the job until convergence. Upper bound the number of retries to 20.
        # Accumulate the cost of retries before convergence.
        cost_acc = 0.0
        for tries in range(1, 21):
            # Whether this run of the job needed to profile power.
            profiled_power = False

            # Fetch knobs to use.
            bs = bso.predict(job)
            pl = plo.predict(job, bs)

            # When the batch size is first explored, we need to profile power limit.
            if pl is None:
                profiled_power = True
                result = self._profile_power_limit(job, bs, eta_knob)
                for pl, epe in result.items():
                    plo.observe(job, bs, pl, epe)
                pl = plo.predict(job, bs)
                assert pl

            # Run the job, potentially early stopping it.
            eta, tta, reached = self._run_job(
                job=job,
                batch_size=bs,
                power_limit=pl,
                rng=rng,
                cost_ub=beta_knob * min_cost,
                eta_knob=eta_knob,
                profile_power=profiled_power,
            )

            # The job never ran because even one epoch exceeds the cost threshold.
            # Let the BSO observe that this batch size is bad, but since the job
            # did not run, do not add to the history and retry.
            if eta == 0 and tta == 0 and not reached:
                bso.observe(job, bs, 100 * beta_knob * min_cost, False)
                continue

            # Compute the hybrid cost metric.
            cost = zeus_cost(eta, tta, eta_knob, max_power)
            cost_acc += cost

            # Provide feedback to the BSO.
            bso.observe(job, bs, cost, reached)

            # Record history for analysis.
            history.append(HistoryEntry(bs, pl, eta, reached, tta))

            # Reached the target metric. Update min_cost and go on to the next recurrence.
            if reached:
                if self.verbose:
                    print()
                    print(
                        f"[Simulator] Reached target metric in {tries} {'try' if tries == 1 else 'tries'}."
                    )
                if min_cost > cost_acc:
                    if self.verbose:
                        print(
                            f"[Simulator] Minimum cost updated from {min_cost:.2f} to {cost_acc:.2f}."
                        )
                    min_cost = cost_acc
                break
            # Didn't reach the target metric.
            # We assume that the default BS (set by the user) will always converge.
            # That is, reaching the target metric with the model should be a feasible task.
            if i == 0:
                raise RuntimeError(
                    f"The default batch size {job.default_bs} did not converge."
                )

        # Target metric was not reached in 20 tries. We consider this target metric to be unreachable.
        else:
            raise RuntimeError("Job did not reach the target metric in 20 tries.")

    if self.verbose:
        print()
        print(
            f"[Simulator] {job} (BS, PL, ETA, whether_reached, TTA) history: \n{history}"
        )

    return history

simulate_one_alibaba_group

simulate_one_alibaba_group(job, group_df, beta_knob, eta_knob)

Run simulation on one group in the Alibaba trace.

Concurrent job submissions (jobs that start before the previous job finishes) are launched with the batch size known to be of minimum cost at that time. The BSO also observes the results of these jobs when they are done, and these jobs may well finish before a job that started before it. See observe in PruningGTSBatchSizeOptimizer for an example of handing such a scenario.

Parameters:

Name Type Description Default
job Job

Job spec of this group.

required
group_df DataFrame

DataFrame of this group. Fields: - group: Group ID in trace. Identical across all rows. - dataset: Dataset name. Identical across all rows. - start_time: Job start time in the trace. - end_time: Job end time in the trace. - runtime_ratio: runtime of this job over the mean runtime of all the jobs of this dataset. Captures intra-dataset job runtime differences.

required
beta_knob float

beta_knob * min_eta is the early stopping cost threshold. Set to np.inf to disable early stopping.

required
eta_knob float

\(\eta\) used in the hybrid cost metric. \(\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}\)

required

Returns:

Type Description
list[HistoryEntry]

A list of HistoryEntry objects for each job run.

Source code in zeus/simulate.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def simulate_one_alibaba_group(
    self,
    job: Job,
    group_df: pd.DataFrame,
    beta_knob: float,
    eta_knob: float,
) -> list[HistoryEntry]:
    r"""Run simulation on one group in the Alibaba trace.

    Concurrent job submissions (jobs that start before the previous job
    finishes) are launched with the batch size known to be of minimum
    cost at that time. The BSO also observes the results of these jobs
    when they are done, and these jobs may well finish before a job that
    started before it. See `observe` in PruningGTSBatchSizeOptimizer for
    an example of handing such a scenario.

    Args:
        job: Job spec of this group.
        group_df: DataFrame of this group. Fields:
            - group: Group ID in trace. Identical across all rows.
            - dataset: Dataset name. Identical across all rows.
            - start_time: Job start time in the trace.
            - end_time: Job end time in the trace.
            - runtime_ratio: runtime of this job over the mean runtime
                of all the jobs of this dataset. Captures intra-dataset
                job runtime differences.
        beta_knob: `beta_knob * min_eta` is the early stopping cost threshold.
            Set to `np.inf` to disable early stopping.
        eta_knob: $\eta$ used in the hybrid cost metric.
            $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$

    Returns:
        A list of [`HistoryEntry`][zeus.analyze.HistoryEntry] objects for each job run.
    """
    # Copy all internal state so that simulation does not modify any
    # internal state and is deterministic w.r.t. the random seed.
    bso = deepcopy(self.bso)
    plo = deepcopy(self.plo)
    rng = np.random.default_rng(self.seed)

    # Sanity check
    if job.default_bs is None:
        raise ValueError("You must give the job a default batch size.")

    # Figure out MAXPOWER.
    max_power = self.df.power_limit.max().item()
    if self.verbose:
        print(f"[Simulator] Max power = {max_power}W")

    # Track the minimum cost observed for the early stopping energy threshold.
    # Also track the corresponding minimum cost BS to handle concurrent jobs.
    min_cost = np.inf
    best_bs = job.default_bs

    # Simulate each job one at a time.
    history: list[HistoryEntry] = []

    if self.verbose:
        print(f"[Simulator] {job} recurring {len(group_df)} times.")

    # A new job. Profile the feasible batch size range.
    batch_sizes = self._profile_batch_size_range(job)

    # Register the job in the BSO.
    bso.register_job(job, batch_sizes)

    # List of jobs in flight.
    @dataclass
    class RunningJob:
        """Represents a job that is running.

        We know what's going to happen to this job when we launch it.
        Thus, pre-compute all results (using self.run_job) and have this
        instance hold the information. Then, jobs will be removed from the
        list of running jobs when the current time passes self.end_time and
        the result will be observed.
        """

        start_time: float
        end_time: float
        runtime_ratio: float
        batch_size: int
        power_limit: int
        reached: bool
        time: float
        energy: float
        cost: float

    running_jobs: list[RunningJob] = []

    # Jobs in group_df are already sorted by start_time.
    current_time = 0.0
    for rec_i, (_, job_row) in enumerate(group_df.iterrows()):
        if self.verbose:
            print(f"\nRecurrence: {rec_i+1}")

        # Update the current time.
        current_time = job_row.start_time
        if self.verbose:
            print(f"[Simulator] Current time is {current_time:.1f}")

        # Scan the in-flight job list to reap jobs that have finished.
        # We need a while loop here because we might have submitted a retry job
        # while reaping jobs that failed to reach the target metric, and that retry job
        # may finish before the current job.
        while any(map(lambda j: j.end_time <= current_time, running_jobs)):
            if self.verbose:
                print(f"[Simulator] Running jobs: {running_jobs}")

            # We copy running_jobs because we want to mutate it as we traverse it.
            running_jobs_copy = deepcopy(running_jobs)

            # Sort the jobs in the order they end.
            for rjob in sorted(
                running_jobs_copy, key=operator.attrgetter("end_time")
            ):
                # We're only interested in jobs that finished at this point in time.
                if rjob.end_time > current_time:
                    continue

                # Job is finished at this point in time.
                if self.verbose:
                    print(
                        f"[Simulator] Job(BS={rjob.batch_size},time={rjob.time},"
                        f"energy={rjob.energy},reached={rjob.reached}) finished"
                    )

                # Remove the job from the in-flight job list.
                running_jobs.remove(rjob)

                # Let the BSO observe the cost for this batch size.
                bso.observe(job, rjob.batch_size, rjob.cost, rjob.reached)

                # If the job never ran because even one epoch exceeds the cost threshold,
                # do not add to the history and retry.
                if rjob.energy != 0 or rjob.time != 0 or rjob.reached:
                    # Record history for analysis.
                    history.append(
                        HistoryEntry(
                            rjob.batch_size,
                            rjob.power_limit,
                            rjob.energy
                            * rjob.runtime_ratio,  # Scale the energy of this job by the runtime ratio.
                            rjob.reached,
                            rjob.time
                            * rjob.runtime_ratio,  # Scale the runtime of this job by the runtime ratio.
                        )
                    )

                # Reached target metric (no need to retry)
                if rjob.reached:
                    if min_cost > rjob.cost:
                        if self.verbose:
                            print(
                                f"[Simulator] Minimum cost updated from {min_cost:.2f} to {rjob.cost:.2f}"
                            )
                        min_cost = rjob.cost
                        best_bs = rjob.batch_size

                # Didn't reach target metric. Need to run a retry job.
                else:
                    profiled_power = False

                    # If there are jobs in-flight, we just run additional concurrent
                    # submissions with the best known knobs.
                    if running_jobs:
                        if self.verbose:
                            print(
                                f"[Simulator] There are in-flight jobs. Use BS {best_bs}."
                            )
                        bs = best_bs
                        pl = plo.predict(job, bs)
                        assert pl, f"Power not profiled for best known BS {bs}"

                    # There are no jobs in-flight. Just submit the job normally.
                    else:
                        # Determine the knobs.
                        bs = bso.predict(job)
                        pl = plo.predict(job, bs)

                        if self.verbose:
                            print(
                                f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                            )

                        # When the batch size is first explored, we need to profile power limit.
                        if pl is None:
                            profiled_power = True
                            result = self._profile_power_limit(job, bs, eta_knob)
                            for pl, epe in result.items():
                                plo.observe(job, bs, pl, epe)
                            pl = plo.predict(job, bs)
                            assert pl

                    # Pre-compute the result of the job.
                    eta, tta, reached = self._run_job(
                        job=job,
                        batch_size=bs,
                        power_limit=pl,
                        rng=rng,
                        cost_ub=beta_knob * min_cost,
                        eta_knob=eta_knob,
                        profile_power=profiled_power,
                    )

                    # Compute the hybrid cost metric.
                    cost = zeus_cost(eta, tta, eta_knob, max_power)

                    # Create the RunningJob instance.
                    running_job = RunningJob(
                        start_time=rjob.end_time,
                        end_time=rjob.end_time
                        + (rjob.end_time - rjob.start_time),  # Assume same runtime.
                        runtime_ratio=rjob.runtime_ratio,
                        batch_size=bs,
                        power_limit=pl,
                        reached=reached,
                        time=tta,
                        energy=eta,
                        cost=cost,
                    )
                    running_jobs.append(running_job)

                    if self.verbose:
                        print(f"[Simulator] Started retry job {running_job}")

                    # We must break from the loop that scans the running_jobs list, because
                    # the job we just submitted might actually be the next job that finishes.
                    break

        # We're (finally) done reaping all finished jobs. Run the current job.
        profiled_power = False

        # If there are jobs in-flight, we just run additional concurrent
        # submissions with the best known knobs.
        if running_jobs:
            if self.verbose:
                print(f"[Simulator] There are in-flight jobs. Use BS {best_bs}.")
            bs = best_bs
            pl = plo.predict(job, bs)
            assert pl is not None, f"Power not profiled for best known BS {bs}"

        # There are no jobs in-flight. Just submit the job.
        else:
            # Determine the knobs.
            bs = bso.predict(job)
            pl = plo.predict(job, bs)

            if self.verbose:
                print(
                    f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                )

            # When the batch size is first explored, we need to profile power limit.
            if pl is None:
                profiled_power = True
                result = self._profile_power_limit(job, bs, eta_knob)
                for pl, epe in result.items():
                    plo.observe(job, bs, pl, epe)
                pl = plo.predict(job, bs)
                assert pl

        # Run the job, potentially early stopping it.
        eta, tta, reached = self._run_job(
            job=job,
            batch_size=bs,
            power_limit=pl,
            rng=rng,
            cost_ub=beta_knob * min_cost,
            eta_knob=eta_knob,
            profile_power=profiled_power,
        )

        # Compute the hybrid cost metric.
        cost = zeus_cost(eta, tta, eta_knob, max_power)

        # Create the RunningJob instance and enqueue.
        running_job = RunningJob(
            start_time=job_row.start_time,
            end_time=job_row.end_time,
            runtime_ratio=job_row.runtime_ratio,
            batch_size=bs,
            power_limit=pl,
            reached=reached,
            time=tta,
            energy=eta,
            cost=cost,
        )
        running_jobs.append(running_job)

        if self.verbose:
            print(f"[Simulator] Started job {running_job}")

    # Now, we're done iterating the rows of group_df.
    # Set the current time to infinity so that all running jobs finish.
    current_time = np.inf
    if self.verbose:
        print("\n[Simulator] Reap all jobs")

    # Scan the remaining in-flight job list to reap jobs that have finished.
    # Since current_time is infinity, this while loop will reap all the jobs ever launched.
    while any(map(lambda j: j.end_time <= current_time, running_jobs)):
        if self.verbose:
            print(f"[Simulator] Running jobs: {running_jobs}")

        # We copy running_jobs because we want to mutate it as we traverse it.
        running_jobs_copy = deepcopy(running_jobs)

        # Sort the jobs in the order they end.
        for rjob in sorted(running_jobs_copy, key=operator.attrgetter("end_time")):
            # We're only interested in jobs that finished at this point in time.
            if rjob.end_time > current_time:
                continue

            # Job is finished at this point in time.
            if self.verbose:
                print(
                    f"[Simulator] Job(BS={rjob.batch_size},time={rjob.time},"
                    f"energy={rjob.energy},reached={rjob.reached}) finished"
                )

            # Remove the job from the in-flight job list.
            running_jobs.remove(rjob)

            # Let the BSO observe the cost for this batch size.
            bso.observe(job, rjob.batch_size, rjob.cost, rjob.reached)

            # If the job never ran because even one epoch exceeds the cost threshold,
            # do not add to the history and retry.
            if rjob.energy != 0 or rjob.time != 0 or rjob.reached:
                # Record history for analysis.
                history.append(
                    HistoryEntry(
                        rjob.batch_size,
                        rjob.power_limit,
                        rjob.energy
                        * rjob.runtime_ratio,  # Scale the energy of this job by the runtime ratio.
                        rjob.reached,
                        rjob.time
                        * rjob.runtime_ratio,  # Scale the runtime of this job by the runtime ratio.
                    )
                )

            # Reached target metric (no need to retry)
            if rjob.reached:
                if min_cost > rjob.cost:
                    if self.verbose:
                        print(
                            f"[Simulator] Minimum cost updated from {min_cost:.2f} to {rjob.cost:.2f}"
                        )
                    min_cost = rjob.cost
                    best_bs = rjob.batch_size

            # Didin't reach target metric. Need to run a retry job.
            else:
                profiled_power = False

                # If there are jobs in-flight, we just run additional concurrent
                # submissions with the best known knobs.
                if running_jobs:
                    if self.verbose:
                        print(
                            f"[Simulator] There are in-flight jobs. Use BS {best_bs}."
                        )
                    bs = best_bs
                    pl = plo.predict(job, bs)
                    assert pl, f"Power not profiled for best known BS {bs}"

                # There are no jobs in-flight. Just submit the job normally.
                else:
                    # Determine the knobs.
                    bs = bso.predict(job)
                    pl = plo.predict(job, bs)

                    if self.verbose:
                        print(
                            f"[Simulator] There are no in-flight jobs. Use BSO's prediction {bs}."
                        )

                    # When the batch size is first explored, we need to profile power limit.
                    if pl is None:
                        profiled_power = True
                        result = self._profile_power_limit(job, bs, eta_knob)
                        for pl, epe in result.items():
                            plo.observe(job, bs, pl, epe)
                        pl = plo.predict(job, bs)
                        assert pl

                # Pre-compute the result of the job.
                eta, tta, reached = self._run_job(
                    job=job,
                    batch_size=bs,
                    power_limit=pl,
                    rng=rng,
                    cost_ub=beta_knob * min_cost,
                    eta_knob=eta_knob,
                    profile_power=profiled_power,
                )

                # Compute the hybrid cost metric.
                cost = zeus_cost(eta, tta, eta_knob, max_power)

                # Create the RunningJob instance.
                running_job = RunningJob(
                    start_time=rjob.end_time,
                    end_time=rjob.end_time
                    + (rjob.end_time - rjob.start_time),  # Assume same runtime.
                    runtime_ratio=rjob.runtime_ratio,
                    batch_size=bs,
                    power_limit=pl,
                    reached=reached,
                    time=tta,
                    energy=eta,
                    cost=cost,
                )
                running_jobs.append(running_job)

                if self.verbose:
                    print(f"[Simulator] Started retry job {running_job}")

                # We must break from the loop that scans the running_jobs list, because
                # the job we just submitted might actually be the next job that finishes.
                break

    return history

_run_job

_run_job(job, batch_size, power_limit, rng, cost_ub, eta_knob, profile_power)

Simulate running the job and return the energy consumed and whether it converged.

This method will randomly choose one of the possible training "paths". Then, based on cost_ub, it will compute the maximum number of epochs the job can run. If the path's target_epoch is smaller than or equal to the maximum number of epochs, the cost incurred until target_epoch is returned. Otherwise, the cost incurred until the maximum number of epochs is returned.

It is important to note that the job may never run when the first epoch's cost is already expected to exceed the cost upper bound. In such a case, the returned time and energy consumptions will be zero. This case must be treated separately in the calling code.

If profile_power is True, the first epoch will JIT-profile power limits coarsely by dividing the epoch evenly into len(available_power_limits) pieces. Thus the the first epoch's energy and time consumption will be slightly adjusted.

Parameters:

Name Type Description Default
job Job

Job spec to run.

required
batch_size int

Batch size to use.

required
power_limit int

Power limit to use. Regardless of whether this run of this batch size requires power profiling, the simulator will input the optimal power limit for the batch size. The first epoch energy consumption from power profiling is adjusted in the latter half of this method based on the profile_power flag.

required
rng Generator

Random number generator used to sample one training path.

required
cost_ub float

Cost upper bound. The job is terminated when the next epoch is going to exceed the cost upper bound.

required
eta_knob float

\(\eta\) used in the hybrid cost metric. \(\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}\)

required
profile_power bool

Whether this run of the job should profile power during the first epoch.

required

Returns:

Type Description
tuple[float, float, bool]

Tuple of energy consumption, time consumption, and whether the job reached the target metric.

Source code in zeus/simulate.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
def _run_job(
    self,
    job: Job,
    batch_size: int,
    power_limit: int,
    rng: np.random.Generator,
    cost_ub: float,
    eta_knob: float,
    profile_power: bool,
) -> tuple[float, float, bool]:
    r"""Simulate running the job and return the energy consumed and whether it converged.

    This method will randomly choose one of the possible training "paths". Then,
    based on cost_ub, it will compute the maximum number of epochs the job can run.
    If the path's target_epoch is smaller than or equal to the maximum number of
    epochs, the cost incurred until target_epoch is returned. Otherwise, the cost
    incurred until the maximum number of epochs is returned.

    It is important to note that the job may never run when the first epoch's cost
    is already expected to exceed the cost upper bound. In such a case, the returned
    time and energy consumptions will be zero. This case must be treated separately
    in the calling code.

    If profile_power is True, the first epoch will JIT-profile power limits coarsely
    by dividing the epoch evenly into len(available_power_limits) pieces. Thus the
    the first epoch's energy and time consumption will be slightly adjusted.

    Args:
        job: Job spec to run.
        batch_size: Batch size to use.
        power_limit: Power limit to use. Regardless of whether this run of this
            batch size requires power profiling, the simulator will input the optimal
            power limit for the batch size. The first epoch energy consumption from
            power profiling is adjusted in the latter half of this method based on the
            profile_power flag.
        rng: Random number generator used to sample one training path.
        cost_ub: Cost upper bound. The job is terminated when the next epoch is going
            to exceed the cost upper bound.
        eta_knob: $\eta$ used in the hybrid cost metric.
            $\textrm{cost} = \eta \cdot \textrm{ETA} + (1 - \eta) \cdot \textrm{MaxPower} \cdot \textrm{TTA}$
        profile_power: Whether this run of the job should profile power during the
            first epoch.

    Returns:
        Tuple of energy consumption, time consumption, and whether the job reached the target metric.
    """
    # df is filtered with job spec, BS, and PL. We sample one possible training path.
    # power_df is filtered with job spec and BS. We use this to compute the energy
    # consumption of profiling power during the first epoch.
    df = job.filter_df(self.df)
    power_df = df.loc[df.batch_size == batch_size]
    df = power_df.loc[df.power_limit == power_limit]
    path = df.sample(n=1, random_state=rng)

    # Max number of epochs is bound by either the cost upper bound or the user-specified
    # max_epochs, whichever is smaller.
    if cost_ub == np.inf:
        # cost_ub is infinity in two cases:
        # 1. The simulator has never observed any cost value in the early part of simulation.
        # 2. We're simulating with no early stopping, i.e. beta_knob is infinity.
        max_epochs = job.max_epochs
        if self.verbose:
            print(f"[run job] Cost UB is inf. {max_epochs=}")
    else:
        # Stop right before the first epoch when cost will cross the upper bound.
        cost_per_epoch = (
            eta_knob * path.energy_per_epoch.item()
            + (1 - eta_knob)
            * power_df.power_limit.max().item()
            * path.time_per_epoch.item()
        )
        max_epochs = min(cost_ub // cost_per_epoch, job.max_epochs)
        if self.verbose:
            print(f"[run job] {cost_ub=}")
            print(f"[run job] {cost_per_epoch=}")
            print(f"[run job] {max_epochs=}")

    # The job virtually never ran. Time and Energy being zero will be treated specially outside.
    # If the min_cost is so low, this might even prevent this BS from running at all.
    if max_epochs == 0:
        print(
            f"[run job] {job} cannot run even one epoch without exceeding the cost UB."
            f" BS {batch_size}, PL {power_limit}, {eta_knob=}"
        )
        return 0.0, 0.0, False

    def compute_energy_and_time(
        num_epochs: int, profile_power: bool
    ) -> tuple[float, float]:
        """Compute the energy and time consumed for running the job for num_epochs."""
        # This is the first run of this batch size, and we need to profile power
        # during the first epoch.
        if profile_power:
            # Note that power_df holds rows with all power limits. Evenly splitting the
            # epochs with the number of samples and running each slice with each power
            # limit consumes (1/N) * e_100 + (1/N) * e_125 + ... + (1/N) * e_250.
            # Also there are all runs 1, 2, ... included, but power info is actually
            # completely duplicated across different runs in the DataFrame.
            # Thus, taking the mean across the entire power_df gets us what we want.
            energy_first_epoch = power_df.energy_per_epoch.mean().item()
            energy_from_second_epoch = path.energy_per_epoch.item() * (
                num_epochs - 1
            )
            energy_consumption = energy_first_epoch + energy_from_second_epoch
            time_first_epoch = power_df.time_per_epoch.mean().item()
            time_from_second_epoch = path.time_per_epoch.item() * (num_epochs - 1)
            time_consumption = time_first_epoch + time_from_second_epoch
        # Just run num_epochs with the given power limit. Simple.
        else:
            energy_consumption = path.energy_per_epoch.item() * num_epochs
            time_consumption = path.time_per_epoch.item() * num_epochs
        return energy_consumption, time_consumption

    # Job reached target metric.
    target_epoch = path.target_epoch.item()
    if path.target_epoch.notnull().item() and target_epoch <= max_epochs:
        eta, tta = compute_energy_and_time(target_epoch, profile_power)
        if self.verbose:
            print(
                f"[run job] {job} @ {batch_size},{power_limit}W{' prof' if profile_power else ''} "
                f"=> \033[31mReached in {int(target_epoch)} epochs, "
                f"TTA {tta:.2f} seconds, ETA {eta:.2f}\033[0m"
            )
        return eta, tta, True

    # Job failed to reach the target metric.
    energy_consumption, time_consumption = compute_energy_and_time(
        max_epochs, profile_power
    )
    if self.verbose:
        print(
            f"[run job] {job} @ {batch_size},{power_limit}W{' prof' if profile_power else ''} "
            f"=> \033[31mFailed (stopped after {int(max_epochs)} epochs), "
            f"TTA {time_consumption:.2f} seconds, ETA {energy_consumption:.2f}\033[0m"
        )
    return energy_consumption, time_consumption, False

_profile_power_limit

_profile_power_limit(job, batch_size, eta_knob)

Simulate running the job and profiling the power limit.

Returns:

Type Description
dict[int, float]

Dictionary mapping PL to energy_per_epoch. PL is inserted in high to low order.

Source code in zeus/simulate.py
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
def _profile_power_limit(
    self, job: Job, batch_size: int, eta_knob: float
) -> dict[int, float]:
    """Simulate running the job and profiling the power limit.

    Returns:
        Dictionary mapping PL to `energy_per_epoch`. PL is inserted in high to low order.
    """
    # Filter by job spec and BS.
    df = job.filter_df(self.df)
    df = df.loc[(df.batch_size == batch_size)]

    # Compute the epoch cost of each power limit (Equation 7).
    max_pl = df.power_limit.max().item()
    df = df.groupby(["power_limit"], as_index=False).mean(numeric_only=True)
    df["epoch_cost"] = (
        eta_knob * df["average_power"] + (1 - eta_knob) * max_pl
    ) * df["time_per_epoch"]

    # We'll be profiling energy from larger to smaller power limits.
    df = df.sort_values(by="power_limit", ascending=False)
    result = {rec.power_limit: rec.epoch_cost for rec in df.to_records(index=False)}
    if self.verbose:
        print(f"[PL profile] {job} @ {batch_size} => PL = {min(result, key=result.get)}W")  # type: ignore
    return result

_profile_batch_size_range

_profile_batch_size_range(job)

Simulate profiling the available batch size range of the job.

Returns:

Type Description
list[int]

A list of feasible batch sizes.

Source code in zeus/simulate.py
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def _profile_batch_size_range(self, job: Job) -> list[int]:
    """Simulate profiling the available batch size range of the job.

    Returns:
        A list of feasible batch sizes.
    """
    df = self.df
    # Do not filter by target_metric here since we do not want to constrain
    # the feasible batch size range to only those that reached the target metric.
    df = df.loc[
        (df.dataset == job.dataset)
        & (df.network == job.network)
        & (df.optimizer == job.optimizer)
    ]
    result = sorted(list(df.batch_size.unique()))
    if self.verbose:
        print(f"[BS profile] {job} => BS = {result}")
    return result