bench_executor.collector
This module holds the Collector class which is responsible for collecting metrics during the execution of a case. It also collects hardware information for provenance reasons when comparing results from cases.
The following metrics are collected:
General
name
: name of the case being executed.index
: incremental index for each collected sample.step
: Number of the step of a collected sample.timestamp
: The time when the sample was collected.version
: format version of the collected version, currently v2.
CPU
cpu_user
: CPU time spent in userspace.cpu_system
: CPU time spent in kernelspace.cpu_user_system
: sum of CPU time userspace and kernelspace.cpu_idle
: CPU time spent in idle mode.cpu_iowait
: Time that the CPU has to wait for IO operations to complete.
Memory
memory_ram
: Amount of RAM memory in use.memory_swap
: Amount of SWAP memory in use.memory_ram_swap
: Sum of the RAM and SWAP memory in use.
Disk
disk_read_count
: Number of disk reads.disk_write_count
: Number of disk writes.disk_read_bytes
: Number of bytes read from disk.disk_write_bytes
: Number of bytes written to disk.disk_read_time
: Time spent to read from disk.disk_write_time
: Time spent to write to disk.disk_busy_time
: Time that the disk is busy and all actions are pending.
Network
network_received_count
: Number of network packets received.network_sent_count
: Number of network packets sent.network_received_bytes
: Number of bytes received over network.network_sent_bytes
: Number of bytes sent over network.network_received_error
: Number of errors occured during receiving over network.network_sent_error
: Number of errors occured during sending over network.network_received_drop
: Number of packets dropped during receiving over network.network_sent_drop
: Number of packets dropped during sending over network.
1#!/usr/bin/env python3 2""" 3This module holds the Collector class which is responsible for collecting 4metrics during the execution of a case. It also collects hardware information 5for provenance reasons when comparing results from cases. 6 7The following metrics are collected: 8 9**General** 10- `name`: name of the case being executed. 11- `index`: incremental index for each collected sample. 12- `step`: Number of the step of a collected sample. 13- `timestamp`: The time when the sample was collected. 14- `version`: format version of the collected version, currently v2. 15 16**CPU** 17- `cpu_user`: CPU time spent in userspace. 18- `cpu_system`: CPU time spent in kernelspace. 19- `cpu_user_system`: sum of CPU time userspace and kernelspace. 20- `cpu_idle`: CPU time spent in idle mode. 21- `cpu_iowait`: Time that the CPU has to wait for IO operations to complete. 22 23**Memory** 24- `memory_ram`: Amount of RAM memory in use. 25- `memory_swap`: Amount of SWAP memory in use. 26- `memory_ram_swap`: Sum of the RAM and SWAP memory in use. 27 28**Disk** 29- `disk_read_count`: Number of disk reads. 30- `disk_write_count`: Number of disk writes. 31- `disk_read_bytes`: Number of bytes read from disk. 32- `disk_write_bytes`: Number of bytes written to disk. 33- `disk_read_time`: Time spent to read from disk. 34- `disk_write_time`: Time spent to write to disk. 35- `disk_busy_time`: Time that the disk is busy and all actions are pending. 36 37**Network** 38- `network_received_count`: Number of network packets received. 39- `network_sent_count`: Number of network packets sent. 40- `network_received_bytes`: Number of bytes received over network. 41- `network_sent_bytes`: Number of bytes sent over network. 42- `network_received_error`: Number of errors occured during receiving over 43network. 44- `network_sent_error`: Number of errors occured during sending over network. 45- `network_received_drop`: Number of packets dropped during receiving over 46network. 47- `network_sent_drop`: Number of packets dropped during sending over network. 48""" 49 50import os 51import platform 52import psutil as ps 53from csv import DictWriter 54from datetime import datetime, timezone 55from time import time, sleep 56from subprocess import run, CalledProcessError 57from threading import Thread, Event 58from typing import TYPE_CHECKING, Dict, Union, Optional, List 59from bench_executor.logger import Logger 60from bench_executor.docker import Docker 61 62# psutil types are platform specific, provide stubs at runtime as checking is 63# not done there 64if TYPE_CHECKING: 65 from psutil._common import sswap, snetio 66 from psutil._pslinux import svmem, sdiskio 67 from psutil._psaix import scputimes 68else: 69 from collections import namedtuple 70 scputimes = namedtuple('scputimes', []) 71 sswap = namedtuple('sswap', []) 72 svmem = namedtuple('svmem', []) 73 sdiskio = namedtuple('sdiskio', []) 74 snetio = namedtuple('snetio', []) 75 76# 77# Hardware and case information is logged to 'case-info.txt' on construction. 78# 79# All data are stored in a CSV as 'stats.csv'. 80# These data are accumulated among all CPU cores, all memory banks, all network 81# interfaces, etc. individual devices are not logged. 82# 83 84CASE_INFO_FILE_NAME: str = 'case-info.txt' 85METRICS_FILE_NAME: str = 'metrics.csv' 86METRICS_VERSION: int = 3 87FIELDNAMES: List[str] = [ 88 'name', 89 'run', 90 'index', 91 'step', 92 'timestamp', 93 'version', 94 'cpu_user', 95 'cpu_system', 96 'cpu_user_system', 97 'cpu_idle', 98 'cpu_iowait', 99 'memory_ram', 100 'memory_swap', 101 'memory_ram_swap', 102 'disk_read_count', 103 'disk_write_count', 104 'disk_read_bytes', 105 'disk_write_bytes', 106 'disk_read_time', 107 'disk_write_time', 108 'disk_busy_time', 109 'network_received_count', 110 'network_sent_count', 111 'network_received_bytes', 112 'network_sent_bytes', 113 'network_received_error', 114 'network_sent_error', 115 'network_received_drop', 116 'network_sent_drop' 117] 118ROUND: int = 4 119 120step_id: int = 1 121 122 123def _collect_metrics(stop_event: Event, name: str, run: int, metrics_path: str, 124 sample_interval: float, initial_timestamp: float, 125 initial_cpu: scputimes, initial_ram: svmem, 126 initial_swap: sswap, initial_disk_io: Optional[sdiskio], 127 initial_network_io: snetio): 128 """Thread function to collect a sample at specific intervals""" 129 global step_id 130 index = 1 131 row: Dict[str, Union[int, float, str]] 132 133 # Create metrics file 134 with open(metrics_path, 'w') as f: 135 writer = DictWriter(f, fieldnames=FIELDNAMES) 136 writer.writeheader() 137 138 # Initial values 139 row = { 140 'name': name, 141 'run': run, 142 'index': index, 143 'step': step_id, 144 'timestamp': 0.0, 145 'version': METRICS_VERSION, 146 'cpu_user': 0.0, 147 'cpu_system': 0.0, 148 'cpu_user_system': 0.0, 149 'cpu_idle': 0.0, 150 'cpu_iowait': 0.0, 151 'memory_ram': 0, 152 'memory_swap': 0, 153 'memory_ram_swap': 0, 154 'disk_read_count': 0, 155 'disk_write_count': 0, 156 'disk_read_bytes': 0, 157 'disk_write_bytes': 0, 158 'disk_read_time': 0, 159 'disk_write_time': 0, 160 'disk_busy_time': 0, 161 'network_received_count': 0, 162 'network_sent_count': 0, 163 'network_received_bytes': 0, 164 'network_sent_bytes': 0, 165 'network_received_error': 0, 166 'network_sent_error': 0, 167 'network_received_drop': 0, 168 'network_sent_drop': 0 169 } 170 writer.writerow(row) 171 index += 1 172 sleep(sample_interval - (initial_timestamp - time())) 173 174 while not stop_event.wait(0): 175 # Collect metrics 176 timestamp = time() 177 cpu: scputimes = ps.cpu_times() 178 ram: svmem = ps.virtual_memory() 179 swap: sswap = ps.swap_memory() 180 disk_io: Optional[sdiskio] = ps.disk_io_counters() # type: ignore 181 network_io: snetio = ps.net_io_counters() 182 183 # Write to file 184 diff = round(timestamp - initial_timestamp, ROUND) 185 cpu_user = round(cpu.user - initial_cpu.user, ROUND) 186 cpu_system = round(cpu.system - initial_cpu.system, ROUND) 187 cpu_idle = round(cpu.idle - initial_cpu.idle, ROUND) 188 cpu_iowait = round(cpu.iowait - initial_cpu.iowait, ROUND) 189 network_recv_count = \ 190 network_io.packets_recv - initial_network_io.packets_recv 191 network_sent_count = \ 192 network_io.packets_sent - initial_network_io.packets_sent 193 network_recv_bytes = \ 194 network_io.bytes_recv - initial_network_io.bytes_recv 195 network_sent_bytes = \ 196 network_io.bytes_sent - initial_network_io.bytes_sent 197 network_errin = \ 198 network_io.errin - initial_network_io.errin 199 network_errout = \ 200 network_io.errout - initial_network_io.errout 201 network_dropin = \ 202 network_io.dropin - initial_network_io.dropin 203 network_dropout = \ 204 network_io.dropout - initial_network_io.dropout 205 206 row = { 207 'name': name, 208 'run': run, 209 'index': index, 210 'step': step_id, 211 'timestamp': diff, 212 'version': METRICS_VERSION, 213 'cpu_user': cpu_user, 214 'cpu_system': cpu_system, 215 'cpu_user_system': cpu_user + cpu_system, 216 'cpu_idle': cpu_idle, 217 'cpu_iowait': cpu_iowait, 218 'memory_ram': ram.used, 219 'memory_swap': swap.used, 220 'memory_ram_swap': ram.used + swap.used, 221 'network_received_count': network_recv_count, 222 'network_sent_count': network_sent_count, 223 'network_received_bytes': network_recv_bytes, 224 'network_sent_bytes': network_sent_bytes, 225 'network_received_error': network_errin, 226 'network_sent_error': network_errout, 227 'network_received_drop': network_dropin, 228 'network_sent_drop': network_dropout 229 } 230 231 # Diskless machines will return None for diskio 232 if disk_io is not None and initial_disk_io is not None: 233 row['disk_read_count'] = \ 234 disk_io.read_count - initial_disk_io.read_count 235 row['disk_write_count'] = \ 236 disk_io.write_count - initial_disk_io.write_count 237 row['disk_read_bytes'] = \ 238 disk_io.read_bytes - initial_disk_io.read_bytes 239 row['disk_write_bytes'] = \ 240 disk_io.write_bytes - initial_disk_io.write_bytes 241 row['disk_read_time'] = \ 242 disk_io.read_time - initial_disk_io.read_time 243 row['disk_write_time'] = \ 244 disk_io.write_time - initial_disk_io.write_time 245 row['disk_busy_time'] = \ 246 disk_io.busy_time - initial_disk_io.busy_time 247 writer.writerow(row) 248 index += 1 249 250 # Honor sample time, remove metrics logging overhead 251 sleep(sample_interval - (timestamp - time())) 252 253 254class Collector(): 255 """Collect metrics samples at a given interval for a run of a case.""" 256 257 def __init__(self, case_name: str, results_run_path: str, 258 sample_interval: float, number_of_steps: int, run_id: int, 259 directory: str, verbose: bool): 260 """ 261 Create an instance of the Collector class. 262 263 Instantiating this class will automatically generate a `case-info.txt` 264 file which describes the hardware used during collection of the 265 metrics. The file describes: 266 267 - **Case**: 268 - Name of the case. 269 - Timestamp when started. 270 - Directory of the case. 271 - Number of the run. 272 - Number of steps in a case. 273 - **Hardware**: 274 - CPU name. 275 - Number of CPU cores. 276 - Minimum and maximum CPU core frequency. 277 - Amount of RAM and SWAP memory 278 - Available disk storage. 279 - Available network interfaces and their link speed. 280 - **Docker**: 281 - Version of the Docker daemon 282 - Docker root directory 283 - Docker storage driver 284 - Docker CgroupFS driver and version 285 286 Parameters 287 ---------- 288 case_name : str 289 Name of the case being executed. 290 results_run_path : str 291 Path to the results directory of the run currently being executed. 292 sample_interval : float 293 Sample interval in seconds for collecting metrics. 294 number_of_steps : int 295 The number of steps of the case that is being executed. 296 run_id : int 297 The number of the run that is being executed. 298 directory : str 299 Path to the directory to store logs. 300 verbose : bool 301 Enable verbose logs. 302 """ 303 304 self._started: bool = False 305 self._data_path: str = os.path.abspath(results_run_path) 306 self._number_of_steps: int = number_of_steps 307 self._stop_event: Event = Event() 308 self._logger = Logger(__name__, directory, verbose) 309 self._docker = Docker(self._logger) 310 311 # Only Linux is supported 312 if platform.system() != 'Linux': 313 msg = f'"{platform.system()} is not supported as OS' 314 self._logger.error(msg) 315 raise ValueError(msg) 316 317 # Initialize step ID 318 global step_id 319 step_id = 1 320 321 # System information: OS, kernel, architecture 322 system_hostname = 'UNKNOWN' 323 system_os_name = 'UNKNOWN' 324 system_os_version = 'UNKNOWN' 325 try: 326 system_os_name = platform.freedesktop_os_release()['NAME'] 327 system_os_version = platform.freedesktop_os_release()['VERSION'] 328 except (OSError, KeyError): 329 self._logger.warning('Cannot extract Freedesktop OS release data') 330 system_hostname = platform.node() 331 system_kernel = platform.platform() 332 system_architecture = platform.uname().machine 333 334 # CPU information: name, max frequency, core count 335 cpu_name = 'UNKNOWN' 336 try: 337 raw = run(['lscpu'], capture_output=True) 338 for line in raw.stdout.decode('utf-8').split('\n'): 339 if 'Model name:' in line: 340 cpu_name = line.split(':')[1].strip() 341 break 342 except CalledProcessError as e: 343 self._logger.warning('Unable to determine CPU processor name: ' 344 f'{e}') 345 346 cpu_cores = ps.cpu_count() 347 cpu_min_freq = ps.cpu_freq().min 348 cpu_max_freq = ps.cpu_freq().max 349 350 # Memory information: RAM total, SWAP total 351 memory_total = ps.virtual_memory().total 352 swap_total = ps.swap_memory().total 353 354 # Disk IO: name 355 partitions: Dict[str, int] = {} 356 for disk in ps.disk_partitions(): 357 # Skip Docker's overlayFS 358 if disk.fstype and 'docker' not in disk.mountpoint: 359 total = ps.disk_usage(disk.mountpoint).total 360 partitions[disk.mountpoint] = total 361 362 # Network IO: name, speed, MTU 363 network_interfaces = ps.net_if_stats() 364 365 # Docker daemon: version, storage driver, cgroupfs 366 success, docker_info = self._docker.info() 367 if not success: 368 self._logger.error('Failed to retrieve Docker daemon information') 369 370 # Write machine information to disk 371 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 372 with open(case_info_file, 'w') as f: 373 f.write('===> CASE <===\n') 374 f.write(f'Name: {case_name}\n') 375 f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n') 376 f.write(f'Directory: {directory}\n') 377 f.write(f'Run: {run_id}\n') 378 f.write(f'Number of steps: {self._number_of_steps}\n') 379 f.write('\n') 380 f.write('===> HARDWARE <===\n') 381 f.write('System\n') 382 f.write(f'\tHostname: {system_hostname}\n') 383 f.write(f'\tOS name: {system_os_name}\n') 384 f.write(f'\tOS version: {system_os_version}\n') 385 f.write(f'\tKernel: {system_kernel}\n') 386 f.write(f'\tArchitecture: {system_architecture}\n') 387 f.write('CPU\n') 388 f.write(f'\tName: {cpu_name}\n') 389 f.write(f'\tCores: {cpu_cores}\n') 390 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 391 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 392 f.write('Memory\n') 393 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 394 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 395 f.write('Storage\n') 396 for disk_name, size in partitions.items(): 397 f.write(f'\tDisk "{disk_name}": ' 398 f'{round(size / 10 ** 9, 2)} GB\n') 399 f.write('Network\n') 400 for interface_name, stats in network_interfaces.items(): 401 speed = stats.speed 402 if speed == 0: 403 f.write(f'\tInterface "{interface_name}"\n') 404 else: 405 f.write(f'\tInterface "{interface_name}": {speed} mbps\n') 406 407 f.write('\n') 408 f.write('===> DOCKER <===\n') 409 f.write('Version: ' 410 f'{docker_info.get("ServerVersion", "UNKNOWN")}\n') 411 f.write('Root directory: ' 412 f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n') 413 f.write('Drivers:\n') 414 f.write('\tStorage: ' 415 f'{docker_info.get("Driver", "UNKNOWN")}\n') 416 f.write('\tCgroupFS: ' 417 f'{docker_info.get("CgroupDriver", "UNKNOWN")} ' 418 f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n') 419 420 # Set initial metric values and start collection thread 421 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 422 initial_timestamp = time() 423 initial_cpu = ps.cpu_times() 424 initial_ram = ps.virtual_memory().used 425 initial_swap = ps.swap_memory().used 426 initial_disk_io = ps.disk_io_counters() 427 initial_network_io = ps.net_io_counters() 428 self._thread: Thread = Thread(target=_collect_metrics, 429 daemon=True, 430 args=(self._stop_event, 431 case_name, 432 run_id, 433 metrics_path, 434 sample_interval, 435 initial_timestamp, 436 initial_cpu, 437 initial_ram, 438 initial_swap, 439 initial_disk_io, 440 initial_network_io)) 441 self._thread.start() 442 443 @property 444 def name(self): 445 """Name of the class: Collector""" 446 return self.__class__.__name__ 447 448 def next_step(self): 449 """Increment the step number by one. 450 451 The step number must always be equal or lower than the number of steps 452 in the case. 453 """ 454 global step_id 455 step_id += 1 456 457 msg = f'Step ({step_id}) is higher than number of steps ' + \ 458 f'({self._number_of_steps})' 459 assert (step_id <= self._number_of_steps), msg 460 461 def stop(self): 462 """End metrics collection. 463 464 Signal the metrics collection thread to stop collecting any metrics. 465 """ 466 self._stop_event.set()
CASE_INFO_FILE_NAME: str =
'case-info.txt'
METRICS_FILE_NAME: str =
'metrics.csv'
METRICS_VERSION: int =
3
FIELDNAMES: List[str] =
['name', 'run', 'index', 'step', 'timestamp', 'version', 'cpu_user', 'cpu_system', 'cpu_user_system', 'cpu_idle', 'cpu_iowait', 'memory_ram', 'memory_swap', 'memory_ram_swap', 'disk_read_count', 'disk_write_count', 'disk_read_bytes', 'disk_write_bytes', 'disk_read_time', 'disk_write_time', 'disk_busy_time', 'network_received_count', 'network_sent_count', 'network_received_bytes', 'network_sent_bytes', 'network_received_error', 'network_sent_error', 'network_received_drop', 'network_sent_drop']
ROUND: int =
4
step_id: int =
1
class
Collector:
255class Collector(): 256 """Collect metrics samples at a given interval for a run of a case.""" 257 258 def __init__(self, case_name: str, results_run_path: str, 259 sample_interval: float, number_of_steps: int, run_id: int, 260 directory: str, verbose: bool): 261 """ 262 Create an instance of the Collector class. 263 264 Instantiating this class will automatically generate a `case-info.txt` 265 file which describes the hardware used during collection of the 266 metrics. The file describes: 267 268 - **Case**: 269 - Name of the case. 270 - Timestamp when started. 271 - Directory of the case. 272 - Number of the run. 273 - Number of steps in a case. 274 - **Hardware**: 275 - CPU name. 276 - Number of CPU cores. 277 - Minimum and maximum CPU core frequency. 278 - Amount of RAM and SWAP memory 279 - Available disk storage. 280 - Available network interfaces and their link speed. 281 - **Docker**: 282 - Version of the Docker daemon 283 - Docker root directory 284 - Docker storage driver 285 - Docker CgroupFS driver and version 286 287 Parameters 288 ---------- 289 case_name : str 290 Name of the case being executed. 291 results_run_path : str 292 Path to the results directory of the run currently being executed. 293 sample_interval : float 294 Sample interval in seconds for collecting metrics. 295 number_of_steps : int 296 The number of steps of the case that is being executed. 297 run_id : int 298 The number of the run that is being executed. 299 directory : str 300 Path to the directory to store logs. 301 verbose : bool 302 Enable verbose logs. 303 """ 304 305 self._started: bool = False 306 self._data_path: str = os.path.abspath(results_run_path) 307 self._number_of_steps: int = number_of_steps 308 self._stop_event: Event = Event() 309 self._logger = Logger(__name__, directory, verbose) 310 self._docker = Docker(self._logger) 311 312 # Only Linux is supported 313 if platform.system() != 'Linux': 314 msg = f'"{platform.system()} is not supported as OS' 315 self._logger.error(msg) 316 raise ValueError(msg) 317 318 # Initialize step ID 319 global step_id 320 step_id = 1 321 322 # System information: OS, kernel, architecture 323 system_hostname = 'UNKNOWN' 324 system_os_name = 'UNKNOWN' 325 system_os_version = 'UNKNOWN' 326 try: 327 system_os_name = platform.freedesktop_os_release()['NAME'] 328 system_os_version = platform.freedesktop_os_release()['VERSION'] 329 except (OSError, KeyError): 330 self._logger.warning('Cannot extract Freedesktop OS release data') 331 system_hostname = platform.node() 332 system_kernel = platform.platform() 333 system_architecture = platform.uname().machine 334 335 # CPU information: name, max frequency, core count 336 cpu_name = 'UNKNOWN' 337 try: 338 raw = run(['lscpu'], capture_output=True) 339 for line in raw.stdout.decode('utf-8').split('\n'): 340 if 'Model name:' in line: 341 cpu_name = line.split(':')[1].strip() 342 break 343 except CalledProcessError as e: 344 self._logger.warning('Unable to determine CPU processor name: ' 345 f'{e}') 346 347 cpu_cores = ps.cpu_count() 348 cpu_min_freq = ps.cpu_freq().min 349 cpu_max_freq = ps.cpu_freq().max 350 351 # Memory information: RAM total, SWAP total 352 memory_total = ps.virtual_memory().total 353 swap_total = ps.swap_memory().total 354 355 # Disk IO: name 356 partitions: Dict[str, int] = {} 357 for disk in ps.disk_partitions(): 358 # Skip Docker's overlayFS 359 if disk.fstype and 'docker' not in disk.mountpoint: 360 total = ps.disk_usage(disk.mountpoint).total 361 partitions[disk.mountpoint] = total 362 363 # Network IO: name, speed, MTU 364 network_interfaces = ps.net_if_stats() 365 366 # Docker daemon: version, storage driver, cgroupfs 367 success, docker_info = self._docker.info() 368 if not success: 369 self._logger.error('Failed to retrieve Docker daemon information') 370 371 # Write machine information to disk 372 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 373 with open(case_info_file, 'w') as f: 374 f.write('===> CASE <===\n') 375 f.write(f'Name: {case_name}\n') 376 f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n') 377 f.write(f'Directory: {directory}\n') 378 f.write(f'Run: {run_id}\n') 379 f.write(f'Number of steps: {self._number_of_steps}\n') 380 f.write('\n') 381 f.write('===> HARDWARE <===\n') 382 f.write('System\n') 383 f.write(f'\tHostname: {system_hostname}\n') 384 f.write(f'\tOS name: {system_os_name}\n') 385 f.write(f'\tOS version: {system_os_version}\n') 386 f.write(f'\tKernel: {system_kernel}\n') 387 f.write(f'\tArchitecture: {system_architecture}\n') 388 f.write('CPU\n') 389 f.write(f'\tName: {cpu_name}\n') 390 f.write(f'\tCores: {cpu_cores}\n') 391 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 392 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 393 f.write('Memory\n') 394 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 395 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 396 f.write('Storage\n') 397 for disk_name, size in partitions.items(): 398 f.write(f'\tDisk "{disk_name}": ' 399 f'{round(size / 10 ** 9, 2)} GB\n') 400 f.write('Network\n') 401 for interface_name, stats in network_interfaces.items(): 402 speed = stats.speed 403 if speed == 0: 404 f.write(f'\tInterface "{interface_name}"\n') 405 else: 406 f.write(f'\tInterface "{interface_name}": {speed} mbps\n') 407 408 f.write('\n') 409 f.write('===> DOCKER <===\n') 410 f.write('Version: ' 411 f'{docker_info.get("ServerVersion", "UNKNOWN")}\n') 412 f.write('Root directory: ' 413 f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n') 414 f.write('Drivers:\n') 415 f.write('\tStorage: ' 416 f'{docker_info.get("Driver", "UNKNOWN")}\n') 417 f.write('\tCgroupFS: ' 418 f'{docker_info.get("CgroupDriver", "UNKNOWN")} ' 419 f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n') 420 421 # Set initial metric values and start collection thread 422 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 423 initial_timestamp = time() 424 initial_cpu = ps.cpu_times() 425 initial_ram = ps.virtual_memory().used 426 initial_swap = ps.swap_memory().used 427 initial_disk_io = ps.disk_io_counters() 428 initial_network_io = ps.net_io_counters() 429 self._thread: Thread = Thread(target=_collect_metrics, 430 daemon=True, 431 args=(self._stop_event, 432 case_name, 433 run_id, 434 metrics_path, 435 sample_interval, 436 initial_timestamp, 437 initial_cpu, 438 initial_ram, 439 initial_swap, 440 initial_disk_io, 441 initial_network_io)) 442 self._thread.start() 443 444 @property 445 def name(self): 446 """Name of the class: Collector""" 447 return self.__class__.__name__ 448 449 def next_step(self): 450 """Increment the step number by one. 451 452 The step number must always be equal or lower than the number of steps 453 in the case. 454 """ 455 global step_id 456 step_id += 1 457 458 msg = f'Step ({step_id}) is higher than number of steps ' + \ 459 f'({self._number_of_steps})' 460 assert (step_id <= self._number_of_steps), msg 461 462 def stop(self): 463 """End metrics collection. 464 465 Signal the metrics collection thread to stop collecting any metrics. 466 """ 467 self._stop_event.set()
Collect metrics samples at a given interval for a run of a case.
Collector( case_name: str, results_run_path: str, sample_interval: float, number_of_steps: int, run_id: int, directory: str, verbose: bool)
258 def __init__(self, case_name: str, results_run_path: str, 259 sample_interval: float, number_of_steps: int, run_id: int, 260 directory: str, verbose: bool): 261 """ 262 Create an instance of the Collector class. 263 264 Instantiating this class will automatically generate a `case-info.txt` 265 file which describes the hardware used during collection of the 266 metrics. The file describes: 267 268 - **Case**: 269 - Name of the case. 270 - Timestamp when started. 271 - Directory of the case. 272 - Number of the run. 273 - Number of steps in a case. 274 - **Hardware**: 275 - CPU name. 276 - Number of CPU cores. 277 - Minimum and maximum CPU core frequency. 278 - Amount of RAM and SWAP memory 279 - Available disk storage. 280 - Available network interfaces and their link speed. 281 - **Docker**: 282 - Version of the Docker daemon 283 - Docker root directory 284 - Docker storage driver 285 - Docker CgroupFS driver and version 286 287 Parameters 288 ---------- 289 case_name : str 290 Name of the case being executed. 291 results_run_path : str 292 Path to the results directory of the run currently being executed. 293 sample_interval : float 294 Sample interval in seconds for collecting metrics. 295 number_of_steps : int 296 The number of steps of the case that is being executed. 297 run_id : int 298 The number of the run that is being executed. 299 directory : str 300 Path to the directory to store logs. 301 verbose : bool 302 Enable verbose logs. 303 """ 304 305 self._started: bool = False 306 self._data_path: str = os.path.abspath(results_run_path) 307 self._number_of_steps: int = number_of_steps 308 self._stop_event: Event = Event() 309 self._logger = Logger(__name__, directory, verbose) 310 self._docker = Docker(self._logger) 311 312 # Only Linux is supported 313 if platform.system() != 'Linux': 314 msg = f'"{platform.system()} is not supported as OS' 315 self._logger.error(msg) 316 raise ValueError(msg) 317 318 # Initialize step ID 319 global step_id 320 step_id = 1 321 322 # System information: OS, kernel, architecture 323 system_hostname = 'UNKNOWN' 324 system_os_name = 'UNKNOWN' 325 system_os_version = 'UNKNOWN' 326 try: 327 system_os_name = platform.freedesktop_os_release()['NAME'] 328 system_os_version = platform.freedesktop_os_release()['VERSION'] 329 except (OSError, KeyError): 330 self._logger.warning('Cannot extract Freedesktop OS release data') 331 system_hostname = platform.node() 332 system_kernel = platform.platform() 333 system_architecture = platform.uname().machine 334 335 # CPU information: name, max frequency, core count 336 cpu_name = 'UNKNOWN' 337 try: 338 raw = run(['lscpu'], capture_output=True) 339 for line in raw.stdout.decode('utf-8').split('\n'): 340 if 'Model name:' in line: 341 cpu_name = line.split(':')[1].strip() 342 break 343 except CalledProcessError as e: 344 self._logger.warning('Unable to determine CPU processor name: ' 345 f'{e}') 346 347 cpu_cores = ps.cpu_count() 348 cpu_min_freq = ps.cpu_freq().min 349 cpu_max_freq = ps.cpu_freq().max 350 351 # Memory information: RAM total, SWAP total 352 memory_total = ps.virtual_memory().total 353 swap_total = ps.swap_memory().total 354 355 # Disk IO: name 356 partitions: Dict[str, int] = {} 357 for disk in ps.disk_partitions(): 358 # Skip Docker's overlayFS 359 if disk.fstype and 'docker' not in disk.mountpoint: 360 total = ps.disk_usage(disk.mountpoint).total 361 partitions[disk.mountpoint] = total 362 363 # Network IO: name, speed, MTU 364 network_interfaces = ps.net_if_stats() 365 366 # Docker daemon: version, storage driver, cgroupfs 367 success, docker_info = self._docker.info() 368 if not success: 369 self._logger.error('Failed to retrieve Docker daemon information') 370 371 # Write machine information to disk 372 case_info_file = os.path.join(self._data_path, CASE_INFO_FILE_NAME) 373 with open(case_info_file, 'w') as f: 374 f.write('===> CASE <===\n') 375 f.write(f'Name: {case_name}\n') 376 f.write(f'Timestamp: {datetime.now(timezone.utc).isoformat()}\n') 377 f.write(f'Directory: {directory}\n') 378 f.write(f'Run: {run_id}\n') 379 f.write(f'Number of steps: {self._number_of_steps}\n') 380 f.write('\n') 381 f.write('===> HARDWARE <===\n') 382 f.write('System\n') 383 f.write(f'\tHostname: {system_hostname}\n') 384 f.write(f'\tOS name: {system_os_name}\n') 385 f.write(f'\tOS version: {system_os_version}\n') 386 f.write(f'\tKernel: {system_kernel}\n') 387 f.write(f'\tArchitecture: {system_architecture}\n') 388 f.write('CPU\n') 389 f.write(f'\tName: {cpu_name}\n') 390 f.write(f'\tCores: {cpu_cores}\n') 391 f.write(f'\tMinimum frequency: {int(cpu_min_freq)} Hz\n') 392 f.write(f'\tMaximum frequency: {int(cpu_max_freq)} Hz\n') 393 f.write('Memory\n') 394 f.write(f'\tRAM memory: {int(memory_total / 10 ** 6)} MB\n') 395 f.write(f'\tSWAP memory: {int(swap_total / 10 ** 6)} MB\n') 396 f.write('Storage\n') 397 for disk_name, size in partitions.items(): 398 f.write(f'\tDisk "{disk_name}": ' 399 f'{round(size / 10 ** 9, 2)} GB\n') 400 f.write('Network\n') 401 for interface_name, stats in network_interfaces.items(): 402 speed = stats.speed 403 if speed == 0: 404 f.write(f'\tInterface "{interface_name}"\n') 405 else: 406 f.write(f'\tInterface "{interface_name}": {speed} mbps\n') 407 408 f.write('\n') 409 f.write('===> DOCKER <===\n') 410 f.write('Version: ' 411 f'{docker_info.get("ServerVersion", "UNKNOWN")}\n') 412 f.write('Root directory: ' 413 f'{docker_info.get("DockerRootDir", "UNKNOWN")}\n') 414 f.write('Drivers:\n') 415 f.write('\tStorage: ' 416 f'{docker_info.get("Driver", "UNKNOWN")}\n') 417 f.write('\tCgroupFS: ' 418 f'{docker_info.get("CgroupDriver", "UNKNOWN")} ' 419 f'v{docker_info.get("CgroupVersion", "UNKNOWN")}\n') 420 421 # Set initial metric values and start collection thread 422 metrics_path = os.path.join(results_run_path, METRICS_FILE_NAME) 423 initial_timestamp = time() 424 initial_cpu = ps.cpu_times() 425 initial_ram = ps.virtual_memory().used 426 initial_swap = ps.swap_memory().used 427 initial_disk_io = ps.disk_io_counters() 428 initial_network_io = ps.net_io_counters() 429 self._thread: Thread = Thread(target=_collect_metrics, 430 daemon=True, 431 args=(self._stop_event, 432 case_name, 433 run_id, 434 metrics_path, 435 sample_interval, 436 initial_timestamp, 437 initial_cpu, 438 initial_ram, 439 initial_swap, 440 initial_disk_io, 441 initial_network_io)) 442 self._thread.start()
Create an instance of the Collector class.
Instantiating this class will automatically generate a case-info.txt
file which describes the hardware used during collection of the
metrics. The file describes:
- Case:
- Name of the case.
- Timestamp when started.
- Directory of the case.
- Number of the run.
- Number of steps in a case.
- Hardware:
- CPU name.
- Number of CPU cores.
- Minimum and maximum CPU core frequency.
- Amount of RAM and SWAP memory
- Available disk storage.
- Available network interfaces and their link speed.
- Docker:
- Version of the Docker daemon
- Docker root directory
- Docker storage driver
- Docker CgroupFS driver and version
Parameters
- case_name (str): Name of the case being executed.
- results_run_path (str): Path to the results directory of the run currently being executed.
- sample_interval (float): Sample interval in seconds for collecting metrics.
- number_of_steps (int): The number of steps of the case that is being executed.
- run_id (int): The number of the run that is being executed.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
name
444 @property 445 def name(self): 446 """Name of the class: Collector""" 447 return self.__class__.__name__
Name of the class: Collector
def
next_step(self):
449 def next_step(self): 450 """Increment the step number by one. 451 452 The step number must always be equal or lower than the number of steps 453 in the case. 454 """ 455 global step_id 456 step_id += 1 457 458 msg = f'Step ({step_id}) is higher than number of steps ' + \ 459 f'({self._number_of_steps})' 460 assert (step_id <= self._number_of_steps), msg
Increment the step number by one.
The step number must always be equal or lower than the number of steps in the case.
class
scputimes(builtins.tuple):
scputimes()
Inherited Members
- builtins.tuple
- index
- count
class
sswap(builtins.tuple):
sswap()
Inherited Members
- builtins.tuple
- index
- count
class
svmem(builtins.tuple):
svmem()
Inherited Members
- builtins.tuple
- index
- count
class
sdiskio(builtins.tuple):
sdiskio()
Inherited Members
- builtins.tuple
- index
- count
class
snetio(builtins.tuple):
snetio()
Inherited Members
- builtins.tuple
- index
- count