bench_executor.collector

This module holds the Collector class which is responsible for collecting metrics during the execution of a case. It also collects hardware information for provenance reasons when comparing results from cases.

The following metrics are collected:

General

  • name: name of the case being executed.
  • index: incremental index for each collected sample.
  • step: Number of the step of a collected sample.
  • timestamp: The time when the sample was collected.
  • version: format version of the collected version, currently v2.

CPU

  • cpu_user: CPU time spent in userspace.
  • cpu_system: CPU time spent in kernelspace.
  • cpu_user_system: sum of CPU time userspace and kernelspace.
  • cpu_idle: CPU time spent in idle mode.
  • cpu_iowait: Time that the CPU has to wait for IO operations to complete.

Memory

  • memory_ram: Amount of RAM memory in use.
  • memory_swap: Amount of SWAP memory in use.
  • memory_ram_swap: Sum of the RAM and SWAP memory in use.

Disk

  • disk_read_count: Number of disk reads.
  • disk_write_count: Number of disk writes.
  • disk_read_bytes: Number of bytes read from disk.
  • disk_write_bytes: Number of bytes written to disk.
  • disk_read_time: Time spent to read from disk.
  • disk_write_time: Time spent to write to disk.
  • disk_busy_time: Time that the disk is busy and all actions are pending.

Network

  • network_received_count: Number of network packets received.
  • network_sent_count: Number of network packets sent.
  • network_received_bytes: Number of bytes received over network.
  • network_sent_bytes: Number of bytes sent over network.
  • network_received_error: Number of errors occured during receiving over network.
  • network_sent_error: Number of errors occured during sending over network.
  • network_received_drop: Number of packets dropped during receiving over network.
  • network_sent_drop: Number of packets dropped during sending over network.
  1#!/usr/bin/env python3
  2"""
  3This module holds the Collector class which is responsible for collecting
  4metrics during the execution of a case. It also collects hardware information
  5for provenance reasons when comparing results from cases.
  6
  7The following metrics are collected:
  8
  9**General**
 10- `name`: name of the case being executed.
 11- `index`: incremental index for each collected sample.
 12- `step`: Number of the step of a collected sample.
 13- `timestamp`: The time when the sample was collected.
 14- `version`: format version of the collected version, currently v2.
 15
 16**CPU**
 17- `cpu_user`: CPU time spent in userspace.
 18- `cpu_system`: CPU time spent in kernelspace.
 19- `cpu_user_system`: sum of CPU time userspace and kernelspace.
 20- `cpu_idle`: CPU time spent in idle mode.
 21- `cpu_iowait`: Time that the CPU has to wait for IO operations to complete.
 22
 23**Memory**
 24- `memory_ram`: Amount of RAM memory in use.
 25- `memory_swap`: Amount of SWAP memory in use.
 26- `memory_ram_swap`: Sum of the RAM and SWAP memory in use.
 27
 28**Disk**
 29- `disk_read_count`: Number of disk reads.
 30- `disk_write_count`: Number of disk writes.
 31- `disk_read_bytes`: Number of bytes read from disk.
 32- `disk_write_bytes`: Number of bytes written to disk.
 33- `disk_read_time`: Time spent to read from disk.
 34- `disk_write_time`: Time spent to write to disk.
 35- `disk_busy_time`: Time that the disk is busy and all actions are pending.
 36
 37**Network**
 38- `network_received_count`: Number of network packets received.
 39- `network_sent_count`: Number of network packets sent.
 40- `network_received_bytes`: Number of bytes received over network.
 41- `network_sent_bytes`: Number of bytes sent over network.
 42- `network_received_error`: Number of errors occured during receiving over
 43network.
 44- `network_sent_error`: Number of errors occured during sending over network.
 45- `network_received_drop`: Number of packets dropped during receiving over
 46network.
 47- `network_sent_drop`: Number of packets dropped during sending over network.
 48"""
 49
 50import os
 51import platform
 52import psutil as ps
 53from csv import DictWriter
 54from datetime import datetime, timezone
 55from time import time, sleep
 56from subprocess import run, CalledProcessError
 57from threading import Thread, Event
 58from typing import TYPE_CHECKING, Dict, Union, Optional, List
 59from bench_executor.logger import Logger
 60from bench_executor.docker import Docker
 61
 62# psutil types are platform specific, provide stubs at runtime as checking is
 63# not done there
 64if TYPE_CHECKING:
 65    from psutil._common import sswap, snetio
 66    from psutil._pslinux import svmem, sdiskio
 67    from psutil._psaix import scputimes
 68else:
 69    from collections import namedtuple
 70    scputimes = namedtuple('scputimes', [])
 71    sswap = namedtuple('sswap', [])
 72    svmem = namedtuple('svmem', [])
 73    sdiskio = namedtuple('sdiskio', [])
 74    snetio = namedtuple('snetio', [])
 75
 76#
 77# Hardware and case information is logged to 'case-info.txt' on construction.
 78#
 79# All data are stored in a CSV as 'stats.csv'.
 80# These data are accumulated among all CPU cores, all memory banks, all network
 81# interfaces, etc. individual devices are not logged.
 82#
 83
 84CASE_INFO_FILE_NAME: str = 'case-info.txt'
 85METRICS_FILE_NAME: str = 'metrics.csv'
 86METRICS_VERSION: int = 3
 87FIELDNAMES: List[str] = [
 88    'name',
 89    'run',
 90    'index',
 91    'step',
 92    'timestamp',
 93    'version',
 94    'cpu_user',
 95    'cpu_system',
 96    'cpu_user_system',
 97    'cpu_idle',
 98    'cpu_iowait',
 99    'memory_ram',
100    'memory_swap',
101    'memory_ram_swap',
102    'disk_read_count',
103    'disk_write_count',
104    'disk_read_bytes',
105    'disk_write_bytes',
106    'disk_read_time',
107    'disk_write_time',
108    'disk_busy_time',
109    'network_received_count',
110    'network_sent_count',
111    'network_received_bytes',
112    'network_sent_bytes',
113    'network_received_error',
114    'network_sent_error',
115    'network_received_drop',
116    'network_sent_drop'
117]
118ROUND: int = 4
119
120step_id: int = 1
121
122
123def _collect_metrics(stop_event: Event, name: str, run: int, metrics_path: str,
124                     sample_interval: float, initial_timestamp: float,
125                     initial_cpu: scputimes, initial_ram: svmem,
126                     initial_swap: sswap, initial_disk_io: Optional[sdiskio],
127                     initial_network_io: snetio):
128    """Thread function to collect a sample at specific intervals"""
129    global step_id
130    index = 1
131    row: Dict[str, Union[int, float, str]]
132
133    # Create metrics file
134    with open(metrics_path, 'w') as f:
135        writer = DictWriter(f, fieldnames=FIELDNAMES)
136        writer.writeheader()
137
138        # Initial values
139        row = {
140            'name': name,
141            'run': run,
142            'index': index,
143            'step': step_id,
144            'timestamp': 0.0,
145            'version': METRICS_VERSION,
146            'cpu_user': 0.0,
147            'cpu_system': 0.0,
148            'cpu_user_system': 0.0,
149            'cpu_idle': 0.0,
150            'cpu_iowait': 0.0,
151            'memory_ram': 0,
152            'memory_swap': 0,
153            'memory_ram_swap': 0,
154            'disk_read_count': 0,
155            'disk_write_count': 0,
156            'disk_read_bytes': 0,
157            'disk_write_bytes': 0,
158            'disk_read_time': 0,
159            'disk_write_time': 0,
160            'disk_busy_time': 0,
161            'network_received_count': 0,
162            'network_sent_count': 0,
163            'network_received_bytes': 0,
164            'network_sent_bytes': 0,
165            'network_received_error': 0,
166            'network_sent_error': 0,
167            'network_received_drop': 0,
168            'network_sent_drop': 0
169        }
170        writer.writerow(row)
171        index += 1
172        sleep(sample_interval - (initial_timestamp - time()))
173
174        while not stop_event.wait(0):
175            # Collect metrics
176            timestamp = time()
177            cpu: scputimes = ps.cpu_times()
178            ram: svmem = ps.virtual_memory()
179            swap: sswap = ps.swap_memory()
180            disk_io: Optional[sdiskio] = ps.disk_io_counters()  # type: ignore
181            network_io: snetio = ps.net_io_counters()
182
183            # Write to file
184            diff = round(timestamp - initial_timestamp, ROUND)
185            cpu_user = round(cpu.user - initial_cpu.user, ROUND)
186            cpu_system = round(cpu.system - initial_cpu.system, ROUND)
187            cpu_idle = round(cpu.idle - initial_cpu.idle, ROUND)
188            cpu_iowait = round(cpu.iowait - initial_cpu.iowait, ROUND)
189            network_recv_count = \
190                network_io.packets_recv - initial_network_io.packets_recv
191            network_sent_count = \
192                network_io.packets_sent - initial_network_io.packets_sent
193            network_recv_bytes = \
194                network_io.bytes_recv - initial_network_io.bytes_recv
195            network_sent_bytes = \
196                network_io.bytes_sent - initial_network_io.bytes_sent
197            network_errin = \
198                network_io.errin - initial_network_io.errin
199            network_errout = \
200                network_io.errout - initial_network_io.errout
201            network_dropin = \
202                network_io.dropin - initial_network_io.dropin
203            network_dropout = \
204                network_io.dropout - initial_network_io.dropout
205
206            row = {
207                'name': name,
208                'run': run,
209                'index': index,
210                'step': step_id,
211                'timestamp': diff,
212                'version': METRICS_VERSION,
213                'cpu_user': cpu_user,
214                'cpu_system': cpu_system,
215                'cpu_user_system': cpu_user + cpu_system,
216                'cpu_idle': cpu_idle,
217                'cpu_iowait': cpu_iowait,
218                'memory_ram': ram.used,
219                'memory_swap': swap.used,
220                'memory_ram_swap': ram.used + swap.used,
221                'network_received_count': network_recv_count,
222                'network_sent_count': network_sent_count,
223                'network_received_bytes': network_recv_bytes,
224                'network_sent_bytes': network_sent_bytes,
225                'network_received_error': network_errin,
226                'network_sent_error': network_errout,
227                'network_received_drop': network_dropin,
228                'network_sent_drop': network_dropout
229            }
230
231            # Diskless machines will return None for diskio
232            if disk_io is not None and initial_disk_io is not None:
233                row['disk_read_count'] = \
234                   disk_io.read_count - initial_disk_io.read_count
235                row['disk_write_count'] = \
236                    disk_io.write_count - initial_disk_io.write_count
237                row['disk_read_bytes'] = \
238                    disk_io.read_bytes - initial_disk_io.read_bytes
239                row['disk_write_bytes'] = \
240                    disk_io.write_bytes - initial_disk_io.write_bytes
241                row['disk_read_time'] = \
242                    disk_io.read_time - initial_disk_io.read_time
243                row['disk_write_time'] = \
244                    disk_io.write_time - initial_disk_io.write_time
245                row['disk_busy_time'] = \
246                    disk_io.busy_time - initial_disk_io.busy_time
247            writer.writerow(row)
248            index += 1
249
250            # Honor sample time, remove metrics logging overhead
251            sleep(sample_interval - (timestamp - time()))
252
253
254class Collector():
255    """Collect metrics samples at a given interval for a run of a case."""
256
257    def __init__(self, case_name: str, results_run_path: str,
258                 sample_interval: float, number_of_steps: int, run_id: int,
259                 directory: str, verbose: bool):
260        """
261        Create an instance of the Collector class.
262
263        Instantiating this class will automatically generate a `case-info.txt`
264        file which describes the hardware used during collection of the
265        metrics. The file describes:
266
267        - **Case**:
268            - Name of the case.
269            - Timestamp when started.
270            - Directory of the case.
271            - Number of the run.
272            - Number of steps in a case.
273        - **Hardware**:
274            - CPU name.
275            - Number of CPU cores.
276            - Minimum and maximum CPU core frequency.
277            - Amount of RAM and SWAP memory
278            - Available disk storage.
279            - Available network interfaces and their link speed.
280        - **Docker**:
281            - Version of the Docker daemon
282            - Docker root directory
283            - Docker storage driver
284            - Docker CgroupFS driver and version
285
286        Parameters
287        ----------
288        case_name : str
289            Name of the case being executed.
290        results_run_path : str
291            Path to the results directory of the run currently being executed.
292        sample_interval : float
293            Sample interval in seconds for collecting metrics.
294        number_of_steps : int
295            The number of steps of the case that is being executed.
296        run_id : int
297            The number of the run that is being executed.
298        directory : str
299            Path to the directory to store logs.
300        verbose : bool
301            Enable verbose logs.
302        """
303
304        self._started: bool = False
305        self._data_path: str = os.path.abspath(results_run_path)
306        self._number_of_steps: int = number_of_steps
307        self._stop_event: Event = Event()
308        self._logger = Logger(__name__, directory, verbose)
309        self._docker = Docker(self._logger)
310
311        # Only Linux is supported
312        if platform.system() != 'Linux':
313            msg = f'"{platform.system()} is not supported as OS'
314            self._logger.error(msg)
315            raise ValueError(msg)
316
317        # Initialize step ID
318        global step_id
319        step_id = 1
320
321        # System information: OS, kernel, architecture
322        system_hostname = 'UNKNOWN'
323        system_os_name = 'UNKNOWN'
324        system_os_version = 'UNKNOWN'
325        try:
326            system_os_name = platform.freedesktop_os_release()['NAME']
327            system_os_version = platform.freedesktop_os_release()['VERSION']
328        except (OSError, KeyError):
329            self._logger.warning('Cannot extract Freedesktop OS release data')
330        system_hostname = platform.node()
331        system_kernel = platform.platform()
332        system_architecture = platform.uname().machine
333
334        # CPU information: name, max frequency, core count
335        cpu_name = 'UNKNOWN'
336        try:
337            raw = run(['lscpu'], capture_output=True)
338            for line in raw.stdout.decode('utf-8').split('\n'):
339                if 'Model name:' in line:
340                    cpu_name = line.split(':')[1].strip()
341                    break
342        except CalledProcessError as e:
343            self._logger.warning('Unable to determine CPU processor name: '
344                                 f'{e}')
345
346        cpu_cores = ps.cpu_count()
347        cpu_min_freq = ps.cpu_freq().min
348        cpu_max_freq = ps.cpu_freq().max
349
350        # Memory information: RAM total, SWAP total
351        memory_total = ps.virtual_memory().total
352        swap_total = ps.swap_memory().total
353
354        # Disk IO: name
355        partitions: Dict[str, int] = {}
356        for disk in ps.disk_partitions():
357            # Skip Docker's overlayFS
358            if disk.fstype and 'docker' not in disk.mountpoint:
359                total = ps.disk_usage(disk.mountpoint).total
360                partitions[disk.mountpoint] = total
361
362        # Network IO: name, speed, MTU
363        network_interfaces = ps.net_if_stats()
364
365        # Docker daemon: version, storage driver, cgroupfs
366        success, docker_info = self._docker.info()
367        if not success:
368            self._logger.error('Failed to retrieve Docker daemon information')
369
370        # Write machine information to disk
371        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
372        with open(case_info_file, 'w') as f:
373            f.write('===> CASE <===\n')
374            f.write(f'Name: {case_name}\n')
375            f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n')
376            f.write(f'Directory: {directory}\n')
377            f.write(f'Run: {run_id}\n')
378            f.write(f'Number of steps: {self._number_of_steps}\n')
379            f.write('\n')
380            f.write('===> HARDWARE <===\n')
381            f.write('System\n')
382            f.write(f'\tHostname: {system_hostname}\n')
383            f.write(f'\tOS name: {system_os_name}\n')
384            f.write(f'\tOS version: {system_os_version}\n')
385            f.write(f'\tKernel: {system_kernel}\n')
386            f.write(f'\tArchitecture: {system_architecture}\n')
387            f.write('CPU\n')
388            f.write(f'\tName: {cpu_name}\n')
389            f.write(f'\tCores: {cpu_cores}\n')
390            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
391            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
392            f.write('Memory\n')
393            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
394            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
395            f.write('Storage\n')
396            for disk_name, size in partitions.items():
397                f.write(f'\tDisk "{disk_name}": '
398                        f'{round(size / 10 ** 9, 2)} GB\n')
399            f.write('Network\n')
400            for interface_name, stats in network_interfaces.items():
401                speed = stats.speed
402                if speed == 0:
403                    f.write(f'\tInterface "{interface_name}"\n')
404                else:
405                    f.write(f'\tInterface "{interface_name}": {speed} mbps\n')
406
407            f.write('\n')
408            f.write('===> DOCKER <===\n')
409            f.write('Version: '
410                    f'{docker_info.get("ServerVersion", "UNKNOWN")}\n')
411            f.write('Root directory: '
412                    f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n')
413            f.write('Drivers:\n')
414            f.write('\tStorage: '
415                    f'{docker_info.get("Driver", "UNKNOWN")}\n')
416            f.write('\tCgroupFS: '
417                    f'{docker_info.get("CgroupDriver", "UNKNOWN")} '
418                    f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n')
419
420        # Set initial metric values and start collection thread
421        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
422        initial_timestamp = time()
423        initial_cpu = ps.cpu_times()
424        initial_ram = ps.virtual_memory().used
425        initial_swap = ps.swap_memory().used
426        initial_disk_io = ps.disk_io_counters()
427        initial_network_io = ps.net_io_counters()
428        self._thread: Thread = Thread(target=_collect_metrics,
429                                      daemon=True,
430                                      args=(self._stop_event,
431                                            case_name,
432                                            run_id,
433                                            metrics_path,
434                                            sample_interval,
435                                            initial_timestamp,
436                                            initial_cpu,
437                                            initial_ram,
438                                            initial_swap,
439                                            initial_disk_io,
440                                            initial_network_io))
441        self._thread.start()
442
443    @property
444    def name(self):
445        """Name of the class: Collector"""
446        return self.__class__.__name__
447
448    def next_step(self):
449        """Increment the step number by one.
450
451        The step number must always be equal or lower than the number of steps
452        in the case.
453        """
454        global step_id
455        step_id += 1
456
457        msg = f'Step ({step_id}) is higher than number of steps ' + \
458              f'({self._number_of_steps})'
459        assert (step_id <= self._number_of_steps), msg
460
461    def stop(self):
462        """End metrics collection.
463
464        Signal the metrics collection thread to stop collecting any metrics.
465        """
466        self._stop_event.set()
CASE_INFO_FILE_NAME: str = 'case-info.txt'
METRICS_FILE_NAME: str = 'metrics.csv'
METRICS_VERSION: int = 3
FIELDNAMES: List[str] = ['name', 'run', 'index', 'step', 'timestamp', 'version', 'cpu_user', 'cpu_system', 'cpu_user_system', 'cpu_idle', 'cpu_iowait', 'memory_ram', 'memory_swap', 'memory_ram_swap', 'disk_read_count', 'disk_write_count', 'disk_read_bytes', 'disk_write_bytes', 'disk_read_time', 'disk_write_time', 'disk_busy_time', 'network_received_count', 'network_sent_count', 'network_received_bytes', 'network_sent_bytes', 'network_received_error', 'network_sent_error', 'network_received_drop', 'network_sent_drop']
ROUND: int = 4
step_id: int = 1
class Collector:
255class Collector():
256    """Collect metrics samples at a given interval for a run of a case."""
257
258    def __init__(self, case_name: str, results_run_path: str,
259                 sample_interval: float, number_of_steps: int, run_id: int,
260                 directory: str, verbose: bool):
261        """
262        Create an instance of the Collector class.
263
264        Instantiating this class will automatically generate a `case-info.txt`
265        file which describes the hardware used during collection of the
266        metrics. The file describes:
267
268        - **Case**:
269            - Name of the case.
270            - Timestamp when started.
271            - Directory of the case.
272            - Number of the run.
273            - Number of steps in a case.
274        - **Hardware**:
275            - CPU name.
276            - Number of CPU cores.
277            - Minimum and maximum CPU core frequency.
278            - Amount of RAM and SWAP memory
279            - Available disk storage.
280            - Available network interfaces and their link speed.
281        - **Docker**:
282            - Version of the Docker daemon
283            - Docker root directory
284            - Docker storage driver
285            - Docker CgroupFS driver and version
286
287        Parameters
288        ----------
289        case_name : str
290            Name of the case being executed.
291        results_run_path : str
292            Path to the results directory of the run currently being executed.
293        sample_interval : float
294            Sample interval in seconds for collecting metrics.
295        number_of_steps : int
296            The number of steps of the case that is being executed.
297        run_id : int
298            The number of the run that is being executed.
299        directory : str
300            Path to the directory to store logs.
301        verbose : bool
302            Enable verbose logs.
303        """
304
305        self._started: bool = False
306        self._data_path: str = os.path.abspath(results_run_path)
307        self._number_of_steps: int = number_of_steps
308        self._stop_event: Event = Event()
309        self._logger = Logger(__name__, directory, verbose)
310        self._docker = Docker(self._logger)
311
312        # Only Linux is supported
313        if platform.system() != 'Linux':
314            msg = f'"{platform.system()} is not supported as OS'
315            self._logger.error(msg)
316            raise ValueError(msg)
317
318        # Initialize step ID
319        global step_id
320        step_id = 1
321
322        # System information: OS, kernel, architecture
323        system_hostname = 'UNKNOWN'
324        system_os_name = 'UNKNOWN'
325        system_os_version = 'UNKNOWN'
326        try:
327            system_os_name = platform.freedesktop_os_release()['NAME']
328            system_os_version = platform.freedesktop_os_release()['VERSION']
329        except (OSError, KeyError):
330            self._logger.warning('Cannot extract Freedesktop OS release data')
331        system_hostname = platform.node()
332        system_kernel = platform.platform()
333        system_architecture = platform.uname().machine
334
335        # CPU information: name, max frequency, core count
336        cpu_name = 'UNKNOWN'
337        try:
338            raw = run(['lscpu'], capture_output=True)
339            for line in raw.stdout.decode('utf-8').split('\n'):
340                if 'Model name:' in line:
341                    cpu_name = line.split(':')[1].strip()
342                    break
343        except CalledProcessError as e:
344            self._logger.warning('Unable to determine CPU processor name: '
345                                 f'{e}')
346
347        cpu_cores = ps.cpu_count()
348        cpu_min_freq = ps.cpu_freq().min
349        cpu_max_freq = ps.cpu_freq().max
350
351        # Memory information: RAM total, SWAP total
352        memory_total = ps.virtual_memory().total
353        swap_total = ps.swap_memory().total
354
355        # Disk IO: name
356        partitions: Dict[str, int] = {}
357        for disk in ps.disk_partitions():
358            # Skip Docker's overlayFS
359            if disk.fstype and 'docker' not in disk.mountpoint:
360                total = ps.disk_usage(disk.mountpoint).total
361                partitions[disk.mountpoint] = total
362
363        # Network IO: name, speed, MTU
364        network_interfaces = ps.net_if_stats()
365
366        # Docker daemon: version, storage driver, cgroupfs
367        success, docker_info = self._docker.info()
368        if not success:
369            self._logger.error('Failed to retrieve Docker daemon information')
370
371        # Write machine information to disk
372        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
373        with open(case_info_file, 'w') as f:
374            f.write('===> CASE <===\n')
375            f.write(f'Name: {case_name}\n')
376            f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n')
377            f.write(f'Directory: {directory}\n')
378            f.write(f'Run: {run_id}\n')
379            f.write(f'Number of steps: {self._number_of_steps}\n')
380            f.write('\n')
381            f.write('===> HARDWARE <===\n')
382            f.write('System\n')
383            f.write(f'\tHostname: {system_hostname}\n')
384            f.write(f'\tOS name: {system_os_name}\n')
385            f.write(f'\tOS version: {system_os_version}\n')
386            f.write(f'\tKernel: {system_kernel}\n')
387            f.write(f'\tArchitecture: {system_architecture}\n')
388            f.write('CPU\n')
389            f.write(f'\tName: {cpu_name}\n')
390            f.write(f'\tCores: {cpu_cores}\n')
391            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
392            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
393            f.write('Memory\n')
394            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
395            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
396            f.write('Storage\n')
397            for disk_name, size in partitions.items():
398                f.write(f'\tDisk "{disk_name}": '
399                        f'{round(size / 10 ** 9, 2)} GB\n')
400            f.write('Network\n')
401            for interface_name, stats in network_interfaces.items():
402                speed = stats.speed
403                if speed == 0:
404                    f.write(f'\tInterface "{interface_name}"\n')
405                else:
406                    f.write(f'\tInterface "{interface_name}": {speed} mbps\n')
407
408            f.write('\n')
409            f.write('===> DOCKER <===\n')
410            f.write('Version: '
411                    f'{docker_info.get("ServerVersion", "UNKNOWN")}\n')
412            f.write('Root directory: '
413                    f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n')
414            f.write('Drivers:\n')
415            f.write('\tStorage: '
416                    f'{docker_info.get("Driver", "UNKNOWN")}\n')
417            f.write('\tCgroupFS: '
418                    f'{docker_info.get("CgroupDriver", "UNKNOWN")} '
419                    f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n')
420
421        # Set initial metric values and start collection thread
422        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
423        initial_timestamp = time()
424        initial_cpu = ps.cpu_times()
425        initial_ram = ps.virtual_memory().used
426        initial_swap = ps.swap_memory().used
427        initial_disk_io = ps.disk_io_counters()
428        initial_network_io = ps.net_io_counters()
429        self._thread: Thread = Thread(target=_collect_metrics,
430                                      daemon=True,
431                                      args=(self._stop_event,
432                                            case_name,
433                                            run_id,
434                                            metrics_path,
435                                            sample_interval,
436                                            initial_timestamp,
437                                            initial_cpu,
438                                            initial_ram,
439                                            initial_swap,
440                                            initial_disk_io,
441                                            initial_network_io))
442        self._thread.start()
443
444    @property
445    def name(self):
446        """Name of the class: Collector"""
447        return self.__class__.__name__
448
449    def next_step(self):
450        """Increment the step number by one.
451
452        The step number must always be equal or lower than the number of steps
453        in the case.
454        """
455        global step_id
456        step_id += 1
457
458        msg = f'Step ({step_id}) is higher than number of steps ' + \
459              f'({self._number_of_steps})'
460        assert (step_id <= self._number_of_steps), msg
461
462    def stop(self):
463        """End metrics collection.
464
465        Signal the metrics collection thread to stop collecting any metrics.
466        """
467        self._stop_event.set()

Collect metrics samples at a given interval for a run of a case.

Collector( case_name: str, results_run_path: str, sample_interval: float, number_of_steps: int, run_id: int, directory: str, verbose: bool)
258    def __init__(self, case_name: str, results_run_path: str,
259                 sample_interval: float, number_of_steps: int, run_id: int,
260                 directory: str, verbose: bool):
261        """
262        Create an instance of the Collector class.
263
264        Instantiating this class will automatically generate a `case-info.txt`
265        file which describes the hardware used during collection of the
266        metrics. The file describes:
267
268        - **Case**:
269            - Name of the case.
270            - Timestamp when started.
271            - Directory of the case.
272            - Number of the run.
273            - Number of steps in a case.
274        - **Hardware**:
275            - CPU name.
276            - Number of CPU cores.
277            - Minimum and maximum CPU core frequency.
278            - Amount of RAM and SWAP memory
279            - Available disk storage.
280            - Available network interfaces and their link speed.
281        - **Docker**:
282            - Version of the Docker daemon
283            - Docker root directory
284            - Docker storage driver
285            - Docker CgroupFS driver and version
286
287        Parameters
288        ----------
289        case_name : str
290            Name of the case being executed.
291        results_run_path : str
292            Path to the results directory of the run currently being executed.
293        sample_interval : float
294            Sample interval in seconds for collecting metrics.
295        number_of_steps : int
296            The number of steps of the case that is being executed.
297        run_id : int
298            The number of the run that is being executed.
299        directory : str
300            Path to the directory to store logs.
301        verbose : bool
302            Enable verbose logs.
303        """
304
305        self._started: bool = False
306        self._data_path: str = os.path.abspath(results_run_path)
307        self._number_of_steps: int = number_of_steps
308        self._stop_event: Event = Event()
309        self._logger = Logger(__name__, directory, verbose)
310        self._docker = Docker(self._logger)
311
312        # Only Linux is supported
313        if platform.system() != 'Linux':
314            msg = f'"{platform.system()} is not supported as OS'
315            self._logger.error(msg)
316            raise ValueError(msg)
317
318        # Initialize step ID
319        global step_id
320        step_id = 1
321
322        # System information: OS, kernel, architecture
323        system_hostname = 'UNKNOWN'
324        system_os_name = 'UNKNOWN'
325        system_os_version = 'UNKNOWN'
326        try:
327            system_os_name = platform.freedesktop_os_release()['NAME']
328            system_os_version = platform.freedesktop_os_release()['VERSION']
329        except (OSError, KeyError):
330            self._logger.warning('Cannot extract Freedesktop OS release data')
331        system_hostname = platform.node()
332        system_kernel = platform.platform()
333        system_architecture = platform.uname().machine
334
335        # CPU information: name, max frequency, core count
336        cpu_name = 'UNKNOWN'
337        try:
338            raw = run(['lscpu'], capture_output=True)
339            for line in raw.stdout.decode('utf-8').split('\n'):
340                if 'Model name:' in line:
341                    cpu_name = line.split(':')[1].strip()
342                    break
343        except CalledProcessError as e:
344            self._logger.warning('Unable to determine CPU processor name: '
345                                 f'{e}')
346
347        cpu_cores = ps.cpu_count()
348        cpu_min_freq = ps.cpu_freq().min
349        cpu_max_freq = ps.cpu_freq().max
350
351        # Memory information: RAM total, SWAP total
352        memory_total = ps.virtual_memory().total
353        swap_total = ps.swap_memory().total
354
355        # Disk IO: name
356        partitions: Dict[str, int] = {}
357        for disk in ps.disk_partitions():
358            # Skip Docker's overlayFS
359            if disk.fstype and 'docker' not in disk.mountpoint:
360                total = ps.disk_usage(disk.mountpoint).total
361                partitions[disk.mountpoint] = total
362
363        # Network IO: name, speed, MTU
364        network_interfaces = ps.net_if_stats()
365
366        # Docker daemon: version, storage driver, cgroupfs
367        success, docker_info = self._docker.info()
368        if not success:
369            self._logger.error('Failed to retrieve Docker daemon information')
370
371        # Write machine information to disk
372        case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME)
373        with open(case_info_file, 'w') as f:
374            f.write('===> CASE <===\n')
375            f.write(f'Name: {case_name}\n')
376            f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n')
377            f.write(f'Directory: {directory}\n')
378            f.write(f'Run: {run_id}\n')
379            f.write(f'Number of steps: {self._number_of_steps}\n')
380            f.write('\n')
381            f.write('===> HARDWARE <===\n')
382            f.write('System\n')
383            f.write(f'\tHostname: {system_hostname}\n')
384            f.write(f'\tOS name: {system_os_name}\n')
385            f.write(f'\tOS version: {system_os_version}\n')
386            f.write(f'\tKernel: {system_kernel}\n')
387            f.write(f'\tArchitecture: {system_architecture}\n')
388            f.write('CPU\n')
389            f.write(f'\tName: {cpu_name}\n')
390            f.write(f'\tCores: {cpu_cores}\n')
391            f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n')
392            f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n')
393            f.write('Memory\n')
394            f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n')
395            f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n')
396            f.write('Storage\n')
397            for disk_name, size in partitions.items():
398                f.write(f'\tDisk "{disk_name}": '
399                        f'{round(size / 10 ** 9, 2)} GB\n')
400            f.write('Network\n')
401            for interface_name, stats in network_interfaces.items():
402                speed = stats.speed
403                if speed == 0:
404                    f.write(f'\tInterface "{interface_name}"\n')
405                else:
406                    f.write(f'\tInterface "{interface_name}": {speed} mbps\n')
407
408            f.write('\n')
409            f.write('===> DOCKER <===\n')
410            f.write('Version: '
411                    f'{docker_info.get("ServerVersion", "UNKNOWN")}\n')
412            f.write('Root directory: '
413                    f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n')
414            f.write('Drivers:\n')
415            f.write('\tStorage: '
416                    f'{docker_info.get("Driver", "UNKNOWN")}\n')
417            f.write('\tCgroupFS: '
418                    f'{docker_info.get("CgroupDriver", "UNKNOWN")} '
419                    f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n')
420
421        # Set initial metric values and start collection thread
422        metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME)
423        initial_timestamp = time()
424        initial_cpu = ps.cpu_times()
425        initial_ram = ps.virtual_memory().used
426        initial_swap = ps.swap_memory().used
427        initial_disk_io = ps.disk_io_counters()
428        initial_network_io = ps.net_io_counters()
429        self._thread: Thread = Thread(target=_collect_metrics,
430                                      daemon=True,
431                                      args=(self._stop_event,
432                                            case_name,
433                                            run_id,
434                                            metrics_path,
435                                            sample_interval,
436                                            initial_timestamp,
437                                            initial_cpu,
438                                            initial_ram,
439                                            initial_swap,
440                                            initial_disk_io,
441                                            initial_network_io))
442        self._thread.start()

Create an instance of the Collector class.

Instantiating this class will automatically generate a case-info.txt file which describes the hardware used during collection of the metrics. The file describes:

  • Case:
    • Name of the case.
    • Timestamp when started.
    • Directory of the case.
    • Number of the run.
    • Number of steps in a case.
  • Hardware:
    • CPU name.
    • Number of CPU cores.
    • Minimum and maximum CPU core frequency.
    • Amount of RAM and SWAP memory
    • Available disk storage.
    • Available network interfaces and their link speed.
  • Docker:
    • Version of the Docker daemon
    • Docker root directory
    • Docker storage driver
    • Docker CgroupFS driver and version
Parameters
  • case_name (str): Name of the case being executed.
  • results_run_path (str): Path to the results directory of the run currently being executed.
  • sample_interval (float): Sample interval in seconds for collecting metrics.
  • number_of_steps (int): The number of steps of the case that is being executed.
  • run_id (int): The number of the run that is being executed.
  • directory (str): Path to the directory to store logs.
  • verbose (bool): Enable verbose logs.
name
444    @property
445    def name(self):
446        """Name of the class: Collector"""
447        return self.__class__.__name__

Name of the class: Collector

def next_step(self):
449    def next_step(self):
450        """Increment the step number by one.
451
452        The step number must always be equal or lower than the number of steps
453        in the case.
454        """
455        global step_id
456        step_id += 1
457
458        msg = f'Step ({step_id}) is higher than number of steps ' + \
459              f'({self._number_of_steps})'
460        assert (step_id <= self._number_of_steps), msg

Increment the step number by one.

The step number must always be equal or lower than the number of steps in the case.

def stop(self):
462    def stop(self):
463        """End metrics collection.
464
465        Signal the metrics collection thread to stop collecting any metrics.
466        """
467        self._stop_event.set()

End metrics collection.

Signal the metrics collection thread to stop collecting any metrics.

class scputimes(builtins.tuple):

scputimes()

scputimes()

Create new instance of scputimes()

Inherited Members
builtins.tuple
index
count
class sswap(builtins.tuple):

sswap()

sswap()

Create new instance of sswap()

Inherited Members
builtins.tuple
index
count
class svmem(builtins.tuple):

svmem()

svmem()

Create new instance of svmem()

Inherited Members
builtins.tuple
index
count
class sdiskio(builtins.tuple):

sdiskio()

sdiskio()

Create new instance of sdiskio()

Inherited Members
builtins.tuple
index
count
class snetio(builtins.tuple):

snetio()

snetio()

Create new instance of snetio()

Inherited Members
builtins.tuple
index
count