Skip to content

carbon

zeus.monitor.carbon

Carbon intensity providers used for carbon-aware optimizers.

ZeusCarbonIntensityHTTPError

Bases: ZeusBaseError

Exception when HTTP request to carbon intensity provider fails.

Source code in zeus/monitor/carbon.py
40
41
42
43
44
45
class ZeusCarbonIntensityHTTPError(ZeusBaseError):
    """Exception when HTTP request to carbon intensity provider fails."""

    def __init__(self, message: str) -> None:
        """Initialize HTTP request exception."""
        super().__init__(message)

__init__

__init__(message)
Source code in zeus/monitor/carbon.py
43
44
45
def __init__(self, message: str) -> None:
    """Initialize HTTP request exception."""
    super().__init__(message)

ZeusCarbonIntensityNotFoundError

Bases: ZeusBaseError

Exception when carbon intensity measurement could not be retrieved.

Source code in zeus/monitor/carbon.py
48
49
50
51
52
53
class ZeusCarbonIntensityNotFoundError(ZeusBaseError):
    """Exception when carbon intensity measurement could not be retrieved."""

    def __init__(self, message: str) -> None:
        """Initialize carbon not found exception."""
        super().__init__(message)

__init__

__init__(message)
Source code in zeus/monitor/carbon.py
51
52
53
def __init__(self, message: str) -> None:
    """Initialize carbon not found exception."""
    super().__init__(message)

CarbonIntensityProvider

Bases: ABC

Abstract class for implementing ways to fetch carbon intensity.

Source code in zeus/monitor/carbon.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class CarbonIntensityProvider(abc.ABC):
    """Abstract class for implementing ways to fetch carbon intensity."""

    @abc.abstractmethod
    def get_current_carbon_intensity(self) -> float:
        """Abstract method for fetching the current carbon intensity of the set location of the class."""
        pass

    @abc.abstractmethod
    def get_recent_carbon_intensity(self) -> dict[datetime, float]:
        """Abstract method for fetching the current carbon intensity of the set location of the class."""
        pass

    @property
    @abc.abstractmethod
    def update_period(self) -> timedelta:
        """Abstract method for how long each carbon intensity value in the history dict remains current."""
        pass

    @property
    @abc.abstractmethod
    def history_length(self) -> int:
        """Abstract method for how many carbon intensity values are in the history dict."""
        pass

update_period abstractmethod property

update_period

Abstract method for how long each carbon intensity value in the history dict remains current.

history_length abstractmethod property

history_length

Abstract method for how many carbon intensity values are in the history dict.

get_current_carbon_intensity abstractmethod

get_current_carbon_intensity()

Abstract method for fetching the current carbon intensity of the set location of the class.

Source code in zeus/monitor/carbon.py
59
60
61
62
@abc.abstractmethod
def get_current_carbon_intensity(self) -> float:
    """Abstract method for fetching the current carbon intensity of the set location of the class."""
    pass

get_recent_carbon_intensity abstractmethod

get_recent_carbon_intensity()

Abstract method for fetching the current carbon intensity of the set location of the class.

Source code in zeus/monitor/carbon.py
64
65
66
67
@abc.abstractmethod
def get_recent_carbon_intensity(self) -> dict[datetime, float]:
    """Abstract method for fetching the current carbon intensity of the set location of the class."""
    pass

ElectrictyMapsClient

Bases: CarbonIntensityProvider

Carbon Intensity Provider with ElectricityMaps API.

Reference:

  1. ElectricityMaps
  2. ElectricityMaps API
  3. ElectricityMaps GitHub
Source code in zeus/monitor/carbon.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class ElectrictyMapsClient(CarbonIntensityProvider):
    """Carbon Intensity Provider with ElectricityMaps API.

    Reference:

    1. [ElectricityMaps](https://www.electricitymaps.com/)
    2. [ElectricityMaps API](https://static.electricitymaps.com/api/docs/index.html)
    3. [ElectricityMaps GitHub](https://github.com/electricitymaps/electricitymaps-contrib)
    """

    def __init__(
        self,
        location: tuple[float, float],
        estimate: bool = False,
        emission_factor_type: Literal["direct", "lifecycle"] = "direct",
    ) -> None:
        """Iniitializes ElectricityMaps Carbon Provider.

        Args:
            location: tuple of latitude and longitude (latitude, longitude)
            estimate: bool to toggle whether carbon intensity is estimated or not
            emission_factor_type: emission factor to be measured (`direct` or `lifestyle`)
        """
        self.lat, self.long = location
        self.estimate = estimate
        self.emission_factor_type = emission_factor_type

    def get_current_carbon_intensity(self) -> float:
        """Fetches current carbon intensity of the location of the class.

        In some locations, there is no recent carbon intensity data. `self.estimate` can be used to approximate the carbon intensity in such cases.
        """
        try:
            url = (
                f"https://api.electricitymap.org/v3/carbon-intensity/latest?lat={self.lat}&lon={self.long}"
                + f"&disableEstimations={not self.estimate}&emissionFactorType={self.emission_factor_type}"
            )
            resp = requests.get(url)
        except requests.exceptions.RequestException as e:
            raise ZeusCarbonIntensityHTTPError(
                f"Failed to retrieve current carbon intensity measurement: {e}"
            ) from e

        try:
            return resp.json()["carbonIntensity"]
        except KeyError as e:
            # Raise exception when carbonIntensity does not exist in response
            raise ZeusCarbonIntensityNotFoundError(
                f"Current carbon intensity measurement not found at `({self.lat}, {self.long})` "
                f"with estimate set to `{self.estimate}` and emission_factor_type set to `{self.emission_factor_type}`\n"
                f"JSON Response: {resp.text}"
            ) from e

    def get_recent_carbon_intensity(self) -> dict[datetime, float]:
        """Fetches recent (within last 24 hours) carbon intensity of the location of the class.

        In some locations, there is no recent carbon intensity data. `self.estimate` can be used to approximate the carbon intensity in such cases.
        """
        try:
            url = (
                f"https://api.electricitymap.org/v3/carbon-intensity/history?lat={self.lat}&lon={self.long}"
                + f"&disableEstimations={not self.estimate}&emissionFactorType={self.emission_factor_type}"
            )
            resp = requests.get(url)
        except requests.exceptions.RequestException as e:
            raise ZeusCarbonIntensityHTTPError(
                f"Failed to retrieve recent carbon intensity measurement: {e}"
            ) from e

        try:
            recent_carbon_intensities: dict[datetime, float] = {
                parser.parse(measurement["datetime"]): measurement["carbonIntensity"]
                for measurement in resp.json()["history"]
            }
            return recent_carbon_intensities
        except KeyError as e:
            # Raise exception when carbonIntensity does not exist in response
            raise ZeusCarbonIntensityNotFoundError(
                f"Recent carbon intensity measurement not found at `({self.lat}, {self.long})` "
                f"with estimate set to `{self.estimate}` and emission_factor_type set to `{self.emission_factor_type}`\n"
                f"JSON Response: {resp.text}"
            ) from e

    @property
    def update_period(self) -> timedelta:
        """Returns timedelta for how long each carbon intensity value in the history dict remains current."""
        return timedelta(hours=1)

    @property
    def history_length(self) -> int:
        """Returns number of carbon intensity values in history dict."""
        return 24

update_period property

update_period

Returns timedelta for how long each carbon intensity value in the history dict remains current.

history_length property

history_length

Returns number of carbon intensity values in history dict.

__init__

__init__(
    location, estimate=False, emission_factor_type="direct"
)

Parameters:

Name Type Description Default
location tuple[float, float]

tuple of latitude and longitude (latitude, longitude)

required
estimate bool

bool to toggle whether carbon intensity is estimated or not

False
emission_factor_type Literal['direct', 'lifecycle']

emission factor to be measured (direct or lifestyle)

'direct'
Source code in zeus/monitor/carbon.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    location: tuple[float, float],
    estimate: bool = False,
    emission_factor_type: Literal["direct", "lifecycle"] = "direct",
) -> None:
    """Iniitializes ElectricityMaps Carbon Provider.

    Args:
        location: tuple of latitude and longitude (latitude, longitude)
        estimate: bool to toggle whether carbon intensity is estimated or not
        emission_factor_type: emission factor to be measured (`direct` or `lifestyle`)
    """
    self.lat, self.long = location
    self.estimate = estimate
    self.emission_factor_type = emission_factor_type

get_current_carbon_intensity

get_current_carbon_intensity()

Fetches current carbon intensity of the location of the class.

In some locations, there is no recent carbon intensity data. self.estimate can be used to approximate the carbon intensity in such cases.

Source code in zeus/monitor/carbon.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_current_carbon_intensity(self) -> float:
    """Fetches current carbon intensity of the location of the class.

    In some locations, there is no recent carbon intensity data. `self.estimate` can be used to approximate the carbon intensity in such cases.
    """
    try:
        url = (
            f"https://api.electricitymap.org/v3/carbon-intensity/latest?lat={self.lat}&lon={self.long}"
            + f"&disableEstimations={not self.estimate}&emissionFactorType={self.emission_factor_type}"
        )
        resp = requests.get(url)
    except requests.exceptions.RequestException as e:
        raise ZeusCarbonIntensityHTTPError(
            f"Failed to retrieve current carbon intensity measurement: {e}"
        ) from e

    try:
        return resp.json()["carbonIntensity"]
    except KeyError as e:
        # Raise exception when carbonIntensity does not exist in response
        raise ZeusCarbonIntensityNotFoundError(
            f"Current carbon intensity measurement not found at `({self.lat}, {self.long})` "
            f"with estimate set to `{self.estimate}` and emission_factor_type set to `{self.emission_factor_type}`\n"
            f"JSON Response: {resp.text}"
        ) from e

get_recent_carbon_intensity

get_recent_carbon_intensity()

Fetches recent (within last 24 hours) carbon intensity of the location of the class.

In some locations, there is no recent carbon intensity data. self.estimate can be used to approximate the carbon intensity in such cases.

Source code in zeus/monitor/carbon.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_recent_carbon_intensity(self) -> dict[datetime, float]:
    """Fetches recent (within last 24 hours) carbon intensity of the location of the class.

    In some locations, there is no recent carbon intensity data. `self.estimate` can be used to approximate the carbon intensity in such cases.
    """
    try:
        url = (
            f"https://api.electricitymap.org/v3/carbon-intensity/history?lat={self.lat}&lon={self.long}"
            + f"&disableEstimations={not self.estimate}&emissionFactorType={self.emission_factor_type}"
        )
        resp = requests.get(url)
    except requests.exceptions.RequestException as e:
        raise ZeusCarbonIntensityHTTPError(
            f"Failed to retrieve recent carbon intensity measurement: {e}"
        ) from e

    try:
        recent_carbon_intensities: dict[datetime, float] = {
            parser.parse(measurement["datetime"]): measurement["carbonIntensity"]
            for measurement in resp.json()["history"]
        }
        return recent_carbon_intensities
    except KeyError as e:
        # Raise exception when carbonIntensity does not exist in response
        raise ZeusCarbonIntensityNotFoundError(
            f"Recent carbon intensity measurement not found at `({self.lat}, {self.long})` "
            f"with estimate set to `{self.estimate}` and emission_factor_type set to `{self.emission_factor_type}`\n"
            f"JSON Response: {resp.text}"
        ) from e

CarbonEmissionMeasurement dataclass

Measurement result of one window.

Attributes:

Name Type Description
time float

Time elapsed (in seconds) during the measurement window.

gpu_energy dict[int, float]

Maps GPU indices to the energy consumed (in Joules) during the measurement window. GPU indices are from the DL framework's perspective after applying CUDA_VISIBLE_DEVICES.

gpu_carbon_emission dict[int, float]

Maps GPU indices to the carbon emission produced (in gCO2eq) during the measurement window. GPU indices are from the DL framework's perspective after applying CUDA_VISIBLE_DEVICES.

cpu_energy dict[int, float] | None

Maps CPU indices to the energy consumed (in Joules) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available.

cpu_carbon_emission dict[int, float] | None

Maps CPU indices to the carbon emission produced (in gCO2eq) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available.

dram_energy dict[int, float] | None

Maps CPU indices to the energy consumed (in Joules) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d) and DRAM measurements are taken from sub-packages within each powerzone. This can be 'None' if CPU measurement is not available or DRAM measurement is not available.

dram_carbon_emission dict[int, float] | None

Maps CPU indices to the carbon emission produced (in gCO2eq) during the measurement window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if CPU measurement is not available or DRAM measurement is not available.

Source code in zeus/monitor/carbon.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@dataclass
class CarbonEmissionMeasurement:
    """Measurement result of one window.

    Attributes:
        time: Time elapsed (in seconds) during the measurement window.
        gpu_energy: Maps GPU indices to the energy consumed (in Joules) during the
            measurement window. GPU indices are from the DL framework's perspective
            after applying `CUDA_VISIBLE_DEVICES`.
        gpu_carbon_emission: Maps GPU indices to the carbon emission produced (in gCO2eq) during the
            measurement window. GPU indices are from the DL framework's perspective
            after applying `CUDA_VISIBLE_DEVICES`.
        cpu_energy: Maps CPU indices to the energy consumed (in Joules) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can
            be 'None' if CPU measurement is not available.
        cpu_carbon_emission: Maps CPU indices to the carbon emission produced (in gCO2eq) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can
            be 'None' if CPU measurement is not available.
        dram_energy: Maps CPU indices to the energy consumed (in Joules) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d) and DRAM
            measurements are taken from sub-packages within each powerzone. This can be 'None' if
            CPU measurement is not available or DRAM measurement is not available.
        dram_carbon_emission: Maps CPU indices to the carbon emission produced (in gCO2eq) during the measurement
            window. Each CPU index refers to one powerzone exposed by RAPL (intel-rapl:d). This can be 'None' if
            CPU measurement is not available or DRAM measurement is not available.
    """

    time: float
    gpu_energy: dict[int, float]
    gpu_carbon_emission: dict[int, float]
    cpu_energy: dict[int, float] | None = None
    cpu_carbon_emission: dict[int, float] | None = None
    dram_energy: dict[int, float] | None = None
    dram_carbon_emission: dict[int, float] | None = None

Op

Bases: Enum

Enum used to communicate between CarbonEmissionMonitor and _polling_process.

Source code in zeus/monitor/carbon.py
212
213
214
215
216
217
class Op(Enum):
    """Enum used to communicate between CarbonEmissionMonitor and _polling_process."""

    BEGIN = 0
    END = 1
    NEXTITER = 2

CarbonEmissionMonitor

Measure the carbon emission, GPU energy, and time consumption of a block of code.

Works for multi-GPU and heterogeneous GPU types. Aware of CUDA_VISIBLE_DEVICES. For instance, if CUDA_VISIBLE_DEVICES=2,3, GPU index 1 passed into gpu_indices will be interpreted as CUDA device 3.

You can mark the beginning and end of a measurement window, during which the carbon emission, GPU energy, and time consumed will be recorded. Multiple concurrent measurement windows are supported.

Note

carbon_intensity_provider must have estimate turned on because during some hours, ElectricityMaps does not have carbon intensity values available and has to rely on estimation.

Source code in zeus/monitor/carbon.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
class CarbonEmissionMonitor:
    """Measure the carbon emission, GPU energy, and time consumption of a block of code.

    Works for multi-GPU and heterogeneous GPU types. Aware of `CUDA_VISIBLE_DEVICES`.
    For instance, if `CUDA_VISIBLE_DEVICES=2,3`, GPU index `1` passed into `gpu_indices`
    will be interpreted as CUDA device `3`.

    You can mark the beginning and end of a measurement window, during which the carbon
    emission, GPU energy, and time consumed will be recorded. Multiple concurrent
    measurement windows are supported.

    !!! Note
        `carbon_intensity_provider` must have `estimate` turned on because during some hours,
        ElectricityMaps does not have carbon intensity values available and has to rely on
        estimation.
    """

    def __init__(
        self,
        carbon_intensity_provider: CarbonIntensityProvider,
        gpu_indices: list[int] | None = None,
        cpu_indices: list[int] | None = None,
        sync_execution_with: Literal["torch", "jax"] = "torch",
    ) -> None:
        """Iniitializes Carbon Emission Monitor.

        Args:
            carbon_intensity_provider: provider for which carbon intensity values will be fetched from
            gpu_indices: Indices of all the CUDA devices to monitor. Time/Energy measurements
                will begin and end at the same time for these GPUs (i.e., synchronized).
                If None, all the GPUs available will be used. `CUDA_VISIBLE_DEVICES`
                is respected if set, e.g., GPU index `1` passed into `gpu_indices` when
                `CUDA_VISIBLE_DEVICES=2,3` will be interpreted as CUDA device `3`.
                `CUDA_VISIBLE_DEVICES`s formatted with comma-separated indices are supported.
            cpu_indices: Indices of the CPU packages to monitor. If None, all CPU packages will
                be used.
            sync_execution_with: Deep learning framework to use to synchronize CPU/GPU computations.
                Defaults to `"torch"`, in which case `torch.cuda.synchronize` will be used.
                See [`sync_execution`][zeus.utils.framework.sync_execution] for more details.
        """
        self.zeus_monitor = ZeusMonitor(
            gpu_indices=gpu_indices,
            cpu_indices=cpu_indices,
            sync_execution_with=sync_execution_with,
        )
        self.carbon_intensity_provider = carbon_intensity_provider
        self.current_keys = set()

        # set up process and shared queues
        self.context = mp.get_context("spawn")
        self.command_q = self.context.Queue()
        self.finished_q = self.context.Queue()

    def begin_window(self, key: str, sync_execution: bool = True) -> None:
        """Begin a new measurement window.

        Args:
            key: Unique name of the measurement window.
            sync_execution: Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window. For instance, PyTorch
                and JAX will run GPU computations asynchronously, and waiting them to
                finish is necessary to ensure that the measurement window captures all
                and only the computations dispatched within the window.
        """
        # check if key is already used
        if key in self.current_keys:
            raise ValueError(f"Measurement window '{key}' already exists")
        self.current_keys.add(key)

        # start window
        self.zeus_monitor.begin_window(key, sync_execution=sync_execution)

        # if there were previously no active windows, start polling process
        if len(self.current_keys) == 1:
            self.polling_process = self.context.Process(
                target=_polling_process,
                args=(
                    self.command_q,
                    self.finished_q,
                    self.zeus_monitor.gpu_indices,
                    self.zeus_monitor.cpu_indices,
                    self.carbon_intensity_provider,
                ),
            )
            self.polling_process.start()

        # start subwindows
        self.command_q.put((Op.BEGIN, key))

    def end_window(
        self, key: str, sync_execution: bool = True
    ) -> CarbonEmissionMeasurement:
        """End a measurement window and return the time, energy consumption, and carbon emission.

        Args:
            key: Name of an active measurement window.
            sync_execution: Whether to wait for asynchronously dispatched computations
                to finish before starting the measurement window. For instance, PyTorch
                and JAX will run GPU computations asynchronously, and waiting them to
                finish is necessary to ensure that the measurement window captures all
                and only the computations dispatched within the window.
        """
        # check if begin_window has been called with key before
        if key not in self.current_keys:
            raise ValueError(f"Measurement window '{key}' does not exist")

        # end window
        self.command_q.put((Op.END, key))
        (
            gpu_carbon_emissions,
            cpu_carbon_emissions,
            dram_carbon_emissions,
        ) = self.finished_q.get()
        self.current_keys.remove(key)

        overall_measurement = self.zeus_monitor.end_window(
            key, sync_execution=sync_execution
        )

        measurement = CarbonEmissionMeasurement(
            time=overall_measurement.time,
            gpu_energy=overall_measurement.gpu_energy,
            cpu_energy=overall_measurement.cpu_energy,
            dram_energy=overall_measurement.dram_energy,
            gpu_carbon_emission=gpu_carbon_emissions,
            cpu_carbon_emission=cpu_carbon_emissions or None,
            dram_carbon_emission=dram_carbon_emissions or None,
        )

        return measurement

__init__

__init__(
    carbon_intensity_provider,
    gpu_indices=None,
    cpu_indices=None,
    sync_execution_with="torch",
)

Parameters:

Name Type Description Default
carbon_intensity_provider CarbonIntensityProvider

provider for which carbon intensity values will be fetched from

required
gpu_indices list[int] | None

Indices of all the CUDA devices to monitor. Time/Energy measurements will begin and end at the same time for these GPUs (i.e., synchronized). If None, all the GPUs available will be used. CUDA_VISIBLE_DEVICES is respected if set, e.g., GPU index 1 passed into gpu_indices when CUDA_VISIBLE_DEVICES=2,3 will be interpreted as CUDA device 3. CUDA_VISIBLE_DEVICESs formatted with comma-separated indices are supported.

None
cpu_indices list[int] | None

Indices of the CPU packages to monitor. If None, all CPU packages will be used.

None
sync_execution_with Literal['torch', 'jax']

Deep learning framework to use to synchronize CPU/GPU computations. Defaults to "torch", in which case torch.cuda.synchronize will be used. See sync_execution for more details.

'torch'
Source code in zeus/monitor/carbon.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def __init__(
    self,
    carbon_intensity_provider: CarbonIntensityProvider,
    gpu_indices: list[int] | None = None,
    cpu_indices: list[int] | None = None,
    sync_execution_with: Literal["torch", "jax"] = "torch",
) -> None:
    """Iniitializes Carbon Emission Monitor.

    Args:
        carbon_intensity_provider: provider for which carbon intensity values will be fetched from
        gpu_indices: Indices of all the CUDA devices to monitor. Time/Energy measurements
            will begin and end at the same time for these GPUs (i.e., synchronized).
            If None, all the GPUs available will be used. `CUDA_VISIBLE_DEVICES`
            is respected if set, e.g., GPU index `1` passed into `gpu_indices` when
            `CUDA_VISIBLE_DEVICES=2,3` will be interpreted as CUDA device `3`.
            `CUDA_VISIBLE_DEVICES`s formatted with comma-separated indices are supported.
        cpu_indices: Indices of the CPU packages to monitor. If None, all CPU packages will
            be used.
        sync_execution_with: Deep learning framework to use to synchronize CPU/GPU computations.
            Defaults to `"torch"`, in which case `torch.cuda.synchronize` will be used.
            See [`sync_execution`][zeus.utils.framework.sync_execution] for more details.
    """
    self.zeus_monitor = ZeusMonitor(
        gpu_indices=gpu_indices,
        cpu_indices=cpu_indices,
        sync_execution_with=sync_execution_with,
    )
    self.carbon_intensity_provider = carbon_intensity_provider
    self.current_keys = set()

    # set up process and shared queues
    self.context = mp.get_context("spawn")
    self.command_q = self.context.Queue()
    self.finished_q = self.context.Queue()

begin_window

begin_window(key, sync_execution=True)

Begin a new measurement window.

Parameters:

Name Type Description Default
key str

Unique name of the measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window. For instance, PyTorch and JAX will run GPU computations asynchronously, and waiting them to finish is necessary to ensure that the measurement window captures all and only the computations dispatched within the window.

True
Source code in zeus/monitor/carbon.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def begin_window(self, key: str, sync_execution: bool = True) -> None:
    """Begin a new measurement window.

    Args:
        key: Unique name of the measurement window.
        sync_execution: Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window. For instance, PyTorch
            and JAX will run GPU computations asynchronously, and waiting them to
            finish is necessary to ensure that the measurement window captures all
            and only the computations dispatched within the window.
    """
    # check if key is already used
    if key in self.current_keys:
        raise ValueError(f"Measurement window '{key}' already exists")
    self.current_keys.add(key)

    # start window
    self.zeus_monitor.begin_window(key, sync_execution=sync_execution)

    # if there were previously no active windows, start polling process
    if len(self.current_keys) == 1:
        self.polling_process = self.context.Process(
            target=_polling_process,
            args=(
                self.command_q,
                self.finished_q,
                self.zeus_monitor.gpu_indices,
                self.zeus_monitor.cpu_indices,
                self.carbon_intensity_provider,
            ),
        )
        self.polling_process.start()

    # start subwindows
    self.command_q.put((Op.BEGIN, key))

end_window

end_window(key, sync_execution=True)

End a measurement window and return the time, energy consumption, and carbon emission.

Parameters:

Name Type Description Default
key str

Name of an active measurement window.

required
sync_execution bool

Whether to wait for asynchronously dispatched computations to finish before starting the measurement window. For instance, PyTorch and JAX will run GPU computations asynchronously, and waiting them to finish is necessary to ensure that the measurement window captures all and only the computations dispatched within the window.

True
Source code in zeus/monitor/carbon.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def end_window(
    self, key: str, sync_execution: bool = True
) -> CarbonEmissionMeasurement:
    """End a measurement window and return the time, energy consumption, and carbon emission.

    Args:
        key: Name of an active measurement window.
        sync_execution: Whether to wait for asynchronously dispatched computations
            to finish before starting the measurement window. For instance, PyTorch
            and JAX will run GPU computations asynchronously, and waiting them to
            finish is necessary to ensure that the measurement window captures all
            and only the computations dispatched within the window.
    """
    # check if begin_window has been called with key before
    if key not in self.current_keys:
        raise ValueError(f"Measurement window '{key}' does not exist")

    # end window
    self.command_q.put((Op.END, key))
    (
        gpu_carbon_emissions,
        cpu_carbon_emissions,
        dram_carbon_emissions,
    ) = self.finished_q.get()
    self.current_keys.remove(key)

    overall_measurement = self.zeus_monitor.end_window(
        key, sync_execution=sync_execution
    )

    measurement = CarbonEmissionMeasurement(
        time=overall_measurement.time,
        gpu_energy=overall_measurement.gpu_energy,
        cpu_energy=overall_measurement.cpu_energy,
        dram_energy=overall_measurement.dram_energy,
        gpu_carbon_emission=gpu_carbon_emissions,
        cpu_carbon_emission=cpu_carbon_emissions or None,
        dram_carbon_emission=dram_carbon_emissions or None,
    )

    return measurement

get_ip_lat_long

get_ip_lat_long()

Retrieve the latitude and longitude of the current IP position.

Source code in zeus/monitor/carbon.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def get_ip_lat_long() -> tuple[float, float]:
    """Retrieve the latitude and longitude of the current IP position."""
    try:
        ip_url = "http://ipinfo.io/json"
        resp = requests.get(ip_url)
        loc = resp.json()["loc"]
        lat, long = map(float, loc.split(","))
        logger.info("Retrieved latitude and longitude: %s, %s", lat, long)
        return lat, long
    except requests.exceptions.RequestException as e:
        logger.exception(
            "Failed to retrieve current latitude and longitude of IP: %s", e
        )
        raise