Skip to content

testing

zeus.utils.testing

Utilities for testing.

ReplayZeusMonitor

Bases: ZeusMonitor

A mock ZeusMonitor that replays windows recorded by a real monitor.

This class is for testing only. Based on a CSV log file that records the time and energy measurements of ZeusMonitor measurement windows, users can drop-in replace ZeusMonitor with this class to replay the measurement windows and fast forward training and time/energy measurement.

The methods exposed is identical to or a superset of ZeusMonitor, but behaves differently. Instead of monitoring the GPU, it replays events from a log file. The log file generated by ZeusMonitor (log_file) is guaranteed to be compatible and will replay time and energy measurements just like how the real monitor experienced them. Note that in the case of concurrent ongoing measurement windows, the log rows file should record windows in the order of end_window calls.

Attributes:

Name Type Description
gpu_indices `list[int]`

Indices of all the CUDA devices to monitor.

Source code in zeus/utils/testing.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class ReplayZeusMonitor(ZeusMonitor):
    """A mock ZeusMonitor that replays windows recorded by a real monitor.

    This class is for testing only. Based on a CSV log file that records the time
    and energy measurements of `ZeusMonitor` measurement windows, users can drop-in
    replace `ZeusMonitor` with this class to replay the measurement windows and
    *fast forward* training and time/energy measurement.

    The methods exposed is identical to or a superset of `ZeusMonitor`, but behaves
    differently. Instead of monitoring the GPU, it replays events from a log file.
    The log file generated by `ZeusMonitor` (`log_file`) is guaranteed to be compatible
    and will replay time and energy measurements just like how the real monitor
    experienced them. Note that in the case of concurrent ongoing measurement windows,
    the log rows file should record windows in the order of `end_window` calls.

    Attributes:
        gpu_indices (`list[int]`): Indices of all the CUDA devices to monitor.
    """

    def __init__(
        self,
        gpu_indices: list[int] | None = None,
        approx_instant_energy: bool = False,
        log_file: str | Path | None = None,
        ignore_sync_execution: bool = False,
        match_window_name: bool = True,
    ) -> None:
        """Initialize the replay monitor.

        The log file should be a CSV file with the following header (e.g. gpu_indices=[0, 2]):
        `start_time,window_name,elapsed_time,gpu0_energy,gpu2_energy`

        Args:
            gpu_indices: Indices of all the CUDA devices to monitor. This should be consistent
                with the indices used in the log file. If `None`, GPU indices will be inferred
                from the log file header. Does not respect `CUDA_VISIBLE_DEVICES`.
                (Default: `None`)
            approx_instant_energy: Whether to approximate the instant energy consumption. Not used.
            log_file: Path to the log CSV file to replay events from. `None` is not allowed.
            ignore_sync_execution: Whether to ignore `sync_execution` calls. (Default: `False`)
            match_window_name: Whether to make sure window names match. (Default: `True`)
        """
        if log_file is None:
            raise ValueError("`log_file` cannot be `None` for `ReplayZeusMonitor`.")

        self.approx_instant_energy = approx_instant_energy
        self.log_file = open(log_file)
        self.ignore_sync_execution = ignore_sync_execution
        self.match_window_name = match_window_name

        # Infer GPU indices from the log file if not provided.
        header = self.log_file.readline()
        if gpu_indices is None:
            gpu_indices = [
                int(gpu.split("_")[0][3:]) for gpu in header.split(",")[3:] if gpu
            ]
        self.nvml_gpu_indices = self.gpu_indices = gpu_indices

        self.logger = get_logger(type(self).__name__)
        self.logger.info(
            "Replaying from '%s' with GPU indices %s", log_file, gpu_indices
        )

        # Keep track of ongoing measurement windows.
        self.ongoing_windows = []

    def begin_window(self, key: str, sync_execution: bool = True) -> None:
        """Begin a new window.

        This method just pushes the key into a list of ongoing measurement windows,
        and just makes sure it's unique.

        Args:
            key: Name of the measurement window.
            sync_execution: Whether to synchronize CUDA before starting the measurement window.
                (Default: `True`)
        """
        if key in self.ongoing_windows:
            raise RuntimeError(f"Window {key} is already ongoing.")
        self.ongoing_windows.append(key)

        if not self.ignore_sync_execution and sync_execution:
            sync_execution_fn(self.gpu_indices)

        self.logger.info("Measurement window '%s' started.", key)

    def end_window(
        self, key: str, sync_execution: bool = True, cancel: bool = False
    ) -> Measurement:
        """End an ongoing window.

        This method pops the key from a list of ongoing measurement windows and
        constructs a `Measurement` object corresponding to the name of the window
        from the log file. If the name of the window does not match the expected
        one, a `RuntimeError` is raised.

        Args:
            key: Name of the measurement window.
            sync_execution: Whether to synchronize CUDA before ending the measurement window.
                (Default: `True`)
            cancel: Whether to cancel the measurement window. This will not consume a
                line from the log file. (Default: `False`)
        """
        try:
            self.ongoing_windows.remove(key)
        except ValueError:
            raise RuntimeError(f"Window {key} is not ongoing.") from None

        if not self.ignore_sync_execution and sync_execution:
            sync_execution_fn(self.gpu_indices)

        if cancel:
            self.logger.info("Measurement window '%s' cancelled.", key)
            return Measurement(
                time=0.0,
                gpu_energy={gpu_index: 0.0 for gpu_index in self.gpu_indices},
            )

        # Read the next line from the log file.
        line = self.log_file.readline()
        if not line:
            raise RuntimeError("No more lines in the log file.")
        _, window_name, *nums = line.split(",")
        if self.match_window_name and window_name != key:
            raise RuntimeError(f"Was expecting {window_name}, not {key}.")
        if len(nums) != len(self.gpu_indices) + 1:
            raise RuntimeError(
                f"Line has unexpected number of energy measurements: {line}"
            )
        time_consumption, *energy_consumptions = map(float, nums)
        energy = dict(zip(self.gpu_indices, energy_consumptions))
        measurement = Measurement(time=time_consumption, gpu_energy=energy)

        self.logger.info("Measurement window '%s' ended (%s).", key, measurement)

        return measurement

__init__

__init__(
    gpu_indices=None,
    approx_instant_energy=False,
    log_file=None,
    ignore_sync_execution=False,
    match_window_name=True,
)

The log file should be a CSV file with the following header (e.g. gpu_indices=[0, 2]): start_time,window_name,elapsed_time,gpu0_energy,gpu2_energy

Parameters:

Name Type Description Default
gpu_indices list[int] | None

Indices of all the CUDA devices to monitor. This should be consistent with the indices used in the log file. If None, GPU indices will be inferred from the log file header. Does not respect CUDA_VISIBLE_DEVICES. (Default: None)

None
approx_instant_energy bool

Whether to approximate the instant energy consumption. Not used.

False
log_file str | Path | None

Path to the log CSV file to replay events from. None is not allowed.

None
ignore_sync_execution bool

Whether to ignore sync_execution calls. (Default: False)

False
match_window_name bool

Whether to make sure window names match. (Default: True)

True
Source code in zeus/utils/testing.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    gpu_indices: list[int] | None = None,
    approx_instant_energy: bool = False,
    log_file: str | Path | None = None,
    ignore_sync_execution: bool = False,
    match_window_name: bool = True,
) -> None:
    """Initialize the replay monitor.

    The log file should be a CSV file with the following header (e.g. gpu_indices=[0, 2]):
    `start_time,window_name,elapsed_time,gpu0_energy,gpu2_energy`

    Args:
        gpu_indices: Indices of all the CUDA devices to monitor. This should be consistent
            with the indices used in the log file. If `None`, GPU indices will be inferred
            from the log file header. Does not respect `CUDA_VISIBLE_DEVICES`.
            (Default: `None`)
        approx_instant_energy: Whether to approximate the instant energy consumption. Not used.
        log_file: Path to the log CSV file to replay events from. `None` is not allowed.
        ignore_sync_execution: Whether to ignore `sync_execution` calls. (Default: `False`)
        match_window_name: Whether to make sure window names match. (Default: `True`)
    """
    if log_file is None:
        raise ValueError("`log_file` cannot be `None` for `ReplayZeusMonitor`.")

    self.approx_instant_energy = approx_instant_energy
    self.log_file = open(log_file)
    self.ignore_sync_execution = ignore_sync_execution
    self.match_window_name = match_window_name

    # Infer GPU indices from the log file if not provided.
    header = self.log_file.readline()
    if gpu_indices is None:
        gpu_indices = [
            int(gpu.split("_")[0][3:]) for gpu in header.split(",")[3:] if gpu
        ]
    self.nvml_gpu_indices = self.gpu_indices = gpu_indices

    self.logger = get_logger(type(self).__name__)
    self.logger.info(
        "Replaying from '%s' with GPU indices %s", log_file, gpu_indices
    )

    # Keep track of ongoing measurement windows.
    self.ongoing_windows = []

begin_window

begin_window(key, sync_execution=True)

Begin a new window.

This method just pushes the key into a list of ongoing measurement windows, and just makes sure it's unique.

Parameters:

Name Type Description Default
key str

Name of the measurement window.

required
sync_execution bool

Whether to synchronize CUDA before starting the measurement window. (Default: True)

True
Source code in zeus/utils/testing.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def begin_window(self, key: str, sync_execution: bool = True) -> None:
    """Begin a new window.

    This method just pushes the key into a list of ongoing measurement windows,
    and just makes sure it's unique.

    Args:
        key: Name of the measurement window.
        sync_execution: Whether to synchronize CUDA before starting the measurement window.
            (Default: `True`)
    """
    if key in self.ongoing_windows:
        raise RuntimeError(f"Window {key} is already ongoing.")
    self.ongoing_windows.append(key)

    if not self.ignore_sync_execution and sync_execution:
        sync_execution_fn(self.gpu_indices)

    self.logger.info("Measurement window '%s' started.", key)

end_window

end_window(key, sync_execution=True, cancel=False)

End an ongoing window.

This method pops the key from a list of ongoing measurement windows and constructs a Measurement object corresponding to the name of the window from the log file. If the name of the window does not match the expected one, a RuntimeError is raised.

Parameters:

Name Type Description Default
key str

Name of the measurement window.

required
sync_execution bool

Whether to synchronize CUDA before ending the measurement window. (Default: True)

True
cancel bool

Whether to cancel the measurement window. This will not consume a line from the log file. (Default: False)

False
Source code in zeus/utils/testing.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def end_window(
    self, key: str, sync_execution: bool = True, cancel: bool = False
) -> Measurement:
    """End an ongoing window.

    This method pops the key from a list of ongoing measurement windows and
    constructs a `Measurement` object corresponding to the name of the window
    from the log file. If the name of the window does not match the expected
    one, a `RuntimeError` is raised.

    Args:
        key: Name of the measurement window.
        sync_execution: Whether to synchronize CUDA before ending the measurement window.
            (Default: `True`)
        cancel: Whether to cancel the measurement window. This will not consume a
            line from the log file. (Default: `False`)
    """
    try:
        self.ongoing_windows.remove(key)
    except ValueError:
        raise RuntimeError(f"Window {key} is not ongoing.") from None

    if not self.ignore_sync_execution and sync_execution:
        sync_execution_fn(self.gpu_indices)

    if cancel:
        self.logger.info("Measurement window '%s' cancelled.", key)
        return Measurement(
            time=0.0,
            gpu_energy={gpu_index: 0.0 for gpu_index in self.gpu_indices},
        )

    # Read the next line from the log file.
    line = self.log_file.readline()
    if not line:
        raise RuntimeError("No more lines in the log file.")
    _, window_name, *nums = line.split(",")
    if self.match_window_name and window_name != key:
        raise RuntimeError(f"Was expecting {window_name}, not {key}.")
    if len(nums) != len(self.gpu_indices) + 1:
        raise RuntimeError(
            f"Line has unexpected number of energy measurements: {line}"
        )
    time_consumption, *energy_consumptions = map(float, nums)
    energy = dict(zip(self.gpu_indices, energy_consumptions))
    measurement = Measurement(time=time_consumption, gpu_energy=energy)

    self.logger.info("Measurement window '%s' ended (%s).", key, measurement)

    return measurement