Skip to content

optimizer

zeus.optimizer.batch_size.server.optimizer

Batch size optimizer top-most layer that provides register/report/predict.

ZeusBatchSizeOptimizer

Batch size optimizer server. Manages which stage the job is in and call corresponding manager (pruning or mab).

Source code in zeus/optimizer/batch_size/server/optimizer.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class ZeusBatchSizeOptimizer:
    """Batch size optimizer server. Manages which stage the job is in and call corresponding manager (pruning or mab)."""

    def __init__(self, service: ZeusService) -> None:
        """Initialize the server. Set the service, pruning manager, and mab.

        Args:
            service: ZeusService for interacting with database
        """
        self.service = service
        self.pruning_manager = PruningExploreManager(service)
        self.mab = GaussianTS(service)

    async def register_job(self, job: JobSpecFromClient) -> bool:
        """Register a job that user submitted. If the job id already exists, check if it is identical with previously registered configuration.

        Args:
            job: job configuration

        Returns:
            True if a job is regiested, False if a job already exists and identical with previous configuration

        Raises:
            [`ZeusBSOJobConfigMismatchError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOJobConfigMismatchError]: In the case of existing job, if job configuration doesn't match with previously registered config
        """
        registered_job = None

        if job.job_id is None:
            while True:
                job.job_id = f"{job.job_id_prefix}-{hashlib.sha256(str(time.time()).encode()).hexdigest()[:8]}"
                if (await self.service.get_job(job.job_id)) is None:
                    break
        else:
            registered_job = await self.service.get_job(job.job_id)

        if registered_job is not None:
            # Job exists
            logger.info("Job(%s) already exists", job.job_id)
            registerd_job_config = JobSpecFromClient.parse_obj(registered_job.dict())

            # check if it is identical
            if registerd_job_config != job:
                raise ZeusBSOJobConfigMismatchError(
                    "JobSpec doesn't match with existing jobSpec. Use a new job_id for different configuration"
                )
            return False

        self.service.create_job(CreateJob.from_job_config(job))
        logger.info("Registered %s", job.job_id)

        return True

    async def predict(self, job_id: str) -> TrialId:
        """Return a batch size to use.

        Args:
            job_id: Id of job

        Returns:
            batch size to use

        Raises:
            [`ZeusBSOValueError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOValueError]: If the job id is unknown, or creating a mab failed due to no converged batch size
        """
        job = await self.service.get_job(job_id)

        if job is None:
            raise ZeusBSOValueError(
                f"Unknown job({job_id}). Please register the job first"
            )

        if job.stage == Stage.MAB:
            # If we are in MAB stage, use mab to get the next batch size
            arms = await self.service.get_arms(job_id)
            next_trial = await self.service.create_trial(
                CreateMabTrial(
                    job_id=job_id,
                    batch_size=self.mab.predict(
                        job_id, job.mab_prior_precision, job.mab_num_explorations, arms
                    ),
                )
            )
        else:
            # Pruning stage
            explorations = await self.service.get_explorations_of_job(job_id)
            # First check if pruning explorer can give us any batch size. Returns batch_size or MAB to indicate going to MAB stage
            res = await self.pruning_manager.next_batch_size(job, explorations)

            if isinstance(res, list):
                # MAB stage: construct MAB and update the job stage to MAB. Return the batch size from MAB
                logger.info("Constructing a MAB")
                arms = await self.mab.construct_mab(job, explorations, res)
                next_trial = await self.service.create_trial(
                    CreateMabTrial(
                        job_id=job_id,
                        batch_size=self.mab.predict(
                            job_id,
                            job.mab_prior_precision,
                            job.mab_num_explorations,
                            arms,
                        ),
                    )
                )
            else:
                next_trial = res

        return TrialId(
            job_id=next_trial.job_id,
            batch_size=next_trial.batch_size,
            trial_number=next_trial.trial_number,
        )

    async def report(self, result: TrainingResult) -> ReportResponse:
        """Report the training result. Stop train if the train is converged or reached max epochs or reached early stop threshold. Otherwise, keep training.

        Args:
            result: result of training [`TrainingResult`][zeus.optimizer.batch_size.common.TrainingResult].

        Returns:
            Decision on training [`ReportResponse`][zeus.optimizer.batch_size.common.ReportResponse].
        """
        cost_ub = np.inf
        job = await self.service.get_job(result.job_id)
        if job is None:
            raise ZeusBSOServiceBadOperationError(f"Unknown job {result.job_id}")

        trial = await self.service.get_trial(
            ReadTrial(
                job_id=result.job_id,
                batch_size=result.batch_size,
                trial_number=result.trial_number,
            )
        )
        if trial is None:
            raise ZeusBSOServiceBadOperationError(f"Unknown trial {result}")

        if trial.status != TrialStatus.Dispatched:
            # result is already reported
            if trial.converged is None:
                raise ZeusBSOValueError(
                    f"Trial({trial.trial_number}) is already reported but converged is not set."
                )
            return ReportResponse(
                stop_train=True,
                converged=trial.converged,
                message=f"Result for this trial({trial.trial_number}) is already reported.",
            )

        if job.beta_knob is not None and job.min_cost is not None:  # Early stop enabled
            cost_ub = job.beta_knob * job.min_cost

        reported_cost = zeus_cost(
            result.energy,
            result.time,
            job.eta_knob,
            job.max_power,
        )

        within_cost_range = cost_ub >= reported_cost
        converged = (
            job.higher_is_better_metric and job.target_metric <= result.metric
        ) or (not job.higher_is_better_metric and job.target_metric >= result.metric)

        if (
            within_cost_range
            and result.current_epoch < job.max_epochs
            and not converged
        ):
            # If it's not converged but below cost upper bound and haven't reached max_epochs, keep training

            return ReportResponse(
                stop_train=False,
                converged=False,
                message="Stop condition not met, keep training",
            )

        # Two cases below here (training ended)
        # 1. Converged == true
        # 2. reached max_epoch OR excceded upper bound cost (error case)
        if converged and within_cost_range:
            message = "Train succeeded"
        elif not within_cost_range:
            message = f"""Batch Size({result.batch_size}) exceeded the cost upper bound: current cost({reported_cost}) >
                    beta_knob({job.beta_knob})*min_cost({job.min_cost})"""
        else:
            # not converged
            message = f"Train failed to converge within max_epoch({job.max_epochs})"

        trial_result = UpdateTrial(
            job_id=result.job_id,
            batch_size=result.batch_size,
            status=TrialStatus.Succeeded,
            trial_number=result.trial_number,
            time=result.time,
            energy=result.energy,
            converged=converged and within_cost_range,
        )

        if job.stage == Stage.MAB:
            await self.mab.report(job, trial_result)
        else:
            # Pruning stage
            logger.info(
                "%s in pruning stage, Current BS %s that did %s converge.",
                result.job_id,
                result.batch_size,
                "not" * (not converged),
            )
            # update trial
            self.service.update_trial(trial_result)

        assert trial_result.converged is not None, "This just set to boolean above."
        return ReportResponse(
            stop_train=True, converged=trial_result.converged, message=message
        )

    async def end_trial(self, trial_id: TrialId) -> None:
        """Mark the trial as finished. If status is still `Dispatched` make the trial as `Failed`.

        Args:
            trial_id: Unique identifier of trial

        Raises:
            [`ZeusBSOServerNotFound`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServerNotFound]: If there is no corresponding trial.
        """
        trial = await self.service.get_trial(ReadTrial(**trial_id.dict()))

        if trial is not None:
            if trial.status == TrialStatus.Dispatched:
                self.service.update_trial(
                    UpdateTrial(
                        job_id=trial_id.job_id,
                        batch_size=trial_id.batch_size,
                        trial_number=trial_id.trial_number,
                        status=TrialStatus.Failed,
                    )
                )
        else:
            raise ZeusBSOServerNotFoundError(f"Could not find the trial: {trial_id}")

    async def delete_job(self, job_id: str) -> None:
        """Delete a job.

        Args:
            job_id: ID of a job.

        Returns:
            True if the job is deleted. False if none was deleted
        """
        if not (await self.service.delete_job(job_id)):
            raise ZeusBSOServerNotFoundError("No job was deleted.")

__init__

__init__(service)

Parameters:

Name Type Description Default
service ZeusService

ZeusService for interacting with database

required
Source code in zeus/optimizer/batch_size/server/optimizer.py
40
41
42
43
44
45
46
47
48
def __init__(self, service: ZeusService) -> None:
    """Initialize the server. Set the service, pruning manager, and mab.

    Args:
        service: ZeusService for interacting with database
    """
    self.service = service
    self.pruning_manager = PruningExploreManager(service)
    self.mab = GaussianTS(service)

register_job async

register_job(job)

Register a job that user submitted. If the job id already exists, check if it is identical with previously registered configuration.

Parameters:

Name Type Description Default
job JobSpecFromClient

job configuration

required

Returns:

Type Description
bool

True if a job is regiested, False if a job already exists and identical with previous configuration

Raises:

Type Description
[`ZeusBSOJobConfigMismatchError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOJobConfigMismatchError]

In the case of existing job, if job configuration doesn't match with previously registered config

Source code in zeus/optimizer/batch_size/server/optimizer.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def register_job(self, job: JobSpecFromClient) -> bool:
    """Register a job that user submitted. If the job id already exists, check if it is identical with previously registered configuration.

    Args:
        job: job configuration

    Returns:
        True if a job is regiested, False if a job already exists and identical with previous configuration

    Raises:
        [`ZeusBSOJobConfigMismatchError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOJobConfigMismatchError]: In the case of existing job, if job configuration doesn't match with previously registered config
    """
    registered_job = None

    if job.job_id is None:
        while True:
            job.job_id = f"{job.job_id_prefix}-{hashlib.sha256(str(time.time()).encode()).hexdigest()[:8]}"
            if (await self.service.get_job(job.job_id)) is None:
                break
    else:
        registered_job = await self.service.get_job(job.job_id)

    if registered_job is not None:
        # Job exists
        logger.info("Job(%s) already exists", job.job_id)
        registerd_job_config = JobSpecFromClient.parse_obj(registered_job.dict())

        # check if it is identical
        if registerd_job_config != job:
            raise ZeusBSOJobConfigMismatchError(
                "JobSpec doesn't match with existing jobSpec. Use a new job_id for different configuration"
            )
        return False

    self.service.create_job(CreateJob.from_job_config(job))
    logger.info("Registered %s", job.job_id)

    return True

predict async

predict(job_id)

Return a batch size to use.

Parameters:

Name Type Description Default
job_id str

Id of job

required

Returns:

Type Description
TrialId

batch size to use

Raises:

Type Description
[`ZeusBSOValueError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOValueError]

If the job id is unknown, or creating a mab failed due to no converged batch size

Source code in zeus/optimizer/batch_size/server/optimizer.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
async def predict(self, job_id: str) -> TrialId:
    """Return a batch size to use.

    Args:
        job_id: Id of job

    Returns:
        batch size to use

    Raises:
        [`ZeusBSOValueError`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOValueError]: If the job id is unknown, or creating a mab failed due to no converged batch size
    """
    job = await self.service.get_job(job_id)

    if job is None:
        raise ZeusBSOValueError(
            f"Unknown job({job_id}). Please register the job first"
        )

    if job.stage == Stage.MAB:
        # If we are in MAB stage, use mab to get the next batch size
        arms = await self.service.get_arms(job_id)
        next_trial = await self.service.create_trial(
            CreateMabTrial(
                job_id=job_id,
                batch_size=self.mab.predict(
                    job_id, job.mab_prior_precision, job.mab_num_explorations, arms
                ),
            )
        )
    else:
        # Pruning stage
        explorations = await self.service.get_explorations_of_job(job_id)
        # First check if pruning explorer can give us any batch size. Returns batch_size or MAB to indicate going to MAB stage
        res = await self.pruning_manager.next_batch_size(job, explorations)

        if isinstance(res, list):
            # MAB stage: construct MAB and update the job stage to MAB. Return the batch size from MAB
            logger.info("Constructing a MAB")
            arms = await self.mab.construct_mab(job, explorations, res)
            next_trial = await self.service.create_trial(
                CreateMabTrial(
                    job_id=job_id,
                    batch_size=self.mab.predict(
                        job_id,
                        job.mab_prior_precision,
                        job.mab_num_explorations,
                        arms,
                    ),
                )
            )
        else:
            next_trial = res

    return TrialId(
        job_id=next_trial.job_id,
        batch_size=next_trial.batch_size,
        trial_number=next_trial.trial_number,
    )

report async

report(result)

Report the training result. Stop train if the train is converged or reached max epochs or reached early stop threshold. Otherwise, keep training.

Parameters:

Name Type Description Default
result TrainingResult

result of training TrainingResult.

required

Returns:

Type Description
ReportResponse

Decision on training ReportResponse.

Source code in zeus/optimizer/batch_size/server/optimizer.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def report(self, result: TrainingResult) -> ReportResponse:
    """Report the training result. Stop train if the train is converged or reached max epochs or reached early stop threshold. Otherwise, keep training.

    Args:
        result: result of training [`TrainingResult`][zeus.optimizer.batch_size.common.TrainingResult].

    Returns:
        Decision on training [`ReportResponse`][zeus.optimizer.batch_size.common.ReportResponse].
    """
    cost_ub = np.inf
    job = await self.service.get_job(result.job_id)
    if job is None:
        raise ZeusBSOServiceBadOperationError(f"Unknown job {result.job_id}")

    trial = await self.service.get_trial(
        ReadTrial(
            job_id=result.job_id,
            batch_size=result.batch_size,
            trial_number=result.trial_number,
        )
    )
    if trial is None:
        raise ZeusBSOServiceBadOperationError(f"Unknown trial {result}")

    if trial.status != TrialStatus.Dispatched:
        # result is already reported
        if trial.converged is None:
            raise ZeusBSOValueError(
                f"Trial({trial.trial_number}) is already reported but converged is not set."
            )
        return ReportResponse(
            stop_train=True,
            converged=trial.converged,
            message=f"Result for this trial({trial.trial_number}) is already reported.",
        )

    if job.beta_knob is not None and job.min_cost is not None:  # Early stop enabled
        cost_ub = job.beta_knob * job.min_cost

    reported_cost = zeus_cost(
        result.energy,
        result.time,
        job.eta_knob,
        job.max_power,
    )

    within_cost_range = cost_ub >= reported_cost
    converged = (
        job.higher_is_better_metric and job.target_metric <= result.metric
    ) or (not job.higher_is_better_metric and job.target_metric >= result.metric)

    if (
        within_cost_range
        and result.current_epoch < job.max_epochs
        and not converged
    ):
        # If it's not converged but below cost upper bound and haven't reached max_epochs, keep training

        return ReportResponse(
            stop_train=False,
            converged=False,
            message="Stop condition not met, keep training",
        )

    # Two cases below here (training ended)
    # 1. Converged == true
    # 2. reached max_epoch OR excceded upper bound cost (error case)
    if converged and within_cost_range:
        message = "Train succeeded"
    elif not within_cost_range:
        message = f"""Batch Size({result.batch_size}) exceeded the cost upper bound: current cost({reported_cost}) >
                beta_knob({job.beta_knob})*min_cost({job.min_cost})"""
    else:
        # not converged
        message = f"Train failed to converge within max_epoch({job.max_epochs})"

    trial_result = UpdateTrial(
        job_id=result.job_id,
        batch_size=result.batch_size,
        status=TrialStatus.Succeeded,
        trial_number=result.trial_number,
        time=result.time,
        energy=result.energy,
        converged=converged and within_cost_range,
    )

    if job.stage == Stage.MAB:
        await self.mab.report(job, trial_result)
    else:
        # Pruning stage
        logger.info(
            "%s in pruning stage, Current BS %s that did %s converge.",
            result.job_id,
            result.batch_size,
            "not" * (not converged),
        )
        # update trial
        self.service.update_trial(trial_result)

    assert trial_result.converged is not None, "This just set to boolean above."
    return ReportResponse(
        stop_train=True, converged=trial_result.converged, message=message
    )

end_trial async

end_trial(trial_id)

Mark the trial as finished. If status is still Dispatched make the trial as Failed.

Parameters:

Name Type Description Default
trial_id TrialId

Unique identifier of trial

required

Raises:

Type Description
[`ZeusBSOServerNotFound`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServerNotFound]

If there is no corresponding trial.

Source code in zeus/optimizer/batch_size/server/optimizer.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
async def end_trial(self, trial_id: TrialId) -> None:
    """Mark the trial as finished. If status is still `Dispatched` make the trial as `Failed`.

    Args:
        trial_id: Unique identifier of trial

    Raises:
        [`ZeusBSOServerNotFound`][zeus.optimizer.batch_size.server.exceptions.ZeusBSOServerNotFound]: If there is no corresponding trial.
    """
    trial = await self.service.get_trial(ReadTrial(**trial_id.dict()))

    if trial is not None:
        if trial.status == TrialStatus.Dispatched:
            self.service.update_trial(
                UpdateTrial(
                    job_id=trial_id.job_id,
                    batch_size=trial_id.batch_size,
                    trial_number=trial_id.trial_number,
                    status=TrialStatus.Failed,
                )
            )
    else:
        raise ZeusBSOServerNotFoundError(f"Could not find the trial: {trial_id}")

delete_job async

delete_job(job_id)

Delete a job.

Parameters:

Name Type Description Default
job_id str

ID of a job.

required

Returns:

Type Description
None

True if the job is deleted. False if none was deleted

Source code in zeus/optimizer/batch_size/server/optimizer.py
277
278
279
280
281
282
283
284
285
286
287
async def delete_job(self, job_id: str) -> None:
    """Delete a job.

    Args:
        job_id: ID of a job.

    Returns:
        True if the job is deleted. False if none was deleted
    """
    if not (await self.service.delete_job(job_id)):
        raise ZeusBSOServerNotFoundError("No job was deleted.")