bench_executor.stats

This module holds the Stats class which is responsible for generating staticstics from executed cases. It will automatically aggregate all runs of an executed case to generate an aggregated.csv and summary.csv files which can be used to compare various cases with each other.

  • aggregated.csv: For each run of a case, the median execution time of each step is calculated. For each step, the results of the run with the median execution time is used to assemble the aggregated results.
  • summary.csv: The summary is similar to the previous file, but provides a single result for each step to immediately see how long the step took, how many samples are provided for the step, etc.
  1#!/usr/bin/env python3
  2"""
  3This module holds the Stats class which is responsible for generating
  4staticstics from executed cases. It will automatically aggregate all runs of an
  5executed case to generate an `aggregated.csv` and `summary.csv` files which can
  6be used to compare various cases with each other.
  7
  8- `aggregated.csv`: For each run of a case, the median execution time of each
  9  step is calculated. For each step, the results of the run with the median
 10  execution time is used to assemble the aggregated results.
 11- `summary.csv`: The summary is similar to the previous file, but provides a
 12  single result for each step to immediately see how long the step took, how
 13  many samples are provided for the step, etc.
 14"""
 15
 16import os
 17import psutil
 18from glob import glob
 19from statistics import median, stdev, mean
 20from csv import DictWriter, DictReader
 21from typing import List, Optional
 22from bench_executor.collector import FIELDNAMES, METRICS_FILE_NAME
 23from bench_executor.logger import Logger
 24
 25METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv'
 26METRICS_SUMMARY_FILE_NAME = 'summary.csv'
 27METRICS_STATS_FILE_NAME = 'stats.csv'
 28FIELDNAMES_STRING = ['name']
 29FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle',
 30                    'cpu_iowait', 'cpu_user_system']
 31FIELDNAMES_INT = ['run', 'index', 'step', 'version', 'memory_ram',
 32                  'memory_swap', 'memory_ram_swap', 'disk_read_count',
 33                  'disk_write_count', 'disk_read_bytes', 'disk_write_bytes',
 34                  'disk_read_time', 'disk_write_time', 'disk_busy_time',
 35                  'network_received_count', 'network_sent_count',
 36                  'network_received_bytes', 'network_sent_bytes',
 37                  'network_received_error', 'network_sent_error',
 38                  'network_received_drop', 'network_sent_drop']
 39FIELDNAMES_SUMMARY = [
 40    'name',
 41    'run',
 42    'number_of_samples',
 43    'step',
 44    'duration',
 45    'version',
 46    'cpu_user_diff',
 47    'cpu_system_diff',
 48    'cpu_user_system_diff',
 49    'cpu_idle_diff',
 50    'cpu_iowait_diff',
 51    'memory_ram_max',
 52    'memory_swap_max',
 53    'memory_ram_swap_max',
 54    'memory_ram_min',
 55    'memory_swap_min',
 56    'memory_ram_swap_min',
 57    'disk_read_count_diff',
 58    'disk_write_count_diff',
 59    'disk_read_bytes_diff',
 60    'disk_write_bytes_diff',
 61    'disk_read_time_diff',
 62    'disk_write_time_diff',
 63    'disk_busy_time_diff',
 64    'network_received_count_diff',
 65    'network_sent_count_diff',
 66    'network_received_bytes_diff',
 67    'network_sent_bytes_diff',
 68    'network_received_error_diff',
 69    'network_sent_error_diff',
 70    'network_received_drop_diff',
 71    'network_sent_drop_diff'
 72]
 73ROUND = 4
 74
 75#
 76# Generate stats from the result runs by aggregating it on
 77# median execution time for each step. Processing is done for each step per
 78# run and unnecessary values are skipped to reduce the memory consumption.
 79#
 80# The median run is available in 'aggregated.csv' while a summarized version
 81# which only reports the diff or max value of each step in 'summary.csv'
 82#
 83
 84
 85class Stats():
 86    """Generate statistics for an executed case."""
 87
 88    def __init__(self, results_path: str, number_of_steps: int,
 89                 directory: str, verbose: bool):
 90        """Create an instance of the Stats class.
 91
 92        Parameters
 93        ----------
 94        results_path : str
 95            The path to the results directory of the case
 96        number_of_steps : int
 97            The number of steps of the case
 98        directory : str
 99            The path to the directory where the logs must be stored.
100        verbose : bool
101            Enable verbose logs.
102        """
103        self._results_path: str = os.path.abspath(results_path)
104        self._number_of_steps: int = number_of_steps
105        self._logger = Logger(__name__, directory, verbose)
106        self._parsed_data: dict = {}
107
108        if not os.path.exists(results_path):
109            msg = f'Results do not exist: {results_path}'
110            self._logger.error(msg)
111            raise ValueError(msg)
112
113    def _parse_field(self, field, value):
114        """Parse the field of the metrics field in a Python data type."""
115        try:
116            if field in FIELDNAMES_FLOAT:
117                return float(value)
118            elif field in FIELDNAMES_INT:
119                return int(value)
120            elif field in FIELDNAMES_STRING:
121                return str(value)
122            else:
123                msg = f'Field "{field}" type is unknown'
124                self._logger.error(msg)
125                raise ValueError(msg)
126        except TypeError:
127            return -1
128
129    def _parse_v2(self, run_path: str, fields: list = FIELDNAMES,
130                  step: Optional[int] = None):
131        """Parse the CSV metrics file in v2 format."""
132        data = []
133
134        # Drop cache if memory usage is too high
135        used_memory = psutil.virtual_memory().percent
136        if used_memory > 85.0:
137            self._logger.debug('Releasing memory of cache...')
138            del self._parsed_data
139            self._parsed_data = {}
140
141        # Pull data from cache if available
142        if run_path in self._parsed_data:
143            if step is not None:
144                return list(filter(lambda x: x['step'] == step,
145                                   self._parsed_data[run_path]))
146            return self._parsed_data[run_path]
147
148        metrics_file = os.path.join(run_path, METRICS_FILE_NAME)
149        if not os.path.exists(metrics_file):
150            self._logger.error(f'Metrics file "{metrics_file}" does not exist')
151            return []
152
153        # Filter the fields we want from above, this way we don't load all
154        # the data in memory during processing.
155        self._logger.debug('Reading metrics file...')
156        with open(metrics_file, 'r') as f:
157            reader = DictReader(f)
158            for line in reader:
159                corrupt: bool = False
160
161                # Filter on field names
162                filtered: dict = {}
163                for key in fields:
164                    if key in line:
165                        filtered[key] = line[key]
166
167                entry = {}
168                for key, value in filtered.items():
169                    v = self._parse_field(key, value)
170                    if v == -1:
171                        corrupt = True
172                        msg = f'Corrupt entry {key} with value {value} in ' + \
173                              f'{metrics_file}, skipped'
174                        self._logger.info(msg)
175                        break
176
177                    entry[key] = v
178
179                if not corrupt:
180                    data.append(entry)
181
182        self._parsed_data[run_path] = data
183        if step is not None:
184            return list(filter(lambda x: x['step'] == step, data))
185
186        return data
187
188    def statistics(self) -> bool:
189        """Calculate basic statistics on the steps by aggregating them from
190        all runs and applying standard deviation, median, min, max, mean for
191        each measured metric.
192
193        Returns
194        -------
195        success : bool
196            Whether the standard deviation calculation was successfully or not.
197        """
198        summary_by_step: dict = {}
199        stats: list = []
200
201        for run_path in glob(f'{self._results_path}/run_*/'):
202            for step_index in range(self._number_of_steps):
203                step_data = self._parse_v2(run_path,
204                                           step=step_index + 1)
205                # If a step failed and no data is available, do not crash
206                if not step_data:
207                    continue
208
209                for field in FIELDNAMES:
210                    if f'step_{step_index}' not in summary_by_step:
211                        summary_by_step[f'step_{step_index}'] = {
212                            'step': step_index,
213                            'name': None,
214                            'version': None,
215                            'duration': [],
216                            'number_of_samples': [],
217                        }
218                    sbs = summary_by_step[f'step_{step_index}']
219
220                    # Some fields are not present on v2 while they are in v3+
221                    if field not in step_data[0]:
222                        continue
223
224                    if 'memory' in field:
225                        if f'{field}_min' not in sbs:
226                            sbs[f'{field}_min'] = []
227                        if f'{field}_max' not in sbs:
228                            sbs[f'{field}_max'] = []
229                    elif not any(name in field for name in ['index', 'version',
230                                                            'step', 'name',
231                                                            'run',
232                                                            'timestamp']):
233                        if f'{field}_diff' not in sbs:
234                            sbs[f'{field}_diff'] = []
235
236                    # Report max memory peak for this step
237                    if 'memory' in field:
238                        values = []
239                        for data in step_data:
240                            values.append(data[field])
241                        sbs[f'{field}_min'].append(min(values))
242                        sbs[f'{field}_max'].append(max(values))
243                    # Skip fields which are not applicable
244                    elif field in ['run']:
245                        continue
246                    # Leave some fields like they are
247                    elif field in ['version', 'step', 'name']:
248                        sbs[field] = step_data[0][field]
249                    # All other fields are accumulated data values for which we
250                    # report the diff for the step
251                    else:
252                        first = step_data[0][field]
253                        last = step_data[-1][field]
254                        diff = round(last - first, ROUND)
255                        if field == 'index':
256                            # diff will be 0 for 1 sample, but we have this
257                            # sample, so include it
258                            sbs['number_of_samples'].append(diff + 1)
259                        elif field == 'timestamp':
260                            sbs['duration'].append(diff)
261                        else:
262                            sbs[f'{field}_diff'].append(diff)
263
264        stats_fieldnames = []
265        for step in summary_by_step:
266            stats_step = {}
267            for field in summary_by_step[step]:
268                if any(name in field for name in ['index', 'version', 'step',
269                                                  'name', 'run', 'timestamp']):
270                    stats_step[field] = summary_by_step[step][field]
271                    if field not in stats_fieldnames:
272                        stats_fieldnames.append(field)
273                    continue
274
275                if f'{field}_median' not in stats_fieldnames:
276                    stats_fieldnames.append(f'{field}_median')
277                    stats_fieldnames.append(f'{field}_average')
278                    stats_fieldnames.append(f'{field}_max')
279                    stats_fieldnames.append(f'{field}_min')
280                    stats_fieldnames.append(f'{field}_stdev')
281                    stats_fieldnames.append(f'{field}_values')
282
283                try:
284                    field_values = summary_by_step[step][field]
285                    stats_step[f'{field}_median'] = median(field_values)
286                    stats_step[f'{field}_average'] = mean(field_values)
287                    stats_step[f'{field}_max'] = max(field_values)
288                    stats_step[f'{field}_min'] = min(field_values)
289                    stats_step[f'{field}_stdev'] = stdev(field_values)
290                    stats_step[f'{field}_values'] = field_values
291                except Exception as e:
292                    print(step, field, summary_by_step[step][field])
293                    self._logger.error(f'Generating stats failed: {e}')
294            stats.append(stats_step)
295
296        stats_file = os.path.join(self._results_path,
297                                  METRICS_STATS_FILE_NAME)
298        self._logger.debug('Generated stats')
299
300        with open(stats_file, 'w') as f:
301            writer = DictWriter(f, fieldnames=stats_fieldnames)
302            writer.writeheader()
303            for step in stats:
304                writer.writerow(step)
305
306        return True
307
308    def aggregate(self) -> bool:
309        """Aggregate the metrics of the different runs of a case.
310
311        Find the median execution time of each step across all runs and extract
312        the step from the run which has this median execution time to assemble
313        an aggregated version and summary version of the case's metrics.
314
315        Returns
316        -------
317        success : bool
318            Whether the aggregation was successfully or not.
319        """
320        # Find each median step of all runs before extracting more data for
321        # memory consumption reasons
322        runs = []
323        for run_path in glob(f'{self._results_path}/run_*/'):
324            # Extract run number
325            try:
326                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
327                run_id: int = int(run_folder.replace('run_', ''))
328            except ValueError:
329                self._logger.error(f'Run "{run_id}" is not a number')
330                return False
331
332            # Extract steps and timestamps of this run.
333            # v3 is the same as v2 with an additional field
334            data = self._parse_v2(run_path)
335            self._logger.debug(f'Parsed metrics of run {run_id}')
336
337            # Calculate timestamp diff for each step
338            step = 1
339            timestamps = []
340            step_end = 0.0
341            step_begin = 0.0
342            for entry in data:
343                entry_step = entry['step']
344                assert (entry_step >= step), 'Entry step decreased over time'
345
346                # Next step
347                if entry_step > step:
348                    if entry_step - step > 1:
349                        self._logger.warning(f"{entry_step - step} step(s) "
350                                             "are missing between steps "
351                                             f"[{step},{entry_step}]. "
352                                             "Try increasing the sample time "
353                                             "and re-run.")
354                    # Calculate diff of current step if at least 2 entries
355                    # for the step exist, if not the diff is 0.0 and we fall
356                    # back to the step_begin timestamp which will make sure we
357                    # use the run with the timestamp that is the median of all
358                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
359                    # 5.0 is the median.
360                    diff = step_end - step_begin
361                    if diff == 0.0:
362                        self._logger.warning(f'Only 1 entry for step {step} '
363                                             f'found, falling back to median '
364                                             f'timestamp instead of diff')
365                        diff = step_begin
366
367                    timestamps.append(diff)
368
369                    # Reset for next step
370                    step = entry_step
371                    step_begin = entry['timestamp']
372                    step_end = entry['timestamp']
373                # step_end keeps increasing until the step changes
374                else:
375                    step_end = entry['timestamp']
376            # Final step does not cause an increment, add manually
377            timestamps.append(step_end - step_begin)
378            runs.append((run_id, timestamps))
379
380            self._logger.debug('Timestamp difference between steps calculated')
381
382        # Statistics rely on uneven number of runs
383        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
384
385        # Runs are unsorted as glob does not have a fixed order, sort them
386        # based on run number in tuple
387        runs.sort(key=lambda element: element[0])
388        self._logger.debug('Sorting runs complete')
389
390        # Find median for each step across runs
391        timestamps_by_step: List[List[float]] = []
392        for step_index in range(self._number_of_steps):
393            timestamps_by_step.append([])
394
395        for run in runs:
396            run_id = run[0]
397            timestamps = run[1]
398
399            # Do not process incomplete runs
400            if (len(timestamps) != self._number_of_steps):
401                msg = f'Number of steps ({self._number_of_steps}) does ' + \
402                      'not match with extracted steps of ' + \
403                      f'run ({len(timestamps)}). Skipping run {run_id}'
404                self._logger.warning(msg)
405                continue
406
407            # Create list of timestamps for each step from all runs
408            for step_index in range(self._number_of_steps):
409                timestamps_by_step[step_index].append(timestamps[step_index])
410        self._logger.debug('Extracted median')
411
412        # Create a list of our steps with the run_id which has the median value
413        # for that step
414        aggregated_entries = []
415        summary_entries = []
416        index_number = 1
417        for step_index, step_timestamps in enumerate(timestamps_by_step):
418            # If we do not have a single timestamp for a step, we cannot
419            # process the data. This can happen when the steps are processed
420            # faster than the configured sample time.
421            if not step_timestamps:
422                self._logger.error("Unable to aggregate because some steps "
423                                   "have no measurements")
424                return False
425
426            # We ensure that the number of runs is always uneven so the median
427            # is always a measured data point instead of the average of 2 data
428            # points with even number of runs
429            try:
430                median_run_id = timestamps_by_step[step_index] \
431                    .index(median(step_timestamps)) + 1
432            except ValueError:
433                continue
434            median_run_path = os.path.join(self._results_path,
435                                           f'run_{median_run_id}')
436            median_step_data = self._parse_v2(median_run_path,
437                                              step=step_index + 1)
438
439            # Rewrite indexes to match new number of samples
440            for entry in median_step_data:
441                entry['index'] = index_number
442
443                aggregated_entries.append(entry)
444                index_number += 1
445        self._logger.debug('Generated median run from steps')
446
447        # Summary data of a step: diff per step
448        for step_index, step_timestamps in enumerate(timestamps_by_step):
449            summary = {}
450            try:
451                median_run_id = timestamps_by_step[step_index] \
452                    .index(median(step_timestamps)) + 1
453            except ValueError:
454                continue
455            median_run_path = os.path.join(self._results_path,
456                                           f'run_{median_run_id}')
457            median_step_data = self._parse_v2(median_run_path,
458                                              step=step_index + 1)
459            # If a step failed and no data is available, do not crash
460            if not median_step_data:
461                continue
462
463            for field in FIELDNAMES:
464                # Some fields are not present on v2 while they are in v3+
465                if field not in median_step_data[0]:
466                    continue
467
468                # Report max memory peak for this step
469                if 'memory' in field:
470                    values = []
471                    for data in median_step_data:
472                        values.append(data[field])
473                    summary[f'{field}_min'] = min(values)
474                    summary[f'{field}_max'] = max(values)
475                # Leave some fields like they are
476                elif field in ['version', 'step', 'name', 'run']:
477                    summary[field] = median_step_data[0][field]
478                # All other fields are accumulated data values for which we
479                # report the diff for the step
480                else:
481                    first = median_step_data[0][field]
482                    last = median_step_data[-1][field]
483                    diff = round(last - first, ROUND)
484                    if field == 'index':
485                        # diff will be 0 for 1 sample, but we have this sample,
486                        # so include it
487                        summary['number_of_samples'] = diff + 1
488                    elif field == 'timestamp':
489                        summary['duration'] = diff
490                    else:
491                        summary[f'{field}_diff'] = diff
492            summary_entries.append(summary)
493
494        aggregated_file = os.path.join(self._results_path,
495                                       METRICS_AGGREGATED_FILE_NAME)
496        summary_file = os.path.join(self._results_path,
497                                    METRICS_SUMMARY_FILE_NAME)
498        self._logger.debug('Generated summary')
499
500        # Store aggregated data
501        with open(aggregated_file, 'w') as f:
502            writer = DictWriter(f, fieldnames=FIELDNAMES)
503            writer.writeheader()
504            for entry in aggregated_entries:
505                writer.writerow(entry)
506
507        # Store summary data
508        with open(summary_file, 'w') as f:
509            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
510            writer.writeheader()
511            for entry in summary_entries:
512                writer.writerow(entry)
513        self._logger.debug('Wrote results')
514
515        return True
METRICS_AGGREGATED_FILE_NAME = 'aggregated.csv'
METRICS_SUMMARY_FILE_NAME = 'summary.csv'
METRICS_STATS_FILE_NAME = 'stats.csv'
FIELDNAMES_STRING = ['name']
FIELDNAMES_FLOAT = ['timestamp', 'cpu_user', 'cpu_system', 'cpu_idle', 'cpu_iowait', 'cpu_user_system']
FIELDNAMES_INT = ['run', 'index', 'step', 'version', 'memory_ram', 'memory_swap', 'memory_ram_swap', 'disk_read_count', 'disk_write_count', 'disk_read_bytes', 'disk_write_bytes', 'disk_read_time', 'disk_write_time', 'disk_busy_time', 'network_received_count', 'network_sent_count', 'network_received_bytes', 'network_sent_bytes', 'network_received_error', 'network_sent_error', 'network_received_drop', 'network_sent_drop']
FIELDNAMES_SUMMARY = ['name', 'run', 'number_of_samples', 'step', 'duration', 'version', 'cpu_user_diff', 'cpu_system_diff', 'cpu_user_system_diff', 'cpu_idle_diff', 'cpu_iowait_diff', 'memory_ram_max', 'memory_swap_max', 'memory_ram_swap_max', 'memory_ram_min', 'memory_swap_min', 'memory_ram_swap_min', 'disk_read_count_diff', 'disk_write_count_diff', 'disk_read_bytes_diff', 'disk_write_bytes_diff', 'disk_read_time_diff', 'disk_write_time_diff', 'disk_busy_time_diff', 'network_received_count_diff', 'network_sent_count_diff', 'network_received_bytes_diff', 'network_sent_bytes_diff', 'network_received_error_diff', 'network_sent_error_diff', 'network_received_drop_diff', 'network_sent_drop_diff']
ROUND = 4
class Stats:
 86class Stats():
 87    """Generate statistics for an executed case."""
 88
 89    def __init__(self, results_path: str, number_of_steps: int,
 90                 directory: str, verbose: bool):
 91        """Create an instance of the Stats class.
 92
 93        Parameters
 94        ----------
 95        results_path : str
 96            The path to the results directory of the case
 97        number_of_steps : int
 98            The number of steps of the case
 99        directory : str
100            The path to the directory where the logs must be stored.
101        verbose : bool
102            Enable verbose logs.
103        """
104        self._results_path: str = os.path.abspath(results_path)
105        self._number_of_steps: int = number_of_steps
106        self._logger = Logger(__name__, directory, verbose)
107        self._parsed_data: dict = {}
108
109        if not os.path.exists(results_path):
110            msg = f'Results do not exist: {results_path}'
111            self._logger.error(msg)
112            raise ValueError(msg)
113
114    def _parse_field(self, field, value):
115        """Parse the field of the metrics field in a Python data type."""
116        try:
117            if field in FIELDNAMES_FLOAT:
118                return float(value)
119            elif field in FIELDNAMES_INT:
120                return int(value)
121            elif field in FIELDNAMES_STRING:
122                return str(value)
123            else:
124                msg = f'Field "{field}" type is unknown'
125                self._logger.error(msg)
126                raise ValueError(msg)
127        except TypeError:
128            return -1
129
130    def _parse_v2(self, run_path: str, fields: list = FIELDNAMES,
131                  step: Optional[int] = None):
132        """Parse the CSV metrics file in v2 format."""
133        data = []
134
135        # Drop cache if memory usage is too high
136        used_memory = psutil.virtual_memory().percent
137        if used_memory > 85.0:
138            self._logger.debug('Releasing memory of cache...')
139            del self._parsed_data
140            self._parsed_data = {}
141
142        # Pull data from cache if available
143        if run_path in self._parsed_data:
144            if step is not None:
145                return list(filter(lambda x: x['step'] == step,
146                                   self._parsed_data[run_path]))
147            return self._parsed_data[run_path]
148
149        metrics_file = os.path.join(run_path, METRICS_FILE_NAME)
150        if not os.path.exists(metrics_file):
151            self._logger.error(f'Metrics file "{metrics_file}" does not exist')
152            return []
153
154        # Filter the fields we want from above, this way we don't load all
155        # the data in memory during processing.
156        self._logger.debug('Reading metrics file...')
157        with open(metrics_file, 'r') as f:
158            reader = DictReader(f)
159            for line in reader:
160                corrupt: bool = False
161
162                # Filter on field names
163                filtered: dict = {}
164                for key in fields:
165                    if key in line:
166                        filtered[key] = line[key]
167
168                entry = {}
169                for key, value in filtered.items():
170                    v = self._parse_field(key, value)
171                    if v == -1:
172                        corrupt = True
173                        msg = f'Corrupt entry {key} with value {value} in ' + \
174                              f'{metrics_file}, skipped'
175                        self._logger.info(msg)
176                        break
177
178                    entry[key] = v
179
180                if not corrupt:
181                    data.append(entry)
182
183        self._parsed_data[run_path] = data
184        if step is not None:
185            return list(filter(lambda x: x['step'] == step, data))
186
187        return data
188
189    def statistics(self) -> bool:
190        """Calculate basic statistics on the steps by aggregating them from
191        all runs and applying standard deviation, median, min, max, mean for
192        each measured metric.
193
194        Returns
195        -------
196        success : bool
197            Whether the standard deviation calculation was successfully or not.
198        """
199        summary_by_step: dict = {}
200        stats: list = []
201
202        for run_path in glob(f'{self._results_path}/run_*/'):
203            for step_index in range(self._number_of_steps):
204                step_data = self._parse_v2(run_path,
205                                           step=step_index + 1)
206                # If a step failed and no data is available, do not crash
207                if not step_data:
208                    continue
209
210                for field in FIELDNAMES:
211                    if f'step_{step_index}' not in summary_by_step:
212                        summary_by_step[f'step_{step_index}'] = {
213                            'step': step_index,
214                            'name': None,
215                            'version': None,
216                            'duration': [],
217                            'number_of_samples': [],
218                        }
219                    sbs = summary_by_step[f'step_{step_index}']
220
221                    # Some fields are not present on v2 while they are in v3+
222                    if field not in step_data[0]:
223                        continue
224
225                    if 'memory' in field:
226                        if f'{field}_min' not in sbs:
227                            sbs[f'{field}_min'] = []
228                        if f'{field}_max' not in sbs:
229                            sbs[f'{field}_max'] = []
230                    elif not any(name in field for name in ['index', 'version',
231                                                            'step', 'name',
232                                                            'run',
233                                                            'timestamp']):
234                        if f'{field}_diff' not in sbs:
235                            sbs[f'{field}_diff'] = []
236
237                    # Report max memory peak for this step
238                    if 'memory' in field:
239                        values = []
240                        for data in step_data:
241                            values.append(data[field])
242                        sbs[f'{field}_min'].append(min(values))
243                        sbs[f'{field}_max'].append(max(values))
244                    # Skip fields which are not applicable
245                    elif field in ['run']:
246                        continue
247                    # Leave some fields like they are
248                    elif field in ['version', 'step', 'name']:
249                        sbs[field] = step_data[0][field]
250                    # All other fields are accumulated data values for which we
251                    # report the diff for the step
252                    else:
253                        first = step_data[0][field]
254                        last = step_data[-1][field]
255                        diff = round(last - first, ROUND)
256                        if field == 'index':
257                            # diff will be 0 for 1 sample, but we have this
258                            # sample, so include it
259                            sbs['number_of_samples'].append(diff + 1)
260                        elif field == 'timestamp':
261                            sbs['duration'].append(diff)
262                        else:
263                            sbs[f'{field}_diff'].append(diff)
264
265        stats_fieldnames = []
266        for step in summary_by_step:
267            stats_step = {}
268            for field in summary_by_step[step]:
269                if any(name in field for name in ['index', 'version', 'step',
270                                                  'name', 'run', 'timestamp']):
271                    stats_step[field] = summary_by_step[step][field]
272                    if field not in stats_fieldnames:
273                        stats_fieldnames.append(field)
274                    continue
275
276                if f'{field}_median' not in stats_fieldnames:
277                    stats_fieldnames.append(f'{field}_median')
278                    stats_fieldnames.append(f'{field}_average')
279                    stats_fieldnames.append(f'{field}_max')
280                    stats_fieldnames.append(f'{field}_min')
281                    stats_fieldnames.append(f'{field}_stdev')
282                    stats_fieldnames.append(f'{field}_values')
283
284                try:
285                    field_values = summary_by_step[step][field]
286                    stats_step[f'{field}_median'] = median(field_values)
287                    stats_step[f'{field}_average'] = mean(field_values)
288                    stats_step[f'{field}_max'] = max(field_values)
289                    stats_step[f'{field}_min'] = min(field_values)
290                    stats_step[f'{field}_stdev'] = stdev(field_values)
291                    stats_step[f'{field}_values'] = field_values
292                except Exception as e:
293                    print(step, field, summary_by_step[step][field])
294                    self._logger.error(f'Generating stats failed: {e}')
295            stats.append(stats_step)
296
297        stats_file = os.path.join(self._results_path,
298                                  METRICS_STATS_FILE_NAME)
299        self._logger.debug('Generated stats')
300
301        with open(stats_file, 'w') as f:
302            writer = DictWriter(f, fieldnames=stats_fieldnames)
303            writer.writeheader()
304            for step in stats:
305                writer.writerow(step)
306
307        return True
308
309    def aggregate(self) -> bool:
310        """Aggregate the metrics of the different runs of a case.
311
312        Find the median execution time of each step across all runs and extract
313        the step from the run which has this median execution time to assemble
314        an aggregated version and summary version of the case's metrics.
315
316        Returns
317        -------
318        success : bool
319            Whether the aggregation was successfully or not.
320        """
321        # Find each median step of all runs before extracting more data for
322        # memory consumption reasons
323        runs = []
324        for run_path in glob(f'{self._results_path}/run_*/'):
325            # Extract run number
326            try:
327                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
328                run_id: int = int(run_folder.replace('run_', ''))
329            except ValueError:
330                self._logger.error(f'Run "{run_id}" is not a number')
331                return False
332
333            # Extract steps and timestamps of this run.
334            # v3 is the same as v2 with an additional field
335            data = self._parse_v2(run_path)
336            self._logger.debug(f'Parsed metrics of run {run_id}')
337
338            # Calculate timestamp diff for each step
339            step = 1
340            timestamps = []
341            step_end = 0.0
342            step_begin = 0.0
343            for entry in data:
344                entry_step = entry['step']
345                assert (entry_step >= step), 'Entry step decreased over time'
346
347                # Next step
348                if entry_step > step:
349                    if entry_step - step > 1:
350                        self._logger.warning(f"{entry_step - step} step(s) "
351                                             "are missing between steps "
352                                             f"[{step},{entry_step}]. "
353                                             "Try increasing the sample time "
354                                             "and re-run.")
355                    # Calculate diff of current step if at least 2 entries
356                    # for the step exist, if not the diff is 0.0 and we fall
357                    # back to the step_begin timestamp which will make sure we
358                    # use the run with the timestamp that is the median of all
359                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
360                    # 5.0 is the median.
361                    diff = step_end - step_begin
362                    if diff == 0.0:
363                        self._logger.warning(f'Only 1 entry for step {step} '
364                                             f'found, falling back to median '
365                                             f'timestamp instead of diff')
366                        diff = step_begin
367
368                    timestamps.append(diff)
369
370                    # Reset for next step
371                    step = entry_step
372                    step_begin = entry['timestamp']
373                    step_end = entry['timestamp']
374                # step_end keeps increasing until the step changes
375                else:
376                    step_end = entry['timestamp']
377            # Final step does not cause an increment, add manually
378            timestamps.append(step_end - step_begin)
379            runs.append((run_id, timestamps))
380
381            self._logger.debug('Timestamp difference between steps calculated')
382
383        # Statistics rely on uneven number of runs
384        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
385
386        # Runs are unsorted as glob does not have a fixed order, sort them
387        # based on run number in tuple
388        runs.sort(key=lambda element: element[0])
389        self._logger.debug('Sorting runs complete')
390
391        # Find median for each step across runs
392        timestamps_by_step: List[List[float]] = []
393        for step_index in range(self._number_of_steps):
394            timestamps_by_step.append([])
395
396        for run in runs:
397            run_id = run[0]
398            timestamps = run[1]
399
400            # Do not process incomplete runs
401            if (len(timestamps) != self._number_of_steps):
402                msg = f'Number of steps ({self._number_of_steps}) does ' + \
403                      'not match with extracted steps of ' + \
404                      f'run ({len(timestamps)}). Skipping run {run_id}'
405                self._logger.warning(msg)
406                continue
407
408            # Create list of timestamps for each step from all runs
409            for step_index in range(self._number_of_steps):
410                timestamps_by_step[step_index].append(timestamps[step_index])
411        self._logger.debug('Extracted median')
412
413        # Create a list of our steps with the run_id which has the median value
414        # for that step
415        aggregated_entries = []
416        summary_entries = []
417        index_number = 1
418        for step_index, step_timestamps in enumerate(timestamps_by_step):
419            # If we do not have a single timestamp for a step, we cannot
420            # process the data. This can happen when the steps are processed
421            # faster than the configured sample time.
422            if not step_timestamps:
423                self._logger.error("Unable to aggregate because some steps "
424                                   "have no measurements")
425                return False
426
427            # We ensure that the number of runs is always uneven so the median
428            # is always a measured data point instead of the average of 2 data
429            # points with even number of runs
430            try:
431                median_run_id = timestamps_by_step[step_index] \
432                    .index(median(step_timestamps)) + 1
433            except ValueError:
434                continue
435            median_run_path = os.path.join(self._results_path,
436                                           f'run_{median_run_id}')
437            median_step_data = self._parse_v2(median_run_path,
438                                              step=step_index + 1)
439
440            # Rewrite indexes to match new number of samples
441            for entry in median_step_data:
442                entry['index'] = index_number
443
444                aggregated_entries.append(entry)
445                index_number += 1
446        self._logger.debug('Generated median run from steps')
447
448        # Summary data of a step: diff per step
449        for step_index, step_timestamps in enumerate(timestamps_by_step):
450            summary = {}
451            try:
452                median_run_id = timestamps_by_step[step_index] \
453                    .index(median(step_timestamps)) + 1
454            except ValueError:
455                continue
456            median_run_path = os.path.join(self._results_path,
457                                           f'run_{median_run_id}')
458            median_step_data = self._parse_v2(median_run_path,
459                                              step=step_index + 1)
460            # If a step failed and no data is available, do not crash
461            if not median_step_data:
462                continue
463
464            for field in FIELDNAMES:
465                # Some fields are not present on v2 while they are in v3+
466                if field not in median_step_data[0]:
467                    continue
468
469                # Report max memory peak for this step
470                if 'memory' in field:
471                    values = []
472                    for data in median_step_data:
473                        values.append(data[field])
474                    summary[f'{field}_min'] = min(values)
475                    summary[f'{field}_max'] = max(values)
476                # Leave some fields like they are
477                elif field in ['version', 'step', 'name', 'run']:
478                    summary[field] = median_step_data[0][field]
479                # All other fields are accumulated data values for which we
480                # report the diff for the step
481                else:
482                    first = median_step_data[0][field]
483                    last = median_step_data[-1][field]
484                    diff = round(last - first, ROUND)
485                    if field == 'index':
486                        # diff will be 0 for 1 sample, but we have this sample,
487                        # so include it
488                        summary['number_of_samples'] = diff + 1
489                    elif field == 'timestamp':
490                        summary['duration'] = diff
491                    else:
492                        summary[f'{field}_diff'] = diff
493            summary_entries.append(summary)
494
495        aggregated_file = os.path.join(self._results_path,
496                                       METRICS_AGGREGATED_FILE_NAME)
497        summary_file = os.path.join(self._results_path,
498                                    METRICS_SUMMARY_FILE_NAME)
499        self._logger.debug('Generated summary')
500
501        # Store aggregated data
502        with open(aggregated_file, 'w') as f:
503            writer = DictWriter(f, fieldnames=FIELDNAMES)
504            writer.writeheader()
505            for entry in aggregated_entries:
506                writer.writerow(entry)
507
508        # Store summary data
509        with open(summary_file, 'w') as f:
510            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
511            writer.writeheader()
512            for entry in summary_entries:
513                writer.writerow(entry)
514        self._logger.debug('Wrote results')
515
516        return True

Generate statistics for an executed case.

Stats( results_path: str, number_of_steps: int, directory: str, verbose: bool)
 89    def __init__(self, results_path: str, number_of_steps: int,
 90                 directory: str, verbose: bool):
 91        """Create an instance of the Stats class.
 92
 93        Parameters
 94        ----------
 95        results_path : str
 96            The path to the results directory of the case
 97        number_of_steps : int
 98            The number of steps of the case
 99        directory : str
100            The path to the directory where the logs must be stored.
101        verbose : bool
102            Enable verbose logs.
103        """
104        self._results_path: str = os.path.abspath(results_path)
105        self._number_of_steps: int = number_of_steps
106        self._logger = Logger(__name__, directory, verbose)
107        self._parsed_data: dict = {}
108
109        if not os.path.exists(results_path):
110            msg = f'Results do not exist: {results_path}'
111            self._logger.error(msg)
112            raise ValueError(msg)

Create an instance of the Stats class.

Parameters
  • results_path (str): The path to the results directory of the case
  • number_of_steps (int): The number of steps of the case
  • directory (str): The path to the directory where the logs must be stored.
  • verbose (bool): Enable verbose logs.
def statistics(self) -> bool:
189    def statistics(self) -> bool:
190        """Calculate basic statistics on the steps by aggregating them from
191        all runs and applying standard deviation, median, min, max, mean for
192        each measured metric.
193
194        Returns
195        -------
196        success : bool
197            Whether the standard deviation calculation was successfully or not.
198        """
199        summary_by_step: dict = {}
200        stats: list = []
201
202        for run_path in glob(f'{self._results_path}/run_*/'):
203            for step_index in range(self._number_of_steps):
204                step_data = self._parse_v2(run_path,
205                                           step=step_index + 1)
206                # If a step failed and no data is available, do not crash
207                if not step_data:
208                    continue
209
210                for field in FIELDNAMES:
211                    if f'step_{step_index}' not in summary_by_step:
212                        summary_by_step[f'step_{step_index}'] = {
213                            'step': step_index,
214                            'name': None,
215                            'version': None,
216                            'duration': [],
217                            'number_of_samples': [],
218                        }
219                    sbs = summary_by_step[f'step_{step_index}']
220
221                    # Some fields are not present on v2 while they are in v3+
222                    if field not in step_data[0]:
223                        continue
224
225                    if 'memory' in field:
226                        if f'{field}_min' not in sbs:
227                            sbs[f'{field}_min'] = []
228                        if f'{field}_max' not in sbs:
229                            sbs[f'{field}_max'] = []
230                    elif not any(name in field for name in ['index', 'version',
231                                                            'step', 'name',
232                                                            'run',
233                                                            'timestamp']):
234                        if f'{field}_diff' not in sbs:
235                            sbs[f'{field}_diff'] = []
236
237                    # Report max memory peak for this step
238                    if 'memory' in field:
239                        values = []
240                        for data in step_data:
241                            values.append(data[field])
242                        sbs[f'{field}_min'].append(min(values))
243                        sbs[f'{field}_max'].append(max(values))
244                    # Skip fields which are not applicable
245                    elif field in ['run']:
246                        continue
247                    # Leave some fields like they are
248                    elif field in ['version', 'step', 'name']:
249                        sbs[field] = step_data[0][field]
250                    # All other fields are accumulated data values for which we
251                    # report the diff for the step
252                    else:
253                        first = step_data[0][field]
254                        last = step_data[-1][field]
255                        diff = round(last - first, ROUND)
256                        if field == 'index':
257                            # diff will be 0 for 1 sample, but we have this
258                            # sample, so include it
259                            sbs['number_of_samples'].append(diff + 1)
260                        elif field == 'timestamp':
261                            sbs['duration'].append(diff)
262                        else:
263                            sbs[f'{field}_diff'].append(diff)
264
265        stats_fieldnames = []
266        for step in summary_by_step:
267            stats_step = {}
268            for field in summary_by_step[step]:
269                if any(name in field for name in ['index', 'version', 'step',
270                                                  'name', 'run', 'timestamp']):
271                    stats_step[field] = summary_by_step[step][field]
272                    if field not in stats_fieldnames:
273                        stats_fieldnames.append(field)
274                    continue
275
276                if f'{field}_median' not in stats_fieldnames:
277                    stats_fieldnames.append(f'{field}_median')
278                    stats_fieldnames.append(f'{field}_average')
279                    stats_fieldnames.append(f'{field}_max')
280                    stats_fieldnames.append(f'{field}_min')
281                    stats_fieldnames.append(f'{field}_stdev')
282                    stats_fieldnames.append(f'{field}_values')
283
284                try:
285                    field_values = summary_by_step[step][field]
286                    stats_step[f'{field}_median'] = median(field_values)
287                    stats_step[f'{field}_average'] = mean(field_values)
288                    stats_step[f'{field}_max'] = max(field_values)
289                    stats_step[f'{field}_min'] = min(field_values)
290                    stats_step[f'{field}_stdev'] = stdev(field_values)
291                    stats_step[f'{field}_values'] = field_values
292                except Exception as e:
293                    print(step, field, summary_by_step[step][field])
294                    self._logger.error(f'Generating stats failed: {e}')
295            stats.append(stats_step)
296
297        stats_file = os.path.join(self._results_path,
298                                  METRICS_STATS_FILE_NAME)
299        self._logger.debug('Generated stats')
300
301        with open(stats_file, 'w') as f:
302            writer = DictWriter(f, fieldnames=stats_fieldnames)
303            writer.writeheader()
304            for step in stats:
305                writer.writerow(step)
306
307        return True

Calculate basic statistics on the steps by aggregating them from all runs and applying standard deviation, median, min, max, mean for each measured metric.

Returns
  • success (bool): Whether the standard deviation calculation was successfully or not.
def aggregate(self) -> bool:
309    def aggregate(self) -> bool:
310        """Aggregate the metrics of the different runs of a case.
311
312        Find the median execution time of each step across all runs and extract
313        the step from the run which has this median execution time to assemble
314        an aggregated version and summary version of the case's metrics.
315
316        Returns
317        -------
318        success : bool
319            Whether the aggregation was successfully or not.
320        """
321        # Find each median step of all runs before extracting more data for
322        # memory consumption reasons
323        runs = []
324        for run_path in glob(f'{self._results_path}/run_*/'):
325            # Extract run number
326            try:
327                run_folder: str = os.path.split(os.path.dirname(run_path))[-1]
328                run_id: int = int(run_folder.replace('run_', ''))
329            except ValueError:
330                self._logger.error(f'Run "{run_id}" is not a number')
331                return False
332
333            # Extract steps and timestamps of this run.
334            # v3 is the same as v2 with an additional field
335            data = self._parse_v2(run_path)
336            self._logger.debug(f'Parsed metrics of run {run_id}')
337
338            # Calculate timestamp diff for each step
339            step = 1
340            timestamps = []
341            step_end = 0.0
342            step_begin = 0.0
343            for entry in data:
344                entry_step = entry['step']
345                assert (entry_step >= step), 'Entry step decreased over time'
346
347                # Next step
348                if entry_step > step:
349                    if entry_step - step > 1:
350                        self._logger.warning(f"{entry_step - step} step(s) "
351                                             "are missing between steps "
352                                             f"[{step},{entry_step}]. "
353                                             "Try increasing the sample time "
354                                             "and re-run.")
355                    # Calculate diff of current step if at least 2 entries
356                    # for the step exist, if not the diff is 0.0 and we fall
357                    # back to the step_begin timestamp which will make sure we
358                    # use the run with the timestamp that is the median of all
359                    # runs. For example: [4.5, 5.0, 6.5] will return run 2 as
360                    # 5.0 is the median.
361                    diff = step_end - step_begin
362                    if diff == 0.0:
363                        self._logger.warning(f'Only 1 entry for step {step} '
364                                             f'found, falling back to median '
365                                             f'timestamp instead of diff')
366                        diff = step_begin
367
368                    timestamps.append(diff)
369
370                    # Reset for next step
371                    step = entry_step
372                    step_begin = entry['timestamp']
373                    step_end = entry['timestamp']
374                # step_end keeps increasing until the step changes
375                else:
376                    step_end = entry['timestamp']
377            # Final step does not cause an increment, add manually
378            timestamps.append(step_end - step_begin)
379            runs.append((run_id, timestamps))
380
381            self._logger.debug('Timestamp difference between steps calculated')
382
383        # Statistics rely on uneven number of runs
384        assert (len(runs) % 2 != 0), 'Number of runs should never be even'
385
386        # Runs are unsorted as glob does not have a fixed order, sort them
387        # based on run number in tuple
388        runs.sort(key=lambda element: element[0])
389        self._logger.debug('Sorting runs complete')
390
391        # Find median for each step across runs
392        timestamps_by_step: List[List[float]] = []
393        for step_index in range(self._number_of_steps):
394            timestamps_by_step.append([])
395
396        for run in runs:
397            run_id = run[0]
398            timestamps = run[1]
399
400            # Do not process incomplete runs
401            if (len(timestamps) != self._number_of_steps):
402                msg = f'Number of steps ({self._number_of_steps}) does ' + \
403                      'not match with extracted steps of ' + \
404                      f'run ({len(timestamps)}). Skipping run {run_id}'
405                self._logger.warning(msg)
406                continue
407
408            # Create list of timestamps for each step from all runs
409            for step_index in range(self._number_of_steps):
410                timestamps_by_step[step_index].append(timestamps[step_index])
411        self._logger.debug('Extracted median')
412
413        # Create a list of our steps with the run_id which has the median value
414        # for that step
415        aggregated_entries = []
416        summary_entries = []
417        index_number = 1
418        for step_index, step_timestamps in enumerate(timestamps_by_step):
419            # If we do not have a single timestamp for a step, we cannot
420            # process the data. This can happen when the steps are processed
421            # faster than the configured sample time.
422            if not step_timestamps:
423                self._logger.error("Unable to aggregate because some steps "
424                                   "have no measurements")
425                return False
426
427            # We ensure that the number of runs is always uneven so the median
428            # is always a measured data point instead of the average of 2 data
429            # points with even number of runs
430            try:
431                median_run_id = timestamps_by_step[step_index] \
432                    .index(median(step_timestamps)) + 1
433            except ValueError:
434                continue
435            median_run_path = os.path.join(self._results_path,
436                                           f'run_{median_run_id}')
437            median_step_data = self._parse_v2(median_run_path,
438                                              step=step_index + 1)
439
440            # Rewrite indexes to match new number of samples
441            for entry in median_step_data:
442                entry['index'] = index_number
443
444                aggregated_entries.append(entry)
445                index_number += 1
446        self._logger.debug('Generated median run from steps')
447
448        # Summary data of a step: diff per step
449        for step_index, step_timestamps in enumerate(timestamps_by_step):
450            summary = {}
451            try:
452                median_run_id = timestamps_by_step[step_index] \
453                    .index(median(step_timestamps)) + 1
454            except ValueError:
455                continue
456            median_run_path = os.path.join(self._results_path,
457                                           f'run_{median_run_id}')
458            median_step_data = self._parse_v2(median_run_path,
459                                              step=step_index + 1)
460            # If a step failed and no data is available, do not crash
461            if not median_step_data:
462                continue
463
464            for field in FIELDNAMES:
465                # Some fields are not present on v2 while they are in v3+
466                if field not in median_step_data[0]:
467                    continue
468
469                # Report max memory peak for this step
470                if 'memory' in field:
471                    values = []
472                    for data in median_step_data:
473                        values.append(data[field])
474                    summary[f'{field}_min'] = min(values)
475                    summary[f'{field}_max'] = max(values)
476                # Leave some fields like they are
477                elif field in ['version', 'step', 'name', 'run']:
478                    summary[field] = median_step_data[0][field]
479                # All other fields are accumulated data values for which we
480                # report the diff for the step
481                else:
482                    first = median_step_data[0][field]
483                    last = median_step_data[-1][field]
484                    diff = round(last - first, ROUND)
485                    if field == 'index':
486                        # diff will be 0 for 1 sample, but we have this sample,
487                        # so include it
488                        summary['number_of_samples'] = diff + 1
489                    elif field == 'timestamp':
490                        summary['duration'] = diff
491                    else:
492                        summary[f'{field}_diff'] = diff
493            summary_entries.append(summary)
494
495        aggregated_file = os.path.join(self._results_path,
496                                       METRICS_AGGREGATED_FILE_NAME)
497        summary_file = os.path.join(self._results_path,
498                                    METRICS_SUMMARY_FILE_NAME)
499        self._logger.debug('Generated summary')
500
501        # Store aggregated data
502        with open(aggregated_file, 'w') as f:
503            writer = DictWriter(f, fieldnames=FIELDNAMES)
504            writer.writeheader()
505            for entry in aggregated_entries:
506                writer.writerow(entry)
507
508        # Store summary data
509        with open(summary_file, 'w') as f:
510            writer = DictWriter(f, fieldnames=FIELDNAMES_SUMMARY)
511            writer.writeheader()
512            for entry in summary_entries:
513                writer.writerow(entry)
514        self._logger.debug('Wrote results')
515
516        return True

Aggregate the metrics of the different runs of a case.

Find the median execution time of each step across all runs and extract the step from the run which has this median execution time to assemble an aggregated version and summary version of the case's metrics.

Returns
  • success (bool): Whether the aggregation was successfully or not.