bench_executor.stats
This module holds the Stats class which is responsible for generating
staticstics from executed cases. It will automatically aggregate all runs of an
executed case to generate an aggregated.csv
and summary.csv
files which can
be used to compare various cases with each other.
aggregated.csv
: For each run of a case, the median execution time of each step is calculated. For each step, the results of the run with the median execution time is used to assemble the aggregated results.summary.csv
: The summary is similar to the previous file, but provides a single result for each step to immediately see how long the step took, how many samples are provided for the step, etc.
1#!/usr/bin/env python3 2""" 3This module holds the Stats class which is responsible for generating 4staticstics from executed cases. It will automatically aggregate all runs of an 5executed case to generate an `aggregated.csv` and `summary.csv` files which can 6be used to compare various cases with each other. 7 8- `aggregated.csv`: For each run of a case, the median execution time of each 9 step is calculated. For each step, the results of the run with the median 10 execution time is used to assemble the aggregated results. 11- `summary.csv`: The summary is similar to the previous file, but provides a 12 single result for each step to immediately see how long the step took, how 13 many samples are provided for the step, etc. 14""" 15 16import os 17import psutil 18from glob import glob 19from statistics import median, stdev, mean 20from csv import DictWriter, DictReader 21from typing import List, Optional 22from bench_executor.collector import FIELDNAMES, METRICS_FILE_NAME 23from bench_executor.logger import Logger 24 25METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv' 26METRICS_SUMMARY_FILE_NAME = 'summary.csv' 27METRICS_STATS_FILE_NAME = 'stats.csv' 28FIELDNAMES_STRING = ['name'] 29FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle', 30 'cpu_iowait', 'cpu_user_system'] 31FIELDNAMES_INT = ['run', 'index', 'step', 'version', 'memory_ram', 32 'memory_swap', 'memory_ram_swap', 'disk_read_count', 33 'disk_write_count', 'disk_read_bytes', 'disk_write_bytes', 34 'disk_read_time', 'disk_write_time', 'disk_busy_time', 35 'network_received_count', 'network_sent_count', 36 'network_received_bytes', 'network_sent_bytes', 37 'network_received_error', 'network_sent_error', 38 'network_received_drop', 'network_sent_drop'] 39FIELDNAMES_SUMMARY = [ 40 'name', 41 'run', 42 'number_of_samples', 43 'step', 44 'duration', 45 'version', 46 'cpu_user_diff', 47 'cpu_system_diff', 48 'cpu_user_system_diff', 49 'cpu_idle_diff', 50 'cpu_iowait_diff', 51 'memory_ram_max', 52 'memory_swap_max', 53 'memory_ram_swap_max', 54 'memory_ram_min', 55 'memory_swap_min', 56 'memory_ram_swap_min', 57 'disk_read_count_diff', 58 'disk_write_count_diff', 59 'disk_read_bytes_diff', 60 'disk_write_bytes_diff', 61 'disk_read_time_diff', 62 'disk_write_time_diff', 63 'disk_busy_time_diff', 64 'network_received_count_diff', 65 'network_sent_count_diff', 66 'network_received_bytes_diff', 67 'network_sent_bytes_diff', 68 'network_received_error_diff', 69 'network_sent_error_diff', 70 'network_received_drop_diff', 71 'network_sent_drop_diff' 72] 73ROUND = 4 74 75# 76# Generate stats from the result runs by aggregating it on 77# median execution time for each step. Processing is done for each step per 78# run and unnecessary values are skipped to reduce the memory consumption. 79# 80# The median run is available in 'aggregated.csv' while a summarized version 81# which only reports the diff or max value of each step in 'summary.csv' 82# 83 84 85class Stats(): 86 """Generate statistics for an executed case.""" 87 88 def __init__(self, results_path: str, number_of_steps: int, 89 directory: str, verbose: bool): 90 """Create an instance of the Stats class. 91 92 Parameters 93 ---------- 94 results_path : str 95 The path to the results directory of the case 96 number_of_steps : int 97 The number of steps of the case 98 directory : str 99 The path to the directory where the logs must be stored. 100 verbose : bool 101 Enable verbose logs. 102 """ 103 self._results_path: str = os.path.abspath(results_path) 104 self._number_of_steps: int = number_of_steps 105 self._logger = Logger(__name__, directory, verbose) 106 self._parsed_data: dict = {} 107 108 if not os.path.exists(results_path): 109 msg = f'Results do not exist: {results_path}' 110 self._logger.error(msg) 111 raise ValueError(msg) 112 113 def _parse_field(self, field, value): 114 """Parse the field of the metrics field in a Python data type.""" 115 try: 116 if field in FIELDNAMES_FLOAT: 117 return float(value) 118 elif field in FIELDNAMES_INT: 119 return int(value) 120 elif field in FIELDNAMES_STRING: 121 return str(value) 122 else: 123 msg = f'Field "{field}" type is unknown' 124 self._logger.error(msg) 125 raise ValueError(msg) 126 except TypeError: 127 return -1 128 129 def _parse_v2(self, run_path: str, fields: list = FIELDNAMES, 130 step: Optional[int] = None): 131 """Parse the CSV metrics file in v2 format.""" 132 data = [] 133 134 # Drop cache if memory usage is too high 135 used_memory = psutil.virtual_memory().percent 136 if used_memory > 85.0: 137 self._logger.debug('Releasing memory of cache...') 138 del self._parsed_data 139 self._parsed_data = {} 140 141 # Pull data from cache if available 142 if run_path in self._parsed_data: 143 if step is not None: 144 return list(filter(lambda x: x['step'] == step, 145 self._parsed_data[run_path])) 146 return self._parsed_data[run_path] 147 148 metrics_file = os.path.join(run_path, METRICS_FILE_NAME) 149 if not os.path.exists(metrics_file): 150 self._logger.error(f'Metrics file "{metrics_file}" does not exist') 151 return [] 152 153 # Filter the fields we want from above, this way we don't load all 154 # the data in memory during processing. 155 self._logger.debug('Reading metrics file...') 156 with open(metrics_file, 'r') as f: 157 reader = DictReader(f) 158 for line in reader: 159 corrupt: bool = False 160 161 # Filter on field names 162 filtered: dict = {} 163 for key in fields: 164 if key in line: 165 filtered[key] = line[key] 166 167 entry = {} 168 for key, value in filtered.items(): 169 v = self._parse_field(key, value) 170 if v == -1: 171 corrupt = True 172 msg = f'Corrupt entry {key} with value {value} in ' + \ 173 f'{metrics_file}, skipped' 174 self._logger.info(msg) 175 break 176 177 entry[key] = v 178 179 if not corrupt: 180 data.append(entry) 181 182 self._parsed_data[run_path] = data 183 if step is not None: 184 return list(filter(lambda x: x['step'] == step, data)) 185 186 return data 187 188 def statistics(self) -> bool: 189 """Calculate basic statistics on the steps by aggregating them from 190 all runs and applying standard deviation, median, min, max, mean for 191 each measured metric. 192 193 Returns 194 ------- 195 success : bool 196 Whether the standard deviation calculation was successfully or not. 197 """ 198 summary_by_step: dict = {} 199 stats: list = [] 200 201 for run_path in glob(f'{self._results_path}/run_*/'): 202 for step_index in range(self._number_of_steps): 203 step_data = self._parse_v2(run_path, 204 step=step_index + 1) 205 # If a step failed and no data is available, do not crash 206 if not step_data: 207 continue 208 209 for field in FIELDNAMES: 210 if f'step_{step_index}' not in summary_by_step: 211 summary_by_step[f'step_{step_index}'] = { 212 'step': step_index, 213 'name': None, 214 'version': None, 215 'duration': [], 216 'number_of_samples': [], 217 } 218 sbs = summary_by_step[f'step_{step_index}'] 219 220 # Some fields are not present on v2 while they are in v3+ 221 if field not in step_data[0]: 222 continue 223 224 if 'memory' in field: 225 if f'{field}_min' not in sbs: 226 sbs[f'{field}_min'] = [] 227 if f'{field}_max' not in sbs: 228 sbs[f'{field}_max'] = [] 229 elif not any(name in field for name in ['index', 'version', 230 'step', 'name', 231 'run', 232 'timestamp']): 233 if f'{field}_diff' not in sbs: 234 sbs[f'{field}_diff'] = [] 235 236 # Report max memory peak for this step 237 if 'memory' in field: 238 values = [] 239 for data in step_data: 240 values.append(data[field]) 241 sbs[f'{field}_min'].append(min(values)) 242 sbs[f'{field}_max'].append(max(values)) 243 # Skip fields which are not applicable 244 elif field in ['run']: 245 continue 246 # Leave some fields like they are 247 elif field in ['version', 'step', 'name']: 248 sbs[field] = step_data[0][field] 249 # All other fields are accumulated data values for which we 250 # report the diff for the step 251 else: 252 first = step_data[0][field] 253 last = step_data[-1][field] 254 diff = round(last - first, ROUND) 255 if field == 'index': 256 # diff will be 0 for 1 sample, but we have this 257 # sample, so include it 258 sbs['number_of_samples'].append(diff + 1) 259 elif field == 'timestamp': 260 sbs['duration'].append(diff) 261 else: 262 sbs[f'{field}_diff'].append(diff) 263 264 stats_fieldnames = [] 265 for step in summary_by_step: 266 stats_step = {} 267 for field in summary_by_step[step]: 268 if any(name in field for name in ['index', 'version', 'step', 269 'name', 'run', 'timestamp']): 270 stats_step[field] = summary_by_step[step][field] 271 if field not in stats_fieldnames: 272 stats_fieldnames.append(field) 273 continue 274 275 if f'{field}_median' not in stats_fieldnames: 276 stats_fieldnames.append(f'{field}_median') 277 stats_fieldnames.append(f'{field}_average') 278 stats_fieldnames.append(f'{field}_max') 279 stats_fieldnames.append(f'{field}_min') 280 stats_fieldnames.append(f'{field}_stdev') 281 stats_fieldnames.append(f'{field}_values') 282 283 try: 284 field_values = summary_by_step[step][field] 285 stats_step[f'{field}_median'] = median(field_values) 286 stats_step[f'{field}_average'] = mean(field_values) 287 stats_step[f'{field}_max'] = max(field_values) 288 stats_step[f'{field}_min'] = min(field_values) 289 stats_step[f'{field}_stdev'] = stdev(field_values) 290 stats_step[f'{field}_values'] = field_values 291 except Exception as e: 292 print(step, field, summary_by_step[step][field]) 293 self._logger.error(f'Generating stats failed: {e}') 294 stats.append(stats_step) 295 296 stats_file = os.path.join(self._results_path, 297 METRICS_STATS_FILE_NAME) 298 self._logger.debug('Generated stats') 299 300 with open(stats_file, 'w') as f: 301 writer = DictWriter(f, fieldnames=stats_fieldnames) 302 writer.writeheader() 303 for step in stats: 304 writer.writerow(step) 305 306 return True 307 308 def aggregate(self) -> bool: 309 """Aggregate the metrics of the different runs of a case. 310 311 Find the median execution time of each step across all runs and extract 312 the step from the run which has this median execution time to assemble 313 an aggregated version and summary version of the case's metrics. 314 315 Returns 316 ------- 317 success : bool 318 Whether the aggregation was successfully or not. 319 """ 320 # Find each median step of all runs before extracting more data for 321 # memory consumption reasons 322 runs = [] 323 for run_path in glob(f'{self._results_path}/run_*/'): 324 # Extract run number 325 try: 326 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 327 run_id: int = int(run_folder.replace('run_', '')) 328 except ValueError: 329 self._logger.error(f'Run "{run_id}" is not a number') 330 return False 331 332 # Extract steps and timestamps of this run. 333 # v3 is the same as v2 with an additional field 334 data = self._parse_v2(run_path) 335 self._logger.debug(f'Parsed metrics of run {run_id}') 336 337 # Calculate timestamp diff for each step 338 step = 1 339 timestamps = [] 340 step_end = 0.0 341 step_begin = 0.0 342 for entry in data: 343 entry_step = entry['step'] 344 assert (entry_step >= step), 'Entry step decreased over time' 345 346 # Next step 347 if entry_step > step: 348 if entry_step - step > 1: 349 self._logger.warning(f"{entry_step - step} step(s) " 350 "are missing between steps " 351 f"[{step},{entry_step}]. " 352 "Try increasing the sample time " 353 "and re-run.") 354 # Calculate diff of current step if at least 2 entries 355 # for the step exist, if not the diff is 0.0 and we fall 356 # back to the step_begin timestamp which will make sure we 357 # use the run with the timestamp that is the median of all 358 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 359 # 5.0 is the median. 360 diff = step_end - step_begin 361 if diff == 0.0: 362 self._logger.warning(f'Only 1 entry for step {step} ' 363 f'found, falling back to median ' 364 f'timestamp instead of diff') 365 diff = step_begin 366 367 timestamps.append(diff) 368 369 # Reset for next step 370 step = entry_step 371 step_begin = entry['timestamp'] 372 step_end = entry['timestamp'] 373 # step_end keeps increasing until the step changes 374 else: 375 step_end = entry['timestamp'] 376 # Final step does not cause an increment, add manually 377 timestamps.append(step_end - step_begin) 378 runs.append((run_id, timestamps)) 379 380 self._logger.debug('Timestamp difference between steps calculated') 381 382 # Statistics rely on uneven number of runs 383 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 384 385 # Runs are unsorted as glob does not have a fixed order, sort them 386 # based on run number in tuple 387 runs.sort(key=lambda element: element[0]) 388 self._logger.debug('Sorting runs complete') 389 390 # Find median for each step across runs 391 timestamps_by_step: List[List[float]] = [] 392 for step_index in range(self._number_of_steps): 393 timestamps_by_step.append([]) 394 395 for run in runs: 396 run_id = run[0] 397 timestamps = run[1] 398 399 # Do not process incomplete runs 400 if (len(timestamps) != self._number_of_steps): 401 msg = f'Number of steps ({self._number_of_steps}) does ' + \ 402 'not match with extracted steps of ' + \ 403 f'run ({len(timestamps)}). Skipping run {run_id}' 404 self._logger.warning(msg) 405 continue 406 407 # Create list of timestamps for each step from all runs 408 for step_index in range(self._number_of_steps): 409 timestamps_by_step[step_index].append(timestamps[step_index]) 410 self._logger.debug('Extracted median') 411 412 # Create a list of our steps with the run_id which has the median value 413 # for that step 414 aggregated_entries = [] 415 summary_entries = [] 416 index_number = 1 417 for step_index, step_timestamps in enumerate(timestamps_by_step): 418 # If we do not have a single timestamp for a step, we cannot 419 # process the data. This can happen when the steps are processed 420 # faster than the configured sample time. 421 if not step_timestamps: 422 self._logger.error("Unable to aggregate because some steps " 423 "have no measurements") 424 return False 425 426 # We ensure that the number of runs is always uneven so the median 427 # is always a measured data point instead of the average of 2 data 428 # points with even number of runs 429 try: 430 median_run_id = timestamps_by_step[step_index] \ 431 .index(median(step_timestamps)) + 1 432 except ValueError: 433 continue 434 median_run_path = os.path.join(self._results_path, 435 f'run_{median_run_id}') 436 median_step_data = self._parse_v2(median_run_path, 437 step=step_index + 1) 438 439 # Rewrite indexes to match new number of samples 440 for entry in median_step_data: 441 entry['index'] = index_number 442 443 aggregated_entries.append(entry) 444 index_number += 1 445 self._logger.debug('Generated median run from steps') 446 447 # Summary data of a step: diff per step 448 for step_index, step_timestamps in enumerate(timestamps_by_step): 449 summary = {} 450 try: 451 median_run_id = timestamps_by_step[step_index] \ 452 .index(median(step_timestamps)) + 1 453 except ValueError: 454 continue 455 median_run_path = os.path.join(self._results_path, 456 f'run_{median_run_id}') 457 median_step_data = self._parse_v2(median_run_path, 458 step=step_index + 1) 459 # If a step failed and no data is available, do not crash 460 if not median_step_data: 461 continue 462 463 for field in FIELDNAMES: 464 # Some fields are not present on v2 while they are in v3+ 465 if field not in median_step_data[0]: 466 continue 467 468 # Report max memory peak for this step 469 if 'memory' in field: 470 values = [] 471 for data in median_step_data: 472 values.append(data[field]) 473 summary[f'{field}_min'] = min(values) 474 summary[f'{field}_max'] = max(values) 475 # Leave some fields like they are 476 elif field in ['version', 'step', 'name', 'run']: 477 summary[field] = median_step_data[0][field] 478 # All other fields are accumulated data values for which we 479 # report the diff for the step 480 else: 481 first = median_step_data[0][field] 482 last = median_step_data[-1][field] 483 diff = round(last - first, ROUND) 484 if field == 'index': 485 # diff will be 0 for 1 sample, but we have this sample, 486 # so include it 487 summary['number_of_samples'] = diff + 1 488 elif field == 'timestamp': 489 summary['duration'] = diff 490 else: 491 summary[f'{field}_diff'] = diff 492 summary_entries.append(summary) 493 494 aggregated_file = os.path.join(self._results_path, 495 METRICS_AGGREGATED_FILE_NAME) 496 summary_file = os.path.join(self._results_path, 497 METRICS_SUMMARY_FILE_NAME) 498 self._logger.debug('Generated summary') 499 500 # Store aggregated data 501 with open(aggregated_file, 'w') as f: 502 writer = DictWriter(f, fieldnames=FIELDNAMES) 503 writer.writeheader() 504 for entry in aggregated_entries: 505 writer.writerow(entry) 506 507 # Store summary data 508 with open(summary_file, 'w') as f: 509 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 510 writer.writeheader() 511 for entry in summary_entries: 512 writer.writerow(entry) 513 self._logger.debug('Wrote results') 514 515 return True
86class Stats(): 87 """Generate statistics for an executed case.""" 88 89 def __init__(self, results_path: str, number_of_steps: int, 90 directory: str, verbose: bool): 91 """Create an instance of the Stats class. 92 93 Parameters 94 ---------- 95 results_path : str 96 The path to the results directory of the case 97 number_of_steps : int 98 The number of steps of the case 99 directory : str 100 The path to the directory where the logs must be stored. 101 verbose : bool 102 Enable verbose logs. 103 """ 104 self._results_path: str = os.path.abspath(results_path) 105 self._number_of_steps: int = number_of_steps 106 self._logger = Logger(__name__, directory, verbose) 107 self._parsed_data: dict = {} 108 109 if not os.path.exists(results_path): 110 msg = f'Results do not exist: {results_path}' 111 self._logger.error(msg) 112 raise ValueError(msg) 113 114 def _parse_field(self, field, value): 115 """Parse the field of the metrics field in a Python data type.""" 116 try: 117 if field in FIELDNAMES_FLOAT: 118 return float(value) 119 elif field in FIELDNAMES_INT: 120 return int(value) 121 elif field in FIELDNAMES_STRING: 122 return str(value) 123 else: 124 msg = f'Field "{field}" type is unknown' 125 self._logger.error(msg) 126 raise ValueError(msg) 127 except TypeError: 128 return -1 129 130 def _parse_v2(self, run_path: str, fields: list = FIELDNAMES, 131 step: Optional[int] = None): 132 """Parse the CSV metrics file in v2 format.""" 133 data = [] 134 135 # Drop cache if memory usage is too high 136 used_memory = psutil.virtual_memory().percent 137 if used_memory > 85.0: 138 self._logger.debug('Releasing memory of cache...') 139 del self._parsed_data 140 self._parsed_data = {} 141 142 # Pull data from cache if available 143 if run_path in self._parsed_data: 144 if step is not None: 145 return list(filter(lambda x: x['step'] == step, 146 self._parsed_data[run_path])) 147 return self._parsed_data[run_path] 148 149 metrics_file = os.path.join(run_path, METRICS_FILE_NAME) 150 if not os.path.exists(metrics_file): 151 self._logger.error(f'Metrics file "{metrics_file}" does not exist') 152 return [] 153 154 # Filter the fields we want from above, this way we don't load all 155 # the data in memory during processing. 156 self._logger.debug('Reading metrics file...') 157 with open(metrics_file, 'r') as f: 158 reader = DictReader(f) 159 for line in reader: 160 corrupt: bool = False 161 162 # Filter on field names 163 filtered: dict = {} 164 for key in fields: 165 if key in line: 166 filtered[key] = line[key] 167 168 entry = {} 169 for key, value in filtered.items(): 170 v = self._parse_field(key, value) 171 if v == -1: 172 corrupt = True 173 msg = f'Corrupt entry {key} with value {value} in ' + \ 174 f'{metrics_file}, skipped' 175 self._logger.info(msg) 176 break 177 178 entry[key] = v 179 180 if not corrupt: 181 data.append(entry) 182 183 self._parsed_data[run_path] = data 184 if step is not None: 185 return list(filter(lambda x: x['step'] == step, data)) 186 187 return data 188 189 def statistics(self) -> bool: 190 """Calculate basic statistics on the steps by aggregating them from 191 all runs and applying standard deviation, median, min, max, mean for 192 each measured metric. 193 194 Returns 195 ------- 196 success : bool 197 Whether the standard deviation calculation was successfully or not. 198 """ 199 summary_by_step: dict = {} 200 stats: list = [] 201 202 for run_path in glob(f'{self._results_path}/run_*/'): 203 for step_index in range(self._number_of_steps): 204 step_data = self._parse_v2(run_path, 205 step=step_index + 1) 206 # If a step failed and no data is available, do not crash 207 if not step_data: 208 continue 209 210 for field in FIELDNAMES: 211 if f'step_{step_index}' not in summary_by_step: 212 summary_by_step[f'step_{step_index}'] = { 213 'step': step_index, 214 'name': None, 215 'version': None, 216 'duration': [], 217 'number_of_samples': [], 218 } 219 sbs = summary_by_step[f'step_{step_index}'] 220 221 # Some fields are not present on v2 while they are in v3+ 222 if field not in step_data[0]: 223 continue 224 225 if 'memory' in field: 226 if f'{field}_min' not in sbs: 227 sbs[f'{field}_min'] = [] 228 if f'{field}_max' not in sbs: 229 sbs[f'{field}_max'] = [] 230 elif not any(name in field for name in ['index', 'version', 231 'step', 'name', 232 'run', 233 'timestamp']): 234 if f'{field}_diff' not in sbs: 235 sbs[f'{field}_diff'] = [] 236 237 # Report max memory peak for this step 238 if 'memory' in field: 239 values = [] 240 for data in step_data: 241 values.append(data[field]) 242 sbs[f'{field}_min'].append(min(values)) 243 sbs[f'{field}_max'].append(max(values)) 244 # Skip fields which are not applicable 245 elif field in ['run']: 246 continue 247 # Leave some fields like they are 248 elif field in ['version', 'step', 'name']: 249 sbs[field] = step_data[0][field] 250 # All other fields are accumulated data values for which we 251 # report the diff for the step 252 else: 253 first = step_data[0][field] 254 last = step_data[-1][field] 255 diff = round(last - first, ROUND) 256 if field == 'index': 257 # diff will be 0 for 1 sample, but we have this 258 # sample, so include it 259 sbs['number_of_samples'].append(diff + 1) 260 elif field == 'timestamp': 261 sbs['duration'].append(diff) 262 else: 263 sbs[f'{field}_diff'].append(diff) 264 265 stats_fieldnames = [] 266 for step in summary_by_step: 267 stats_step = {} 268 for field in summary_by_step[step]: 269 if any(name in field for name in ['index', 'version', 'step', 270 'name', 'run', 'timestamp']): 271 stats_step[field] = summary_by_step[step][field] 272 if field not in stats_fieldnames: 273 stats_fieldnames.append(field) 274 continue 275 276 if f'{field}_median' not in stats_fieldnames: 277 stats_fieldnames.append(f'{field}_median') 278 stats_fieldnames.append(f'{field}_average') 279 stats_fieldnames.append(f'{field}_max') 280 stats_fieldnames.append(f'{field}_min') 281 stats_fieldnames.append(f'{field}_stdev') 282 stats_fieldnames.append(f'{field}_values') 283 284 try: 285 field_values = summary_by_step[step][field] 286 stats_step[f'{field}_median'] = median(field_values) 287 stats_step[f'{field}_average'] = mean(field_values) 288 stats_step[f'{field}_max'] = max(field_values) 289 stats_step[f'{field}_min'] = min(field_values) 290 stats_step[f'{field}_stdev'] = stdev(field_values) 291 stats_step[f'{field}_values'] = field_values 292 except Exception as e: 293 print(step, field, summary_by_step[step][field]) 294 self._logger.error(f'Generating stats failed: {e}') 295 stats.append(stats_step) 296 297 stats_file = os.path.join(self._results_path, 298 METRICS_STATS_FILE_NAME) 299 self._logger.debug('Generated stats') 300 301 with open(stats_file, 'w') as f: 302 writer = DictWriter(f, fieldnames=stats_fieldnames) 303 writer.writeheader() 304 for step in stats: 305 writer.writerow(step) 306 307 return True 308 309 def aggregate(self) -> bool: 310 """Aggregate the metrics of the different runs of a case. 311 312 Find the median execution time of each step across all runs and extract 313 the step from the run which has this median execution time to assemble 314 an aggregated version and summary version of the case's metrics. 315 316 Returns 317 ------- 318 success : bool 319 Whether the aggregation was successfully or not. 320 """ 321 # Find each median step of all runs before extracting more data for 322 # memory consumption reasons 323 runs = [] 324 for run_path in glob(f'{self._results_path}/run_*/'): 325 # Extract run number 326 try: 327 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 328 run_id: int = int(run_folder.replace('run_', '')) 329 except ValueError: 330 self._logger.error(f'Run "{run_id}" is not a number') 331 return False 332 333 # Extract steps and timestamps of this run. 334 # v3 is the same as v2 with an additional field 335 data = self._parse_v2(run_path) 336 self._logger.debug(f'Parsed metrics of run {run_id}') 337 338 # Calculate timestamp diff for each step 339 step = 1 340 timestamps = [] 341 step_end = 0.0 342 step_begin = 0.0 343 for entry in data: 344 entry_step = entry['step'] 345 assert (entry_step >= step), 'Entry step decreased over time' 346 347 # Next step 348 if entry_step > step: 349 if entry_step - step > 1: 350 self._logger.warning(f"{entry_step - step} step(s) " 351 "are missing between steps " 352 f"[{step},{entry_step}]. " 353 "Try increasing the sample time " 354 "and re-run.") 355 # Calculate diff of current step if at least 2 entries 356 # for the step exist, if not the diff is 0.0 and we fall 357 # back to the step_begin timestamp which will make sure we 358 # use the run with the timestamp that is the median of all 359 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 360 # 5.0 is the median. 361 diff = step_end - step_begin 362 if diff == 0.0: 363 self._logger.warning(f'Only 1 entry for step {step} ' 364 f'found, falling back to median ' 365 f'timestamp instead of diff') 366 diff = step_begin 367 368 timestamps.append(diff) 369 370 # Reset for next step 371 step = entry_step 372 step_begin = entry['timestamp'] 373 step_end = entry['timestamp'] 374 # step_end keeps increasing until the step changes 375 else: 376 step_end = entry['timestamp'] 377 # Final step does not cause an increment, add manually 378 timestamps.append(step_end - step_begin) 379 runs.append((run_id, timestamps)) 380 381 self._logger.debug('Timestamp difference between steps calculated') 382 383 # Statistics rely on uneven number of runs 384 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 385 386 # Runs are unsorted as glob does not have a fixed order, sort them 387 # based on run number in tuple 388 runs.sort(key=lambda element: element[0]) 389 self._logger.debug('Sorting runs complete') 390 391 # Find median for each step across runs 392 timestamps_by_step: List[List[float]] = [] 393 for step_index in range(self._number_of_steps): 394 timestamps_by_step.append([]) 395 396 for run in runs: 397 run_id = run[0] 398 timestamps = run[1] 399 400 # Do not process incomplete runs 401 if (len(timestamps) != self._number_of_steps): 402 msg = f'Number of steps ({self._number_of_steps}) does ' + \ 403 'not match with extracted steps of ' + \ 404 f'run ({len(timestamps)}). Skipping run {run_id}' 405 self._logger.warning(msg) 406 continue 407 408 # Create list of timestamps for each step from all runs 409 for step_index in range(self._number_of_steps): 410 timestamps_by_step[step_index].append(timestamps[step_index]) 411 self._logger.debug('Extracted median') 412 413 # Create a list of our steps with the run_id which has the median value 414 # for that step 415 aggregated_entries = [] 416 summary_entries = [] 417 index_number = 1 418 for step_index, step_timestamps in enumerate(timestamps_by_step): 419 # If we do not have a single timestamp for a step, we cannot 420 # process the data. This can happen when the steps are processed 421 # faster than the configured sample time. 422 if not step_timestamps: 423 self._logger.error("Unable to aggregate because some steps " 424 "have no measurements") 425 return False 426 427 # We ensure that the number of runs is always uneven so the median 428 # is always a measured data point instead of the average of 2 data 429 # points with even number of runs 430 try: 431 median_run_id = timestamps_by_step[step_index] \ 432 .index(median(step_timestamps)) + 1 433 except ValueError: 434 continue 435 median_run_path = os.path.join(self._results_path, 436 f'run_{median_run_id}') 437 median_step_data = self._parse_v2(median_run_path, 438 step=step_index + 1) 439 440 # Rewrite indexes to match new number of samples 441 for entry in median_step_data: 442 entry['index'] = index_number 443 444 aggregated_entries.append(entry) 445 index_number += 1 446 self._logger.debug('Generated median run from steps') 447 448 # Summary data of a step: diff per step 449 for step_index, step_timestamps in enumerate(timestamps_by_step): 450 summary = {} 451 try: 452 median_run_id = timestamps_by_step[step_index] \ 453 .index(median(step_timestamps)) + 1 454 except ValueError: 455 continue 456 median_run_path = os.path.join(self._results_path, 457 f'run_{median_run_id}') 458 median_step_data = self._parse_v2(median_run_path, 459 step=step_index + 1) 460 # If a step failed and no data is available, do not crash 461 if not median_step_data: 462 continue 463 464 for field in FIELDNAMES: 465 # Some fields are not present on v2 while they are in v3+ 466 if field not in median_step_data[0]: 467 continue 468 469 # Report max memory peak for this step 470 if 'memory' in field: 471 values = [] 472 for data in median_step_data: 473 values.append(data[field]) 474 summary[f'{field}_min'] = min(values) 475 summary[f'{field}_max'] = max(values) 476 # Leave some fields like they are 477 elif field in ['version', 'step', 'name', 'run']: 478 summary[field] = median_step_data[0][field] 479 # All other fields are accumulated data values for which we 480 # report the diff for the step 481 else: 482 first = median_step_data[0][field] 483 last = median_step_data[-1][field] 484 diff = round(last - first, ROUND) 485 if field == 'index': 486 # diff will be 0 for 1 sample, but we have this sample, 487 # so include it 488 summary['number_of_samples'] = diff + 1 489 elif field == 'timestamp': 490 summary['duration'] = diff 491 else: 492 summary[f'{field}_diff'] = diff 493 summary_entries.append(summary) 494 495 aggregated_file = os.path.join(self._results_path, 496 METRICS_AGGREGATED_FILE_NAME) 497 summary_file = os.path.join(self._results_path, 498 METRICS_SUMMARY_FILE_NAME) 499 self._logger.debug('Generated summary') 500 501 # Store aggregated data 502 with open(aggregated_file, 'w') as f: 503 writer = DictWriter(f, fieldnames=FIELDNAMES) 504 writer.writeheader() 505 for entry in aggregated_entries: 506 writer.writerow(entry) 507 508 # Store summary data 509 with open(summary_file, 'w') as f: 510 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 511 writer.writeheader() 512 for entry in summary_entries: 513 writer.writerow(entry) 514 self._logger.debug('Wrote results') 515 516 return True
Generate statistics for an executed case.
89 def __init__(self, results_path: str, number_of_steps: int, 90 directory: str, verbose: bool): 91 """Create an instance of the Stats class. 92 93 Parameters 94 ---------- 95 results_path : str 96 The path to the results directory of the case 97 number_of_steps : int 98 The number of steps of the case 99 directory : str 100 The path to the directory where the logs must be stored. 101 verbose : bool 102 Enable verbose logs. 103 """ 104 self._results_path: str = os.path.abspath(results_path) 105 self._number_of_steps: int = number_of_steps 106 self._logger = Logger(__name__, directory, verbose) 107 self._parsed_data: dict = {} 108 109 if not os.path.exists(results_path): 110 msg = f'Results do not exist: {results_path}' 111 self._logger.error(msg) 112 raise ValueError(msg)
Create an instance of the Stats class.
Parameters
- results_path (str): The path to the results directory of the case
- number_of_steps (int): The number of steps of the case
- directory (str): The path to the directory where the logs must be stored.
- verbose (bool): Enable verbose logs.
189 def statistics(self) -> bool: 190 """Calculate basic statistics on the steps by aggregating them from 191 all runs and applying standard deviation, median, min, max, mean for 192 each measured metric. 193 194 Returns 195 ------- 196 success : bool 197 Whether the standard deviation calculation was successfully or not. 198 """ 199 summary_by_step: dict = {} 200 stats: list = [] 201 202 for run_path in glob(f'{self._results_path}/run_*/'): 203 for step_index in range(self._number_of_steps): 204 step_data = self._parse_v2(run_path, 205 step=step_index + 1) 206 # If a step failed and no data is available, do not crash 207 if not step_data: 208 continue 209 210 for field in FIELDNAMES: 211 if f'step_{step_index}' not in summary_by_step: 212 summary_by_step[f'step_{step_index}'] = { 213 'step': step_index, 214 'name': None, 215 'version': None, 216 'duration': [], 217 'number_of_samples': [], 218 } 219 sbs = summary_by_step[f'step_{step_index}'] 220 221 # Some fields are not present on v2 while they are in v3+ 222 if field not in step_data[0]: 223 continue 224 225 if 'memory' in field: 226 if f'{field}_min' not in sbs: 227 sbs[f'{field}_min'] = [] 228 if f'{field}_max' not in sbs: 229 sbs[f'{field}_max'] = [] 230 elif not any(name in field for name in ['index', 'version', 231 'step', 'name', 232 'run', 233 'timestamp']): 234 if f'{field}_diff' not in sbs: 235 sbs[f'{field}_diff'] = [] 236 237 # Report max memory peak for this step 238 if 'memory' in field: 239 values = [] 240 for data in step_data: 241 values.append(data[field]) 242 sbs[f'{field}_min'].append(min(values)) 243 sbs[f'{field}_max'].append(max(values)) 244 # Skip fields which are not applicable 245 elif field in ['run']: 246 continue 247 # Leave some fields like they are 248 elif field in ['version', 'step', 'name']: 249 sbs[field] = step_data[0][field] 250 # All other fields are accumulated data values for which we 251 # report the diff for the step 252 else: 253 first = step_data[0][field] 254 last = step_data[-1][field] 255 diff = round(last - first, ROUND) 256 if field == 'index': 257 # diff will be 0 for 1 sample, but we have this 258 # sample, so include it 259 sbs['number_of_samples'].append(diff + 1) 260 elif field == 'timestamp': 261 sbs['duration'].append(diff) 262 else: 263 sbs[f'{field}_diff'].append(diff) 264 265 stats_fieldnames = [] 266 for step in summary_by_step: 267 stats_step = {} 268 for field in summary_by_step[step]: 269 if any(name in field for name in ['index', 'version', 'step', 270 'name', 'run', 'timestamp']): 271 stats_step[field] = summary_by_step[step][field] 272 if field not in stats_fieldnames: 273 stats_fieldnames.append(field) 274 continue 275 276 if f'{field}_median' not in stats_fieldnames: 277 stats_fieldnames.append(f'{field}_median') 278 stats_fieldnames.append(f'{field}_average') 279 stats_fieldnames.append(f'{field}_max') 280 stats_fieldnames.append(f'{field}_min') 281 stats_fieldnames.append(f'{field}_stdev') 282 stats_fieldnames.append(f'{field}_values') 283 284 try: 285 field_values = summary_by_step[step][field] 286 stats_step[f'{field}_median'] = median(field_values) 287 stats_step[f'{field}_average'] = mean(field_values) 288 stats_step[f'{field}_max'] = max(field_values) 289 stats_step[f'{field}_min'] = min(field_values) 290 stats_step[f'{field}_stdev'] = stdev(field_values) 291 stats_step[f'{field}_values'] = field_values 292 except Exception as e: 293 print(step, field, summary_by_step[step][field]) 294 self._logger.error(f'Generating stats failed: {e}') 295 stats.append(stats_step) 296 297 stats_file = os.path.join(self._results_path, 298 METRICS_STATS_FILE_NAME) 299 self._logger.debug('Generated stats') 300 301 with open(stats_file, 'w') as f: 302 writer = DictWriter(f, fieldnames=stats_fieldnames) 303 writer.writeheader() 304 for step in stats: 305 writer.writerow(step) 306 307 return True
Calculate basic statistics on the steps by aggregating them from all runs and applying standard deviation, median, min, max, mean for each measured metric.
Returns
- success (bool): Whether the standard deviation calculation was successfully or not.
309 def aggregate(self) -> bool: 310 """Aggregate the metrics of the different runs of a case. 311 312 Find the median execution time of each step across all runs and extract 313 the step from the run which has this median execution time to assemble 314 an aggregated version and summary version of the case's metrics. 315 316 Returns 317 ------- 318 success : bool 319 Whether the aggregation was successfully or not. 320 """ 321 # Find each median step of all runs before extracting more data for 322 # memory consumption reasons 323 runs = [] 324 for run_path in glob(f'{self._results_path}/run_*/'): 325 # Extract run number 326 try: 327 run_folder: str = os.path.split(os.path.dirname(run_path))[-1] 328 run_id: int = int(run_folder.replace('run_', '')) 329 except ValueError: 330 self._logger.error(f'Run "{run_id}" is not a number') 331 return False 332 333 # Extract steps and timestamps of this run. 334 # v3 is the same as v2 with an additional field 335 data = self._parse_v2(run_path) 336 self._logger.debug(f'Parsed metrics of run {run_id}') 337 338 # Calculate timestamp diff for each step 339 step = 1 340 timestamps = [] 341 step_end = 0.0 342 step_begin = 0.0 343 for entry in data: 344 entry_step = entry['step'] 345 assert (entry_step >= step), 'Entry step decreased over time' 346 347 # Next step 348 if entry_step > step: 349 if entry_step - step > 1: 350 self._logger.warning(f"{entry_step - step} step(s) " 351 "are missing between steps " 352 f"[{step},{entry_step}]. " 353 "Try increasing the sample time " 354 "and re-run.") 355 # Calculate diff of current step if at least 2 entries 356 # for the step exist, if not the diff is 0.0 and we fall 357 # back to the step_begin timestamp which will make sure we 358 # use the run with the timestamp that is the median of all 359 # runs. For example: [4.5, 5.0, 6.5] will return run 2 as 360 # 5.0 is the median. 361 diff = step_end - step_begin 362 if diff == 0.0: 363 self._logger.warning(f'Only 1 entry for step {step} ' 364 f'found, falling back to median ' 365 f'timestamp instead of diff') 366 diff = step_begin 367 368 timestamps.append(diff) 369 370 # Reset for next step 371 step = entry_step 372 step_begin = entry['timestamp'] 373 step_end = entry['timestamp'] 374 # step_end keeps increasing until the step changes 375 else: 376 step_end = entry['timestamp'] 377 # Final step does not cause an increment, add manually 378 timestamps.append(step_end - step_begin) 379 runs.append((run_id, timestamps)) 380 381 self._logger.debug('Timestamp difference between steps calculated') 382 383 # Statistics rely on uneven number of runs 384 assert (len(runs) % 2 != 0), 'Number of runs should never be even' 385 386 # Runs are unsorted as glob does not have a fixed order, sort them 387 # based on run number in tuple 388 runs.sort(key=lambda element: element[0]) 389 self._logger.debug('Sorting runs complete') 390 391 # Find median for each step across runs 392 timestamps_by_step: List[List[float]] = [] 393 for step_index in range(self._number_of_steps): 394 timestamps_by_step.append([]) 395 396 for run in runs: 397 run_id = run[0] 398 timestamps = run[1] 399 400 # Do not process incomplete runs 401 if (len(timestamps) != self._number_of_steps): 402 msg = f'Number of steps ({self._number_of_steps}) does ' + \ 403 'not match with extracted steps of ' + \ 404 f'run ({len(timestamps)}). Skipping run {run_id}' 405 self._logger.warning(msg) 406 continue 407 408 # Create list of timestamps for each step from all runs 409 for step_index in range(self._number_of_steps): 410 timestamps_by_step[step_index].append(timestamps[step_index]) 411 self._logger.debug('Extracted median') 412 413 # Create a list of our steps with the run_id which has the median value 414 # for that step 415 aggregated_entries = [] 416 summary_entries = [] 417 index_number = 1 418 for step_index, step_timestamps in enumerate(timestamps_by_step): 419 # If we do not have a single timestamp for a step, we cannot 420 # process the data. This can happen when the steps are processed 421 # faster than the configured sample time. 422 if not step_timestamps: 423 self._logger.error("Unable to aggregate because some steps " 424 "have no measurements") 425 return False 426 427 # We ensure that the number of runs is always uneven so the median 428 # is always a measured data point instead of the average of 2 data 429 # points with even number of runs 430 try: 431 median_run_id = timestamps_by_step[step_index] \ 432 .index(median(step_timestamps)) + 1 433 except ValueError: 434 continue 435 median_run_path = os.path.join(self._results_path, 436 f'run_{median_run_id}') 437 median_step_data = self._parse_v2(median_run_path, 438 step=step_index + 1) 439 440 # Rewrite indexes to match new number of samples 441 for entry in median_step_data: 442 entry['index'] = index_number 443 444 aggregated_entries.append(entry) 445 index_number += 1 446 self._logger.debug('Generated median run from steps') 447 448 # Summary data of a step: diff per step 449 for step_index, step_timestamps in enumerate(timestamps_by_step): 450 summary = {} 451 try: 452 median_run_id = timestamps_by_step[step_index] \ 453 .index(median(step_timestamps)) + 1 454 except ValueError: 455 continue 456 median_run_path = os.path.join(self._results_path, 457 f'run_{median_run_id}') 458 median_step_data = self._parse_v2(median_run_path, 459 step=step_index + 1) 460 # If a step failed and no data is available, do not crash 461 if not median_step_data: 462 continue 463 464 for field in FIELDNAMES: 465 # Some fields are not present on v2 while they are in v3+ 466 if field not in median_step_data[0]: 467 continue 468 469 # Report max memory peak for this step 470 if 'memory' in field: 471 values = [] 472 for data in median_step_data: 473 values.append(data[field]) 474 summary[f'{field}_min'] = min(values) 475 summary[f'{field}_max'] = max(values) 476 # Leave some fields like they are 477 elif field in ['version', 'step', 'name', 'run']: 478 summary[field] = median_step_data[0][field] 479 # All other fields are accumulated data values for which we 480 # report the diff for the step 481 else: 482 first = median_step_data[0][field] 483 last = median_step_data[-1][field] 484 diff = round(last - first, ROUND) 485 if field == 'index': 486 # diff will be 0 for 1 sample, but we have this sample, 487 # so include it 488 summary['number_of_samples'] = diff + 1 489 elif field == 'timestamp': 490 summary['duration'] = diff 491 else: 492 summary[f'{field}_diff'] = diff 493 summary_entries.append(summary) 494 495 aggregated_file = os.path.join(self._results_path, 496 METRICS_AGGREGATED_FILE_NAME) 497 summary_file = os.path.join(self._results_path, 498 METRICS_SUMMARY_FILE_NAME) 499 self._logger.debug('Generated summary') 500 501 # Store aggregated data 502 with open(aggregated_file, 'w') as f: 503 writer = DictWriter(f, fieldnames=FIELDNAMES) 504 writer.writeheader() 505 for entry in aggregated_entries: 506 writer.writerow(entry) 507 508 # Store summary data 509 with open(summary_file, 'w') as f: 510 writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY) 511 writer.writeheader() 512 for entry in summary_entries: 513 writer.writerow(entry) 514 self._logger.debug('Wrote results') 515 516 return True
Aggregate the metrics of the different runs of a case.
Find the median execution time of each step across all runs and extract the step from the run which has this median execution time to assemble an aggregated version and summary version of the case's metrics.
Returns
- success (bool): Whether the aggregation was successfully or not.