Skip to content

zeusd

zeus.utils.zeusd

Zeusd client library.

Provides ZeusdConfig and ZeusdClient, the entry points for communicating with a Zeusd daemon. Handles connection, discovery, authentication, and exposes typed methods for every Zeusd endpoint.

Typical usage:

from zeus.utils.zeusd import ZeusdConfig, ZeusdClient

client = ZeusdClient(ZeusdConfig.uds(socket_path="/var/run/zeusd.sock"))
print(client.gpu_ids)       # [0, 1, 2, 3]
print(client.can_read_gpu)  # True

snapshot = client.get_gpu_power()
print(snapshot.power_mw)    # {0: 75000, 1: 120000, ...}

ZeusdConnectionError

Bases: ZeusBaseError

Cannot reach the Zeusd daemon.

Source code in zeus/utils/zeusd.py
34
35
class ZeusdConnectionError(ZeusBaseError):
    """Cannot reach the Zeusd daemon."""

ZeusdAuthError

Bases: ZeusBaseError

Authentication or authorization failure.

Source code in zeus/utils/zeusd.py
38
39
class ZeusdAuthError(ZeusBaseError):
    """Authentication or authorization failure."""

ZeusdCapabilityError

Bases: ZeusBaseError

Requested capabilities exceed what the daemon offers.

Source code in zeus/utils/zeusd.py
42
43
class ZeusdCapabilityError(ZeusBaseError):
    """Requested capabilities exceed what the daemon offers."""

GpuPowerSnapshot dataclass

Instantaneous GPU power readings from the daemon.

Attributes:

Name Type Description
timestamp_ms int

Daemon-side Unix timestamp in milliseconds.

power_mw dict[int, int]

Mapping of GPU index to power draw in milliwatts.

Source code in zeus/utils/zeusd.py
46
47
48
49
50
51
52
53
54
55
56
@dataclass(frozen=True)
class GpuPowerSnapshot:
    """Instantaneous GPU power readings from the daemon.

    Attributes:
        timestamp_ms: Daemon-side Unix timestamp in milliseconds.
        power_mw: Mapping of GPU index to power draw in milliwatts.
    """

    timestamp_ms: int
    power_mw: dict[int, int]

CpuDramPower dataclass

Power reading for a single CPU package.

Attributes:

Name Type Description
cpu_mw int

CPU package power in milliwatts.

dram_mw int | None

DRAM power in milliwatts, or None if unavailable.

Source code in zeus/utils/zeusd.py
59
60
61
62
63
64
65
66
67
68
69
@dataclass(frozen=True)
class CpuDramPower:
    """Power reading for a single CPU package.

    Attributes:
        cpu_mw: CPU package power in milliwatts.
        dram_mw: DRAM power in milliwatts, or None if unavailable.
    """

    cpu_mw: int
    dram_mw: int | None

CpuPowerSnapshot dataclass

Instantaneous CPU power readings from the daemon.

Attributes:

Name Type Description
timestamp_ms int

Daemon-side Unix timestamp in milliseconds.

power_mw dict[int, CpuDramPower]

Mapping of CPU index to power readings.

Source code in zeus/utils/zeusd.py
72
73
74
75
76
77
78
79
80
81
82
@dataclass(frozen=True)
class CpuPowerSnapshot:
    """Instantaneous CPU power readings from the daemon.

    Attributes:
        timestamp_ms: Daemon-side Unix timestamp in milliseconds.
        power_mw: Mapping of CPU index to power readings.
    """

    timestamp_ms: int
    power_mw: dict[int, CpuDramPower]

CpuEnergyResult dataclass

Cumulative energy for a single CPU package.

Attributes:

Name Type Description
cpu_energy_uj int | None

CPU package energy in microjoules, or None.

dram_energy_uj int | None

DRAM energy in microjoules, or None.

Source code in zeus/utils/zeusd.py
85
86
87
88
89
90
91
92
93
94
95
@dataclass(frozen=True)
class CpuEnergyResult:
    """Cumulative energy for a single CPU package.

    Attributes:
        cpu_energy_uj: CPU package energy in microjoules, or None.
        dram_energy_uj: DRAM energy in microjoules, or None.
    """

    cpu_energy_uj: int | None
    dram_energy_uj: int | None

ZeusdConfig dataclass

Connection configuration for a Zeusd daemon.

Use the classmethods tcp, uds, or from_env to construct.

Attributes:

Name Type Description
host_port str | None

host:port string (TCP mode). None for UDS.

socket_path str | None

Unix domain socket path (UDS mode). None for TCP.

token str | None

JWT token. Falls back to ZEUSD_TOKEN env var.

gpu_indices list[int] | None

GPU indices to stream (for PowerStreamingClient). None means all, empty list means skip. Ignored by ZeusdClient.

cpu_indices list[int] | None

CPU indices to stream (for PowerStreamingClient). None means all, empty list means skip. Ignored by ZeusdClient.

Source code in zeus/utils/zeusd.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@dataclass(frozen=True)
class ZeusdConfig:
    """Connection configuration for a Zeusd daemon.

    Use the classmethods `tcp`, `uds`, or `from_env` to construct.

    Attributes:
        host_port: `host:port` string (TCP mode). None for UDS.
        socket_path: Unix domain socket path (UDS mode). None for TCP.
        token: JWT token. Falls back to `ZEUSD_TOKEN` env var.
        gpu_indices: GPU indices to stream (for `PowerStreamingClient`).
            None means all, empty list means skip. Ignored by `ZeusdClient`.
        cpu_indices: CPU indices to stream (for `PowerStreamingClient`).
            None means all, empty list means skip. Ignored by `ZeusdClient`.
    """

    host_port: str | None = None
    socket_path: str | None = None
    token: str | None = None
    gpu_indices: list[int] | None = None
    cpu_indices: list[int] | None = None

    @classmethod
    def tcp(
        cls,
        host: str,
        port: int,
        *,
        token: str | None = None,
        gpu_indices: list[int] | None = None,
        cpu_indices: list[int] | None = None,
    ) -> ZeusdConfig:
        """Create a TCP connection config.

        Args:
            host: Hostname or IP of the Zeusd instance.
            port: TCP port.
            token: JWT token. Falls back to `ZEUSD_TOKEN` env var.
            gpu_indices: GPU indices to stream (for `PowerStreamingClient`).
            cpu_indices: CPU indices to stream (for `PowerStreamingClient`).
        """
        return cls(
            host_port=f"{host}:{port}",
            token=token,
            gpu_indices=gpu_indices,
            cpu_indices=cpu_indices,
        )

    @classmethod
    def uds(
        cls,
        socket_path: str,
        *,
        token: str | None = None,
        gpu_indices: list[int] | None = None,
        cpu_indices: list[int] | None = None,
    ) -> ZeusdConfig:
        """Create a Unix domain socket connection config.

        Args:
            socket_path: Path to the Zeusd Unix domain socket.
            token: JWT token. Falls back to `ZEUSD_TOKEN` env var.
            gpu_indices: GPU indices to stream (for `PowerStreamingClient`).
            cpu_indices: CPU indices to stream (for `PowerStreamingClient`).
        """
        return cls(
            socket_path=socket_path,
            token=token,
            gpu_indices=gpu_indices,
            cpu_indices=cpu_indices,
        )

    @classmethod
    def from_env(cls) -> ZeusdConfig | None:
        """Create from environment variables.

        Tries `ZEUSD_SOCK_PATH` (UDS) first, then `ZEUSD_HOST_PORT` (TCP).
        `ZEUSD_HOST_PORT` should be `host:port`. `ZEUSD_TOKEN` is read for
        JWT authentication.

        Returns None if neither env var is set.
        """
        token = os.environ.get("ZEUSD_TOKEN")
        sock = os.environ.get("ZEUSD_SOCK_PATH")
        if sock is not None:
            return cls.uds(socket_path=sock, token=token)
        host_port = os.environ.get("ZEUSD_HOST_PORT")
        if host_port is not None:
            return cls(host_port=host_port, token=token)
        return None

    @property
    def _is_uds(self) -> bool:
        return self.socket_path is not None

    def make_client(self) -> httpx.Client:
        """Create an httpx.Client with the appropriate transport and auth."""
        headers = self._auth_headers()
        if self._is_uds:
            transport = httpx.HTTPTransport(uds=self.socket_path)
            return httpx.Client(transport=transport, headers=headers)
        return httpx.Client(headers=headers)

    def url(self, path: str) -> str:
        """Build the full URL for the given path."""
        if self._is_uds:
            return f"http://localhost{path}"
        return f"http://{self.host_port}{path}"

    @property
    def endpoint(self) -> str:
        """Human-readable identifier for this connection."""
        if self._is_uds:
            return self.socket_path  # type: ignore[return-value]
        return self.host_port  # type: ignore[return-value]

    def _auth_headers(self) -> dict[str, str]:
        if self.token:
            return {"Authorization": f"Bearer {self.token}"}
        return {}

endpoint property

endpoint

Human-readable identifier for this connection.

tcp classmethod

tcp(host, port, *, token=None, gpu_indices=None, cpu_indices=None)

Create a TCP connection config.

Parameters:

Name Type Description Default
host str

Hostname or IP of the Zeusd instance.

required
port int

TCP port.

required
token str | None

JWT token. Falls back to ZEUSD_TOKEN env var.

None
gpu_indices list[int] | None

GPU indices to stream (for PowerStreamingClient).

None
cpu_indices list[int] | None

CPU indices to stream (for PowerStreamingClient).

None
Source code in zeus/utils/zeusd.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@classmethod
def tcp(
    cls,
    host: str,
    port: int,
    *,
    token: str | None = None,
    gpu_indices: list[int] | None = None,
    cpu_indices: list[int] | None = None,
) -> ZeusdConfig:
    """Create a TCP connection config.

    Args:
        host: Hostname or IP of the Zeusd instance.
        port: TCP port.
        token: JWT token. Falls back to `ZEUSD_TOKEN` env var.
        gpu_indices: GPU indices to stream (for `PowerStreamingClient`).
        cpu_indices: CPU indices to stream (for `PowerStreamingClient`).
    """
    return cls(
        host_port=f"{host}:{port}",
        token=token,
        gpu_indices=gpu_indices,
        cpu_indices=cpu_indices,
    )

uds classmethod

uds(socket_path, *, token=None, gpu_indices=None, cpu_indices=None)

Create a Unix domain socket connection config.

Parameters:

Name Type Description Default
socket_path str

Path to the Zeusd Unix domain socket.

required
token str | None

JWT token. Falls back to ZEUSD_TOKEN env var.

None
gpu_indices list[int] | None

GPU indices to stream (for PowerStreamingClient).

None
cpu_indices list[int] | None

CPU indices to stream (for PowerStreamingClient).

None
Source code in zeus/utils/zeusd.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def uds(
    cls,
    socket_path: str,
    *,
    token: str | None = None,
    gpu_indices: list[int] | None = None,
    cpu_indices: list[int] | None = None,
) -> ZeusdConfig:
    """Create a Unix domain socket connection config.

    Args:
        socket_path: Path to the Zeusd Unix domain socket.
        token: JWT token. Falls back to `ZEUSD_TOKEN` env var.
        gpu_indices: GPU indices to stream (for `PowerStreamingClient`).
        cpu_indices: CPU indices to stream (for `PowerStreamingClient`).
    """
    return cls(
        socket_path=socket_path,
        token=token,
        gpu_indices=gpu_indices,
        cpu_indices=cpu_indices,
    )

from_env classmethod

from_env()

Create from environment variables.

Tries ZEUSD_SOCK_PATH (UDS) first, then ZEUSD_HOST_PORT (TCP). ZEUSD_HOST_PORT should be host:port. ZEUSD_TOKEN is read for JWT authentication.

Returns None if neither env var is set.

Source code in zeus/utils/zeusd.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@classmethod
def from_env(cls) -> ZeusdConfig | None:
    """Create from environment variables.

    Tries `ZEUSD_SOCK_PATH` (UDS) first, then `ZEUSD_HOST_PORT` (TCP).
    `ZEUSD_HOST_PORT` should be `host:port`. `ZEUSD_TOKEN` is read for
    JWT authentication.

    Returns None if neither env var is set.
    """
    token = os.environ.get("ZEUSD_TOKEN")
    sock = os.environ.get("ZEUSD_SOCK_PATH")
    if sock is not None:
        return cls.uds(socket_path=sock, token=token)
    host_port = os.environ.get("ZEUSD_HOST_PORT")
    if host_port is not None:
        return cls(host_port=host_port, token=token)
    return None

make_client

make_client()

Create an httpx.Client with the appropriate transport and auth.

Source code in zeus/utils/zeusd.py
193
194
195
196
197
198
199
def make_client(self) -> httpx.Client:
    """Create an httpx.Client with the appropriate transport and auth."""
    headers = self._auth_headers()
    if self._is_uds:
        transport = httpx.HTTPTransport(uds=self.socket_path)
        return httpx.Client(transport=transport, headers=headers)
    return httpx.Client(headers=headers)

url

url(path)

Build the full URL for the given path.

Source code in zeus/utils/zeusd.py
201
202
203
204
205
def url(self, path: str) -> str:
    """Build the full URL for the given path."""
    if self._is_uds:
        return f"http://localhost{path}"
    return f"http://{self.host_port}{path}"

ZeusdClient

Authenticated client for a Zeusd daemon.

Handles connection, service discovery, and JWT authentication in one place. Provides typed methods for every Zeusd endpoint.

Parameters:

Name Type Description Default
config ZeusdConfig | None

Connection configuration. If None, tries environment variables: ZEUSD_SOCK_PATH (UDS) first, then ZEUSD_HOST_PORT (TCP).

None

Raises:

Type Description
ZeusdConnectionError

If the daemon is unreachable.

Source code in zeus/utils/zeusd.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
class ZeusdClient:
    """Authenticated client for a Zeusd daemon.

    Handles connection, service discovery, and JWT authentication in
    one place.  Provides typed methods for every Zeusd endpoint.

    Args:
        config: Connection configuration.  If None, tries environment
            variables: `ZEUSD_SOCK_PATH` (UDS) first, then
            `ZEUSD_HOST_PORT` (TCP).

    Raises:
        ZeusdConnectionError: If the daemon is unreachable.
    """

    def __init__(self, config: ZeusdConfig | None = None) -> None:
        """Initialize the client, run discovery, and attempt authentication."""
        if config is None:
            config = ZeusdConfig.from_env()
            if config is None:
                raise ZeusdConnectionError(
                    "No Zeusd connection configured. Set ZEUSD_SOCK_PATH or ZEUSD_HOST_PORT, or pass a ZeusdConfig."
                )
        self._config = config
        self._client = config.make_client()

        try:
            resp = self._client.get(config.url("/discover"))
        except httpx.RequestError as exc:
            raise ZeusdConnectionError(f"Cannot reach Zeusd at {config.endpoint}: {exc}") from exc
        if resp.status_code != 200:
            raise ZeusdConnectionError(
                f"Zeusd at {config.endpoint} returned HTTP {resp.status_code} on /discover: {resp.text}"
            )
        data = resp.json()
        self._gpu_ids: list[int] = data.get("gpu_ids", [])
        self._cpu_ids: list[int] = data.get("cpu_ids", [])
        self._dram_available: list[bool] = data.get("dram_available", [])
        self._enabled_api_groups: set[str] = set(data.get("enabled_api_groups", []))
        self._auth_required: bool = data.get("auth_required", False)

        self._auth_error: str | None = None
        self._granted_scopes: frozenset[str] = frozenset()
        self._whoami_sub: str | None = None
        self._whoami_exp: int | None = None
        if self._auth_required:
            if not config.token:
                self._auth_error = (
                    f"Zeusd at {config.endpoint} requires authentication but "
                    "no token was provided. Set the ZEUSD_TOKEN environment "
                    "variable or pass token= in the config."
                )
            else:
                whoami_resp = self._client.get(config.url("/auth/whoami"))
                if whoami_resp.status_code == 401:
                    self._auth_error = f"Token rejected by Zeusd at {config.endpoint}: {whoami_resp.text}"
                elif whoami_resp.status_code != 200:
                    self._auth_error = (
                        f"Unexpected response from /auth/whoami at "
                        f"{config.endpoint} (HTTP {whoami_resp.status_code}): "
                        f"{whoami_resp.text}"
                    )
                else:
                    whoami = whoami_resp.json()
                    self._granted_scopes = frozenset(whoami.get("scopes", []))
                    self._whoami_sub = whoami.get("sub")
                    self._whoami_exp = whoami.get("exp")
                    logger.info(
                        "Authenticated with Zeusd at %s as user '%s' (scopes: %s)",
                        config.endpoint,
                        self._whoami_sub,
                        sorted(self._granted_scopes),
                    )
            if self._auth_error:
                logger.warning("Auth issue with Zeusd at %s: %s", config.endpoint, self._auth_error)

    @property
    def endpoint(self) -> str:
        """Human-readable identifier for this connection."""
        return self._config.endpoint

    @property
    def gpu_ids(self) -> list[int]:
        """GPU device indices available on this daemon."""
        return list(self._gpu_ids)

    @property
    def cpu_ids(self) -> list[int]:
        """CPU device indices available on this daemon."""
        return list(self._cpu_ids)

    @property
    def dram_available(self) -> list[bool]:
        """Per-CPU DRAM energy availability, aligned with `cpu_ids`."""
        return list(self._dram_available)

    @property
    def auth_required(self) -> bool:
        """Whether this daemon requires JWT authentication."""
        return self._auth_required

    @property
    def auth_error(self) -> str | None:
        """Auth error message, or None if auth succeeded or is not required."""
        return self._auth_error

    @property
    def granted_scopes(self) -> frozenset[str]:
        """Scopes granted by the current token (empty if auth is off or failed)."""
        return self._granted_scopes

    def _can(self, api_group: str, scope: str) -> bool:
        if api_group not in self._enabled_api_groups:
            return False
        return not (self._auth_required and scope not in self._granted_scopes)

    @property
    def can_read_gpu(self) -> bool:
        """Whether GPU read endpoints are accessible."""
        return self._can("gpu-read", "gpu-read")

    @property
    def can_control_gpu(self) -> bool:
        """Whether GPU control endpoints are accessible."""
        return self._can("gpu-control", "gpu-control")

    @property
    def can_read_cpu(self) -> bool:
        """Whether CPU read endpoints are accessible."""
        return self._can("cpu-read", "cpu-read")

    def get_gpu_energy(self, gpu_ids: list[int]) -> dict[int, int]:
        """Get cumulative energy consumption per GPU.

        Args:
            gpu_ids: GPU indices to query.

        Returns:
            Mapping of GPU index to cumulative energy in millijoules.
        """
        resp = self._client.get(
            self._config.url("/gpu/get_cumulative_energy"),
            params={"gpu_ids": ",".join(str(i) for i in gpu_ids)},
        )
        self._check(resp, "get_gpu_energy")
        data = resp.json()
        return {int(k): v["energy_mj"] for k, v in data.items()}

    def get_gpu_power(self, gpu_ids: list[int] | None = None) -> GpuPowerSnapshot:
        """Get instantaneous GPU power readings.

        Args:
            gpu_ids: GPU indices to query.  None means all.

        Returns:
            Snapshot with timestamp and per-GPU power in milliwatts.
        """
        params: dict[str, str] = {}
        if gpu_ids is not None:
            params["gpu_ids"] = ",".join(str(i) for i in gpu_ids)
        resp = self._client.get(self._config.url("/gpu/get_power"), params=params)
        self._check(resp, "get_gpu_power")
        data = resp.json()
        return GpuPowerSnapshot(
            timestamp_ms=data["timestamp_ms"],
            power_mw={int(k): v for k, v in data["power_mw"].items()},
        )

    def set_power_limit(self, gpu_ids: list[int], power_limit_mw: int, block: bool = True) -> None:
        """Set the power management limit for the given GPUs."""
        resp = self._client.post(
            self._config.url("/gpu/set_power_limit"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "power_limit_mw": str(power_limit_mw),
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "set_power_limit")

    def set_persistence_mode(self, gpu_ids: list[int], enabled: bool, block: bool = True) -> None:
        """Set persistence mode for the given GPUs."""
        resp = self._client.post(
            self._config.url("/gpu/set_persistence_mode"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "enabled": "true" if enabled else "false",
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "set_persistence_mode")

    def set_gpu_locked_clocks(
        self,
        gpu_ids: list[int],
        min_clock_mhz: int,
        max_clock_mhz: int,
        block: bool = True,
    ) -> None:
        """Lock the GPU clock to a specified range (MHz)."""
        resp = self._client.post(
            self._config.url("/gpu/set_gpu_locked_clocks"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "min_clock_mhz": str(min_clock_mhz),
                "max_clock_mhz": str(max_clock_mhz),
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "set_gpu_locked_clocks")

    def reset_gpu_locked_clocks(self, gpu_ids: list[int], block: bool = True) -> None:
        """Reset locked GPU clocks to the default."""
        resp = self._client.post(
            self._config.url("/gpu/reset_gpu_locked_clocks"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "reset_gpu_locked_clocks")

    def set_mem_locked_clocks(
        self,
        gpu_ids: list[int],
        min_clock_mhz: int,
        max_clock_mhz: int,
        block: bool = True,
    ) -> None:
        """Lock the memory clock to a specified range (MHz)."""
        resp = self._client.post(
            self._config.url("/gpu/set_mem_locked_clocks"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "min_clock_mhz": str(min_clock_mhz),
                "max_clock_mhz": str(max_clock_mhz),
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "set_mem_locked_clocks")

    def reset_mem_locked_clocks(self, gpu_ids: list[int], block: bool = True) -> None:
        """Reset locked memory clocks to the default."""
        resp = self._client.post(
            self._config.url("/gpu/reset_mem_locked_clocks"),
            params={
                "gpu_ids": ",".join(str(i) for i in gpu_ids),
                "block": "true" if block else "false",
            },
        )
        self._check(resp, "reset_mem_locked_clocks")

    def get_cpu_energy(
        self,
        cpu_ids: list[int],
        cpu: bool = True,
        dram: bool = True,
    ) -> dict[int, CpuEnergyResult]:
        """Get cumulative energy consumption per CPU.

        Args:
            cpu_ids: CPU indices to query.
            cpu: Whether to include CPU package energy.
            dram: Whether to include DRAM energy.

        Returns:
            Mapping of CPU index to energy results.
        """
        resp = self._client.get(
            self._config.url("/cpu/get_cumulative_energy"),
            params={
                "cpu_ids": ",".join(str(i) for i in cpu_ids),
                "cpu": "true" if cpu else "false",
                "dram": "true" if dram else "false",
            },
        )
        self._check(resp, "get_cpu_energy")
        data = resp.json()
        return {
            int(k): CpuEnergyResult(
                cpu_energy_uj=v.get("cpu_energy_uj"),
                dram_energy_uj=v.get("dram_energy_uj"),
            )
            for k, v in data.items()
        }

    def get_cpu_power(self, cpu_ids: list[int] | None = None) -> CpuPowerSnapshot:
        """Get instantaneous CPU power readings.

        Args:
            cpu_ids: CPU indices to query.  None means all.

        Returns:
            Snapshot with timestamp and per-CPU power in milliwatts.
        """
        params: dict[str, str] = {}
        if cpu_ids is not None:
            params["cpu_ids"] = ",".join(str(i) for i in cpu_ids)
        resp = self._client.get(self._config.url("/cpu/get_power"), params=params)
        self._check(resp, "get_cpu_power")
        data = resp.json()
        return CpuPowerSnapshot(
            timestamp_ms=data["timestamp_ms"],
            power_mw={
                int(k): CpuDramPower(cpu_mw=v["cpu_mw"], dram_mw=v.get("dram_mw")) for k, v in data["power_mw"].items()
            },
        )

    def get_time(self) -> float:
        """Get daemon timestamp in seconds."""
        resp = self._client.get(self._config.url("/time"))
        self._check(resp, "get_time")
        return resp.json()["timestamp_ms"] / 1000.0

    def make_client(self) -> httpx.Client:
        """Create a new httpx.Client with this client's transport and auth.

        Used by `PowerStreamingClient` for SSE streaming connections
        where a dedicated, long-lived httpx.Client is needed.
        """
        return self._config.make_client()

    def url(self, path: str) -> str:
        """Build the full URL for the given path.

        Used together with `make_client()` for streaming URLs.
        """
        return self._config.url(path)

    @staticmethod
    def _check(resp: httpx.Response, operation: str) -> None:
        """Raise ZeusdError if the response is not 200."""
        if resp.status_code != 200:
            # Import here to avoid circular import at module level.
            from zeus.device.exception import ZeusdError

            raise ZeusdError(f"Failed to {operation}: {resp.text}")

endpoint property

endpoint

Human-readable identifier for this connection.

gpu_ids property

gpu_ids

GPU device indices available on this daemon.

cpu_ids property

cpu_ids

CPU device indices available on this daemon.

dram_available property

dram_available

Per-CPU DRAM energy availability, aligned with cpu_ids.

auth_required property

auth_required

Whether this daemon requires JWT authentication.

auth_error property

auth_error

Auth error message, or None if auth succeeded or is not required.

granted_scopes property

granted_scopes

Scopes granted by the current token (empty if auth is off or failed).

can_read_gpu property

can_read_gpu

Whether GPU read endpoints are accessible.

can_control_gpu property

can_control_gpu

Whether GPU control endpoints are accessible.

can_read_cpu property

can_read_cpu

Whether CPU read endpoints are accessible.

__init__

__init__(config=None)
Source code in zeus/utils/zeusd.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def __init__(self, config: ZeusdConfig | None = None) -> None:
    """Initialize the client, run discovery, and attempt authentication."""
    if config is None:
        config = ZeusdConfig.from_env()
        if config is None:
            raise ZeusdConnectionError(
                "No Zeusd connection configured. Set ZEUSD_SOCK_PATH or ZEUSD_HOST_PORT, or pass a ZeusdConfig."
            )
    self._config = config
    self._client = config.make_client()

    try:
        resp = self._client.get(config.url("/discover"))
    except httpx.RequestError as exc:
        raise ZeusdConnectionError(f"Cannot reach Zeusd at {config.endpoint}: {exc}") from exc
    if resp.status_code != 200:
        raise ZeusdConnectionError(
            f"Zeusd at {config.endpoint} returned HTTP {resp.status_code} on /discover: {resp.text}"
        )
    data = resp.json()
    self._gpu_ids: list[int] = data.get("gpu_ids", [])
    self._cpu_ids: list[int] = data.get("cpu_ids", [])
    self._dram_available: list[bool] = data.get("dram_available", [])
    self._enabled_api_groups: set[str] = set(data.get("enabled_api_groups", []))
    self._auth_required: bool = data.get("auth_required", False)

    self._auth_error: str | None = None
    self._granted_scopes: frozenset[str] = frozenset()
    self._whoami_sub: str | None = None
    self._whoami_exp: int | None = None
    if self._auth_required:
        if not config.token:
            self._auth_error = (
                f"Zeusd at {config.endpoint} requires authentication but "
                "no token was provided. Set the ZEUSD_TOKEN environment "
                "variable or pass token= in the config."
            )
        else:
            whoami_resp = self._client.get(config.url("/auth/whoami"))
            if whoami_resp.status_code == 401:
                self._auth_error = f"Token rejected by Zeusd at {config.endpoint}: {whoami_resp.text}"
            elif whoami_resp.status_code != 200:
                self._auth_error = (
                    f"Unexpected response from /auth/whoami at "
                    f"{config.endpoint} (HTTP {whoami_resp.status_code}): "
                    f"{whoami_resp.text}"
                )
            else:
                whoami = whoami_resp.json()
                self._granted_scopes = frozenset(whoami.get("scopes", []))
                self._whoami_sub = whoami.get("sub")
                self._whoami_exp = whoami.get("exp")
                logger.info(
                    "Authenticated with Zeusd at %s as user '%s' (scopes: %s)",
                    config.endpoint,
                    self._whoami_sub,
                    sorted(self._granted_scopes),
                )
        if self._auth_error:
            logger.warning("Auth issue with Zeusd at %s: %s", config.endpoint, self._auth_error)

get_gpu_energy

get_gpu_energy(gpu_ids)

Get cumulative energy consumption per GPU.

Parameters:

Name Type Description Default
gpu_ids list[int]

GPU indices to query.

required

Returns:

Type Description
dict[int, int]

Mapping of GPU index to cumulative energy in millijoules.

Source code in zeus/utils/zeusd.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_gpu_energy(self, gpu_ids: list[int]) -> dict[int, int]:
    """Get cumulative energy consumption per GPU.

    Args:
        gpu_ids: GPU indices to query.

    Returns:
        Mapping of GPU index to cumulative energy in millijoules.
    """
    resp = self._client.get(
        self._config.url("/gpu/get_cumulative_energy"),
        params={"gpu_ids": ",".join(str(i) for i in gpu_ids)},
    )
    self._check(resp, "get_gpu_energy")
    data = resp.json()
    return {int(k): v["energy_mj"] for k, v in data.items()}

get_gpu_power

get_gpu_power(gpu_ids=None)

Get instantaneous GPU power readings.

Parameters:

Name Type Description Default
gpu_ids list[int] | None

GPU indices to query. None means all.

None

Returns:

Type Description
GpuPowerSnapshot

Snapshot with timestamp and per-GPU power in milliwatts.

Source code in zeus/utils/zeusd.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def get_gpu_power(self, gpu_ids: list[int] | None = None) -> GpuPowerSnapshot:
    """Get instantaneous GPU power readings.

    Args:
        gpu_ids: GPU indices to query.  None means all.

    Returns:
        Snapshot with timestamp and per-GPU power in milliwatts.
    """
    params: dict[str, str] = {}
    if gpu_ids is not None:
        params["gpu_ids"] = ",".join(str(i) for i in gpu_ids)
    resp = self._client.get(self._config.url("/gpu/get_power"), params=params)
    self._check(resp, "get_gpu_power")
    data = resp.json()
    return GpuPowerSnapshot(
        timestamp_ms=data["timestamp_ms"],
        power_mw={int(k): v for k, v in data["power_mw"].items()},
    )

set_power_limit

set_power_limit(gpu_ids, power_limit_mw, block=True)

Set the power management limit for the given GPUs.

Source code in zeus/utils/zeusd.py
388
389
390
391
392
393
394
395
396
397
398
def set_power_limit(self, gpu_ids: list[int], power_limit_mw: int, block: bool = True) -> None:
    """Set the power management limit for the given GPUs."""
    resp = self._client.post(
        self._config.url("/gpu/set_power_limit"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "power_limit_mw": str(power_limit_mw),
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "set_power_limit")

set_persistence_mode

set_persistence_mode(gpu_ids, enabled, block=True)

Set persistence mode for the given GPUs.

Source code in zeus/utils/zeusd.py
400
401
402
403
404
405
406
407
408
409
410
def set_persistence_mode(self, gpu_ids: list[int], enabled: bool, block: bool = True) -> None:
    """Set persistence mode for the given GPUs."""
    resp = self._client.post(
        self._config.url("/gpu/set_persistence_mode"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "enabled": "true" if enabled else "false",
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "set_persistence_mode")

set_gpu_locked_clocks

set_gpu_locked_clocks(gpu_ids, min_clock_mhz, max_clock_mhz, block=True)

Lock the GPU clock to a specified range (MHz).

Source code in zeus/utils/zeusd.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def set_gpu_locked_clocks(
    self,
    gpu_ids: list[int],
    min_clock_mhz: int,
    max_clock_mhz: int,
    block: bool = True,
) -> None:
    """Lock the GPU clock to a specified range (MHz)."""
    resp = self._client.post(
        self._config.url("/gpu/set_gpu_locked_clocks"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "min_clock_mhz": str(min_clock_mhz),
            "max_clock_mhz": str(max_clock_mhz),
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "set_gpu_locked_clocks")

reset_gpu_locked_clocks

reset_gpu_locked_clocks(gpu_ids, block=True)

Reset locked GPU clocks to the default.

Source code in zeus/utils/zeusd.py
431
432
433
434
435
436
437
438
439
440
def reset_gpu_locked_clocks(self, gpu_ids: list[int], block: bool = True) -> None:
    """Reset locked GPU clocks to the default."""
    resp = self._client.post(
        self._config.url("/gpu/reset_gpu_locked_clocks"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "reset_gpu_locked_clocks")

set_mem_locked_clocks

set_mem_locked_clocks(gpu_ids, min_clock_mhz, max_clock_mhz, block=True)

Lock the memory clock to a specified range (MHz).

Source code in zeus/utils/zeusd.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def set_mem_locked_clocks(
    self,
    gpu_ids: list[int],
    min_clock_mhz: int,
    max_clock_mhz: int,
    block: bool = True,
) -> None:
    """Lock the memory clock to a specified range (MHz)."""
    resp = self._client.post(
        self._config.url("/gpu/set_mem_locked_clocks"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "min_clock_mhz": str(min_clock_mhz),
            "max_clock_mhz": str(max_clock_mhz),
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "set_mem_locked_clocks")

reset_mem_locked_clocks

reset_mem_locked_clocks(gpu_ids, block=True)

Reset locked memory clocks to the default.

Source code in zeus/utils/zeusd.py
461
462
463
464
465
466
467
468
469
470
def reset_mem_locked_clocks(self, gpu_ids: list[int], block: bool = True) -> None:
    """Reset locked memory clocks to the default."""
    resp = self._client.post(
        self._config.url("/gpu/reset_mem_locked_clocks"),
        params={
            "gpu_ids": ",".join(str(i) for i in gpu_ids),
            "block": "true" if block else "false",
        },
    )
    self._check(resp, "reset_mem_locked_clocks")

get_cpu_energy

get_cpu_energy(cpu_ids, cpu=True, dram=True)

Get cumulative energy consumption per CPU.

Parameters:

Name Type Description Default
cpu_ids list[int]

CPU indices to query.

required
cpu bool

Whether to include CPU package energy.

True
dram bool

Whether to include DRAM energy.

True

Returns:

Type Description
dict[int, CpuEnergyResult]

Mapping of CPU index to energy results.

Source code in zeus/utils/zeusd.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def get_cpu_energy(
    self,
    cpu_ids: list[int],
    cpu: bool = True,
    dram: bool = True,
) -> dict[int, CpuEnergyResult]:
    """Get cumulative energy consumption per CPU.

    Args:
        cpu_ids: CPU indices to query.
        cpu: Whether to include CPU package energy.
        dram: Whether to include DRAM energy.

    Returns:
        Mapping of CPU index to energy results.
    """
    resp = self._client.get(
        self._config.url("/cpu/get_cumulative_energy"),
        params={
            "cpu_ids": ",".join(str(i) for i in cpu_ids),
            "cpu": "true" if cpu else "false",
            "dram": "true" if dram else "false",
        },
    )
    self._check(resp, "get_cpu_energy")
    data = resp.json()
    return {
        int(k): CpuEnergyResult(
            cpu_energy_uj=v.get("cpu_energy_uj"),
            dram_energy_uj=v.get("dram_energy_uj"),
        )
        for k, v in data.items()
    }

get_cpu_power

get_cpu_power(cpu_ids=None)

Get instantaneous CPU power readings.

Parameters:

Name Type Description Default
cpu_ids list[int] | None

CPU indices to query. None means all.

None

Returns:

Type Description
CpuPowerSnapshot

Snapshot with timestamp and per-CPU power in milliwatts.

Source code in zeus/utils/zeusd.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def get_cpu_power(self, cpu_ids: list[int] | None = None) -> CpuPowerSnapshot:
    """Get instantaneous CPU power readings.

    Args:
        cpu_ids: CPU indices to query.  None means all.

    Returns:
        Snapshot with timestamp and per-CPU power in milliwatts.
    """
    params: dict[str, str] = {}
    if cpu_ids is not None:
        params["cpu_ids"] = ",".join(str(i) for i in cpu_ids)
    resp = self._client.get(self._config.url("/cpu/get_power"), params=params)
    self._check(resp, "get_cpu_power")
    data = resp.json()
    return CpuPowerSnapshot(
        timestamp_ms=data["timestamp_ms"],
        power_mw={
            int(k): CpuDramPower(cpu_mw=v["cpu_mw"], dram_mw=v.get("dram_mw")) for k, v in data["power_mw"].items()
        },
    )

get_time

get_time()

Get daemon timestamp in seconds.

Source code in zeus/utils/zeusd.py
528
529
530
531
532
def get_time(self) -> float:
    """Get daemon timestamp in seconds."""
    resp = self._client.get(self._config.url("/time"))
    self._check(resp, "get_time")
    return resp.json()["timestamp_ms"] / 1000.0

make_client

make_client()

Create a new httpx.Client with this client's transport and auth.

Used by PowerStreamingClient for SSE streaming connections where a dedicated, long-lived httpx.Client is needed.

Source code in zeus/utils/zeusd.py
534
535
536
537
538
539
540
def make_client(self) -> httpx.Client:
    """Create a new httpx.Client with this client's transport and auth.

    Used by `PowerStreamingClient` for SSE streaming connections
    where a dedicated, long-lived httpx.Client is needed.
    """
    return self._config.make_client()

url

url(path)

Build the full URL for the given path.

Used together with make_client() for streaming URLs.

Source code in zeus/utils/zeusd.py
542
543
544
545
546
547
def url(self, path: str) -> str:
    """Build the full URL for the given path.

    Used together with `make_client()` for streaming URLs.
    """
    return self._config.url(path)

_check staticmethod

_check(resp, operation)

Raise ZeusdError if the response is not 200.

Source code in zeus/utils/zeusd.py
549
550
551
552
553
554
555
556
@staticmethod
def _check(resp: httpx.Response, operation: str) -> None:
    """Raise ZeusdError if the response is not 200."""
    if resp.status_code != 200:
        # Import here to avoid circular import at module level.
        from zeus.device.exception import ZeusdError

        raise ZeusdError(f"Failed to {operation}: {resp.text}")

require_capabilities

require_capabilities(client, *, read_gpu=False, control_gpu=False, read_cpu=False, gpu_ids=None, cpu_ids=None)

Fail-fast validation that the daemon supports what the caller needs.

Checks that the required API groups are enabled, the required scopes are granted by the token, and that the requested device IDs are available on the daemon.

Parameters:

Name Type Description Default
client ZeusdClient

The ZeusdClient to validate against.

required
read_gpu bool

Require the gpu-read capability.

False
control_gpu bool

Require the gpu-control capability.

False
read_cpu bool

Require the cpu-read capability.

False
gpu_ids list[int] | None

GPU indices that must be available.

None
cpu_ids list[int] | None

CPU indices that must be available.

None

Raises:

Type Description
ZeusdCapabilityError

If any requirement is not met.

Source code in zeus/utils/zeusd.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
def require_capabilities(
    client: ZeusdClient,
    *,
    read_gpu: bool = False,
    control_gpu: bool = False,
    read_cpu: bool = False,
    gpu_ids: list[int] | None = None,
    cpu_ids: list[int] | None = None,
) -> None:
    """Fail-fast validation that the daemon supports what the caller needs.

    Checks that the required API groups are enabled, the required scopes
    are granted by the token, and that the requested device IDs are
    available on the daemon.

    Args:
        client: The ZeusdClient to validate against.
        read_gpu: Require the gpu-read capability.
        control_gpu: Require the gpu-control capability.
        read_cpu: Require the cpu-read capability.
        gpu_ids: GPU indices that must be available.
        cpu_ids: CPU indices that must be available.

    Raises:
        ZeusdCapabilityError: If any requirement is not met.
    """
    if client.auth_error:
        raise ZeusdAuthError(client.auth_error)

    errors: list[str] = []

    if read_gpu and not client.can_read_gpu:
        errors.append(_capability_reason(client, "gpu-read"))
    if control_gpu and not client.can_control_gpu:
        errors.append(_capability_reason(client, "gpu-control"))
    if read_cpu and not client.can_read_cpu:
        errors.append(_capability_reason(client, "cpu-read"))

    if gpu_ids is not None:
        available = set(client.gpu_ids)
        missing = set(gpu_ids) - available
        if missing:
            errors.append(f"GPU indices {sorted(missing)} not available (available: {sorted(available)})")

    if cpu_ids is not None:
        available = set(client.cpu_ids)
        missing = set(cpu_ids) - available
        if missing:
            errors.append(f"CPU indices {sorted(missing)} not available (available: {sorted(available)})")

    if errors:
        raise ZeusdCapabilityError(f"Zeusd at {client.endpoint}: " + "; ".join(errors))

_capability_reason

_capability_reason(client, scope)

Build a human-readable reason why a capability is unavailable.

Source code in zeus/utils/zeusd.py
613
614
615
616
617
618
619
def _capability_reason(client: ZeusdClient, scope: str) -> str:
    """Build a human-readable reason why a capability is unavailable."""
    if scope not in client._enabled_api_groups:
        return f"API group '{scope}' is not enabled on this server"
    if client.auth_required and scope not in client.granted_scopes:
        return f"Token lacks required scope '{scope}' (granted: {sorted(client.granted_scopes)})"
    return f"'{scope}' is not available"