Skip to content

metric

zeus.metric

Track and export energy and power metrics via Prometheus.

MonitoringProcessState dataclass

Represents the state of a monitoring window.

Source code in zeus/metric.py
27
28
29
30
31
32
@dataclass
class MonitoringProcessState:
    """Represents the state of a monitoring window."""

    queue: mp.Queue
    proc: SpawnProcess

Metric

Bases: ABC

Abstract base class for all metric types in Zeus.

Defines a common interface for metrics, ensuring consistent behavior for begin_window and end_window operations.

Source code in zeus/metric.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Metric(abc.ABC):
    """Abstract base class for all metric types in Zeus.

    Defines a common interface for metrics, ensuring consistent behavior
    for `begin_window` and `end_window` operations.
    """

    @abc.abstractmethod
    def begin_window(self, name: str, sync_execution: bool = True) -> None:
        """Start a new measurement window.

        Args:
            name (str): Name of the measurement window.
            sync_execution (bool): Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window.
        """
        pass

    @abc.abstractmethod
    def end_window(self, name: str, sync_execution: bool = True) -> None:
        """End the current measurement window and report metrics.

        Args:
            name (str): Name of the measurement window.
            sync_execution (bool): Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window.
        """
        pass

begin_window abstractmethod

begin_window(name, sync_execution=True)

Start a new measurement window.

Parameters:

Name Type Description Default
name str

Name of the measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window.

True
Source code in zeus/metric.py
42
43
44
45
46
47
48
49
50
51
@abc.abstractmethod
def begin_window(self, name: str, sync_execution: bool = True) -> None:
    """Start a new measurement window.

    Args:
        name (str): Name of the measurement window.
        sync_execution (bool): Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window.
    """
    pass

end_window abstractmethod

end_window(name, sync_execution=True)

End the current measurement window and report metrics.

Parameters:

Name Type Description Default
name str

Name of the measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window.

True
Source code in zeus/metric.py
53
54
55
56
57
58
59
60
61
62
@abc.abstractmethod
def end_window(self, name: str, sync_execution: bool = True) -> None:
    """End the current measurement window and report metrics.

    Args:
        name (str): Name of the measurement window.
        sync_execution (bool): Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window.
    """
    pass

EnergyHistogram

Bases: Metric

Measures the energy consumption a code range and exports a histogram metrics.

Tracks energy consumption for GPUs, CPUs, and DRAM as Prometheus Histogram metrics.

Source code in zeus/metric.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class EnergyHistogram(Metric):
    """Measures the energy consumption a code range and exports a histogram metrics.

    Tracks energy consumption for GPUs, CPUs, and DRAM as Prometheus Histogram metrics.
    """

    def __init__(
        self,
        cpu_indices: list,
        gpu_indices: list,
        pushgateway_url: str,
        job: str,
        gpu_bucket_range: Sequence[float] = [50.0, 100.0, 200.0, 500.0, 1000.0],
        cpu_bucket_range: Sequence[float] = [10.0, 50.0, 100.0, 500.0, 1000.0],
        dram_bucket_range: Sequence[float] = [5.0, 10.0, 20.0, 50.0, 150.0],
    ) -> None:
        """Initialize the EnergyHistogram class.

        Sets up the Prometheus Histogram metrics to track energy consumption for GPUs, CPUs, and DRAMs.
        The data will be collected and pushed to the Prometheus Push Gateway at regular intervals.

        Args:
            cpu_indices (list): List of CPU indices to monitor.
            gpu_indices (list): List of GPU indices to monitor.
            pushgateway_url (str): URL of the Prometheus Push Gateway where metrics will be pushed.
            job (str): Name of the Prometheus job to associate with the energy metrics.
            gpu_bucket_range (list[float], optional): Bucket ranges for GPU energy histograms.
                Defaults to [50.0, 100.0, 200.0, 500.0, 1000.0].
            cpu_bucket_range (list[float], optional): Bucket ranges for CPU energy histograms.
                Defaults to [10.0, 20.0, 50.0, 100.0, 200.0].
            dram_bucket_range (list[float], optional): Bucket ranges for DRAM energy histograms.
                Defaults to [5.0, 10.0, 20.0, 50.0, 150.0].

        Raises:
            ValueError: If any of the bucket ranges (GPU, CPU, DRAM) is an empty list.
        """
        self.gpu_bucket_range = gpu_bucket_range
        self.cpu_bucket_range = cpu_bucket_range
        self.dram_bucket_range = dram_bucket_range
        self.cpu_indices = cpu_indices
        self.gpu_indices = gpu_indices
        self.pushgateway_url = pushgateway_url
        self.job = job
        self.registry = CollectorRegistry()

        if not gpu_bucket_range:
            raise ValueError(
                "GPU bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
            )
        if not cpu_bucket_range:
            raise ValueError(
                "CPU bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
            )
        if not dram_bucket_range:
            raise ValueError(
                "DRAM bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
            )

        # Initialize GPU histograms
        if self.gpu_indices:
            self.gpu_histograms = Histogram(
                "energy_monitor_gpu_energy_joules",
                "GPU energy consumption",
                ["window", "index"],
                buckets=self.gpu_bucket_range,
                registry=self.registry,
            )
        # Initialize CPU histograms
        if self.cpu_indices:
            self.cpu_histograms = Histogram(
                "energy_monitor_cpu_energy_joules",
                "CPU energy consumption",
                ["window", "index"],
                buckets=self.cpu_bucket_range,
                registry=self.registry,
            )
            # Initialize CPU and DRAM histograms
            if any(cpu.supportsGetDramEnergyConsumption() for cpu in get_cpus().cpus):
                self.dram_histograms = Histogram(
                    "energy_monitor_dram_energy_joules",
                    "DRAM energy consumption",
                    ["window", "index"],
                    buckets=self.dram_bucket_range,
                    registry=self.registry,
                )

        self.max_gpu_bucket = max(self.gpu_bucket_range)
        self.max_cpu_bucket = max(self.cpu_bucket_range)
        self.max_dram_bucket = max(self.dram_bucket_range)

        self.min_gpu_bucket = min(self.gpu_bucket_range)
        self.min_cpu_bucket = min(self.cpu_bucket_range)
        self.min_dram_bucket = min(self.dram_bucket_range)

        self.energy_monitor = ZeusMonitor(
            cpu_indices=cpu_indices, gpu_indices=gpu_indices
        )

    def begin_window(self, name: str, sync_execution: bool = True) -> None:
        """Begin the energy monitoring window.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool): Whether to execute synchronously. Defaults to True. If assigned True, calls sync_execution_fn with the defined gpu
        """
        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        self.energy_monitor.begin_window(
            f"__EnergyHistogram_{name}", sync_execution=sync_execution
        )

    def end_window(self, name: str, sync_execution: bool = True) -> None:
        """End the current energy monitoring window and record the energy data.

        Retrieves the energy consumption data (for GPUs, CPUs, and DRAMs) for the monitoring window
        and updates the corresponding Histogram metrics. The data is then pushed to the Prometheus Push Gateway.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool): Whether to execute synchronously. Defaults to True.
        """
        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        measurement = self.energy_monitor.end_window(
            f"__EnergyHistogram_{name}", sync_execution=sync_execution
        )

        if measurement.gpu_energy:
            for gpu_index, gpu_energy in measurement.gpu_energy.items():
                self.gpu_histograms.labels(window=name, index=gpu_index).observe(
                    gpu_energy
                )
                if gpu_energy > self.max_gpu_bucket:
                    warnings.warn(
                        f"GPU {gpu_index} energy {gpu_energy} exceeds the maximum bucket value of {self.max_gpu_bucket}",
                        stacklevel=1,
                    )
                if gpu_energy < self.min_gpu_bucket:
                    warnings.warn(
                        f"GPU {gpu_index} energy {gpu_energy} exceeds the minimum bucket value of {self.min_gpu_bucket}",
                        stacklevel=1,
                    )

        if measurement.cpu_energy:
            for cpu_index, cpu_energy in measurement.cpu_energy.items():
                self.cpu_histograms.labels(window=name, index=cpu_index).observe(
                    cpu_energy
                )
                if cpu_energy > self.max_cpu_bucket:
                    warnings.warn(
                        f"CPU {cpu_index} energy {cpu_energy} exceeds the maximum bucket value of {self.max_cpu_bucket}",
                        stacklevel=1,
                    )
                if cpu_energy < self.min_cpu_bucket:
                    warnings.warn(
                        f"CPU {cpu_index} energy {cpu_energy} exceeds the minimum bucket value of {self.min_cpu_bucket}",
                        stacklevel=1,
                    )

        if measurement.dram_energy:
            for dram_index, dram_energy in measurement.dram_energy.items():
                self.dram_histograms.labels(window=name, index=dram_index).observe(
                    dram_energy
                )
                if dram_energy > self.max_dram_bucket:
                    warnings.warn(
                        f"DRAM {dram_index} energy {dram_energy} exceeds the maximum bucket value of {self.max_dram_bucket}",
                        stacklevel=1,
                    )
                if dram_energy < self.min_dram_bucket:
                    warnings.warn(
                        f"DRAM {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}",
                        stacklevel=1,
                    )

        push_to_gateway(self.pushgateway_url, job=self.job, registry=self.registry)

__init__

__init__(
    cpu_indices,
    gpu_indices,
    pushgateway_url,
    job,
    gpu_bucket_range=[50.0, 100.0, 200.0, 500.0, 1000.0],
    cpu_bucket_range=[10.0, 50.0, 100.0, 500.0, 1000.0],
    dram_bucket_range=[5.0, 10.0, 20.0, 50.0, 150.0],
)

Sets up the Prometheus Histogram metrics to track energy consumption for GPUs, CPUs, and DRAMs. The data will be collected and pushed to the Prometheus Push Gateway at regular intervals.

Parameters:

Name Type Description Default
cpu_indices list

List of CPU indices to monitor.

required
gpu_indices list

List of GPU indices to monitor.

required
pushgateway_url str

URL of the Prometheus Push Gateway where metrics will be pushed.

required
job str

Name of the Prometheus job to associate with the energy metrics.

required
gpu_bucket_range list[float]

Bucket ranges for GPU energy histograms. Defaults to [50.0, 100.0, 200.0, 500.0, 1000.0].

[50.0, 100.0, 200.0, 500.0, 1000.0]
cpu_bucket_range list[float]

Bucket ranges for CPU energy histograms. Defaults to [10.0, 20.0, 50.0, 100.0, 200.0].

[10.0, 50.0, 100.0, 500.0, 1000.0]
dram_bucket_range list[float]

Bucket ranges for DRAM energy histograms. Defaults to [5.0, 10.0, 20.0, 50.0, 150.0].

[5.0, 10.0, 20.0, 50.0, 150.0]

Raises:

Type Description
ValueError

If any of the bucket ranges (GPU, CPU, DRAM) is an empty list.

Source code in zeus/metric.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def __init__(
    self,
    cpu_indices: list,
    gpu_indices: list,
    pushgateway_url: str,
    job: str,
    gpu_bucket_range: Sequence[float] = [50.0, 100.0, 200.0, 500.0, 1000.0],
    cpu_bucket_range: Sequence[float] = [10.0, 50.0, 100.0, 500.0, 1000.0],
    dram_bucket_range: Sequence[float] = [5.0, 10.0, 20.0, 50.0, 150.0],
) -> None:
    """Initialize the EnergyHistogram class.

    Sets up the Prometheus Histogram metrics to track energy consumption for GPUs, CPUs, and DRAMs.
    The data will be collected and pushed to the Prometheus Push Gateway at regular intervals.

    Args:
        cpu_indices (list): List of CPU indices to monitor.
        gpu_indices (list): List of GPU indices to monitor.
        pushgateway_url (str): URL of the Prometheus Push Gateway where metrics will be pushed.
        job (str): Name of the Prometheus job to associate with the energy metrics.
        gpu_bucket_range (list[float], optional): Bucket ranges for GPU energy histograms.
            Defaults to [50.0, 100.0, 200.0, 500.0, 1000.0].
        cpu_bucket_range (list[float], optional): Bucket ranges for CPU energy histograms.
            Defaults to [10.0, 20.0, 50.0, 100.0, 200.0].
        dram_bucket_range (list[float], optional): Bucket ranges for DRAM energy histograms.
            Defaults to [5.0, 10.0, 20.0, 50.0, 150.0].

    Raises:
        ValueError: If any of the bucket ranges (GPU, CPU, DRAM) is an empty list.
    """
    self.gpu_bucket_range = gpu_bucket_range
    self.cpu_bucket_range = cpu_bucket_range
    self.dram_bucket_range = dram_bucket_range
    self.cpu_indices = cpu_indices
    self.gpu_indices = gpu_indices
    self.pushgateway_url = pushgateway_url
    self.job = job
    self.registry = CollectorRegistry()

    if not gpu_bucket_range:
        raise ValueError(
            "GPU bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
        )
    if not cpu_bucket_range:
        raise ValueError(
            "CPU bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
        )
    if not dram_bucket_range:
        raise ValueError(
            "DRAM bucket range cannot be empty. Please provide a valid range or omit the argument to use defaults."
        )

    # Initialize GPU histograms
    if self.gpu_indices:
        self.gpu_histograms = Histogram(
            "energy_monitor_gpu_energy_joules",
            "GPU energy consumption",
            ["window", "index"],
            buckets=self.gpu_bucket_range,
            registry=self.registry,
        )
    # Initialize CPU histograms
    if self.cpu_indices:
        self.cpu_histograms = Histogram(
            "energy_monitor_cpu_energy_joules",
            "CPU energy consumption",
            ["window", "index"],
            buckets=self.cpu_bucket_range,
            registry=self.registry,
        )
        # Initialize CPU and DRAM histograms
        if any(cpu.supportsGetDramEnergyConsumption() for cpu in get_cpus().cpus):
            self.dram_histograms = Histogram(
                "energy_monitor_dram_energy_joules",
                "DRAM energy consumption",
                ["window", "index"],
                buckets=self.dram_bucket_range,
                registry=self.registry,
            )

    self.max_gpu_bucket = max(self.gpu_bucket_range)
    self.max_cpu_bucket = max(self.cpu_bucket_range)
    self.max_dram_bucket = max(self.dram_bucket_range)

    self.min_gpu_bucket = min(self.gpu_bucket_range)
    self.min_cpu_bucket = min(self.cpu_bucket_range)
    self.min_dram_bucket = min(self.dram_bucket_range)

    self.energy_monitor = ZeusMonitor(
        cpu_indices=cpu_indices, gpu_indices=gpu_indices
    )

begin_window

begin_window(name, sync_execution=True)

Begin the energy monitoring window.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute synchronously. Defaults to True. If assigned True, calls sync_execution_fn with the defined gpu

True
Source code in zeus/metric.py
163
164
165
166
167
168
169
170
171
172
173
174
175
def begin_window(self, name: str, sync_execution: bool = True) -> None:
    """Begin the energy monitoring window.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool): Whether to execute synchronously. Defaults to True. If assigned True, calls sync_execution_fn with the defined gpu
    """
    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    self.energy_monitor.begin_window(
        f"__EnergyHistogram_{name}", sync_execution=sync_execution
    )

end_window

end_window(name, sync_execution=True)

End the current energy monitoring window and record the energy data.

Retrieves the energy consumption data (for GPUs, CPUs, and DRAMs) for the monitoring window and updates the corresponding Histogram metrics. The data is then pushed to the Prometheus Push Gateway.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute synchronously. Defaults to True.

True
Source code in zeus/metric.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def end_window(self, name: str, sync_execution: bool = True) -> None:
    """End the current energy monitoring window and record the energy data.

    Retrieves the energy consumption data (for GPUs, CPUs, and DRAMs) for the monitoring window
    and updates the corresponding Histogram metrics. The data is then pushed to the Prometheus Push Gateway.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool): Whether to execute synchronously. Defaults to True.
    """
    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    measurement = self.energy_monitor.end_window(
        f"__EnergyHistogram_{name}", sync_execution=sync_execution
    )

    if measurement.gpu_energy:
        for gpu_index, gpu_energy in measurement.gpu_energy.items():
            self.gpu_histograms.labels(window=name, index=gpu_index).observe(
                gpu_energy
            )
            if gpu_energy > self.max_gpu_bucket:
                warnings.warn(
                    f"GPU {gpu_index} energy {gpu_energy} exceeds the maximum bucket value of {self.max_gpu_bucket}",
                    stacklevel=1,
                )
            if gpu_energy < self.min_gpu_bucket:
                warnings.warn(
                    f"GPU {gpu_index} energy {gpu_energy} exceeds the minimum bucket value of {self.min_gpu_bucket}",
                    stacklevel=1,
                )

    if measurement.cpu_energy:
        for cpu_index, cpu_energy in measurement.cpu_energy.items():
            self.cpu_histograms.labels(window=name, index=cpu_index).observe(
                cpu_energy
            )
            if cpu_energy > self.max_cpu_bucket:
                warnings.warn(
                    f"CPU {cpu_index} energy {cpu_energy} exceeds the maximum bucket value of {self.max_cpu_bucket}",
                    stacklevel=1,
                )
            if cpu_energy < self.min_cpu_bucket:
                warnings.warn(
                    f"CPU {cpu_index} energy {cpu_energy} exceeds the minimum bucket value of {self.min_cpu_bucket}",
                    stacklevel=1,
                )

    if measurement.dram_energy:
        for dram_index, dram_energy in measurement.dram_energy.items():
            self.dram_histograms.labels(window=name, index=dram_index).observe(
                dram_energy
            )
            if dram_energy > self.max_dram_bucket:
                warnings.warn(
                    f"DRAM {dram_index} energy {dram_energy} exceeds the maximum bucket value of {self.max_dram_bucket}",
                    stacklevel=1,
                )
            if dram_energy < self.min_dram_bucket:
                warnings.warn(
                    f"DRAM {dram_index} energy {dram_energy} exceeds the minimum bucket value of {self.min_dram_bucket}",
                    stacklevel=1,
                )

    push_to_gateway(self.pushgateway_url, job=self.job, registry=self.registry)

EnergyCumulativeCounter

Bases: Metric

EnergyCumulativeCounter class to monitor and record cumulative energy consumption.

This class tracks GPU, CPU, and DRAM energy usage over time, and records the data as Prometheus Counter metrics. The energy consumption metrics are periodically updated and pushed to a Prometheus Push Gateway for monitoring and analysis.

The cumulative nature of the Counter ensures that energy values are always incremented over time, never reset, which is ideal for tracking continuously increasing values like energy usage.

Source code in zeus/metric.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class EnergyCumulativeCounter(Metric):
    """EnergyCumulativeCounter class to monitor and record cumulative energy consumption.

    This class tracks GPU, CPU, and DRAM energy usage over time, and records the data as Prometheus Counter metrics.
    The energy consumption metrics are periodically updated and pushed to a Prometheus Push Gateway for monitoring and analysis.

    The cumulative nature of the Counter ensures that energy values are always incremented over time, never reset,
    which is ideal for tracking continuously increasing values like energy usage.
    """

    def __init__(
        self,
        cpu_indices: list,
        gpu_indices: list,
        update_period: int,
        pushgateway_url: str,
        job: str,
    ) -> None:
        """Initialize the EnergyCumulativeCounter.

        Args:
            cpu_indices (list): List of CPU indices to monitor.
            gpu_indices (list): List of GPU indices to monitor.
            update_period: The time interval (in seconds) at which energy measurements are updated.
            pushgateway_url: The URL for the Prometheus Push Gateway where the metrics will be pushed.
            job: The name of the job to be associated with the Prometheus metrics.
        """
        self.cpu_indices = cpu_indices
        self.gpu_indices = gpu_indices
        self.update_period = update_period
        self.pushgateway_url = pushgateway_url
        self.job = job
        self.window_state: dict[str, MonitoringProcessState] = {}

    def begin_window(self, name: str, sync_execution: bool = False) -> None:
        """Begin the energy monitoring window.

        Starts a new multiprocessing process that monitors energy usage periodically
        and pushes the results to the Prometheus Push Gateway.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
        """
        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        context = mp.get_context("spawn")
        queue = context.Queue()
        proc = context.Process(
            target=energy_monitoring_loop,
            args=(
                name,
                queue,
                self.cpu_indices,
                self.gpu_indices,
                self.update_period,
                self.pushgateway_url,
                self.job,
            ),
        )
        proc.start()
        if not proc.is_alive():
            raise RuntimeError(f"Failed to start monitoring process for {name}.")

        self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

    def end_window(self, name: str, sync_execution: bool = False) -> None:
        """End the energy monitoring window.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
        """
        if name not in self.window_state:
            raise ValueError(f"No active monitoring process found for '{name}'.")

        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        state = self.window_state.pop(name)
        state.queue.put("stop")
        state.proc.join(timeout=20)

        if state.proc.is_alive():
            state.proc.terminate()

__init__

__init__(
    cpu_indices,
    gpu_indices,
    update_period,
    pushgateway_url,
    job,
)

Parameters:

Name Type Description Default
cpu_indices list

List of CPU indices to monitor.

required
gpu_indices list

List of GPU indices to monitor.

required
update_period int

The time interval (in seconds) at which energy measurements are updated.

required
pushgateway_url str

The URL for the Prometheus Push Gateway where the metrics will be pushed.

required
job str

The name of the job to be associated with the Prometheus metrics.

required
Source code in zeus/metric.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def __init__(
    self,
    cpu_indices: list,
    gpu_indices: list,
    update_period: int,
    pushgateway_url: str,
    job: str,
) -> None:
    """Initialize the EnergyCumulativeCounter.

    Args:
        cpu_indices (list): List of CPU indices to monitor.
        gpu_indices (list): List of GPU indices to monitor.
        update_period: The time interval (in seconds) at which energy measurements are updated.
        pushgateway_url: The URL for the Prometheus Push Gateway where the metrics will be pushed.
        job: The name of the job to be associated with the Prometheus metrics.
    """
    self.cpu_indices = cpu_indices
    self.gpu_indices = gpu_indices
    self.update_period = update_period
    self.pushgateway_url = pushgateway_url
    self.job = job
    self.window_state: dict[str, MonitoringProcessState] = {}

begin_window

begin_window(name, sync_execution=False)

Begin the energy monitoring window.

Starts a new multiprocessing process that monitors energy usage periodically and pushes the results to the Prometheus Push Gateway.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute monitoring synchronously. Defaults to False.

False
Source code in zeus/metric.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def begin_window(self, name: str, sync_execution: bool = False) -> None:
    """Begin the energy monitoring window.

    Starts a new multiprocessing process that monitors energy usage periodically
    and pushes the results to the Prometheus Push Gateway.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
    """
    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    context = mp.get_context("spawn")
    queue = context.Queue()
    proc = context.Process(
        target=energy_monitoring_loop,
        args=(
            name,
            queue,
            self.cpu_indices,
            self.gpu_indices,
            self.update_period,
            self.pushgateway_url,
            self.job,
        ),
    )
    proc.start()
    if not proc.is_alive():
        raise RuntimeError(f"Failed to start monitoring process for {name}.")

    self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

end_window

end_window(name, sync_execution=False)

End the energy monitoring window.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute monitoring synchronously. Defaults to False.

False
Source code in zeus/metric.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def end_window(self, name: str, sync_execution: bool = False) -> None:
    """End the energy monitoring window.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
    """
    if name not in self.window_state:
        raise ValueError(f"No active monitoring process found for '{name}'.")

    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    state = self.window_state.pop(name)
    state.queue.put("stop")
    state.proc.join(timeout=20)

    if state.proc.is_alive():
        state.proc.terminate()

PowerGauge

Bases: Metric

PowerGauge class to monitor and record power consumption.

This class tracks GPU power usage in real time and records it as Prometheus Gauge metrics. The Gauge metric type is suitable for tracking values that can go up and down over time, like power consumption.

Power usage data is collected at regular intervals and pushed to a Prometheus Push Gateway for monitoring.

Source code in zeus/metric.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class PowerGauge(Metric):
    """PowerGauge class to monitor and record power consumption.

    This class tracks GPU power usage in real time and records it as **Prometheus Gauge** metrics.
    The Gauge metric type is suitable for tracking values that can go up and down over time, like power consumption.

    Power usage data is collected at regular intervals and pushed to a Prometheus Push Gateway for monitoring.
    """

    def __init__(
        self,
        gpu_indices: list,
        update_period: int,
        pushgateway_url: str,
        job: str,
    ) -> None:
        """Initialize the PowerGauge metric.

        Args:
            gpu_indices (list[int]): List of GPU indices to monitor for power consumption.
            update_period (int): Interval (in seconds) between consecutive power measurements.
            pushgateway_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed.
            job (str): Name of the Prometheus job to associate with the power metrics.
        """
        self.gpu_indices = gpu_indices
        self.update_period = update_period
        self.pushgateway_url = pushgateway_url
        self.job = job
        self.window_state: dict[str, MonitoringProcessState] = {}

    def begin_window(self, name: str, sync_execution: bool = False) -> None:
        """Begin the power monitoring window.

        Starts a new multiprocessing process that runs the power monitoring loop.
        The process collects real-time power consumption data and updates the corresponding
        Gauge metrics in Prometheus.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
        """
        if name in self.window_state:
            raise ValueError(f"PowerGauge metric '{name}' already exists.")

        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        context = mp.get_context("spawn")
        queue = context.Queue()
        proc = context.Process(
            target=power_monitoring_loop,
            args=(
                name,
                queue,
                self.gpu_indices,
                self.update_period,
                self.pushgateway_url,
                self.job,
            ),
        )
        proc.start()
        if not proc.is_alive():
            raise RuntimeError(
                f"Failed to start power monitoring process for '{name}'."
            )

        self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

    def end_window(self, name: str, sync_execution: bool = False) -> None:
        """End the power monitoring window.

        Args:
            name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
            sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
        """
        if sync_execution:
            sync_execution_fn(self.gpu_indices)

        state = self.window_state.pop(name)
        state.queue.put("stop")
        state.proc.join(timeout=20)

        if state.proc.is_alive():
            state.proc.terminate()

__init__

__init__(gpu_indices, update_period, pushgateway_url, job)

Parameters:

Name Type Description Default
gpu_indices list[int]

List of GPU indices to monitor for power consumption.

required
update_period int

Interval (in seconds) between consecutive power measurements.

required
pushgateway_url str

URL of the Prometheus Push Gateway where Gauge metrics are pushed.

required
job str

Name of the Prometheus job to associate with the power metrics.

required
Source code in zeus/metric.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def __init__(
    self,
    gpu_indices: list,
    update_period: int,
    pushgateway_url: str,
    job: str,
) -> None:
    """Initialize the PowerGauge metric.

    Args:
        gpu_indices (list[int]): List of GPU indices to monitor for power consumption.
        update_period (int): Interval (in seconds) between consecutive power measurements.
        pushgateway_url (str): URL of the Prometheus Push Gateway where Gauge metrics are pushed.
        job (str): Name of the Prometheus job to associate with the power metrics.
    """
    self.gpu_indices = gpu_indices
    self.update_period = update_period
    self.pushgateway_url = pushgateway_url
    self.job = job
    self.window_state: dict[str, MonitoringProcessState] = {}

begin_window

begin_window(name, sync_execution=False)

Begin the power monitoring window.

Starts a new multiprocessing process that runs the power monitoring loop. The process collects real-time power consumption data and updates the corresponding Gauge metrics in Prometheus.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute monitoring synchronously. Defaults to False.

False
Source code in zeus/metric.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def begin_window(self, name: str, sync_execution: bool = False) -> None:
    """Begin the power monitoring window.

    Starts a new multiprocessing process that runs the power monitoring loop.
    The process collects real-time power consumption data and updates the corresponding
    Gauge metrics in Prometheus.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
    """
    if name in self.window_state:
        raise ValueError(f"PowerGauge metric '{name}' already exists.")

    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    context = mp.get_context("spawn")
    queue = context.Queue()
    proc = context.Process(
        target=power_monitoring_loop,
        args=(
            name,
            queue,
            self.gpu_indices,
            self.update_period,
            self.pushgateway_url,
            self.job,
        ),
    )
    proc.start()
    if not proc.is_alive():
        raise RuntimeError(
            f"Failed to start power monitoring process for '{name}'."
        )

    self.window_state[name] = MonitoringProcessState(queue=queue, proc=proc)

end_window

end_window(name, sync_execution=False)

End the power monitoring window.

Parameters:

Name Type Description Default
name str

The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.

required
sync_execution bool

Whether to execute monitoring synchronously. Defaults to False.

False
Source code in zeus/metric.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def end_window(self, name: str, sync_execution: bool = False) -> None:
    """End the power monitoring window.

    Args:
        name (str): The unique name of the measurement window. Must match between calls to 'begin_window' and 'end_window'.
        sync_execution (bool, optional): Whether to execute monitoring synchronously. Defaults to False.
    """
    if sync_execution:
        sync_execution_fn(self.gpu_indices)

    state = self.window_state.pop(name)
    state.queue.put("stop")
    state.proc.join(timeout=20)

    if state.proc.is_alive():
        state.proc.terminate()

energy_monitoring_loop

energy_monitoring_loop(
    name,
    pipe,
    cpu_indices,
    gpu_indices,
    update_period,
    pushgateway_url,
    job,
)

Runs in a separate process to collect and update energy consumption metrics (for GPUs, CPUs, and DRAM).

Parameters:

Name Type Description Default
name str

The user-defined name of the monitoring window (used as a label for Prometheus metrics).

required
pipe Queue

A multiprocessing queue for inter-process communication, used to signal when to stop the process.

required
cpu_indices list

List of CPU indices to monitor.

required
gpu_indices list

List of GPU indices to monitor.

required
update_period int

The interval (in seconds) between consecutive energy data updates.

required
pushgateway_url str

The URL of the Prometheus Push Gateway where the metrics will be pushed.

required
job str

The name of the Prometheus job associated with these metrics.

required
Source code in zeus/metric.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def energy_monitoring_loop(
    name: str,
    pipe: mp.Queue,
    cpu_indices: list,
    gpu_indices: list,
    update_period: int,
    pushgateway_url: str,
    job: str,
) -> None:
    """Runs in a separate process to collect and update energy consumption metrics (for GPUs, CPUs, and DRAM).

    Args:
        name (str): The user-defined name of the monitoring window (used as a label for Prometheus metrics).
        pipe (mp.Queue): A multiprocessing queue for inter-process communication, used to signal when to stop the process.
        cpu_indices (list): List of CPU indices to monitor.
        gpu_indices (list): List of GPU indices to monitor.
        update_period (int): The interval (in seconds) between consecutive energy data updates.
        pushgateway_url (str): The URL of the Prometheus Push Gateway where the metrics will be pushed.
        job (str): The name of the Prometheus job associated with these metrics.
    """
    registry = CollectorRegistry()
    energy_monitor = ZeusMonitor(cpu_indices=cpu_indices, gpu_indices=gpu_indices)
    gpu_counters = None
    cpu_counters = None
    dram_counters = None

    if energy_monitor.gpu_indices:
        gpu_counters = Counter(
            "energy_monitor_gpu_energy_joules",
            "GPU energy consumption",
            ["window", "index"],
            registry=registry,
        )

    if energy_monitor.cpu_indices:
        cpu_counters = Counter(
            "energy_monitor_cpu_energy_joules",
            "CPU energy consumption",
            ["window", "index"],
            registry=registry,
        )
        if any(cpu.supportsGetDramEnergyConsumption() for cpu in get_cpus().cpus):
            dram_counters = Counter(
                "energy_monitor_dram_energy_joules",
                "DRAM energy consumption",
                ["window", "index"],
                registry=registry,
            )

    while True:
        if not pipe.empty():
            break
        # Begin and end monitoring window using sync_execution
        energy_monitor.begin_window(
            f"__EnergyCumulativeCounter_{name}", sync_execution=False
        )
        time.sleep(update_period)
        measurement = energy_monitor.end_window(
            f"__EnergyCumulativeCounter_{name}", sync_execution=False
        )

        if measurement.gpu_energy:
            for gpu_index, energy in measurement.gpu_energy.items():
                if gpu_counters:
                    gpu_counters.labels(window=name, index=gpu_index).inc(energy)

        if measurement.cpu_energy:
            for cpu_index, energy in measurement.cpu_energy.items():
                if cpu_counters:
                    cpu_counters.labels(window=name, index=cpu_index).inc(energy)

        if measurement.dram_energy:
            for dram_index, energy in measurement.dram_energy.items():
                if dram_counters:
                    dram_counters.labels(window=name, index=dram_index).inc(energy)
        # Push metrics to Prometheus
        push_to_gateway(pushgateway_url, job=job, registry=registry)

power_monitoring_loop

power_monitoring_loop(
    name,
    pipe,
    gpu_indices,
    update_period,
    pushgateway_url,
    job,
)

Runs in a separate process and periodically collects power consumption data for each GPU and pushes the results to the Prometheus Push Gateway.

Parameters:

Name Type Description Default
name str

Unique name for the monitoring window (used as a label in Prometheus metrics).

required
pipe Queue

Queue to receive control signals (e.g., "stop").

required
gpu_indices list[int]

List of GPU indices to monitor for power consumption.

required
update_period int

Interval (in seconds) between consecutive power data polls.

required
pushgateway_url str

URL of the Prometheus Push Gateway where metrics are pushed.

required
job str

Name of the Prometheus job to associate with the metrics.

required
Source code in zeus/metric.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def power_monitoring_loop(
    name: str,
    pipe: mp.Queue,
    gpu_indices: list[int],
    update_period: int,
    pushgateway_url: str,
    job: str,
) -> None:
    """Runs in a separate process and periodically collects power consumption data for each GPU and pushes the results to the Prometheus Push Gateway.

    Args:
        name (str): Unique name for the monitoring window (used as a label in Prometheus metrics).
        pipe (multiprocessing.Queue): Queue to receive control signals (e.g., "stop").
        gpu_indices (list[int]): List of GPU indices to monitor for power consumption.
        update_period (int): Interval (in seconds) between consecutive power data polls.
        pushgateway_url (str): URL of the Prometheus Push Gateway where metrics are pushed.
        job (str): Name of the Prometheus job to associate with the metrics.
    """
    power_monitor = PowerMonitor(gpu_indices=gpu_indices)
    registry = CollectorRegistry()

    gpu_gauges = Gauge(
        "power_monitor_gpu_power_watts",
        "Records power consumption for GPU over time",
        ["window", "index"],
        registry=registry,
    )

    while True:
        if not pipe.empty():
            break

        power_measurement = power_monitor.get_power()

        try:
            if power_measurement:
                for gpu_index, power_value in power_measurement.items():
                    gpu_gauges.labels(window=name, index=gpu_index).set(power_value)
        except Exception as e:
            print(f"Error during processing power measurement: {e}")

        try:
            push_to_gateway(pushgateway_url, job=job, registry=registry)
        except Exception as e:
            print(f"Error pushing metrics: {e}")

        time.sleep(update_period)