Skip to content

power

zeus.monitor.power

Monitor the power usage of GPUs.

PowerDomain

Bases: Enum

Power measurement domains with different update characteristics.

Source code in zeus/monitor/power.py
101
102
103
104
105
106
class PowerDomain(Enum):
    """Power measurement domains with different update characteristics."""

    DEVICE_INSTANT = "device_instant"
    DEVICE_AVERAGE = "device_average"
    MEMORY_AVERAGE = "memory_average"

PowerSample dataclass

A single power measurement sample.

Source code in zeus/monitor/power.py
109
110
111
112
113
114
115
@dataclass
class PowerSample:
    """A single power measurement sample."""

    timestamp: float
    gpu_index: int
    power_mw: float

PowerMonitor

Enhanced PowerMonitor with multiple power domains and timeline export.

This class provides: 1. Multiple power domains: device instant, device average, and memory average 2. Timeline export with independent deduplication per domain 3. Separate processes for each power domain (2-3 processes depending on GPU support) 4. Backward compatibility with existing PowerMonitor interface

Note

The current implementation only supports cases where all GPUs are homegeneous (i.e., the same model).

Warning

Since the monitor spawns child processes, it should not be instantiated as a global variable. Refer to the "Safe importing of main module" section in the Python documentation for more details.

Source code in zeus/monitor/power.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class PowerMonitor:
    """Enhanced PowerMonitor with multiple power domains and timeline export.

    This class provides:
    1. Multiple power domains: device instant, device average, and memory average
    2. Timeline export with independent deduplication per domain
    3. Separate processes for each power domain (2-3 processes depending on GPU support)
    4. Backward compatibility with existing PowerMonitor interface

    !!! Note
        The current implementation only supports cases where all GPUs are homegeneous
        (i.e., the same model).

    !!! Warning
        Since the monitor spawns child processes, **it should not be instantiated as a global variable**.
        Refer to the "Safe importing of main module" section in the
        [Python documentation](https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods)
        for more details.
    """

    def __init__(
        self,
        gpu_indices: list[int] | None = None,
        update_period: float | None = None,
        max_samples_per_gpu: int | None = None,
    ) -> None:
        """Initialize the enhanced power monitor.

        Args:
            gpu_indices: Indices of the GPUs to monitor. If None, monitor all GPUs.
            update_period: Update period of the power monitor in seconds. If None,
                infer the update period by max speed polling the power counter for
                each GPU model.
            max_samples_per_gpu: Maximum number of power samples to keep per GPU per domain
                in memory. If None (default), unlimited samples are kept.
        """
        if gpu_indices is not None and not gpu_indices:
            raise ValueError("`gpu_indices` must be either `None` or non-empty")

        # Get GPUs
        gpus = get_gpus(ensure_homogeneous=True)

        # Configure GPU indices
        self.gpu_indices = (
            gpu_indices if gpu_indices is not None else list(range(len(gpus)))
        )
        if not self.gpu_indices:
            raise ValueError("At least one GPU index must be specified")
        logger.info("Monitoring power usage of GPUs %s", self.gpu_indices)

        # Infer update period from GPU instant power, if necessary
        if update_period is None:
            update_period = infer_counter_update_period(self.gpu_indices)
        elif update_period < 0.05:
            logger.warning(
                "An update period of %g might be too fast, which may lead to unexpected "
                "NVML errors (e.g., NotSupported) and/or zero values being returned. "
                "If you see these, consider increasing to >= 0.05.",
                update_period,
            )
        self.update_period = update_period

        # Inter-process communication - separate unbounded queue per domain
        self.data_queues: dict[PowerDomain, mp.Queue] = {}
        self.ready_events: dict[PowerDomain, EventClass] = {}
        self.stop_events: dict[PowerDomain, EventClass] = {}
        self.processes: dict[PowerDomain, SpawnProcess] = {}

        # Determine which domains are supported
        self.supported_domains = self._determine_supported_domains()
        logger.info(
            "Supported power domains: %s", [d.value for d in self.supported_domains]
        )

        # Power samples are collected for each power domain and device index.
        self.samples: dict[PowerDomain, dict[int, collections.deque[PowerSample]]] = {}
        for domain in self.supported_domains:
            self.samples[domain] = {}
            for gpu_idx in self.gpu_indices:
                self.samples[domain][gpu_idx] = collections.deque(
                    maxlen=max_samples_per_gpu
                )

        # Spawn collector processes for each supported domain
        atexit.register(self._stop)
        ctx = mp.get_context("spawn")
        for domain in self.supported_domains:
            self.data_queues[domain] = ctx.Queue()
            self.ready_events[domain] = ctx.Event()
            self.stop_events[domain] = ctx.Event()
            self.processes[domain] = ctx.Process(
                target=_domain_polling_process,
                kwargs=dict(
                    power_domain=domain,
                    gpu_indices=self.gpu_indices,
                    data_queue=self.data_queues[domain],
                    ready_event=self.ready_events[domain],
                    stop_event=self.stop_events[domain],
                    update_period=update_period,
                ),
                daemon=True,
                name=f"zeus-power-monitor-{domain.value}",
            )
        for process in self.processes.values():
            process.start()

        # Wait for all subprocesses to signal they're ready
        logger.info("Waiting for all power monitoring subprocesses to be ready...")
        for domain in self.supported_domains:
            if not self.ready_events[domain].wait(timeout=10.0):
                logger.warning(
                    "Power monitor subprocess for %s did not signal ready within timeout",
                    domain.value,
                )
        logger.info("All power monitoring subprocesses are ready")

    def _determine_supported_domains(self) -> list[PowerDomain]:
        """Determine which power domains are supported by the current GPUs."""
        supported = []
        gpus = get_gpus(ensure_homogeneous=True)
        methods = {
            PowerDomain.DEVICE_INSTANT: gpus.getInstantPowerUsage,
            PowerDomain.DEVICE_AVERAGE: gpus.getAveragePowerUsage,
            PowerDomain.MEMORY_AVERAGE: gpus.getAverageMemoryPowerUsage,
        }

        # Just check the first GPU for support, since all GPUs are homogeneous.
        for domain, method in methods.items():
            try:
                _ = method(0)
                supported.append(domain)
                logger.info("Power domain %s is supported", domain.value)
            except ZeusGPUNotSupportedError:
                logger.info("Power domain %s is not supported", domain.value)
            except Exception as e:
                logger.warning(
                    "Unexpected error while checking for %s support on GPU %d: %s",
                    domain.value,
                    self.gpu_indices[0],
                    e,
                )

        return supported

    def _stop(self) -> None:
        """Stop all monitoring processes."""
        # First, signal all processes to stop
        for domain in PowerDomain:
            if domain in self.stop_events:
                self.stop_events[domain].set()

        # Then, wait for each process to complete
        for domain in PowerDomain:
            if domain in self.processes and self.processes[domain].is_alive():
                self.processes[domain].join(timeout=2.0)
                if self.processes[domain].is_alive():
                    self.processes[domain].terminate()
                    self.processes[domain].join(timeout=1.0)

        self.processes.clear()

    def _process_queue_data(self, domain: PowerDomain) -> None:
        """Process all pending samples from a specific domain's queue."""
        if domain not in self.data_queues:
            return

        while True:
            try:
                sample = self.data_queues[domain].get_nowait()
                if sample == "STOP":
                    break
                assert isinstance(sample, PowerSample)
                self.samples[domain][sample.gpu_index].append(sample)
            except Empty:
                break

    def _process_all_queue_data(self) -> None:
        """Process all pending samples from all domain queues."""
        for domain in self.supported_domains:
            self._process_queue_data(domain)

    def get_power_timeline(
        self,
        power_domain: PowerDomain,
        gpu_index: int | None = None,
        start_time: float | None = None,
        end_time: float | None = None,
    ) -> dict[int, list[tuple[float, float]]]:
        """Get power timeline for specific power domain and GPU(s).

        Args:
            power_domain: Power domain to query
            gpu_index: Specific GPU index, or None for all GPUs
            start_time: Start time filter (unix timestamp)
            end_time: End time filter (unix timestamp)

        Returns:
            Dictionary mapping GPU indices to timeline data with deduplication.
            Timeline data is list of (timestamp, power_watts) tuples.
        """
        if power_domain not in self.supported_domains:
            return {}

        # Process any pending queue data for this domain
        self._process_queue_data(power_domain)

        # Determine which GPUs to query
        target_gpus = [gpu_index] if gpu_index is not None else self.gpu_indices

        result = {}
        for gpu_idx in target_gpus:
            if gpu_idx not in self.samples[power_domain]:
                continue

            # Extract timeline from samples
            timeline = []
            for sample in self.samples[power_domain][gpu_idx]:
                # Apply time filters
                if start_time is not None and sample.timestamp < start_time:
                    continue
                if end_time is not None and sample.timestamp > end_time:
                    continue

                timeline.append(
                    (sample.timestamp, sample.power_mw / 1000.0)
                )  # Convert to watts

            # Sort by timestamp
            timeline.sort(key=lambda x: x[0])
            result[gpu_idx] = timeline

        return result

    def get_all_power_timelines(
        self,
        gpu_index: int | None = None,
        start_time: float | None = None,
        end_time: float | None = None,
    ) -> dict[str, dict[int, list[tuple[float, float]]]]:
        """Get all power timelines organized by power domain.

        Args:
            gpu_index: Specific GPU index, or None for all GPUs
            start_time: Start time filter (unix timestamp)
            end_time: End time filter (unix timestamp)

        Returns:
            Dictionary with power domain names as keys and each value is a dict
            mapping GPU indices to timeline data.
        """
        result = {}
        for domain in self.supported_domains:
            result[domain.value] = self.get_power_timeline(
                domain, gpu_index, start_time, end_time
            )
        return result

    def get_energy(self, start_time: float, end_time: float) -> dict[int, float] | None:
        """Get the energy used by the GPUs between two times (backward compatibility).

        Uses device instant power for energy calculation.

        Args:
            start_time: Start time of the interval, from time.time().
            end_time: End time of the interval, from time.time().

        Returns:
            A dictionary mapping GPU indices to the energy used by the GPU between the
            two times. If there are no power readings, return None.
        """
        timelines = self.get_power_timeline(
            PowerDomain.DEVICE_INSTANT, start_time=start_time, end_time=end_time
        )

        if not timelines:
            return None

        energy_result = {}
        for gpu_idx, timeline in timelines.items():
            if not timeline or len(timeline) < 2:
                energy_result[gpu_idx] = 0.0
                continue

            timestamps = [t[0] for t in timeline]
            powers = [t[1] for t in timeline]

            try:
                energy_result[gpu_idx] = float(auc(timestamps, powers))
            except ValueError:
                energy_result[gpu_idx] = 0.0

        return energy_result

    def get_power(self, time: float | None = None) -> dict[int, float] | None:
        """Get the instant power usage of the GPUs at a specific time point.

        Uses device instant power for compatibility.

        Args:
            time: Time point to get the power usage at. If None, get the power usage
                at the last recorded time point.

        Returns:
            A dictionary mapping GPU indices to the power usage of the GPU at the
            specified time point. If there are no power readings, return None.
        """
        if PowerDomain.DEVICE_INSTANT not in self.supported_domains:
            raise ValueError(
                "PowerDomain.DEVICE_INSTANT is not supported by the current GPUs."
            )

        # Process any pending queue data
        self._process_all_queue_data()

        result = {}
        for gpu_idx in self.gpu_indices:
            samples = self.samples[PowerDomain.DEVICE_INSTANT][gpu_idx]
            if not samples:
                return None

            if time is None:
                # Get the most recent sample
                latest_sample = samples[-1]
                result[gpu_idx] = latest_sample.power_mw / 1000.0  # Convert to watts
            else:
                # Find the closest sample to the requested time using bisect
                timestamps = [sample.timestamp for sample in samples]
                pos = bisect.bisect_left(timestamps, time)

                if pos == 0:
                    closest_sample = samples[0]
                elif pos == len(samples):
                    closest_sample = samples[-1]
                else:
                    # Check the closest sample before and after the requested time
                    before = samples[pos - 1]
                    after = samples[pos]
                    closest_sample = (
                        before
                        if time - before.timestamp <= after.timestamp - time
                        else after
                    )
                result[gpu_idx] = closest_sample.power_mw / 1000.0  # To Watts

        return result

__init__

__init__(
    gpu_indices=None,
    update_period=None,
    max_samples_per_gpu=None,
)

Parameters:

Name Type Description Default
gpu_indices list[int] | None

Indices of the GPUs to monitor. If None, monitor all GPUs.

None
update_period float | None

Update period of the power monitor in seconds. If None, infer the update period by max speed polling the power counter for each GPU model.

None
max_samples_per_gpu int | None

Maximum number of power samples to keep per GPU per domain in memory. If None (default), unlimited samples are kept.

None
Source code in zeus/monitor/power.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def __init__(
    self,
    gpu_indices: list[int] | None = None,
    update_period: float | None = None,
    max_samples_per_gpu: int | None = None,
) -> None:
    """Initialize the enhanced power monitor.

    Args:
        gpu_indices: Indices of the GPUs to monitor. If None, monitor all GPUs.
        update_period: Update period of the power monitor in seconds. If None,
            infer the update period by max speed polling the power counter for
            each GPU model.
        max_samples_per_gpu: Maximum number of power samples to keep per GPU per domain
            in memory. If None (default), unlimited samples are kept.
    """
    if gpu_indices is not None and not gpu_indices:
        raise ValueError("`gpu_indices` must be either `None` or non-empty")

    # Get GPUs
    gpus = get_gpus(ensure_homogeneous=True)

    # Configure GPU indices
    self.gpu_indices = (
        gpu_indices if gpu_indices is not None else list(range(len(gpus)))
    )
    if not self.gpu_indices:
        raise ValueError("At least one GPU index must be specified")
    logger.info("Monitoring power usage of GPUs %s", self.gpu_indices)

    # Infer update period from GPU instant power, if necessary
    if update_period is None:
        update_period = infer_counter_update_period(self.gpu_indices)
    elif update_period < 0.05:
        logger.warning(
            "An update period of %g might be too fast, which may lead to unexpected "
            "NVML errors (e.g., NotSupported) and/or zero values being returned. "
            "If you see these, consider increasing to >= 0.05.",
            update_period,
        )
    self.update_period = update_period

    # Inter-process communication - separate unbounded queue per domain
    self.data_queues: dict[PowerDomain, mp.Queue] = {}
    self.ready_events: dict[PowerDomain, EventClass] = {}
    self.stop_events: dict[PowerDomain, EventClass] = {}
    self.processes: dict[PowerDomain, SpawnProcess] = {}

    # Determine which domains are supported
    self.supported_domains = self._determine_supported_domains()
    logger.info(
        "Supported power domains: %s", [d.value for d in self.supported_domains]
    )

    # Power samples are collected for each power domain and device index.
    self.samples: dict[PowerDomain, dict[int, collections.deque[PowerSample]]] = {}
    for domain in self.supported_domains:
        self.samples[domain] = {}
        for gpu_idx in self.gpu_indices:
            self.samples[domain][gpu_idx] = collections.deque(
                maxlen=max_samples_per_gpu
            )

    # Spawn collector processes for each supported domain
    atexit.register(self._stop)
    ctx = mp.get_context("spawn")
    for domain in self.supported_domains:
        self.data_queues[domain] = ctx.Queue()
        self.ready_events[domain] = ctx.Event()
        self.stop_events[domain] = ctx.Event()
        self.processes[domain] = ctx.Process(
            target=_domain_polling_process,
            kwargs=dict(
                power_domain=domain,
                gpu_indices=self.gpu_indices,
                data_queue=self.data_queues[domain],
                ready_event=self.ready_events[domain],
                stop_event=self.stop_events[domain],
                update_period=update_period,
            ),
            daemon=True,
            name=f"zeus-power-monitor-{domain.value}",
        )
    for process in self.processes.values():
        process.start()

    # Wait for all subprocesses to signal they're ready
    logger.info("Waiting for all power monitoring subprocesses to be ready...")
    for domain in self.supported_domains:
        if not self.ready_events[domain].wait(timeout=10.0):
            logger.warning(
                "Power monitor subprocess for %s did not signal ready within timeout",
                domain.value,
            )
    logger.info("All power monitoring subprocesses are ready")

_determine_supported_domains

_determine_supported_domains()

Determine which power domains are supported by the current GPUs.

Source code in zeus/monitor/power.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def _determine_supported_domains(self) -> list[PowerDomain]:
    """Determine which power domains are supported by the current GPUs."""
    supported = []
    gpus = get_gpus(ensure_homogeneous=True)
    methods = {
        PowerDomain.DEVICE_INSTANT: gpus.getInstantPowerUsage,
        PowerDomain.DEVICE_AVERAGE: gpus.getAveragePowerUsage,
        PowerDomain.MEMORY_AVERAGE: gpus.getAverageMemoryPowerUsage,
    }

    # Just check the first GPU for support, since all GPUs are homogeneous.
    for domain, method in methods.items():
        try:
            _ = method(0)
            supported.append(domain)
            logger.info("Power domain %s is supported", domain.value)
        except ZeusGPUNotSupportedError:
            logger.info("Power domain %s is not supported", domain.value)
        except Exception as e:
            logger.warning(
                "Unexpected error while checking for %s support on GPU %d: %s",
                domain.value,
                self.gpu_indices[0],
                e,
            )

    return supported

_stop

_stop()

Stop all monitoring processes.

Source code in zeus/monitor/power.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def _stop(self) -> None:
    """Stop all monitoring processes."""
    # First, signal all processes to stop
    for domain in PowerDomain:
        if domain in self.stop_events:
            self.stop_events[domain].set()

    # Then, wait for each process to complete
    for domain in PowerDomain:
        if domain in self.processes and self.processes[domain].is_alive():
            self.processes[domain].join(timeout=2.0)
            if self.processes[domain].is_alive():
                self.processes[domain].terminate()
                self.processes[domain].join(timeout=1.0)

    self.processes.clear()

_process_queue_data

_process_queue_data(domain)

Process all pending samples from a specific domain's queue.

Source code in zeus/monitor/power.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def _process_queue_data(self, domain: PowerDomain) -> None:
    """Process all pending samples from a specific domain's queue."""
    if domain not in self.data_queues:
        return

    while True:
        try:
            sample = self.data_queues[domain].get_nowait()
            if sample == "STOP":
                break
            assert isinstance(sample, PowerSample)
            self.samples[domain][sample.gpu_index].append(sample)
        except Empty:
            break

_process_all_queue_data

_process_all_queue_data()

Process all pending samples from all domain queues.

Source code in zeus/monitor/power.py
294
295
296
297
def _process_all_queue_data(self) -> None:
    """Process all pending samples from all domain queues."""
    for domain in self.supported_domains:
        self._process_queue_data(domain)

get_power_timeline

get_power_timeline(
    power_domain,
    gpu_index=None,
    start_time=None,
    end_time=None,
)

Get power timeline for specific power domain and GPU(s).

Parameters:

Name Type Description Default
power_domain PowerDomain

Power domain to query

required
gpu_index int | None

Specific GPU index, or None for all GPUs

None
start_time float | None

Start time filter (unix timestamp)

None
end_time float | None

End time filter (unix timestamp)

None

Returns:

Type Description
dict[int, list[tuple[float, float]]]

Dictionary mapping GPU indices to timeline data with deduplication.

dict[int, list[tuple[float, float]]]

Timeline data is list of (timestamp, power_watts) tuples.

Source code in zeus/monitor/power.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def get_power_timeline(
    self,
    power_domain: PowerDomain,
    gpu_index: int | None = None,
    start_time: float | None = None,
    end_time: float | None = None,
) -> dict[int, list[tuple[float, float]]]:
    """Get power timeline for specific power domain and GPU(s).

    Args:
        power_domain: Power domain to query
        gpu_index: Specific GPU index, or None for all GPUs
        start_time: Start time filter (unix timestamp)
        end_time: End time filter (unix timestamp)

    Returns:
        Dictionary mapping GPU indices to timeline data with deduplication.
        Timeline data is list of (timestamp, power_watts) tuples.
    """
    if power_domain not in self.supported_domains:
        return {}

    # Process any pending queue data for this domain
    self._process_queue_data(power_domain)

    # Determine which GPUs to query
    target_gpus = [gpu_index] if gpu_index is not None else self.gpu_indices

    result = {}
    for gpu_idx in target_gpus:
        if gpu_idx not in self.samples[power_domain]:
            continue

        # Extract timeline from samples
        timeline = []
        for sample in self.samples[power_domain][gpu_idx]:
            # Apply time filters
            if start_time is not None and sample.timestamp < start_time:
                continue
            if end_time is not None and sample.timestamp > end_time:
                continue

            timeline.append(
                (sample.timestamp, sample.power_mw / 1000.0)
            )  # Convert to watts

        # Sort by timestamp
        timeline.sort(key=lambda x: x[0])
        result[gpu_idx] = timeline

    return result

get_all_power_timelines

get_all_power_timelines(
    gpu_index=None, start_time=None, end_time=None
)

Get all power timelines organized by power domain.

Parameters:

Name Type Description Default
gpu_index int | None

Specific GPU index, or None for all GPUs

None
start_time float | None

Start time filter (unix timestamp)

None
end_time float | None

End time filter (unix timestamp)

None

Returns:

Type Description
dict[str, dict[int, list[tuple[float, float]]]]

Dictionary with power domain names as keys and each value is a dict

dict[str, dict[int, list[tuple[float, float]]]]

mapping GPU indices to timeline data.

Source code in zeus/monitor/power.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def get_all_power_timelines(
    self,
    gpu_index: int | None = None,
    start_time: float | None = None,
    end_time: float | None = None,
) -> dict[str, dict[int, list[tuple[float, float]]]]:
    """Get all power timelines organized by power domain.

    Args:
        gpu_index: Specific GPU index, or None for all GPUs
        start_time: Start time filter (unix timestamp)
        end_time: End time filter (unix timestamp)

    Returns:
        Dictionary with power domain names as keys and each value is a dict
        mapping GPU indices to timeline data.
    """
    result = {}
    for domain in self.supported_domains:
        result[domain.value] = self.get_power_timeline(
            domain, gpu_index, start_time, end_time
        )
    return result

get_energy

get_energy(start_time, end_time)

Get the energy used by the GPUs between two times (backward compatibility).

Uses device instant power for energy calculation.

Parameters:

Name Type Description Default
start_time float

Start time of the interval, from time.time().

required
end_time float

End time of the interval, from time.time().

required

Returns:

Type Description
dict[int, float] | None

A dictionary mapping GPU indices to the energy used by the GPU between the

dict[int, float] | None

two times. If there are no power readings, return None.

Source code in zeus/monitor/power.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def get_energy(self, start_time: float, end_time: float) -> dict[int, float] | None:
    """Get the energy used by the GPUs between two times (backward compatibility).

    Uses device instant power for energy calculation.

    Args:
        start_time: Start time of the interval, from time.time().
        end_time: End time of the interval, from time.time().

    Returns:
        A dictionary mapping GPU indices to the energy used by the GPU between the
        two times. If there are no power readings, return None.
    """
    timelines = self.get_power_timeline(
        PowerDomain.DEVICE_INSTANT, start_time=start_time, end_time=end_time
    )

    if not timelines:
        return None

    energy_result = {}
    for gpu_idx, timeline in timelines.items():
        if not timeline or len(timeline) < 2:
            energy_result[gpu_idx] = 0.0
            continue

        timestamps = [t[0] for t in timeline]
        powers = [t[1] for t in timeline]

        try:
            energy_result[gpu_idx] = float(auc(timestamps, powers))
        except ValueError:
            energy_result[gpu_idx] = 0.0

    return energy_result

get_power

get_power(time=None)

Get the instant power usage of the GPUs at a specific time point.

Uses device instant power for compatibility.

Parameters:

Name Type Description Default
time float | None

Time point to get the power usage at. If None, get the power usage at the last recorded time point.

None

Returns:

Type Description
dict[int, float] | None

A dictionary mapping GPU indices to the power usage of the GPU at the

dict[int, float] | None

specified time point. If there are no power readings, return None.

Source code in zeus/monitor/power.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def get_power(self, time: float | None = None) -> dict[int, float] | None:
    """Get the instant power usage of the GPUs at a specific time point.

    Uses device instant power for compatibility.

    Args:
        time: Time point to get the power usage at. If None, get the power usage
            at the last recorded time point.

    Returns:
        A dictionary mapping GPU indices to the power usage of the GPU at the
        specified time point. If there are no power readings, return None.
    """
    if PowerDomain.DEVICE_INSTANT not in self.supported_domains:
        raise ValueError(
            "PowerDomain.DEVICE_INSTANT is not supported by the current GPUs."
        )

    # Process any pending queue data
    self._process_all_queue_data()

    result = {}
    for gpu_idx in self.gpu_indices:
        samples = self.samples[PowerDomain.DEVICE_INSTANT][gpu_idx]
        if not samples:
            return None

        if time is None:
            # Get the most recent sample
            latest_sample = samples[-1]
            result[gpu_idx] = latest_sample.power_mw / 1000.0  # Convert to watts
        else:
            # Find the closest sample to the requested time using bisect
            timestamps = [sample.timestamp for sample in samples]
            pos = bisect.bisect_left(timestamps, time)

            if pos == 0:
                closest_sample = samples[0]
            elif pos == len(samples):
                closest_sample = samples[-1]
            else:
                # Check the closest sample before and after the requested time
                before = samples[pos - 1]
                after = samples[pos]
                closest_sample = (
                    before
                    if time - before.timestamp <= after.timestamp - time
                    else after
                )
            result[gpu_idx] = closest_sample.power_mw / 1000.0  # To Watts

    return result

infer_counter_update_period

infer_counter_update_period(gpu_indicies)

Infer the update period of the NVML power counter.

NVML counters can update as slow as 10 Hz depending on the GPU model, so there's no need to poll them too faster than that. This function infers the update period for each unique GPU model and selects the fastest-updating period detected. Then, it returns half the period to ensure that the counter is polled at least twice per update period.

Source code in zeus/monitor/power.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def infer_counter_update_period(gpu_indicies: list[int]) -> float:
    """Infer the update period of the NVML power counter.

    NVML counters can update as slow as 10 Hz depending on the GPU model, so
    there's no need to poll them too faster than that. This function infers the
    update period for each unique GPU model and selects the fastest-updating
    period detected. Then, it returns half the period to ensure that the
    counter is polled at least twice per update period.
    """
    logger = get_logger(__name__)

    gpus = get_gpus()

    # For each unique GPU model, infer the update period.
    update_period = 0.0
    gpu_models_covered = set()
    for index in gpu_indicies:
        if (model := gpus.getName(index)) not in gpu_models_covered:
            logger.info(
                "Detected %s, inferring NVML power counter update period.", model
            )
            gpu_models_covered.add(model)
            detected_period = _infer_counter_update_period_single(index)
            logger.info(
                "Counter update period for %s is %.2f s",
                model,
                detected_period,
            )
            update_period = min(update_period, detected_period)

    # Target half the update period to ensure that the counter is enough.
    update_period /= 2.0

    # Anything less than ten times a second is probably too slow.
    if update_period > 0.1:
        logger.warning(
            "Inferred update period (%.2f s) is too long. Using 0.1 s instead.",
            update_period,
        )
        update_period = 0.1
    return update_period

_infer_counter_update_period_single

_infer_counter_update_period_single(gpu_index)

Infer the update period of the NVML power counter for a single GPU.

Source code in zeus/monitor/power.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def _infer_counter_update_period_single(gpu_index: int) -> float:
    """Infer the update period of the NVML power counter for a single GPU."""
    gpus = get_gpus()

    # Collect 1000 samples of the power counter with timestamps.
    time_power_samples: list[tuple[float, int]] = [(0.0, 0) for _ in range(1000)]
    for i in range(len(time_power_samples)):
        time_power_samples[i] = (
            time(),
            gpus.getInstantPowerUsage(gpu_index),
        )

    # Find the timestamps when the power readings changed.
    time_power_samples = time_power_samples[10:]
    changed_times = []
    prev_power = time_power_samples[0][1]
    for t, p in time_power_samples:
        if p != prev_power:
            changed_times.append(t)
            prev_power = p

    # Compute the minimum time difference between power change timestamps.
    intervals = [
        time2 - time1 for time1, time2 in zip(changed_times, changed_times[1:])
    ]
    if len(intervals) == 0:
        return 0.1
    return min(intervals)

_domain_polling_process

_domain_polling_process(
    power_domain,
    gpu_indices,
    data_queue,
    ready_event,
    stop_event,
    update_period,
)

Polling process for a specific power domain with deduplication.

Source code in zeus/monitor/power.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def _domain_polling_process(
    power_domain: PowerDomain,
    gpu_indices: list[int],
    data_queue: mp.Queue,
    ready_event: EventClass,
    stop_event: EventClass,
    update_period: float,
) -> None:
    """Polling process for a specific power domain with deduplication."""
    try:
        # Get GPUs
        gpus = get_gpus(ensure_homogeneous=True)

        # Determine the GPU method to call based on domain
        power_methods = {
            PowerDomain.DEVICE_INSTANT: gpus.getInstantPowerUsage,
            PowerDomain.DEVICE_AVERAGE: gpus.getAveragePowerUsage,
            PowerDomain.MEMORY_AVERAGE: gpus.getAverageMemoryPowerUsage,
        }
        try:
            power_method = power_methods[power_domain]
        except KeyError:
            raise ValueError(f"Unknown power domain: {power_domain}") from None

        # Track previous power values for deduplication
        prev_power: dict[int, float] = {}

        # Signal that this process is ready to start monitoring
        ready_event.set()

        # Start polling loop
        num_not_supported_encounter = 0
        while not stop_event.is_set():
            timestamp = time()

            for gpu_index in gpu_indices:
                try:
                    power_mw = power_method(gpu_index)

                    # Sometimes, if we poll too fast, power can return 0. Skip.
                    if power_mw <= 0:
                        logger.warning(
                            "GPU %d power domain %s encountered %g mW measurement. "
                            "Skipping. Polling frequency may be too high.",
                            gpu_index,
                            power_domain.value,
                            power_mw,
                        )
                        continue

                    # Deduplication: only send if power changed
                    if gpu_index in prev_power and prev_power[gpu_index] == power_mw:
                        continue

                    prev_power[gpu_index] = power_mw

                    # Create and send power sample
                    sample = PowerSample(
                        timestamp=timestamp,
                        gpu_index=gpu_index,
                        power_mw=power_mw,
                    )

                    data_queue.put(sample)
                except ZeusGPUNotSupportedError as e:
                    # When polling at a high frequency, NVML sometimes raises
                    # a NotSupported error.
                    num_not_supported_encounter += 1
                    if num_not_supported_encounter > 10:
                        num_not_supported_encounter = 0
                        logger.warning(
                            "GPU %d domain %s encountered 10 NotSupported errors. "
                            "This may indicate a polling frequency that is too high. "
                            "Consider increasing the update period. "
                            "Exception: '%s'",
                            gpu_index,
                            power_domain.value,
                            e,
                        )
                except Exception as e:
                    logger.exception(
                        "Error polling power for GPU %d in domain %s: %s",
                        gpu_index,
                        power_domain.value,
                        e,
                    )
                    raise e

            # Sleep for the remaining time
            elapsed = time() - timestamp
            sleep_time = update_period - elapsed
            if sleep_time > 0:
                sleep(sleep_time)

    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(
            "Exiting polling process for domain %s due to error: %s",
            power_domain.value,
            e,
        )
        raise e
    finally:
        # Send stop signal
        data_queue.put("STOP")