Skip to content

service

zeus.optimizer.batch_size.server.services.service

Zeus batch size optimizer service layer.

ZeusService

Zeus Service that interacts with database using repository.

Provides application layer methods to communicate with database. Each method is one or more number of db operations that have to be done at the same time.

Source code in zeus/optimizer/batch_size/server/services/service.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class ZeusService:
    """Zeus Service that interacts with database using repository.

    Provides application layer methods to communicate with database.
    Each method is one or more number of db operations that have to be done at the same time.
    """

    def __init__(self, db_session: AsyncSession):
        """Set up repositories to use to talk to database."""
        self.bs_repo = BatchSizeStateRepository(db_session)
        self.job_repo = JobStateRepository(db_session)

    async def get_arms(self, job_id: str) -> list[GaussianTsArmState]:
        """Get GaussianTs arm states for all arms(job_id, batch size).

        Args:
            job_id: Job id

        Returns:
            list of arms
        """
        return await self.bs_repo.get_arms(job_id)

    async def get_arm(self, bs: BatchSizeBase) -> GaussianTsArmState | None:
        """Get arm state for one arm.

        Args:
            bs: (job_id, batch size) pair that represents one arm

        Returns:
            Result arm state or None if we cannot find that arm
        """
        return await self.bs_repo.get_arm(bs)

    async def get_explorations_of_job(self, job_id: str) -> ExplorationsPerJob:
        """Get all explorations we have done for that job.

        Args:
            job_id: Job id

        Returns:
            list of explorations per each batch size
        """
        return await self.bs_repo.get_explorations_of_job(job_id)

    def update_trial(self, updated_trial: UpdateTrial) -> None:
        """Update trial.

        (1) update the corresponding trial.
        (2) we update the min training cost observed so far if we have to.

        Args:
            updated_trial: Result of training that batch size

        Raises:
            [`ZeusBSOServiceBadOperationError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServiceBadOperationError]: When we didn't fetch the job or trial during this session. This operation should have
                    fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.
        """
        trial = self._get_trial(
            ReadTrial(
                job_id=updated_trial.job_id,
                batch_size=updated_trial.batch_size,
                trial_number=updated_trial.trial_number,
            )
        )
        if trial.status != TrialStatus.Dispatched:
            raise ZeusBSOServiceBadOperationError("Trial already has a result.")

        self.bs_repo.updated_current_trial(updated_trial)

        # Update the corresponding batch size's min cost if needed.
        if updated_trial.status != TrialStatus.Failed:
            job = self._get_job(updated_trial.job_id)
            if updated_trial.energy is None or updated_trial.time is None:
                raise ZeusBSOValueError(
                    "Energy and time should be set if the trial is not failed."
                )
            cur_cost = zeus_cost(
                updated_trial.energy, updated_trial.time, job.eta_knob, job.max_power
            )
            if job.min_cost is None or job.min_cost > cur_cost:
                self.job_repo.update_min(
                    UpdateJobMinCost(
                        job_id=job.job_id,
                        min_cost=cur_cost,
                        min_cost_batch_size=updated_trial.batch_size,
                    )
                )

    def update_arm_state(
        self,
        arm: UpdateArm,
    ) -> None:
        """Update arm state.

        Args:
            arm: Updated arm state.

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job or trial during this session. This operation should have
                    fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.
        """
        self._check_job_fetched(arm.trial.job_id)
        trial = self._get_trial(
            ReadTrial(
                job_id=arm.trial.job_id,
                batch_size=arm.trial.batch_size,
                trial_number=arm.trial.trial_number,
            )
        )
        if trial.type != TrialType.MAB:
            raise ZeusBSOServiceBadOperationError(
                "Cannot update an arm since this trial is not issued from MAB stage."
            )
        self.bs_repo.update_arm_state(arm.updated_arm)

    def update_exp_default_bs(self, updated_default_bs: UpdateExpDefaultBs) -> None:
        """Update the default batch size for exploration.

        Args:
            updated_default_bs: Job Id and new default batch size

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        self._check_job_fetched(updated_default_bs.job_id)
        self.job_repo.update_exp_default_bs(updated_default_bs)

    async def create_trial(
        self, trial: CreateExplorationTrial | CreateMabTrial | CreateConcurrentTrial
    ) -> ReadTrial:
        """Create a new trial.

        Args:
            trial: New trial to create.

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        self._check_job_fetched(trial.job_id)
        trial_number = await self.bs_repo.get_next_trial_number(trial.job_id)
        self.bs_repo.create_trial(
            CreateTrial(**trial.dict(), trial_number=trial_number)
        )
        return ReadTrial(
            job_id=trial.job_id, batch_size=trial.batch_size, trial_number=trial_number
        )

    def get_random_choices(self, choice: GetRandomChoices) -> np.ndarray[Any, Any]:
        """Get randome choices based on job's seed.

        If seed is not None (set by the user) we get the random choices from the generator that is stored in the database.
        Otherwise, we get random choices based on random seed.

        Args:
            choice: Job id and list of choices

        Returns:
            reuslt random choices

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        arr = np.array(choice.choices)
        rng, should_update = self._get_generator(choice.job_id)
        res = rng.choice(arr, len(arr), replace=False)

        if should_update:
            # If we used the generator from database, should update the generator state after using it
            self.job_repo.update_generator_state(
                UpdateGeneratorState(
                    job_id=choice.job_id, state=json.dumps(rng.__getstate__())
                )
            )

        return res

    def get_normal(self, arg: GetNormal) -> float:
        """Sample from normal distribution and update the generator state if seed was set.

        Args:
            arg: args for `numpy.random.normal`, which is loc(mean of distribution) and scale(stdev of distribution)

        Returns:
            Drawn sample.

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        rng, should_update = self._get_generator(arg.job_id)
        res = rng.normal(arg.loc, arg.scale)

        if should_update:
            # If we used the generator from database, should update the generator state after using it
            self.job_repo.update_generator_state(
                UpdateGeneratorState(
                    job_id=arg.job_id, state=json.dumps(rng.__getstate__())
                )
            )

        return res

    async def get_job(self, job_id: str) -> JobState | None:
        """Get job from database.

        Args:
            job_id: Job Id

        Returns:
            JobState if we found one, None if we couldn't find a job matching the job id.
        """
        return await self.job_repo.get_job(job_id)

    async def get_trial(self, trial: ReadTrial) -> Trial | None:
        """Get a trial from database.

        Args:
            trial: (Job Id, batch size, trial_number) triplet.

        Returns:
            Trial if we found one, None if we couldn't find a job matching trial.
        """
        return await self.bs_repo.get_trial(trial)

    def create_job(self, new_job: CreateJob) -> None:
        """Create a new job.

        Args:
            new_job: Configuration of a new job
        """
        return self.job_repo.create_job(new_job)

    async def get_trial_results_of_bs(self, bs: BatchSizeBase) -> TrialResultsPerBs:
        """Load window size amount of results for a given batch size. If window size <= 0, load all of them.

        Args:
            bs: (job_id, batch size) pair.

        Returns:
            list of windowed measurements in descending order for that (job_id, batch size)

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        job = self._get_job(bs.job_id)
        return await self.bs_repo.get_trial_results_of_bs(
            BatchSizeBase(job_id=bs.job_id, batch_size=bs.batch_size),
            job.window_size,
        )

    def create_arms(self, new_arms: list[GaussianTsArmState]) -> None:
        """Create GuassianTs arms for the job.

        Args:
            new_arms: List of new arm states

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        if len(new_arms) != 0:
            self._check_job_fetched(new_arms[0].job_id)
            self.bs_repo.create_arms(new_arms)

    def update_job_stage(self, updated_stage: UpdateJobStage) -> None:
        """Update the job stage (Pruning -> MAB).

        Args:
            updated_stage: Updated stage.

        Raises:
            `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                    fetched the job first.
        """
        self._check_job_fetched(updated_stage.job_id)
        self.job_repo.update_stage(updated_stage)

    async def delete_job(self, job_id: str) -> bool:
        """Delete the job.

        Args:
            job_id: ID of the job.

        Returns:
            True if the job is deleted. False if none was deleted
        """
        return await self.job_repo.delete_job(job_id)

    def _get_generator(self, job_id: str) -> tuple[np_Generator, bool]:
        """Get generator based on job_id. If mab_seed is not none, we should update the state after using generator.

        Returns:
            Tuple of [Generator, if we should update state]
        """
        job_state = self._get_job(job_id)

        rng = np.random.default_rng(int(datetime.now().timestamp()))

        should_update = job_state.mab_seed is not None
        if job_state.mab_seed is not None:
            if job_state.mab_random_generator_state is None:
                raise ZeusBSOValueError(
                    "Seed is set but generator state is none. Should be impossible"
                )

            state = json.loads(job_state.mab_random_generator_state)
            rng.__setstate__(state)

        return (rng, should_update)

    def _get_job(self, job_id: str) -> JobState:
        """Get the job from the session. If we couldn't find the job, raise a `ZeusBSOServiceBadOperationError`."""
        res = self.job_repo.get_job_from_session(job_id)
        if res is None:
            raise ZeusBSOServiceBadOperationError(
                f"Should have fetched the job first or job does not exist(job_id = {job_id})"
            )
        return res

    def _get_trial(self, trial: ReadTrial) -> Trial:
        """Get the job from the session. If we couldn't find the trial, raise a `ZeusBSOServiceBadOperationError`."""
        res = self.bs_repo.get_trial_from_session(trial)
        if res is None:
            raise ZeusBSOServiceBadOperationError(
                f"Should have fetched the trial first or trial does not exist(trial = {trial})"
            )
        return res

    def _check_job_fetched(self, job_id: str) -> None:
        """Check if we fetched the job in the current session. If we didn't raise a `ZeusBSOServiceBadOperationError`."""
        if not self.job_repo.check_job_fetched(job_id):
            raise ZeusBSOServiceBadOperationError(
                f"check_job_fetched: {job_id} is not currently in the session"
            )

__init__

__init__(db_session)
Source code in zeus/optimizer/batch_size/server/services/service.py
59
60
61
62
def __init__(self, db_session: AsyncSession):
    """Set up repositories to use to talk to database."""
    self.bs_repo = BatchSizeStateRepository(db_session)
    self.job_repo = JobStateRepository(db_session)

get_arms async

get_arms(job_id)

Get GaussianTs arm states for all arms(job_id, batch size).

Parameters:

Name Type Description Default
job_id str

Job id

required

Returns:

Type Description
list[GaussianTsArmState]

list of arms

Source code in zeus/optimizer/batch_size/server/services/service.py
64
65
66
67
68
69
70
71
72
73
async def get_arms(self, job_id: str) -> list[GaussianTsArmState]:
    """Get GaussianTs arm states for all arms(job_id, batch size).

    Args:
        job_id: Job id

    Returns:
        list of arms
    """
    return await self.bs_repo.get_arms(job_id)

get_arm async

get_arm(bs)

Get arm state for one arm.

Parameters:

Name Type Description Default
bs BatchSizeBase

(job_id, batch size) pair that represents one arm

required

Returns:

Type Description
GaussianTsArmState | None

Result arm state or None if we cannot find that arm

Source code in zeus/optimizer/batch_size/server/services/service.py
75
76
77
78
79
80
81
82
83
84
async def get_arm(self, bs: BatchSizeBase) -> GaussianTsArmState | None:
    """Get arm state for one arm.

    Args:
        bs: (job_id, batch size) pair that represents one arm

    Returns:
        Result arm state or None if we cannot find that arm
    """
    return await self.bs_repo.get_arm(bs)

get_explorations_of_job async

get_explorations_of_job(job_id)

Get all explorations we have done for that job.

Parameters:

Name Type Description Default
job_id str

Job id

required

Returns:

Type Description
ExplorationsPerJob

list of explorations per each batch size

Source code in zeus/optimizer/batch_size/server/services/service.py
86
87
88
89
90
91
92
93
94
95
async def get_explorations_of_job(self, job_id: str) -> ExplorationsPerJob:
    """Get all explorations we have done for that job.

    Args:
        job_id: Job id

    Returns:
        list of explorations per each batch size
    """
    return await self.bs_repo.get_explorations_of_job(job_id)

update_trial

update_trial(updated_trial)

Update trial.

(1) update the corresponding trial. (2) we update the min training cost observed so far if we have to.

Parameters:

Name Type Description Default
updated_trial UpdateTrial

Result of training that batch size

required

Raises:

Type Description
[`ZeusBSOServiceBadOperationError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServiceBadOperationError]

When we didn't fetch the job or trial during this session. This operation should have fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.

Source code in zeus/optimizer/batch_size/server/services/service.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def update_trial(self, updated_trial: UpdateTrial) -> None:
    """Update trial.

    (1) update the corresponding trial.
    (2) we update the min training cost observed so far if we have to.

    Args:
        updated_trial: Result of training that batch size

    Raises:
        [`ZeusBSOServiceBadOperationError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServiceBadOperationError]: When we didn't fetch the job or trial during this session. This operation should have
                fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.
    """
    trial = self._get_trial(
        ReadTrial(
            job_id=updated_trial.job_id,
            batch_size=updated_trial.batch_size,
            trial_number=updated_trial.trial_number,
        )
    )
    if trial.status != TrialStatus.Dispatched:
        raise ZeusBSOServiceBadOperationError("Trial already has a result.")

    self.bs_repo.updated_current_trial(updated_trial)

    # Update the corresponding batch size's min cost if needed.
    if updated_trial.status != TrialStatus.Failed:
        job = self._get_job(updated_trial.job_id)
        if updated_trial.energy is None or updated_trial.time is None:
            raise ZeusBSOValueError(
                "Energy and time should be set if the trial is not failed."
            )
        cur_cost = zeus_cost(
            updated_trial.energy, updated_trial.time, job.eta_knob, job.max_power
        )
        if job.min_cost is None or job.min_cost > cur_cost:
            self.job_repo.update_min(
                UpdateJobMinCost(
                    job_id=job.job_id,
                    min_cost=cur_cost,
                    min_cost_batch_size=updated_trial.batch_size,
                )
            )

update_arm_state

update_arm_state(arm)

Update arm state.

Parameters:

Name Type Description Default
arm UpdateArm

Updated arm state.

required

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job or trial during this session. This operation should have fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.

Source code in zeus/optimizer/batch_size/server/services/service.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def update_arm_state(
    self,
    arm: UpdateArm,
) -> None:
    """Update arm state.

    Args:
        arm: Updated arm state.

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job or trial during this session. This operation should have
                fetched the job and trial first. Also, check if trial type is matching with fetched trial's type.
    """
    self._check_job_fetched(arm.trial.job_id)
    trial = self._get_trial(
        ReadTrial(
            job_id=arm.trial.job_id,
            batch_size=arm.trial.batch_size,
            trial_number=arm.trial.trial_number,
        )
    )
    if trial.type != TrialType.MAB:
        raise ZeusBSOServiceBadOperationError(
            "Cannot update an arm since this trial is not issued from MAB stage."
        )
    self.bs_repo.update_arm_state(arm.updated_arm)

update_exp_default_bs

update_exp_default_bs(updated_default_bs)

Update the default batch size for exploration.

Parameters:

Name Type Description Default
updated_default_bs UpdateExpDefaultBs

Job Id and new default batch size

required

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
168
169
170
171
172
173
174
175
176
177
178
179
def update_exp_default_bs(self, updated_default_bs: UpdateExpDefaultBs) -> None:
    """Update the default batch size for exploration.

    Args:
        updated_default_bs: Job Id and new default batch size

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    self._check_job_fetched(updated_default_bs.job_id)
    self.job_repo.update_exp_default_bs(updated_default_bs)

create_trial async

create_trial(trial)

Create a new trial.

Parameters:

Name Type Description Default
trial CreateExplorationTrial | CreateMabTrial | CreateConcurrentTrial

New trial to create.

required

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def create_trial(
    self, trial: CreateExplorationTrial | CreateMabTrial | CreateConcurrentTrial
) -> ReadTrial:
    """Create a new trial.

    Args:
        trial: New trial to create.

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    self._check_job_fetched(trial.job_id)
    trial_number = await self.bs_repo.get_next_trial_number(trial.job_id)
    self.bs_repo.create_trial(
        CreateTrial(**trial.dict(), trial_number=trial_number)
    )
    return ReadTrial(
        job_id=trial.job_id, batch_size=trial.batch_size, trial_number=trial_number
    )

get_random_choices

get_random_choices(choice)

Get randome choices based on job's seed.

If seed is not None (set by the user) we get the random choices from the generator that is stored in the database. Otherwise, we get random choices based on random seed.

Parameters:

Name Type Description Default
choice GetRandomChoices

Job id and list of choices

required

Returns:

Type Description
ndarray[Any, Any]

reuslt random choices

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def get_random_choices(self, choice: GetRandomChoices) -> np.ndarray[Any, Any]:
    """Get randome choices based on job's seed.

    If seed is not None (set by the user) we get the random choices from the generator that is stored in the database.
    Otherwise, we get random choices based on random seed.

    Args:
        choice: Job id and list of choices

    Returns:
        reuslt random choices

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    arr = np.array(choice.choices)
    rng, should_update = self._get_generator(choice.job_id)
    res = rng.choice(arr, len(arr), replace=False)

    if should_update:
        # If we used the generator from database, should update the generator state after using it
        self.job_repo.update_generator_state(
            UpdateGeneratorState(
                job_id=choice.job_id, state=json.dumps(rng.__getstate__())
            )
        )

    return res

get_normal

get_normal(arg)

Sample from normal distribution and update the generator state if seed was set.

Parameters:

Name Type Description Default
arg GetNormal

args for numpy.random.normal, which is loc(mean of distribution) and scale(stdev of distribution)

required

Returns:

Type Description
float

Drawn sample.

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def get_normal(self, arg: GetNormal) -> float:
    """Sample from normal distribution and update the generator state if seed was set.

    Args:
        arg: args for `numpy.random.normal`, which is loc(mean of distribution) and scale(stdev of distribution)

    Returns:
        Drawn sample.

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    rng, should_update = self._get_generator(arg.job_id)
    res = rng.normal(arg.loc, arg.scale)

    if should_update:
        # If we used the generator from database, should update the generator state after using it
        self.job_repo.update_generator_state(
            UpdateGeneratorState(
                job_id=arg.job_id, state=json.dumps(rng.__getstate__())
            )
        )

    return res

get_job async

get_job(job_id)

Get job from database.

Parameters:

Name Type Description Default
job_id str

Job Id

required

Returns:

Type Description
JobState | None

JobState if we found one, None if we couldn't find a job matching the job id.

Source code in zeus/optimizer/batch_size/server/services/service.py
258
259
260
261
262
263
264
265
266
267
async def get_job(self, job_id: str) -> JobState | None:
    """Get job from database.

    Args:
        job_id: Job Id

    Returns:
        JobState if we found one, None if we couldn't find a job matching the job id.
    """
    return await self.job_repo.get_job(job_id)

get_trial async

get_trial(trial)

Get a trial from database.

Parameters:

Name Type Description Default
trial ReadTrial

(Job Id, batch size, trial_number) triplet.

required

Returns:

Type Description
Trial | None

Trial if we found one, None if we couldn't find a job matching trial.

Source code in zeus/optimizer/batch_size/server/services/service.py
269
270
271
272
273
274
275
276
277
278
async def get_trial(self, trial: ReadTrial) -> Trial | None:
    """Get a trial from database.

    Args:
        trial: (Job Id, batch size, trial_number) triplet.

    Returns:
        Trial if we found one, None if we couldn't find a job matching trial.
    """
    return await self.bs_repo.get_trial(trial)

create_job

create_job(new_job)

Create a new job.

Parameters:

Name Type Description Default
new_job CreateJob

Configuration of a new job

required
Source code in zeus/optimizer/batch_size/server/services/service.py
280
281
282
283
284
285
286
def create_job(self, new_job: CreateJob) -> None:
    """Create a new job.

    Args:
        new_job: Configuration of a new job
    """
    return self.job_repo.create_job(new_job)

get_trial_results_of_bs async

get_trial_results_of_bs(bs)

Load window size amount of results for a given batch size. If window size <= 0, load all of them.

Parameters:

Name Type Description Default
bs BatchSizeBase

(job_id, batch size) pair.

required

Returns:

Type Description
TrialResultsPerBs

list of windowed measurements in descending order for that (job_id, batch size)

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
async def get_trial_results_of_bs(self, bs: BatchSizeBase) -> TrialResultsPerBs:
    """Load window size amount of results for a given batch size. If window size <= 0, load all of them.

    Args:
        bs: (job_id, batch size) pair.

    Returns:
        list of windowed measurements in descending order for that (job_id, batch size)

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    job = self._get_job(bs.job_id)
    return await self.bs_repo.get_trial_results_of_bs(
        BatchSizeBase(job_id=bs.job_id, batch_size=bs.batch_size),
        job.window_size,
    )

create_arms

create_arms(new_arms)

Create GuassianTs arms for the job.

Parameters:

Name Type Description Default
new_arms list[GaussianTsArmState]

List of new arm states

required

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
307
308
309
310
311
312
313
314
315
316
317
318
319
def create_arms(self, new_arms: list[GaussianTsArmState]) -> None:
    """Create GuassianTs arms for the job.

    Args:
        new_arms: List of new arm states

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    if len(new_arms) != 0:
        self._check_job_fetched(new_arms[0].job_id)
        self.bs_repo.create_arms(new_arms)

update_job_stage

update_job_stage(updated_stage)

Update the job stage (Pruning -> MAB).

Parameters:

Name Type Description Default
updated_stage UpdateJobStage

Updated stage.

required

Raises:

Type Description
`ZeusBSOServiceBadOperationError`

When we didn't fetch the job during this session. This operation should have fetched the job first.

Source code in zeus/optimizer/batch_size/server/services/service.py
321
322
323
324
325
326
327
328
329
330
331
332
def update_job_stage(self, updated_stage: UpdateJobStage) -> None:
    """Update the job stage (Pruning -> MAB).

    Args:
        updated_stage: Updated stage.

    Raises:
        `ZeusBSOServiceBadOperationError`: When we didn't fetch the job during this session. This operation should have
                fetched the job first.
    """
    self._check_job_fetched(updated_stage.job_id)
    self.job_repo.update_stage(updated_stage)

delete_job async

delete_job(job_id)

Delete the job.

Parameters:

Name Type Description Default
job_id str

ID of the job.

required

Returns:

Type Description
bool

True if the job is deleted. False if none was deleted

Source code in zeus/optimizer/batch_size/server/services/service.py
334
335
336
337
338
339
340
341
342
343
async def delete_job(self, job_id: str) -> bool:
    """Delete the job.

    Args:
        job_id: ID of the job.

    Returns:
        True if the job is deleted. False if none was deleted
    """
    return await self.job_repo.delete_job(job_id)

_get_generator

_get_generator(job_id)

Get generator based on job_id. If mab_seed is not none, we should update the state after using generator.

Returns:

Type Description
tuple[Generator, bool]

Tuple of [Generator, if we should update state]

Source code in zeus/optimizer/batch_size/server/services/service.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def _get_generator(self, job_id: str) -> tuple[np_Generator, bool]:
    """Get generator based on job_id. If mab_seed is not none, we should update the state after using generator.

    Returns:
        Tuple of [Generator, if we should update state]
    """
    job_state = self._get_job(job_id)

    rng = np.random.default_rng(int(datetime.now().timestamp()))

    should_update = job_state.mab_seed is not None
    if job_state.mab_seed is not None:
        if job_state.mab_random_generator_state is None:
            raise ZeusBSOValueError(
                "Seed is set but generator state is none. Should be impossible"
            )

        state = json.loads(job_state.mab_random_generator_state)
        rng.__setstate__(state)

    return (rng, should_update)

_get_job

_get_job(job_id)

Get the job from the session. If we couldn't find the job, raise a ZeusBSOServiceBadOperationError.

Source code in zeus/optimizer/batch_size/server/services/service.py
367
368
369
370
371
372
373
374
def _get_job(self, job_id: str) -> JobState:
    """Get the job from the session. If we couldn't find the job, raise a `ZeusBSOServiceBadOperationError`."""
    res = self.job_repo.get_job_from_session(job_id)
    if res is None:
        raise ZeusBSOServiceBadOperationError(
            f"Should have fetched the job first or job does not exist(job_id = {job_id})"
        )
    return res

_get_trial

_get_trial(trial)

Get the job from the session. If we couldn't find the trial, raise a ZeusBSOServiceBadOperationError.

Source code in zeus/optimizer/batch_size/server/services/service.py
376
377
378
379
380
381
382
383
def _get_trial(self, trial: ReadTrial) -> Trial:
    """Get the job from the session. If we couldn't find the trial, raise a `ZeusBSOServiceBadOperationError`."""
    res = self.bs_repo.get_trial_from_session(trial)
    if res is None:
        raise ZeusBSOServiceBadOperationError(
            f"Should have fetched the trial first or trial does not exist(trial = {trial})"
        )
    return res

_check_job_fetched

_check_job_fetched(job_id)

Check if we fetched the job in the current session. If we didn't raise a ZeusBSOServiceBadOperationError.

Source code in zeus/optimizer/batch_size/server/services/service.py
385
386
387
388
389
390
def _check_job_fetched(self, job_id: str) -> None:
    """Check if we fetched the job in the current session. If we didn't raise a `ZeusBSOServiceBadOperationError`."""
    if not self.job_repo.check_job_fetched(job_id):
        raise ZeusBSOServiceBadOperationError(
            f"check_job_fetched: {job_id} is not currently in the session"
        )