Skip to content

price

zeus.monitor.price

Electricity price providers used for price-aware optimizers.

ZeusElectricityPriceHTTPError

Bases: ZeusBaseError

Exception when HTTP request to electricity price provider fails.

Source code in zeus/monitor/price.py
37
38
39
40
41
42
class ZeusElectricityPriceHTTPError(ZeusBaseError):
    """Exception when HTTP request to electricity price provider fails."""

    def __init__(self, message: str) -> None:
        """Initialize HTTP request exception."""
        super().__init__(message)

__init__

__init__(message)
Source code in zeus/monitor/price.py
40
41
42
def __init__(self, message: str) -> None:
    """Initialize HTTP request exception."""
    super().__init__(message)

ZeusElectricityPriceNotFoundError

Bases: ZeusBaseError

Exception when electricity price measurement could not be retrieved.

Source code in zeus/monitor/price.py
45
46
47
48
49
50
class ZeusElectricityPriceNotFoundError(ZeusBaseError):
    """Exception when electricity price measurement could not be retrieved."""

    def __init__(self, message: str) -> None:
        """Initialize price not found exception."""
        super().__init__(message)

__init__

__init__(message)
Source code in zeus/monitor/price.py
48
49
50
def __init__(self, message: str) -> None:
    """Initialize price not found exception."""
    super().__init__(message)

ElectricityPriceProvider

Bases: ABC

Abstract class for implementing ways to fetch electricity price.

Source code in zeus/monitor/price.py
53
54
55
56
57
58
59
class ElectricityPriceProvider(abc.ABC):
    """Abstract class for implementing ways to fetch electricity price."""

    @abc.abstractmethod
    def get_current_electricity_prices(self) -> dict[str, list]:
        """Abstract method for fetching the current electricity price of the set location of the class."""
        pass

get_current_electricity_prices abstractmethod

get_current_electricity_prices()

Abstract method for fetching the current electricity price of the set location of the class.

Source code in zeus/monitor/price.py
56
57
58
59
@abc.abstractmethod
def get_current_electricity_prices(self) -> dict[str, list]:
    """Abstract method for fetching the current electricity price of the set location of the class."""
    pass

OpenEIClient

Bases: ElectricityPriceProvider

Electricity Price Provider with OpenEI API.

Reference:

  1. OpenEI
  2. OpenEI Utility Rates API
Source code in zeus/monitor/price.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class OpenEIClient(ElectricityPriceProvider):
    """Electricity Price Provider with OpenEI API.

    Reference:

    1. [OpenEI](https://openei.org/wiki/Main_Page)
    2. [OpenEI Utility Rates API](https://apps.openei.org/services/doc/rest/util_rates/?version=7)
    """

    def __init__(
        self,
        location: tuple[float, float],
        label: str,
        sector: Literal[
            "Residential", "Commercial", "Industrial", "Lighting"
        ] = "Residential",
        radius: int = 0,
    ) -> None:
        """Initializes OpenEI Utility Rates Provider.

        Args:
            location: tuple of latitude and longitude (latitude, longitude)
            label: unique identifier of a particular variant of a utility company's rate
            sector: depends on which sector of electricity is relevant to you
            radius: search radius for utility rates from the location
        """
        self.lat, self.long = location
        self.label = label
        self.sector = sector
        self.radius = radius

    def search_json(self, data, key_name, target_value, return_value):
        """Recursively search for a key in a nested JSON and return the return_value field if found."""
        results = []

        if isinstance(data, dict):
            for key, val in data.items():
                # Check if the current dictionary contains the matching key-value pair
                if key == key_name and val == target_value:
                    # If "energyratestructure" exists at the same level, add it to results
                    if return_value in data:
                        results.append(data[return_value])
                    else:
                        results.append(None)

                # Recursively search deeper in nested dictionaries
                results.extend(
                    self.search_json(val, key_name, target_value, return_value)
                )

        elif isinstance(data, list):
            for item in data:
                results.extend(
                    self.search_json(item, key_name, target_value, return_value)
                )

        return results

    def get_current_electricity_prices(self) -> dict[str, list]:
        """Fetches current carbon intensity of the location of the class."""
        try:
            url = (
                "https://api.openei.org/utility_rates?version=latest&format=json"
                + f"&api_key=tJASWWgPhBRpiZCwfhtKV2A3gyNxbDfvQvdI5Wa7&lat={self.lat}"
                + f"&lon={self.long}&radius={self.radius}"
                + f"&detail=full&sector={self.sector}"
            )
            resp = requests.get(url)
            data = resp.json()

        except requests.exceptions.RequestException as e:
            raise ZeusElectricityPriceHTTPError(
                f"Failed to retrieve current electricity price measurement: {e}"
            ) from e

        try:
            if "label" not in json.dumps(data):
                raise ZeusElectricityPriceNotFoundError(
                    f"No rates found for lat, lon: [{self.lat}, {self.long}]."
                )

            energy_rate_structure = self.search_json(
                data, "label", self.label, "energyratestructure"
            )
            energy_weekday_schedule = self.search_json(
                data, "label", self.label, "energyweekdayschedule"
            )
            energy_weekend_schedule = self.search_json(
                data, "label", self.label, "energyweekendschedule"
            )

            if (
                not energy_rate_structure
                or not energy_weekday_schedule
                or not energy_weekend_schedule
            ):
                raise ZeusElectricityPriceNotFoundError(
                    f"No rates found for the label: {self.label}."
                )

            rate_data = {
                "energy_rate_structure": energy_rate_structure[0],
                "energy_weekday_schedule": energy_weekday_schedule[0],
                "energy_weekend_schedule": energy_weekend_schedule[0],
            }
            return rate_data

        except (KeyError, ValueError) as e:
            logger.error(
                "Error occurred while processing electricity price data: %s", e
            )
            raise ZeusElectricityPriceNotFoundError(
                "Failed to process electricity price data."
            ) from e

__init__

__init__(location, label, sector='Residential', radius=0)

Parameters:

Name Type Description Default
location tuple[float, float]

tuple of latitude and longitude (latitude, longitude)

required
label str

unique identifier of a particular variant of a utility company's rate

required
sector Literal['Residential', 'Commercial', 'Industrial', 'Lighting']

depends on which sector of electricity is relevant to you

'Residential'
radius int

search radius for utility rates from the location

0
Source code in zeus/monitor/price.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    location: tuple[float, float],
    label: str,
    sector: Literal[
        "Residential", "Commercial", "Industrial", "Lighting"
    ] = "Residential",
    radius: int = 0,
) -> None:
    """Initializes OpenEI Utility Rates Provider.

    Args:
        location: tuple of latitude and longitude (latitude, longitude)
        label: unique identifier of a particular variant of a utility company's rate
        sector: depends on which sector of electricity is relevant to you
        radius: search radius for utility rates from the location
    """
    self.lat, self.long = location
    self.label = label
    self.sector = sector
    self.radius = radius

search_json

search_json(data, key_name, target_value, return_value)

Recursively search for a key in a nested JSON and return the return_value field if found.

Source code in zeus/monitor/price.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def search_json(self, data, key_name, target_value, return_value):
    """Recursively search for a key in a nested JSON and return the return_value field if found."""
    results = []

    if isinstance(data, dict):
        for key, val in data.items():
            # Check if the current dictionary contains the matching key-value pair
            if key == key_name and val == target_value:
                # If "energyratestructure" exists at the same level, add it to results
                if return_value in data:
                    results.append(data[return_value])
                else:
                    results.append(None)

            # Recursively search deeper in nested dictionaries
            results.extend(
                self.search_json(val, key_name, target_value, return_value)
            )

    elif isinstance(data, list):
        for item in data:
            results.extend(
                self.search_json(item, key_name, target_value, return_value)
            )

    return results

get_current_electricity_prices

get_current_electricity_prices()

Fetches current carbon intensity of the location of the class.

Source code in zeus/monitor/price.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_current_electricity_prices(self) -> dict[str, list]:
    """Fetches current carbon intensity of the location of the class."""
    try:
        url = (
            "https://api.openei.org/utility_rates?version=latest&format=json"
            + f"&api_key=tJASWWgPhBRpiZCwfhtKV2A3gyNxbDfvQvdI5Wa7&lat={self.lat}"
            + f"&lon={self.long}&radius={self.radius}"
            + f"&detail=full&sector={self.sector}"
        )
        resp = requests.get(url)
        data = resp.json()

    except requests.exceptions.RequestException as e:
        raise ZeusElectricityPriceHTTPError(
            f"Failed to retrieve current electricity price measurement: {e}"
        ) from e

    try:
        if "label" not in json.dumps(data):
            raise ZeusElectricityPriceNotFoundError(
                f"No rates found for lat, lon: [{self.lat}, {self.long}]."
            )

        energy_rate_structure = self.search_json(
            data, "label", self.label, "energyratestructure"
        )
        energy_weekday_schedule = self.search_json(
            data, "label", self.label, "energyweekdayschedule"
        )
        energy_weekend_schedule = self.search_json(
            data, "label", self.label, "energyweekendschedule"
        )

        if (
            not energy_rate_structure
            or not energy_weekday_schedule
            or not energy_weekend_schedule
        ):
            raise ZeusElectricityPriceNotFoundError(
                f"No rates found for the label: {self.label}."
            )

        rate_data = {
            "energy_rate_structure": energy_rate_structure[0],
            "energy_weekday_schedule": energy_weekday_schedule[0],
            "energy_weekend_schedule": energy_weekend_schedule[0],
        }
        return rate_data

    except (KeyError, ValueError) as e:
        logger.error(
            "Error occurred while processing electricity price data: %s", e
        )
        raise ZeusElectricityPriceNotFoundError(
            "Failed to process electricity price data."
        ) from e

EnergyCostMeasurement dataclass

Measurement result of one window.

Attributes:

Name Type Description
time float

Time elapsed (in seconds) during the measurement window.

gpu_energy dict[int, float]

Maps GPU indices to the energy consumed (in Joules) during the measurement window. GPU indices are from the DL framework's perspective after applying CUDA_VISIBLE_DEVICES.

gpu_energy_cost dict[int, float]

Maps GPU indices to the electricity cost (in $) during the measurement window. GPU indices are from the DL framework's perspective after applying CUDA_VISIBLE_DEVICES.

cpu_energy dict[int, float] | None

Maps CPU indices to the energy consumed (in Joules) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available.

cpu_energy_cost dict[int, float] | None

Maps CPU indices to the electricity cost (in $) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available.

dram_energy dict[int, float] | None

Maps CPU indices to the energy consumed (in Joules) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d) and DRAM measurements are taken from sub-packages within each powerzone. This can be 'None' if CPU measurement is not available or DRAM measurement is not available.

dram_energy_cost dict[int, float] | None

Maps CPU indices to the electricity cost (in $) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available or DRAM measurement is not available.

Source code in zeus/monitor/price.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@dataclass
class EnergyCostMeasurement:
    """Measurement result of one window.

    Attributes:
        time: Time elapsed (in seconds) during the measurement window.
        gpu_energy: Maps GPU indices to the energy consumed (in Joules) during the
            measurement window. GPU indices are from the DL framework's perspective
            after applying `CUDA_VISIBLE_DEVICES`.
        gpu_energy_cost: Maps GPU indices to the electricity cost (in $) during the
            measurement window. GPU indices are from the DL framework's perspective
            after applying `CUDA_VISIBLE_DEVICES`.
        cpu_energy: Maps CPU indices to the energy consumed (in Joules) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can
            be 'None' if CPU measurement is not available.
        cpu_energy_cost: Maps CPU indices to the electricity cost (in $) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can
            be 'None' if CPU measurement is not available.
        dram_energy: Maps CPU indices to the energy consumed (in Joules) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d) and DRAM
            measurements are taken from sub-packages within each powerzone. This can be 'None' if
            CPU measurement is not available or DRAM measurement is not available.
        dram_energy_cost: Maps CPU indices to the electricity cost (in $) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if
            CPU measurement is not available or DRAM measurement is not available.
    """

    time: float
    gpu_energy: dict[int, float]
    gpu_energy_cost: dict[int, float]
    cpu_energy: dict[int, float] | None = None
    cpu_energy_cost: dict[int, float] | None = None
    dram_energy: dict[int, float] | None = None
    dram_energy_cost: dict[int, float] | None = None

Op

Bases: Enum

Enum used to communicate between EnergyCostMonitor and _polling_process.

Source code in zeus/monitor/price.py
214
215
216
217
218
219
class Op(Enum):
    """Enum used to communicate between EnergyCostMonitor and _polling_process."""

    BEGIN = 0
    END = 1
    NEXTITER = 2

EnergyCostMonitor

Measure the energy, energy cost, and time consumption of a block of code.

Works for multi-GPU and heterogeneous GPU types. Aware of CUDA_VISIBLE_DEVICES. For instance, if CUDA_VISIBLE_DEVICES=2,3, GPU index 1 passed into gpu_indices will be interpreted as CUDA device 3.

You can mark the beginning and end of a measurement window, during which the energy cost, GPU energy, and time consumed will be recorded. Multiple concurrent measurement windows are supported.

Source code in zeus/monitor/price.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class EnergyCostMonitor:
    """Measure the energy, energy cost, and time consumption of a block of code.

    Works for multi-GPU and heterogeneous GPU types. Aware of `CUDA_VISIBLE_DEVICES`.
    For instance, if `CUDA_VISIBLE_DEVICES=2,3`, GPU index `1` passed into `gpu_indices`
    will be interpreted as CUDA device `3`.

    You can mark the beginning and end of a measurement window, during which the energy cost,
    GPU energy, and time consumed will be recorded. Multiple concurrent measurement windows
    are supported.
    """

    def __init__(
        self,
        electricity_price_provider: ElectricityPriceProvider,
        gpu_indices: list[int] | None = None,
        cpu_indices: list[int] | None = None,
        sync_execution_with: Literal["torch", "jax"] = "torch",
    ) -> None:
        """Initializes Energy Cost Monitor.

        Args:
            electricity_price_provider: provider for which electricity price values will be fetched from
            gpu_indices: Indices of all the CUDA devices to monitor. Time/Energy measurements
                will begin and end at the same time for these GPUs (i.e., synchronized).
                If None, all the GPUs available will be used. `CUDA_VISIBLE_DEVICES`
                is respected if set, e.g., GPU index `1` passed into `gpu_indices` when
                `CUDA_VISIBLE_DEVICES=2,3` will be interpreted as CUDA device `3`.
                `CUDA_VISIBLE_DEVICES`s formatted with comma-separated indices are supported.
            cpu_indices: Indices of the CPU packages to monitor. If None, all CPU packages will
                be used.
            sync_execution_with: Deep learning framework to use to synchronize CPU/GPU computations.
                Defaults to `"torch"`, in which case `torch.cuda.synchronize` will be used.
                See [`sync_execution`][zeus.utils.framework.sync_execution] for more details.
        """
        self.zeus_monitor = ZeusMonitor(
            gpu_indices=gpu_indices,
            cpu_indices=cpu_indices,
            sync_execution_with=sync_execution_with,
            approx_instant_energy=True,
        )
        self.electricity_price_provider = electricity_price_provider
        self.current_keys = set()

        # set up process and shared queues
        self.context = mp.get_context("spawn")
        self.command_q = self.context.Queue()
        self.finished_q = self.context.Queue()

    def begin_window(self, key: str, sync_execution: bool = True) -> None:
        """Begin a new measurement window.

        Args:
            key: Unique name of the measurement window.
            sync_execution: Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window. For instance, PyTorch
                and JAX will run GPU computations asynchronously, and waiting them to
                finish is necessary to ensure that the measurement window captures all
                and only the computations dispatched within the window.
        """
        # check if key is already used
        if key in self.current_keys:
            raise ValueError(f"Measurement window '{key}' already exists")
        self.current_keys.add(key)

        # start window
        self.zeus_monitor.begin_window(key, sync_execution=sync_execution)

        # if there were previously no active windows, start polling process
        if len(self.current_keys) == 1:
            self.polling_process = self.context.Process(
                target=_polling_process,
                args=(
                    self.command_q,
                    self.finished_q,
                    self.zeus_monitor.gpu_indices,
                    self.zeus_monitor.cpu_indices,
                    self.electricity_price_provider,
                ),
            )
            self.polling_process.start()

        # start subwindows
        self.command_q.put((Op.BEGIN, key))

    def end_window(
        self, key: str, sync_execution: bool = True
    ) -> EnergyCostMeasurement:
        """End a measurement window and return the time, energy consumption, and energy cost.

        Args:
            key: Name of an active measurement window.
            sync_execution: Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window. For instance, PyTorch
                and JAX will run GPU computations asynchronously, and waiting them to
                finish is necessary to ensure that the measurement window captures all
                and only the computations dispatched within the window.
        """
        # check if begin_window has been called with key before
        if key not in self.current_keys:
            raise ValueError(f"Measurement window '{key}' does not exist")

        # end window
        self.command_q.put((Op.END, key))
        (
            gpu_energy_cost,
            cpu_energy_cost,
            dram_energy_cost,
        ) = self.finished_q.get()
        self.current_keys.remove(key)

        overall_measurement = self.zeus_monitor.end_window(
            key, sync_execution=sync_execution
        )

        measurement = EnergyCostMeasurement(
            time=overall_measurement.time,
            gpu_energy=overall_measurement.gpu_energy,
            cpu_energy=overall_measurement.cpu_energy,
            dram_energy=overall_measurement.dram_energy,
            gpu_energy_cost=gpu_energy_cost,
            cpu_energy_cost=cpu_energy_cost or None,
            dram_energy_cost=dram_energy_cost or None,
        )

        return measurement

__init__

__init__(
    electricity_price_provider,
    gpu_indices=None,
    cpu_indices=None,
    sync_execution_with="torch",
)

Parameters:

Name Type Description Default
electricity_price_provider ElectricityPriceProvider

provider for which electricity price values will be fetched from

required
gpu_indices list[int] | None

Indices of all the CUDA devices to monitor. Time/Energy measurements will begin and end at the same time for these GPUs (i.e., synchronized). If None, all the GPUs available will be used. CUDA_VISIBLE_DEVICES is respected if set, e.g., GPU index 1 passed into gpu_indices when CUDA_VISIBLE_DEVICES=2,3 will be interpreted as CUDA device 3. CUDA_VISIBLE_DEVICESs formatted with comma-separated indices are supported.

None
cpu_indices list[int] | None

Indices of the CPU packages to monitor. If None, all CPU packages will be used.

None
sync_execution_with Literal['torch', 'jax']

Deep learning framework to use to synchronize CPU/GPU computations. Defaults to "torch", in which case torch.cuda.synchronize will be used. See sync_execution for more details.

'torch'
Source code in zeus/monitor/price.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def __init__(
    self,
    electricity_price_provider: ElectricityPriceProvider,
    gpu_indices: list[int] | None = None,
    cpu_indices: list[int] | None = None,
    sync_execution_with: Literal["torch", "jax"] = "torch",
) -> None:
    """Initializes Energy Cost Monitor.

    Args:
        electricity_price_provider: provider for which electricity price values will be fetched from
        gpu_indices: Indices of all the CUDA devices to monitor. Time/Energy measurements
            will begin and end at the same time for these GPUs (i.e., synchronized).
            If None, all the GPUs available will be used. `CUDA_VISIBLE_DEVICES`
            is respected if set, e.g., GPU index `1` passed into `gpu_indices` when
            `CUDA_VISIBLE_DEVICES=2,3` will be interpreted as CUDA device `3`.
            `CUDA_VISIBLE_DEVICES`s formatted with comma-separated indices are supported.
        cpu_indices: Indices of the CPU packages to monitor. If None, all CPU packages will
            be used.
        sync_execution_with: Deep learning framework to use to synchronize CPU/GPU computations.
            Defaults to `"torch"`, in which case `torch.cuda.synchronize` will be used.
            See [`sync_execution`][zeus.utils.framework.sync_execution] for more details.
    """
    self.zeus_monitor = ZeusMonitor(
        gpu_indices=gpu_indices,
        cpu_indices=cpu_indices,
        sync_execution_with=sync_execution_with,
        approx_instant_energy=True,
    )
    self.electricity_price_provider = electricity_price_provider
    self.current_keys = set()

    # set up process and shared queues
    self.context = mp.get_context("spawn")
    self.command_q = self.context.Queue()
    self.finished_q = self.context.Queue()

begin_window

begin_window(key, sync_execution=True)

Begin a new measurement window.

Parameters:

Name Type Description Default
key str

Unique name of the measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window. For instance, PyTorch and JAX will run GPU computations asynchronously, and waiting them to finish is necessary to ensure that the measurement window captures all and only the computations dispatched within the window.

True
Source code in zeus/monitor/price.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def begin_window(self, key: str, sync_execution: bool = True) -> None:
    """Begin a new measurement window.

    Args:
        key: Unique name of the measurement window.
        sync_execution: Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window. For instance, PyTorch
            and JAX will run GPU computations asynchronously, and waiting them to
            finish is necessary to ensure that the measurement window captures all
            and only the computations dispatched within the window.
    """
    # check if key is already used
    if key in self.current_keys:
        raise ValueError(f"Measurement window '{key}' already exists")
    self.current_keys.add(key)

    # start window
    self.zeus_monitor.begin_window(key, sync_execution=sync_execution)

    # if there were previously no active windows, start polling process
    if len(self.current_keys) == 1:
        self.polling_process = self.context.Process(
            target=_polling_process,
            args=(
                self.command_q,
                self.finished_q,
                self.zeus_monitor.gpu_indices,
                self.zeus_monitor.cpu_indices,
                self.electricity_price_provider,
            ),
        )
        self.polling_process.start()

    # start subwindows
    self.command_q.put((Op.BEGIN, key))

end_window

end_window(key, sync_execution=True)

End a measurement window and return the time, energy consumption, and energy cost.

Parameters:

Name Type Description Default
key str

Name of an active measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window. For instance, PyTorch and JAX will run GPU computations asynchronously, and waiting them to finish is necessary to ensure that the measurement window captures all and only the computations dispatched within the window.

True
Source code in zeus/monitor/price.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def end_window(
    self, key: str, sync_execution: bool = True
) -> EnergyCostMeasurement:
    """End a measurement window and return the time, energy consumption, and energy cost.

    Args:
        key: Name of an active measurement window.
        sync_execution: Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window. For instance, PyTorch
            and JAX will run GPU computations asynchronously, and waiting them to
            finish is necessary to ensure that the measurement window captures all
            and only the computations dispatched within the window.
    """
    # check if begin_window has been called with key before
    if key not in self.current_keys:
        raise ValueError(f"Measurement window '{key}' does not exist")

    # end window
    self.command_q.put((Op.END, key))
    (
        gpu_energy_cost,
        cpu_energy_cost,
        dram_energy_cost,
    ) = self.finished_q.get()
    self.current_keys.remove(key)

    overall_measurement = self.zeus_monitor.end_window(
        key, sync_execution=sync_execution
    )

    measurement = EnergyCostMeasurement(
        time=overall_measurement.time,
        gpu_energy=overall_measurement.gpu_energy,
        cpu_energy=overall_measurement.cpu_energy,
        dram_energy=overall_measurement.dram_energy,
        gpu_energy_cost=gpu_energy_cost,
        cpu_energy_cost=cpu_energy_cost or None,
        dram_energy_cost=dram_energy_cost or None,
    )

    return measurement

get_time_info

get_time_info()

Retrieve the month, day_type (weekend or weekday), and hour.

Source code in zeus/monitor/price.py
24
25
26
27
28
29
30
31
32
33
34
def get_time_info() -> tuple[str, str, int]:
    """Retrieve the month, day_type (weekend or weekday), and hour."""
    now = datetime.now()

    month = now.strftime("%B")

    day_type = "Weekend" if now.weekday() >= 5 else "Weekday"

    hour = now.hour

    return month, day_type, hour