Skip to content

temperature

zeus.monitor.temperature

Monitor the temperature of GPUs.

TemperatureSample dataclass

A single temperature measurement sample.

Source code in zeus/monitor/temperature.py
24
25
26
27
28
29
30
@dataclass
class TemperatureSample:
    """A single temperature measurement sample."""

    timestamp: float
    gpu_index: int
    temperature_c: int

TemperatureMonitor

Monitor GPU temperature over time.

This class provides: 1. Continuous temperature monitoring in a background process 2. Timeline export with deduplication 3. Point-in-time temperature queries

Note

The current implementation only supports cases where all GPUs are homogeneous (i.e., the same model).

Warning

Since the monitor spawns child processes, it should not be instantiated as a global variable. Refer to the "Safe importing of main module" section in the Python documentation for more details.

Source code in zeus/monitor/temperature.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class TemperatureMonitor:
    """Monitor GPU temperature over time.

    This class provides:
    1. Continuous temperature monitoring in a background process
    2. Timeline export with deduplication
    3. Point-in-time temperature queries

    !!! Note
        The current implementation only supports cases where all GPUs are homogeneous
        (i.e., the same model).

    !!! Warning
        Since the monitor spawns child processes, **it should not be instantiated as a global variable**.
        Refer to the "Safe importing of main module" section in the
        [Python documentation](https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods)
        for more details.
    """

    def __init__(
        self,
        gpu_indices: list[int] | None = None,
        update_period: float = 1.0,
        max_samples_per_gpu: int | None = None,
    ) -> None:
        """Initialize the temperature monitor.

        Args:
            gpu_indices: Indices of the GPUs to monitor. If None, monitor all GPUs.
            update_period: Update period of the temperature monitor in seconds.
                Defaults to 1.0 second. Temperature typically doesn't change as
                rapidly as power, so a longer update period is reasonable.
            max_samples_per_gpu: Maximum number of temperature samples to keep per GPU
                in memory. If None (default), unlimited samples are kept.
        """
        if gpu_indices is not None and not gpu_indices:
            raise ValueError("`gpu_indices` must be either `None` or non-empty")

        # Get GPUs
        gpus = get_gpus(ensure_homogeneous=True)

        # Configure GPU indices
        self.gpu_indices = (
            gpu_indices if gpu_indices is not None else list(range(len(gpus)))
        )
        if not self.gpu_indices:
            raise ValueError("At least one GPU index must be specified")
        logger.info("Monitoring temperature of GPUs %s", self.gpu_indices)

        self.update_period = update_period

        # Temperature samples are collected for each device index.
        self.temperature_samples: dict[int, collections.deque[TemperatureSample]] = {}
        for gpu_idx in self.gpu_indices:
            self.temperature_samples[gpu_idx] = collections.deque(
                maxlen=max_samples_per_gpu
            )

        # Spawn temperature collector process
        atexit.register(self._stop)
        ctx = mp.get_context("spawn")
        self.temperature_queue = ctx.Queue()
        self.temperature_ready_event = ctx.Event()
        self.temperature_stop_event = ctx.Event()
        self.temperature_process = ctx.Process(
            target=_temperature_polling_process,
            kwargs=dict(
                gpu_indices=self.gpu_indices,
                data_queue=self.temperature_queue,
                ready_event=self.temperature_ready_event,
                stop_event=self.temperature_stop_event,
                update_period=update_period,
            ),
            daemon=True,
            name="zeus-temperature-monitor",
        )
        self.temperature_process.start()

        # Wait for subprocess to signal it's ready
        logger.info("Waiting for temperature monitoring subprocess to be ready...")
        if not self.temperature_ready_event.wait(timeout=10.0):
            logger.warning(
                "Temperature monitor subprocess did not signal ready within timeout"
            )
        logger.info("Temperature monitoring subprocess is ready")

    def _stop(self) -> None:
        """Stop the monitoring process."""
        if hasattr(self, "temperature_stop_event"):
            self.temperature_stop_event.set()

        if hasattr(self, "temperature_process") and self.temperature_process.is_alive():
            self.temperature_process.join(timeout=2.0)
            if self.temperature_process.is_alive():
                self.temperature_process.terminate()
                self.temperature_process.join(timeout=1.0)

    def _process_temperature_queue_data(self) -> None:
        """Process all pending temperature samples from the queue."""
        if not hasattr(self, "temperature_queue"):
            return

        while True:
            try:
                sample = self.temperature_queue.get_nowait()
                if sample == "STOP":
                    break
                assert isinstance(sample, TemperatureSample)
                self.temperature_samples[sample.gpu_index].append(sample)
            except Empty:
                break

    def get_temperature_timeline(
        self,
        gpu_index: int | None = None,
        start_time: float | None = None,
        end_time: float | None = None,
    ) -> dict[int, list[tuple[float, int]]]:
        """Get temperature timeline for specific GPU(s).

        Args:
            gpu_index: Specific GPU index, or None for all GPUs
            start_time: Start time filter (unix timestamp)
            end_time: End time filter (unix timestamp)

        Returns:
            Dictionary mapping GPU indices to timeline data.
            Timeline data is list of (timestamp, temperature_celsius) tuples.
        """
        # Process any pending queue data
        self._process_temperature_queue_data()

        # Determine which GPUs to query
        target_gpus = [gpu_index] if gpu_index is not None else self.gpu_indices

        result = {}
        for gpu_idx in target_gpus:
            if gpu_idx not in self.temperature_samples:
                continue

            # Extract timeline from samples
            timeline = []
            for sample in self.temperature_samples[gpu_idx]:
                # Apply time filters
                if start_time is not None and sample.timestamp < start_time:
                    continue
                if end_time is not None and sample.timestamp > end_time:
                    continue

                timeline.append((sample.timestamp, sample.temperature_c))

            # Sort by timestamp
            timeline.sort(key=lambda x: x[0])
            result[gpu_idx] = timeline

        return result

    def get_temperature(self, time: float | None = None) -> dict[int, int] | None:
        """Get the GPU temperature at a specific time point.

        Args:
            time: Time point to get the temperature at. If None, get the temperature
                at the last recorded time point.

        Returns:
            A dictionary mapping GPU indices to the temperature of the GPU at the
            specified time point. If there are no temperature readings, return None.
        """
        # Process any pending queue data
        self._process_temperature_queue_data()

        result = {}
        for gpu_idx in self.gpu_indices:
            samples = self.temperature_samples[gpu_idx]
            if not samples:
                return None

            if time is None:
                # Get the most recent sample
                latest_sample = samples[-1]
                result[gpu_idx] = latest_sample.temperature_c
            else:
                # Find the closest sample to the requested time using bisect
                timestamps = [sample.timestamp for sample in samples]
                pos = bisect.bisect_left(timestamps, time)

                if pos == 0:
                    closest_sample = samples[0]
                elif pos == len(samples):
                    closest_sample = samples[-1]
                else:
                    # Check the closest sample before and after the requested time
                    before = samples[pos - 1]
                    after = samples[pos]
                    closest_sample = (
                        before
                        if time - before.timestamp <= after.timestamp - time
                        else after
                    )
                result[gpu_idx] = closest_sample.temperature_c

        return result

__init__

__init__(
    gpu_indices=None,
    update_period=1.0,
    max_samples_per_gpu=None,
)

Parameters:

Name Type Description Default
gpu_indices list[int] | None

Indices of the GPUs to monitor. If None, monitor all GPUs.

None
update_period float

Update period of the temperature monitor in seconds. Defaults to 1.0 second. Temperature typically doesn't change as rapidly as power, so a longer update period is reasonable.

1.0
max_samples_per_gpu int | None

Maximum number of temperature samples to keep per GPU in memory. If None (default), unlimited samples are kept.

None
Source code in zeus/monitor/temperature.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(
    self,
    gpu_indices: list[int] | None = None,
    update_period: float = 1.0,
    max_samples_per_gpu: int | None = None,
) -> None:
    """Initialize the temperature monitor.

    Args:
        gpu_indices: Indices of the GPUs to monitor. If None, monitor all GPUs.
        update_period: Update period of the temperature monitor in seconds.
            Defaults to 1.0 second. Temperature typically doesn't change as
            rapidly as power, so a longer update period is reasonable.
        max_samples_per_gpu: Maximum number of temperature samples to keep per GPU
            in memory. If None (default), unlimited samples are kept.
    """
    if gpu_indices is not None and not gpu_indices:
        raise ValueError("`gpu_indices` must be either `None` or non-empty")

    # Get GPUs
    gpus = get_gpus(ensure_homogeneous=True)

    # Configure GPU indices
    self.gpu_indices = (
        gpu_indices if gpu_indices is not None else list(range(len(gpus)))
    )
    if not self.gpu_indices:
        raise ValueError("At least one GPU index must be specified")
    logger.info("Monitoring temperature of GPUs %s", self.gpu_indices)

    self.update_period = update_period

    # Temperature samples are collected for each device index.
    self.temperature_samples: dict[int, collections.deque[TemperatureSample]] = {}
    for gpu_idx in self.gpu_indices:
        self.temperature_samples[gpu_idx] = collections.deque(
            maxlen=max_samples_per_gpu
        )

    # Spawn temperature collector process
    atexit.register(self._stop)
    ctx = mp.get_context("spawn")
    self.temperature_queue = ctx.Queue()
    self.temperature_ready_event = ctx.Event()
    self.temperature_stop_event = ctx.Event()
    self.temperature_process = ctx.Process(
        target=_temperature_polling_process,
        kwargs=dict(
            gpu_indices=self.gpu_indices,
            data_queue=self.temperature_queue,
            ready_event=self.temperature_ready_event,
            stop_event=self.temperature_stop_event,
            update_period=update_period,
        ),
        daemon=True,
        name="zeus-temperature-monitor",
    )
    self.temperature_process.start()

    # Wait for subprocess to signal it's ready
    logger.info("Waiting for temperature monitoring subprocess to be ready...")
    if not self.temperature_ready_event.wait(timeout=10.0):
        logger.warning(
            "Temperature monitor subprocess did not signal ready within timeout"
        )
    logger.info("Temperature monitoring subprocess is ready")

_stop

_stop()

Stop the monitoring process.

Source code in zeus/monitor/temperature.py
119
120
121
122
123
124
125
126
127
128
def _stop(self) -> None:
    """Stop the monitoring process."""
    if hasattr(self, "temperature_stop_event"):
        self.temperature_stop_event.set()

    if hasattr(self, "temperature_process") and self.temperature_process.is_alive():
        self.temperature_process.join(timeout=2.0)
        if self.temperature_process.is_alive():
            self.temperature_process.terminate()
            self.temperature_process.join(timeout=1.0)

_process_temperature_queue_data

_process_temperature_queue_data()

Process all pending temperature samples from the queue.

Source code in zeus/monitor/temperature.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def _process_temperature_queue_data(self) -> None:
    """Process all pending temperature samples from the queue."""
    if not hasattr(self, "temperature_queue"):
        return

    while True:
        try:
            sample = self.temperature_queue.get_nowait()
            if sample == "STOP":
                break
            assert isinstance(sample, TemperatureSample)
            self.temperature_samples[sample.gpu_index].append(sample)
        except Empty:
            break

get_temperature_timeline

get_temperature_timeline(
    gpu_index=None, start_time=None, end_time=None
)

Get temperature timeline for specific GPU(s).

Parameters:

Name Type Description Default
gpu_index int | None

Specific GPU index, or None for all GPUs

None
start_time float | None

Start time filter (unix timestamp)

None
end_time float | None

End time filter (unix timestamp)

None

Returns:

Type Description
dict[int, list[tuple[float, int]]]

Dictionary mapping GPU indices to timeline data.

dict[int, list[tuple[float, int]]]

Timeline data is list of (timestamp, temperature_celsius) tuples.

Source code in zeus/monitor/temperature.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_temperature_timeline(
    self,
    gpu_index: int | None = None,
    start_time: float | None = None,
    end_time: float | None = None,
) -> dict[int, list[tuple[float, int]]]:
    """Get temperature timeline for specific GPU(s).

    Args:
        gpu_index: Specific GPU index, or None for all GPUs
        start_time: Start time filter (unix timestamp)
        end_time: End time filter (unix timestamp)

    Returns:
        Dictionary mapping GPU indices to timeline data.
        Timeline data is list of (timestamp, temperature_celsius) tuples.
    """
    # Process any pending queue data
    self._process_temperature_queue_data()

    # Determine which GPUs to query
    target_gpus = [gpu_index] if gpu_index is not None else self.gpu_indices

    result = {}
    for gpu_idx in target_gpus:
        if gpu_idx not in self.temperature_samples:
            continue

        # Extract timeline from samples
        timeline = []
        for sample in self.temperature_samples[gpu_idx]:
            # Apply time filters
            if start_time is not None and sample.timestamp < start_time:
                continue
            if end_time is not None and sample.timestamp > end_time:
                continue

            timeline.append((sample.timestamp, sample.temperature_c))

        # Sort by timestamp
        timeline.sort(key=lambda x: x[0])
        result[gpu_idx] = timeline

    return result

get_temperature

get_temperature(time=None)

Get the GPU temperature at a specific time point.

Parameters:

Name Type Description Default
time float | None

Time point to get the temperature at. If None, get the temperature at the last recorded time point.

None

Returns:

Type Description
dict[int, int] | None

A dictionary mapping GPU indices to the temperature of the GPU at the

dict[int, int] | None

specified time point. If there are no temperature readings, return None.

Source code in zeus/monitor/temperature.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def get_temperature(self, time: float | None = None) -> dict[int, int] | None:
    """Get the GPU temperature at a specific time point.

    Args:
        time: Time point to get the temperature at. If None, get the temperature
            at the last recorded time point.

    Returns:
        A dictionary mapping GPU indices to the temperature of the GPU at the
        specified time point. If there are no temperature readings, return None.
    """
    # Process any pending queue data
    self._process_temperature_queue_data()

    result = {}
    for gpu_idx in self.gpu_indices:
        samples = self.temperature_samples[gpu_idx]
        if not samples:
            return None

        if time is None:
            # Get the most recent sample
            latest_sample = samples[-1]
            result[gpu_idx] = latest_sample.temperature_c
        else:
            # Find the closest sample to the requested time using bisect
            timestamps = [sample.timestamp for sample in samples]
            pos = bisect.bisect_left(timestamps, time)

            if pos == 0:
                closest_sample = samples[0]
            elif pos == len(samples):
                closest_sample = samples[-1]
            else:
                # Check the closest sample before and after the requested time
                before = samples[pos - 1]
                after = samples[pos]
                closest_sample = (
                    before
                    if time - before.timestamp <= after.timestamp - time
                    else after
                )
            result[gpu_idx] = closest_sample.temperature_c

    return result

_temperature_polling_process

_temperature_polling_process(
    gpu_indices,
    data_queue,
    ready_event,
    stop_event,
    update_period,
)

Polling process for GPU temperature with deduplication.

Source code in zeus/monitor/temperature.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def _temperature_polling_process(
    gpu_indices: list[int],
    data_queue: mp.Queue,
    ready_event: EventClass,
    stop_event: EventClass,
    update_period: float,
) -> None:
    """Polling process for GPU temperature with deduplication."""
    try:
        # Get GPUs
        gpus = get_gpus()

        # Track previous temperature values for deduplication
        prev_temperature: dict[int, int] = {}

        # Signal that this process is ready to start monitoring
        ready_event.set()

        # Start polling loop
        while not stop_event.is_set():
            timestamp = time()

            for gpu_index in gpu_indices:
                try:
                    temperature_c = gpus.getGpuTemperature(gpu_index)

                    # Deduplication: only send if temperature changed
                    if (
                        gpu_index in prev_temperature
                        and prev_temperature[gpu_index] == temperature_c
                    ):
                        continue

                    prev_temperature[gpu_index] = temperature_c

                    # Create and send temperature sample
                    sample = TemperatureSample(
                        timestamp=timestamp,
                        gpu_index=gpu_index,
                        temperature_c=temperature_c,
                    )

                    data_queue.put(sample)
                except ZeusGPUNotSupportedError as e:
                    logger.warning(
                        "GPU %d temperature reading not supported: %s",
                        gpu_index,
                        e,
                    )
                    # Don't keep trying if it's not supported
                    break
                except Exception as e:
                    logger.exception(
                        "Error polling temperature for GPU %d: %s",
                        gpu_index,
                        e,
                    )
                    raise

            # Sleep for the remaining time
            elapsed = time() - timestamp
            sleep_time = update_period - elapsed
            if sleep_time > 0:
                sleep(sleep_time)

    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(
            "Exiting temperature polling process due to error: %s",
            e,
        )
        raise e
    finally:
        # Send stop signal
        data_queue.put("STOP")