Monitors the energy and time consumption inside a training loop.
Skip the first skip_steps
steps and profile for the next profile_steps
. Does
nothing after that, before you call reset
.
You can check whether profiling is done (i.e., skip_steps + profile_steps
passed)
through is_done
and query results through
total_energy
,
avg_energy
,
total_time
, and
avg_time
.
Integration example
zeus_ctx = zeus.monitor.ZeusMonitorContext(skip_steps=10, profile_steps=10)
for step, (x, y) in enumerate(train_dataloader):
print(f"Training step {step}.")
# Wrap the code range of one training step.
# zeus_ctx will ignore the first 10 steps and measure the next 10 steps.
with zeus_ctx.step():
training_step(x, y)
# An alternative way if you don't want to use context managers.
zeus_ctx.start_step()
training_step(x, y)
zeus_ctx.finish_step()
# Check if 20 steps passed and query results.
if zeus_ctx.is_done:
print(
f"{zeus_ctx.profile_steps} training steps "
f"consumed {zeus_ctx.total_energy} Joules in {zeus_ctx.total_time} seconds."
)
zeus_ctx.reset()
Source code in zeus/monitor.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238 | class ZeusMonitorContext:
"""Monitors the energy and time consumption inside a training loop.
Skip the first `skip_steps` steps and profile for the next `profile_steps`. Does
nothing after that, before you call [`reset`][zeus.monitor.ZeusMonitorContext.reset].
You can check whether profiling is done (i.e., `skip_steps + profile_steps` passed)
through [`is_done`][zeus.monitor.ZeusMonitorContext.is_done] and query results through
[`total_energy`][zeus.monitor.ZeusMonitorContext.total_energy],
[`avg_energy`][zeus.monitor.ZeusMonitorContext.avg_energy],
[`total_time`][zeus.monitor.ZeusMonitorContext.total_time], and
[`avg_time`][zeus.monitor.ZeusMonitorContext.avg_time].
## Integration example
```python
zeus_ctx = zeus.monitor.ZeusMonitorContext(skip_steps=10, profile_steps=10)
for step, (x, y) in enumerate(train_dataloader):
print(f"Training step {step}.")
# Wrap the code range of one training step.
# zeus_ctx will ignore the first 10 steps and measure the next 10 steps.
with zeus_ctx.step():
training_step(x, y)
# An alternative way if you don't want to use context managers.
zeus_ctx.start_step()
training_step(x, y)
zeus_ctx.finish_step()
# Check if 20 steps passed and query results.
if zeus_ctx.is_done:
print(
f"{zeus_ctx.profile_steps} training steps "
f"consumed {zeus_ctx.total_energy} Joules in {zeus_ctx.total_time} seconds."
)
zeus_ctx.reset()
```
"""
def __init__(
self,
skip_steps: int = 10,
profile_steps: int = 10,
device_id: int = 0,
zeus_monitor_path: str = "zeus_monitor",
zeus_monitor_sleep_ms: int = 0,
zeus_monitor_log_dir: str | None = None,
) -> None:
"""Create a Zeus monitor context.
Args:
skip_steps: The number of steps to skip when profiling energy consumption.
profile_steps: The number of steps to profile and average over for energy consumption.
device_id: CUDA device ID to run the monitor for.
zeus_monitor_path: `argv[0]` to use when spawning the Zeus monitor.
zeus_monitor_sleep_ms: How long the Zeus monitor should sleep after sampling power.
zeus_monitor_log_dir: The directory to put the monitor log file. A temporary file is
used if not specified.
"""
# Save arguments.
self.skip_steps = skip_steps
self.profile_steps = profile_steps
self.device_id = device_id
# Spawn the Zeus monitor.
if zeus_monitor_log_dir:
self._power_csv = f"{zeus_monitor_log_dir}/gpu{self.device_id}.power.csv"
else:
self._power_csv = tempfile.mkstemp(
suffix=f"+gpu{self.device_id}.power.csv"
)[1]
self._monitor = subprocess.Popen(
args=[
zeus_monitor_path,
self._power_csv,
"0",
str(zeus_monitor_sleep_ms),
str(self.device_id),
],
stdin=subprocess.DEVNULL,
)
self._time_origin = time.monotonic()
# Make sure the monitor is eventually stopped.
def exit_hook():
self._monitor.send_signal(signal.SIGINT)
time.sleep(2.0)
self._monitor.kill()
atexit.register(exit_hook)
# Set internal profiling states.
self._started_steps: int = 0
self._finished_steps: int = 0
self._profile_start_times: list[float] = []
self._profile_end_times: list[float] = []
self._metric_cache: dict[str, float] = {}
def start_step(self) -> None:
"""Mark the beginning of one step."""
self._started_steps += 1
if (
self.skip_steps
< self._started_steps
<= self.skip_steps + self.profile_steps
):
torch.cuda.synchronize()
current_time = time.monotonic()
self._profile_start_times.append(current_time - self._time_origin)
def finish_step(self) -> None:
"""Mark the end of one step."""
self._finished_steps += 1
if (
self.skip_steps
< self._finished_steps
<= self.skip_steps + self.profile_steps
):
torch.cuda.synchronize()
current_time = time.monotonic()
self._profile_end_times.append(current_time - self._time_origin)
@contextmanager
def step(self) -> Generator[None, None, None]:
"""Wrap one training step to mark start and finish times."""
try:
self.start_step()
yield
finally:
self.finish_step()
@property
def is_done(self) -> bool:
"""Return whether the specified profiling steps are done."""
if self._started_steps != self._finished_steps:
raise RuntimeError(
"`is_done` should be called outside of the code range marked as a 'step'."
)
return self._finished_steps >= self.skip_steps + self.profile_steps
@property
def total_energy(self) -> float:
"""Return the total energy consumption of `profile_steps` steps."""
if not self.is_done:
raise RuntimeError("Metrics are accessible only after profiling is done.")
try:
return self._metric_cache["total_energy"]
except KeyError:
metric = sum(
compute_energy(self._power_csv, start=start, end=end)
for start, end in zip(
self._profile_start_times, self._profile_end_times
)
)
self._metric_cache["total_energy"] = metric
return metric
@property
def avg_energy(self) -> float:
"""Return the average energy consumption over `profiler_steps` steps."""
if not self.is_done:
raise RuntimeError("Metrics are accessible only after profiling is done.")
try:
return self._metric_cache["avg_energy"]
except KeyError:
metric = self.total_energy / self.profile_steps
self._metric_cache["avg_energy"] = metric
return metric
@property
def total_time(self) -> float:
"""Return the total time consumption of `profile_steps` steps."""
if not self.is_done:
raise RuntimeError("Metrics are accessible only after profiling is done.")
try:
return self._metric_cache["total_time"]
except KeyError:
metric = sum(
end - start
for start, end in zip(
self._profile_start_times, self._profile_end_times
)
)
self._metric_cache["total_time"] = metric
return metric
@property
def avg_time(self) -> float:
"""Return the average time consumption over `profiler_steps` steps."""
if not self.is_done:
raise RuntimeError("Metrics are accessible only after profiling is done.")
try:
return self._metric_cache["avg_time"]
except KeyError:
metric = self.total_time / self.profile_steps
self._metric_cache["avg_time"] = metric
return metric
def reset(self) -> None:
"""Reset internal profile states."""
self._started_steps = 0
self._finished_steps = 0
self._profile_start_times = []
self._profile_end_times = []
self._metric_cache = {}
|
is_done
property
Return whether the specified profiling steps are done.
total_energy
property
Return the total energy consumption of profile_steps
steps.
avg_energy
property
Return the average energy consumption over profiler_steps
steps.
total_time
property
Return the total time consumption of profile_steps
steps.
avg_time
property
Return the average time consumption over profiler_steps
steps.
__init__
__init__(
skip_steps=10,
profile_steps=10,
device_id=0,
zeus_monitor_path="zeus_monitor",
zeus_monitor_sleep_ms=0,
zeus_monitor_log_dir=None,
)
Parameters:
Name |
Type |
Description |
Default |
skip_steps |
int
|
The number of steps to skip when profiling energy consumption. |
10
|
profile_steps |
int
|
The number of steps to profile and average over for energy consumption. |
10
|
device_id |
int
|
CUDA device ID to run the monitor for. |
0
|
zeus_monitor_path |
str
|
argv[0] to use when spawning the Zeus monitor.
|
'zeus_monitor'
|
zeus_monitor_sleep_ms |
int
|
How long the Zeus monitor should sleep after sampling power. |
0
|
zeus_monitor_log_dir |
str | None
|
The directory to put the monitor log file. A temporary file is
used if not specified. |
None
|
Source code in zeus/monitor.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 | def __init__(
self,
skip_steps: int = 10,
profile_steps: int = 10,
device_id: int = 0,
zeus_monitor_path: str = "zeus_monitor",
zeus_monitor_sleep_ms: int = 0,
zeus_monitor_log_dir: str | None = None,
) -> None:
"""Create a Zeus monitor context.
Args:
skip_steps: The number of steps to skip when profiling energy consumption.
profile_steps: The number of steps to profile and average over for energy consumption.
device_id: CUDA device ID to run the monitor for.
zeus_monitor_path: `argv[0]` to use when spawning the Zeus monitor.
zeus_monitor_sleep_ms: How long the Zeus monitor should sleep after sampling power.
zeus_monitor_log_dir: The directory to put the monitor log file. A temporary file is
used if not specified.
"""
# Save arguments.
self.skip_steps = skip_steps
self.profile_steps = profile_steps
self.device_id = device_id
# Spawn the Zeus monitor.
if zeus_monitor_log_dir:
self._power_csv = f"{zeus_monitor_log_dir}/gpu{self.device_id}.power.csv"
else:
self._power_csv = tempfile.mkstemp(
suffix=f"+gpu{self.device_id}.power.csv"
)[1]
self._monitor = subprocess.Popen(
args=[
zeus_monitor_path,
self._power_csv,
"0",
str(zeus_monitor_sleep_ms),
str(self.device_id),
],
stdin=subprocess.DEVNULL,
)
self._time_origin = time.monotonic()
# Make sure the monitor is eventually stopped.
def exit_hook():
self._monitor.send_signal(signal.SIGINT)
time.sleep(2.0)
self._monitor.kill()
atexit.register(exit_hook)
# Set internal profiling states.
self._started_steps: int = 0
self._finished_steps: int = 0
self._profile_start_times: list[float] = []
self._profile_end_times: list[float] = []
self._metric_cache: dict[str, float] = {}
|
start_step
Mark the beginning of one step.
Source code in zeus/monitor.py
132
133
134
135
136
137
138
139
140
141
142 | def start_step(self) -> None:
"""Mark the beginning of one step."""
self._started_steps += 1
if (
self.skip_steps
< self._started_steps
<= self.skip_steps + self.profile_steps
):
torch.cuda.synchronize()
current_time = time.monotonic()
self._profile_start_times.append(current_time - self._time_origin)
|
finish_step
Mark the end of one step.
Source code in zeus/monitor.py
144
145
146
147
148
149
150
151
152
153
154 | def finish_step(self) -> None:
"""Mark the end of one step."""
self._finished_steps += 1
if (
self.skip_steps
< self._finished_steps
<= self.skip_steps + self.profile_steps
):
torch.cuda.synchronize()
current_time = time.monotonic()
self._profile_end_times.append(current_time - self._time_origin)
|
step
Wrap one training step to mark start and finish times.
Source code in zeus/monitor.py
156
157
158
159
160
161
162
163 | @contextmanager
def step(self) -> Generator[None, None, None]:
"""Wrap one training step to mark start and finish times."""
try:
self.start_step()
yield
finally:
self.finish_step()
|
reset
Reset internal profile states.
Source code in zeus/monitor.py
232
233
234
235
236
237
238 | def reset(self) -> None:
"""Reset internal profile states."""
self._started_steps = 0
self._finished_steps = 0
self._profile_start_times = []
self._profile_end_times = []
self._metric_cache = {}
|