Skip to content

optimizer

zeus.policy.optimizer

Implementations of various optimization policies.

JITPowerLimitOptimizer and PruningGTSBatchSizeOptimizer are the implementations used in Zeus's publication.

GTSBatchSizeOptimizer

Bases: BatchSizeOptimizer

One Gaussian Thompson Sampling MAB for each job.

Source code in zeus/policy/optimizer.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class GTSBatchSizeOptimizer(BatchSizeOptimizer):
    """One Gaussian Thompson Sampling MAB for each job."""

    # ruff: noqa: D417
    def __init__(
        self,
        learn_reward_precision: bool,
        reward_precision: float = 0.0,
        prior_mean: float = 0.0,
        prior_precision: float = 0.0,
        num_exploration: int = 1,
        seed: int = 123456,
        verbose: bool = True,
    ) -> None:
        """Initialze the optimizer.

        Refer to the constructor of [`GaussianTS`][zeus.policy.mab.GaussianTS]
        for descriptions of other arguments.

        Args:
            learn_reward_precision: Whether to learn the reward precision of
                each arm as we accumulate observations.
        """
        self.learn_reward_precision = learn_reward_precision
        self.reward_precision = reward_precision
        self.prior_mean = prior_mean
        self.prior_precision = prior_precision
        self.num_exploration = num_exploration
        self.seed = seed
        self.verbose = verbose

        # One MAB for each job.
        self.mabs: dict[Job, GaussianTS] = {}

        # Track the batch size range for each job.
        self.batch_sizes: dict[Job, list[int]] = {}

        # Observation history (batch size, reward) for each job.
        self.history: dict[Job, defaultdict[int, list[float]]] = {}

    @property
    def name(self) -> str:
        """Name of the batch size optimizer."""
        return "GaussianTS BSO"

    def register_job(self, job: Job, batch_sizes: list[int]) -> None:
        """Instantiate a new GaussianTS MAB for the new job."""
        # We do not want to reset the state related to this job if
        # anything already exists.
        if job in self.mabs:
            return
        self.mabs[job] = GaussianTS(
            arms=batch_sizes,
            reward_precision=self.reward_precision,
            prior_mean=self.prior_mean,
            prior_precision=self.prior_precision,
            num_exploration=self.num_exploration,
            seed=self.seed,
            verbose=self.verbose,
        )
        self.batch_sizes[job] = batch_sizes
        self.history[job] = defaultdict(list)
        if self.verbose:
            self._log(f"Registered {job}")

    def predict(self, job: Job) -> int:
        """Return the batch size to use for the job."""
        if self.verbose:
            self._log(f"Prediction for {job}")
        pred = self.mabs[job].predict()
        if self.verbose:
            self._log(f"{job} -> \033[31mBS = {pred}\033[0m")
        return pred

    def observe(
        self, job: Job, batch_size: int, cost: float, converged: bool | None = None
    ) -> None:
        """Learn from the cost of using the given batch size for the job."""
        if batch_size not in self.batch_sizes[job]:
            raise ValueError(f"Unknown batch size '{batch_size}' for {job}.")

        # No normalization needed since we learn a separate bandit for each job.
        reward = -cost

        # Add observation to history.
        self.history[job][batch_size].append(reward)

        # When we're not learning the reward precision, everyting is
        # simple. We can just call `partial_fit` on the job's MAB instance.
        if not self.learn_reward_precision:
            self.mabs[job].fit([batch_size], [reward], reset=False)
            if self.verbose:
                self._log(f"{job} @ {batch_size}: reward = {reward:.2f}")

        # When we're learning the reward precision, we need to
        # 1. re-compute the precision this arm based on the history,
        # 2. update the arm's reward precision
        # 3. and `fit` the new MAB instance on all past data.
        else:
            arm_rewards = np.array(self.history[job][batch_size])
            variance = np.var(arm_rewards)
            # When there is only one observation for the arm, the variance is zero.
            # NOTE: We might still want to have a pre-determined reward precision here
            #       because sampling from an infinite precision Gaussian distribution
            #       always returns the mean (the observation), and it will hamper
            #       exploration in the early stage.
            precision = np.inf if variance == 0.0 else np.reciprocal(variance)
            mab = self.mabs[job]
            mab.arm_reward_prec[batch_size] = precision
            mab.fit_arm(batch_size, arm_rewards, reset=True)
            self.mabs[job] = mab
            if self.verbose:
                arm_rewards_repr = ", ".join([f"{r:.2f}" for r in arm_rewards])
                self._log(
                    f"{job} @ {batch_size}: "
                    f"arm_rewards = [{arm_rewards_repr}], reward_prec = {precision}"
                )

name property

1
name

Name of the batch size optimizer.

__init__

1
2
3
4
5
6
7
8
9
__init__(
    learn_reward_precision,
    reward_precision=0.0,
    prior_mean=0.0,
    prior_precision=0.0,
    num_exploration=1,
    seed=123456,
    verbose=True,
)

Refer to the constructor of GaussianTS for descriptions of other arguments.

Parameters:

Name Type Description Default
learn_reward_precision bool

Whether to learn the reward precision of each arm as we accumulate observations.

required
Source code in zeus/policy/optimizer.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    learn_reward_precision: bool,
    reward_precision: float = 0.0,
    prior_mean: float = 0.0,
    prior_precision: float = 0.0,
    num_exploration: int = 1,
    seed: int = 123456,
    verbose: bool = True,
) -> None:
    """Initialze the optimizer.

    Refer to the constructor of [`GaussianTS`][zeus.policy.mab.GaussianTS]
    for descriptions of other arguments.

    Args:
        learn_reward_precision: Whether to learn the reward precision of
            each arm as we accumulate observations.
    """
    self.learn_reward_precision = learn_reward_precision
    self.reward_precision = reward_precision
    self.prior_mean = prior_mean
    self.prior_precision = prior_precision
    self.num_exploration = num_exploration
    self.seed = seed
    self.verbose = verbose

    # One MAB for each job.
    self.mabs: dict[Job, GaussianTS] = {}

    # Track the batch size range for each job.
    self.batch_sizes: dict[Job, list[int]] = {}

    # Observation history (batch size, reward) for each job.
    self.history: dict[Job, defaultdict[int, list[float]]] = {}

register_job

1
register_job(job, batch_sizes)

Instantiate a new GaussianTS MAB for the new job.

Source code in zeus/policy/optimizer.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def register_job(self, job: Job, batch_sizes: list[int]) -> None:
    """Instantiate a new GaussianTS MAB for the new job."""
    # We do not want to reset the state related to this job if
    # anything already exists.
    if job in self.mabs:
        return
    self.mabs[job] = GaussianTS(
        arms=batch_sizes,
        reward_precision=self.reward_precision,
        prior_mean=self.prior_mean,
        prior_precision=self.prior_precision,
        num_exploration=self.num_exploration,
        seed=self.seed,
        verbose=self.verbose,
    )
    self.batch_sizes[job] = batch_sizes
    self.history[job] = defaultdict(list)
    if self.verbose:
        self._log(f"Registered {job}")

predict

1
predict(job)

Return the batch size to use for the job.

Source code in zeus/policy/optimizer.py
 99
100
101
102
103
104
105
106
def predict(self, job: Job) -> int:
    """Return the batch size to use for the job."""
    if self.verbose:
        self._log(f"Prediction for {job}")
    pred = self.mabs[job].predict()
    if self.verbose:
        self._log(f"{job} -> \033[31mBS = {pred}\033[0m")
    return pred

observe

1
observe(job, batch_size, cost, converged=None)

Learn from the cost of using the given batch size for the job.

Source code in zeus/policy/optimizer.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def observe(
    self, job: Job, batch_size: int, cost: float, converged: bool | None = None
) -> None:
    """Learn from the cost of using the given batch size for the job."""
    if batch_size not in self.batch_sizes[job]:
        raise ValueError(f"Unknown batch size '{batch_size}' for {job}.")

    # No normalization needed since we learn a separate bandit for each job.
    reward = -cost

    # Add observation to history.
    self.history[job][batch_size].append(reward)

    # When we're not learning the reward precision, everyting is
    # simple. We can just call `partial_fit` on the job's MAB instance.
    if not self.learn_reward_precision:
        self.mabs[job].fit([batch_size], [reward], reset=False)
        if self.verbose:
            self._log(f"{job} @ {batch_size}: reward = {reward:.2f}")

    # When we're learning the reward precision, we need to
    # 1. re-compute the precision this arm based on the history,
    # 2. update the arm's reward precision
    # 3. and `fit` the new MAB instance on all past data.
    else:
        arm_rewards = np.array(self.history[job][batch_size])
        variance = np.var(arm_rewards)
        # When there is only one observation for the arm, the variance is zero.
        # NOTE: We might still want to have a pre-determined reward precision here
        #       because sampling from an infinite precision Gaussian distribution
        #       always returns the mean (the observation), and it will hamper
        #       exploration in the early stage.
        precision = np.inf if variance == 0.0 else np.reciprocal(variance)
        mab = self.mabs[job]
        mab.arm_reward_prec[batch_size] = precision
        mab.fit_arm(batch_size, arm_rewards, reset=True)
        self.mabs[job] = mab
        if self.verbose:
            arm_rewards_repr = ", ".join([f"{r:.2f}" for r in arm_rewards])
            self._log(
                f"{job} @ {batch_size}: "
                f"arm_rewards = [{arm_rewards_repr}], reward_prec = {precision}"
            )

PruningExploreManager

Helper class that generates batch sizes to explore and prune.

Source code in zeus/policy/optimizer.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class PruningExploreManager:
    """Helper class that generates batch sizes to explore and prune."""

    def __init__(
        self,
        batch_sizes: list[int],
        default: int,
        num_pruning_rounds: int = 2,
    ) -> None:
        """Initialze the object.

        Args:
            batch_sizes: The initial set of batch sizes to prune from.
            default: The default batch size (b0) to begin exploration from.
            num_pruning_rounds: How many rounds to run pruning.
        """
        # Sanity checks.
        if default not in batch_sizes:
            raise ValueError(f"Default batch size {default} not in {batch_sizes}.")

        # Save arguments.
        self.batch_sizes = batch_sizes
        self.default = default
        self.num_pruning_rounds = num_pruning_rounds

        # State
        self.expecting = default

        # Generator that returns batch sizes.
        self.gen = self._exploration_engine()

    def _exploration_engine(
        self,
    ) -> Generator[int | None, tuple[int, float, bool], list[int]]:
        """Drive pruning exploration.

        Yields the batch size to be explored.
        The caller should `send` a tuple of (explored batch size, cost, whether reached).
        As a safety measure, the explored batch size must match the most recently yielded
        batch size, and otherwise a `RuntimeError` is raised.
        Finally, when exploration is over, returns a sorted list of batch sizes that
        survived pruning.
        """
        for _ in range(self.num_pruning_rounds):
            # A list of batch sizes that reached the target metric.
            good: list[int] = []

            # We first explore downwards form the default batch size, and then go upwards.
            idx = self.batch_sizes.index(self.default)
            down = sorted(self.batch_sizes[: idx + 1], reverse=True)
            up = sorted(self.batch_sizes[idx + 1 :])

            # We track the best cost because the default batch size is updated to the batch
            # size that performed the best.
            best_cost = np.inf

            for bs_list in [down, up]:
                for bs in bs_list:
                    # We tell the outside world to explore `bs`, and we expect the outside
                    # world to give us back the cost of that `bs`.
                    self.expecting = bs
                    batch_size, cost, reached = yield bs
                    if self.expecting != batch_size:
                        raise RuntimeError(
                            f"PruningExplorationManager: {self.expecting=}, {batch_size=}"
                        )
                    self.expecting = 0

                    # An empty `yield` to not proceed to the next batch size when the caller
                    # `send`s in the results.
                    yield

                    # Only batch sizes that reached the target mteric are good.
                    if reached:
                        if best_cost > cost:
                            best_cost = cost
                            self.default = bs
                        good.append(bs)
                    # If the batch size did not reach the target metric, `break`ing here will
                    # allow us to move on to either the next direction of exploration (upwards)
                    # or end this round of pruning exploration.
                    else:
                        break

            self.expecting = 0
            self.batch_sizes = sorted(good)

        return sorted(self.batch_sizes)

    def next_batch_size(self) -> int:
        """Return the next batch size to explore.

        Raises `StopIteration` when pruning exploration phase is over.
        The exception instance contains the final set of batch sizes to consider.
        Access it through `exception.value`.
        """
        batch_size = next(self.gen)
        assert batch_size is not None, "Call order may have been wrong."
        return batch_size

    def report_batch_size_result(
        self, batch_size: int, cost: float, reached: bool
    ) -> None:
        """Report whether the previous batch size reached the target metric.

        Args:
            batch_size: The batch size which this cost observation is from.
            cost: The energy-time cost of running the job with this batch size.
            reached: Whether the job reached the target metric.
        """
        none = self.gen.send((batch_size, cost, reached))
        assert none is None, "Call order may have been wrong."

__init__

1
__init__(batch_sizes, default, num_pruning_rounds=2)

Parameters:

Name Type Description Default
batch_sizes list[int]

The initial set of batch sizes to prune from.

required
default int

The default batch size (b0) to begin exploration from.

required
num_pruning_rounds int

How many rounds to run pruning.

2
Source code in zeus/policy/optimizer.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def __init__(
    self,
    batch_sizes: list[int],
    default: int,
    num_pruning_rounds: int = 2,
) -> None:
    """Initialze the object.

    Args:
        batch_sizes: The initial set of batch sizes to prune from.
        default: The default batch size (b0) to begin exploration from.
        num_pruning_rounds: How many rounds to run pruning.
    """
    # Sanity checks.
    if default not in batch_sizes:
        raise ValueError(f"Default batch size {default} not in {batch_sizes}.")

    # Save arguments.
    self.batch_sizes = batch_sizes
    self.default = default
    self.num_pruning_rounds = num_pruning_rounds

    # State
    self.expecting = default

    # Generator that returns batch sizes.
    self.gen = self._exploration_engine()

_exploration_engine

1
_exploration_engine()

Drive pruning exploration.

Yields the batch size to be explored. The caller should send a tuple of (explored batch size, cost, whether reached). As a safety measure, the explored batch size must match the most recently yielded batch size, and otherwise a RuntimeError is raised. Finally, when exploration is over, returns a sorted list of batch sizes that survived pruning.

Source code in zeus/policy/optimizer.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def _exploration_engine(
    self,
) -> Generator[int | None, tuple[int, float, bool], list[int]]:
    """Drive pruning exploration.

    Yields the batch size to be explored.
    The caller should `send` a tuple of (explored batch size, cost, whether reached).
    As a safety measure, the explored batch size must match the most recently yielded
    batch size, and otherwise a `RuntimeError` is raised.
    Finally, when exploration is over, returns a sorted list of batch sizes that
    survived pruning.
    """
    for _ in range(self.num_pruning_rounds):
        # A list of batch sizes that reached the target metric.
        good: list[int] = []

        # We first explore downwards form the default batch size, and then go upwards.
        idx = self.batch_sizes.index(self.default)
        down = sorted(self.batch_sizes[: idx + 1], reverse=True)
        up = sorted(self.batch_sizes[idx + 1 :])

        # We track the best cost because the default batch size is updated to the batch
        # size that performed the best.
        best_cost = np.inf

        for bs_list in [down, up]:
            for bs in bs_list:
                # We tell the outside world to explore `bs`, and we expect the outside
                # world to give us back the cost of that `bs`.
                self.expecting = bs
                batch_size, cost, reached = yield bs
                if self.expecting != batch_size:
                    raise RuntimeError(
                        f"PruningExplorationManager: {self.expecting=}, {batch_size=}"
                    )
                self.expecting = 0

                # An empty `yield` to not proceed to the next batch size when the caller
                # `send`s in the results.
                yield

                # Only batch sizes that reached the target mteric are good.
                if reached:
                    if best_cost > cost:
                        best_cost = cost
                        self.default = bs
                    good.append(bs)
                # If the batch size did not reach the target metric, `break`ing here will
                # allow us to move on to either the next direction of exploration (upwards)
                # or end this round of pruning exploration.
                else:
                    break

        self.expecting = 0
        self.batch_sizes = sorted(good)

    return sorted(self.batch_sizes)

next_batch_size

1
next_batch_size()

Return the next batch size to explore.

Raises StopIteration when pruning exploration phase is over. The exception instance contains the final set of batch sizes to consider. Access it through exception.value.

Source code in zeus/policy/optimizer.py
242
243
244
245
246
247
248
249
250
251
def next_batch_size(self) -> int:
    """Return the next batch size to explore.

    Raises `StopIteration` when pruning exploration phase is over.
    The exception instance contains the final set of batch sizes to consider.
    Access it through `exception.value`.
    """
    batch_size = next(self.gen)
    assert batch_size is not None, "Call order may have been wrong."
    return batch_size

report_batch_size_result

1
report_batch_size_result(batch_size, cost, reached)

Report whether the previous batch size reached the target metric.

Parameters:

Name Type Description Default
batch_size int

The batch size which this cost observation is from.

required
cost float

The energy-time cost of running the job with this batch size.

required
reached bool

Whether the job reached the target metric.

required
Source code in zeus/policy/optimizer.py
253
254
255
256
257
258
259
260
261
262
263
264
def report_batch_size_result(
    self, batch_size: int, cost: float, reached: bool
) -> None:
    """Report whether the previous batch size reached the target metric.

    Args:
        batch_size: The batch size which this cost observation is from.
        cost: The energy-time cost of running the job with this batch size.
        reached: Whether the job reached the target metric.
    """
    none = self.gen.send((batch_size, cost, reached))
    assert none is None, "Call order may have been wrong."

PruningGTSBatchSizeOptimizer

Bases: BatchSizeOptimizer

One Gaussian Thompson Sampling MAB for each job with double pruning exploration.

Source code in zeus/policy/optimizer.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
class PruningGTSBatchSizeOptimizer(BatchSizeOptimizer):
    """One Gaussian Thompson Sampling MAB for each job with double pruning exploration."""

    def __init__(
        self,
        prior_mean: float = 0.0,
        prior_precision: float = 0.0,
        window_size: int = 0,
        concurrency: bool = False,
        seed: int = 123456,
        verbose: bool = True,
    ) -> None:
        """Initialze the optimizer.

        Refer to the constructor of [`GaussianTS`][zeus.policy.mab.GaussianTS]
        for descriptions of other arguments.

        Args:
            window_size: Size of the window for the MAB (for drift handling).
            concurrency: Whether to support concurrent job submissions.
        """
        self.prior_mean = prior_mean
        self.prior_precision = prior_precision
        self.window_size = window_size
        self.concurrency = concurrency
        self.seed = seed
        self.verbose = verbose

        # One MAB for each job.
        self.mabs: dict[Job, GaussianTS] = {}

        # One PruningExplorationManager for each job.
        self.exp_manager: dict[Job, PruningExploreManager] = {}

        # Observation history (batch size, reward) for each job.
        self.history: dict[Job, list[tuple[int, float]]] = {}

    @property
    def name(self) -> str:
        """Name of the batch size optimizer."""
        return "Pruning GaussianTS BSO"

    def register_job(self, job: Job, batch_sizes: list[int]) -> None:
        """Register the job."""
        # Sanity checks.
        if job.default_bs is None:
            raise ValueError(f"Default BS not specified for {job}.")
        if not batch_sizes:
            raise ValueError(f"Batch size list for {job} is empty.")

        # Set internal states.
        self.exp_manager[job] = PruningExploreManager(
            sorted(batch_sizes), job.default_bs
        )
        self.history[job] = []
        if self.verbose:
            self._log(f"Registered {job}")

    def predict(self, job: Job) -> int:
        """Return the batch size to use for the job."""
        # Try to see if the exploration manager has something.
        try:
            batch_size = self.exp_manager[job].next_batch_size()
            if self.verbose:
                self._log(f"{job} in pruning stage -> \033[31mBS = {batch_size}\033[0m")
        except StopIteration as exp:
            # Pruning stage is over.
            if job not in self.mabs:
                self._construct_mab(job, exp.value)
            batch_size = self.mabs[job].predict()
            if self.verbose:
                self._log(
                    f"{job} in Thompson Sampling stage -> \033[31mBS = {batch_size}\033[0m"
                )

        return batch_size

    def observe(
        self, job: Job, batch_size: int, cost: float, converged: bool | None = None
    ) -> None:
        """Learn from the cost of using the given batch size for the job."""
        # Add observation to history.
        self.history[job].append((batch_size, -cost))

        # We're in Thompson Sampling stage.
        if job in self.mabs:
            # Since we're learning the reward precision, we need to
            # 1. re-compute the precision of this arm based on the reward history,
            # 2. update the arm's reward precision
            # 3. and `fit` the new MAB instance on all the reward history.
            # Note that `arm_rewards` always has more than one entry (and hence a
            # non-zero variance) because we've been through pruning exploration.
            arm_rewards = np.array(self._get_history_for_bs(job, batch_size))
            precision = np.reciprocal(np.var(arm_rewards))
            mab = self.mabs[job]
            mab.arm_reward_prec[batch_size] = precision
            mab.fit_arm(batch_size, arm_rewards, reset=True)
            if self.verbose:
                arm_rewards_repr = ", ".join([f"{r:.2f}" for r in arm_rewards])
                self._log(
                    f"{job} @ {batch_size}: "
                    f"arm_rewards = [{arm_rewards_repr}], reward_prec = {precision}"
                )

        # We're in pruning stage.
        else:
            assert converged is not None
            # Log before we potentially error out.
            if self.verbose:
                self._log(
                    f"{job} in pruning stage, expecting BS {self.exp_manager[job].expecting}."
                    f" Current BS {batch_size} that did {'not ' * converged}converge."
                )

            # If we don't support concurrency, we can just pass the results to the
            # exploration manager, and the manager will err if the order of batch sizes
            # is screwed up.
            if not self.concurrency:
                self.exp_manager[job].report_batch_size_result(
                    batch_size, cost, converged
                )
                return

            # If we are supporting concurrency, there's a subtle issue.
            # Pruning exploration demands a specific order of trying out a batch size
            # and receiving the results (cost and whether reached). This breaks in the
            # following situation, for example:
            # 1. Job with BS 32 that is part of pruning exploration starts.
            # 2. Concurrent job comes in, and we launch it with the best known BS 64.
            # 3. Job with BS 64 finishes first, and calls bso.observe with BS 64.
            # This breaks the observation order assumption of PruningExplorationManager.
            # Thus we check whether the current batch size is the one expected by
            # PruningExplorationManager, and then only if so, call bso.observe.
            # Otherwise, we silently insert the cost observation into the bso's history
            # (first line of this method) and don't touch the PruningExplorationManager.
            if self.exp_manager[job].expecting == batch_size:
                self.exp_manager[job].report_batch_size_result(
                    batch_size, cost, converged
                )

    def _get_history_for_bs(self, job: Job, batch_size: int) -> list[float]:
        """Return the windowed history for the given job's batch size."""
        history = self.history[job]
        rewards = []
        # Collect rewards starting from the most recent ones and backwards.
        for bs, reward in reversed(history):
            if bs == batch_size:
                rewards.append(reward)
                if len(rewards) == self.window_size:
                    break
        # There's no need to return this in time order, but just in case.
        return list(reversed(rewards))

    def _construct_mab(self, job: Job, batch_sizes: list[int]) -> None:
        """When exploration is over, this method is called to construct and learn GTS."""
        # Sanity check.
        if not batch_sizes:
            raise ValueError(
                "Empty batch size set when constructing MAB. "
                "Probably all batch sizes have been pruned."
            )

        if self.verbose:
            self._log(f"Construct MAB for {job} with arms {batch_sizes}")

        mab = GaussianTS(
            arms=batch_sizes,  # The MAB only has "good" arms.
            reward_precision=0.0,
            prior_mean=self.prior_mean,
            prior_precision=self.prior_precision,
            num_exploration=2,
            seed=self.seed,
            verbose=self.verbose,
        )
        # Fit the arm for each good batch size.
        for batch_size in self.exp_manager[job].batch_sizes:
            arm_rewards = np.array(self._get_history_for_bs(job, batch_size))
            assert (
                len(arm_rewards) >= 2
            ), f"Number of observations for {batch_size} is {len(arm_rewards)}."
            mab.arm_reward_prec[batch_size] = np.reciprocal(np.var(arm_rewards))
            mab.fit_arm(batch_size, arm_rewards, reset=True)
        # Save the MAB.
        self.mabs[job] = mab

name property

1
name

Name of the batch size optimizer.

__init__

1
2
3
4
5
6
7
8
__init__(
    prior_mean=0.0,
    prior_precision=0.0,
    window_size=0,
    concurrency=False,
    seed=123456,
    verbose=True,
)

Refer to the constructor of GaussianTS for descriptions of other arguments.

Parameters:

Name Type Description Default
window_size int

Size of the window for the MAB (for drift handling).

0
concurrency bool

Whether to support concurrent job submissions.

False
Source code in zeus/policy/optimizer.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def __init__(
    self,
    prior_mean: float = 0.0,
    prior_precision: float = 0.0,
    window_size: int = 0,
    concurrency: bool = False,
    seed: int = 123456,
    verbose: bool = True,
) -> None:
    """Initialze the optimizer.

    Refer to the constructor of [`GaussianTS`][zeus.policy.mab.GaussianTS]
    for descriptions of other arguments.

    Args:
        window_size: Size of the window for the MAB (for drift handling).
        concurrency: Whether to support concurrent job submissions.
    """
    self.prior_mean = prior_mean
    self.prior_precision = prior_precision
    self.window_size = window_size
    self.concurrency = concurrency
    self.seed = seed
    self.verbose = verbose

    # One MAB for each job.
    self.mabs: dict[Job, GaussianTS] = {}

    # One PruningExplorationManager for each job.
    self.exp_manager: dict[Job, PruningExploreManager] = {}

    # Observation history (batch size, reward) for each job.
    self.history: dict[Job, list[tuple[int, float]]] = {}

register_job

1
register_job(job, batch_sizes)

Register the job.

Source code in zeus/policy/optimizer.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def register_job(self, job: Job, batch_sizes: list[int]) -> None:
    """Register the job."""
    # Sanity checks.
    if job.default_bs is None:
        raise ValueError(f"Default BS not specified for {job}.")
    if not batch_sizes:
        raise ValueError(f"Batch size list for {job} is empty.")

    # Set internal states.
    self.exp_manager[job] = PruningExploreManager(
        sorted(batch_sizes), job.default_bs
    )
    self.history[job] = []
    if self.verbose:
        self._log(f"Registered {job}")

predict

1
predict(job)

Return the batch size to use for the job.

Source code in zeus/policy/optimizer.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def predict(self, job: Job) -> int:
    """Return the batch size to use for the job."""
    # Try to see if the exploration manager has something.
    try:
        batch_size = self.exp_manager[job].next_batch_size()
        if self.verbose:
            self._log(f"{job} in pruning stage -> \033[31mBS = {batch_size}\033[0m")
    except StopIteration as exp:
        # Pruning stage is over.
        if job not in self.mabs:
            self._construct_mab(job, exp.value)
        batch_size = self.mabs[job].predict()
        if self.verbose:
            self._log(
                f"{job} in Thompson Sampling stage -> \033[31mBS = {batch_size}\033[0m"
            )

    return batch_size

observe

1
observe(job, batch_size, cost, converged=None)

Learn from the cost of using the given batch size for the job.

Source code in zeus/policy/optimizer.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def observe(
    self, job: Job, batch_size: int, cost: float, converged: bool | None = None
) -> None:
    """Learn from the cost of using the given batch size for the job."""
    # Add observation to history.
    self.history[job].append((batch_size, -cost))

    # We're in Thompson Sampling stage.
    if job in self.mabs:
        # Since we're learning the reward precision, we need to
        # 1. re-compute the precision of this arm based on the reward history,
        # 2. update the arm's reward precision
        # 3. and `fit` the new MAB instance on all the reward history.
        # Note that `arm_rewards` always has more than one entry (and hence a
        # non-zero variance) because we've been through pruning exploration.
        arm_rewards = np.array(self._get_history_for_bs(job, batch_size))
        precision = np.reciprocal(np.var(arm_rewards))
        mab = self.mabs[job]
        mab.arm_reward_prec[batch_size] = precision
        mab.fit_arm(batch_size, arm_rewards, reset=True)
        if self.verbose:
            arm_rewards_repr = ", ".join([f"{r:.2f}" for r in arm_rewards])
            self._log(
                f"{job} @ {batch_size}: "
                f"arm_rewards = [{arm_rewards_repr}], reward_prec = {precision}"
            )

    # We're in pruning stage.
    else:
        assert converged is not None
        # Log before we potentially error out.
        if self.verbose:
            self._log(
                f"{job} in pruning stage, expecting BS {self.exp_manager[job].expecting}."
                f" Current BS {batch_size} that did {'not ' * converged}converge."
            )

        # If we don't support concurrency, we can just pass the results to the
        # exploration manager, and the manager will err if the order of batch sizes
        # is screwed up.
        if not self.concurrency:
            self.exp_manager[job].report_batch_size_result(
                batch_size, cost, converged
            )
            return

        # If we are supporting concurrency, there's a subtle issue.
        # Pruning exploration demands a specific order of trying out a batch size
        # and receiving the results (cost and whether reached). This breaks in the
        # following situation, for example:
        # 1. Job with BS 32 that is part of pruning exploration starts.
        # 2. Concurrent job comes in, and we launch it with the best known BS 64.
        # 3. Job with BS 64 finishes first, and calls bso.observe with BS 64.
        # This breaks the observation order assumption of PruningExplorationManager.
        # Thus we check whether the current batch size is the one expected by
        # PruningExplorationManager, and then only if so, call bso.observe.
        # Otherwise, we silently insert the cost observation into the bso's history
        # (first line of this method) and don't touch the PruningExplorationManager.
        if self.exp_manager[job].expecting == batch_size:
            self.exp_manager[job].report_batch_size_result(
                batch_size, cost, converged
            )

_get_history_for_bs

1
_get_history_for_bs(job, batch_size)

Return the windowed history for the given job's batch size.

Source code in zeus/policy/optimizer.py
407
408
409
410
411
412
413
414
415
416
417
418
def _get_history_for_bs(self, job: Job, batch_size: int) -> list[float]:
    """Return the windowed history for the given job's batch size."""
    history = self.history[job]
    rewards = []
    # Collect rewards starting from the most recent ones and backwards.
    for bs, reward in reversed(history):
        if bs == batch_size:
            rewards.append(reward)
            if len(rewards) == self.window_size:
                break
    # There's no need to return this in time order, but just in case.
    return list(reversed(rewards))

_construct_mab

1
_construct_mab(job, batch_sizes)

When exploration is over, this method is called to construct and learn GTS.

Source code in zeus/policy/optimizer.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def _construct_mab(self, job: Job, batch_sizes: list[int]) -> None:
    """When exploration is over, this method is called to construct and learn GTS."""
    # Sanity check.
    if not batch_sizes:
        raise ValueError(
            "Empty batch size set when constructing MAB. "
            "Probably all batch sizes have been pruned."
        )

    if self.verbose:
        self._log(f"Construct MAB for {job} with arms {batch_sizes}")

    mab = GaussianTS(
        arms=batch_sizes,  # The MAB only has "good" arms.
        reward_precision=0.0,
        prior_mean=self.prior_mean,
        prior_precision=self.prior_precision,
        num_exploration=2,
        seed=self.seed,
        verbose=self.verbose,
    )
    # Fit the arm for each good batch size.
    for batch_size in self.exp_manager[job].batch_sizes:
        arm_rewards = np.array(self._get_history_for_bs(job, batch_size))
        assert (
            len(arm_rewards) >= 2
        ), f"Number of observations for {batch_size} is {len(arm_rewards)}."
        mab.arm_reward_prec[batch_size] = np.reciprocal(np.var(arm_rewards))
        mab.fit_arm(batch_size, arm_rewards, reset=True)
    # Save the MAB.
    self.mabs[job] = mab

JITPowerLimitOptimizer

Bases: PowerLimitOptimizer

Returns the best power limit to use for the job & batch size.

Source code in zeus/policy/optimizer.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
class JITPowerLimitOptimizer(PowerLimitOptimizer):
    """Returns the best power limit to use for the job & batch size."""

    def __init__(self, verbose: bool = True) -> None:
        """Initialize the object."""
        self.verbose = verbose

        self.best_pl: defaultdict[Job, dict[int, int]] = defaultdict(dict)
        self.best_cost: defaultdict[Job, dict[int, float]] = defaultdict(dict)
        self.observe_count: defaultdict[Job, defaultdict[int, int]] = defaultdict(
            lambda: defaultdict(int)
        )

    @property
    def name(self) -> str:
        """Name of the power limit optimizer."""
        return "JITPSO"

    def predict(self, job: Job, batch_size: int) -> int | None:
        """Return the best power limit for the job, or None if unknown."""
        pred = self.best_pl[job].get(batch_size)
        if self.verbose:
            self._log(
                f"{job} @ {batch_size} -> \033[31mPL = "
                f"{'needs profiling' if pred is None else str(pred) + 'W'}\033[0m"
            )
        return pred

    def observe(self, job: Job, batch_size: int, power_limit: int, cost: float) -> None:
        """Learn from the cost of using the given knobs for the job."""
        self.observe_count[job][batch_size] += 1
        prev_best_cost = self.best_cost[job].get(batch_size)
        if prev_best_cost is None or prev_best_cost > cost:
            self.best_pl[job][batch_size] = power_limit
            self.best_cost[job][batch_size] = cost

name property

1
name

Name of the power limit optimizer.

__init__

1
__init__(verbose=True)
Source code in zeus/policy/optimizer.py
456
457
458
459
460
461
462
463
464
def __init__(self, verbose: bool = True) -> None:
    """Initialize the object."""
    self.verbose = verbose

    self.best_pl: defaultdict[Job, dict[int, int]] = defaultdict(dict)
    self.best_cost: defaultdict[Job, dict[int, float]] = defaultdict(dict)
    self.observe_count: defaultdict[Job, defaultdict[int, int]] = defaultdict(
        lambda: defaultdict(int)
    )

predict

1
predict(job, batch_size)

Return the best power limit for the job, or None if unknown.

Source code in zeus/policy/optimizer.py
471
472
473
474
475
476
477
478
479
def predict(self, job: Job, batch_size: int) -> int | None:
    """Return the best power limit for the job, or None if unknown."""
    pred = self.best_pl[job].get(batch_size)
    if self.verbose:
        self._log(
            f"{job} @ {batch_size} -> \033[31mPL = "
            f"{'needs profiling' if pred is None else str(pred) + 'W'}\033[0m"
        )
    return pred

observe

1
observe(job, batch_size, power_limit, cost)

Learn from the cost of using the given knobs for the job.

Source code in zeus/policy/optimizer.py
481
482
483
484
485
486
487
def observe(self, job: Job, batch_size: int, power_limit: int, cost: float) -> None:
    """Learn from the cost of using the given knobs for the job."""
    self.observe_count[job][batch_size] += 1
    prev_best_cost = self.best_cost[job].get(batch_size)
    if prev_best_cost is None or prev_best_cost > cost:
        self.best_pl[job][batch_size] = power_limit
        self.best_cost[job][batch_size] = cost