bench_executor.executor

This module holds the Executor class which is responsible for executing a case, collecting metrics, and exposing this functionality to the CLI. All features of this tool can be accessed through the Executor class, other classes should not be used directly.

  1#!/usr/bin/env python3
  2
  3"""
  4This module holds the Executor class which is responsible for executing a case,
  5collecting metrics, and exposing this functionality to the CLI.
  6All features of this tool can be accessed through the Executor class, other
  7classes should not be used directly.
  8"""
  9
 10import os
 11import sys
 12import json
 13import jsonschema
 14import importlib
 15import inspect
 16import shutil
 17from glob import glob
 18from datetime import datetime
 19from time import sleep
 20from typing import List, Dict, Any
 21from bench_executor.collector import Collector, METRICS_FILE_NAME
 22from bench_executor.stats import Stats
 23from bench_executor.logger import Logger, LOG_FILE_NAME
 24
 25METADATA_FILE = 'metadata.json'
 26SCHEMA_FILE = 'metadata.schema'
 27CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'config')
 28WAIT_TIME = 15  # seconds
 29CHECKPOINT_FILE_NAME = '.done'
 30
 31
 32# Dummy callback in case no callback was provided
 33def _progress_cb(resource: str, name: str, success: bool):
 34    pass
 35
 36
 37class Executor:
 38    """
 39    Executor class executes a case.
 40    """
 41
 42    def __init__(self, main_directory: str, verbose: bool = False,
 43                 progress_cb=_progress_cb):
 44        """Create an instance of the Executor class.
 45
 46        Parameters
 47        ----------
 48        main_directory : str
 49            The root directory of all the cases to execute.
 50        verbose : bool
 51            Enables verbose logs.
 52        process_cb : function
 53            Callback to call when a step is completed of the case. By default,
 54            a dummy callback is provided if the argument is missing.
 55        """
 56        self._main_directory = os.path.abspath(main_directory)
 57        self._schema = {}
 58        self._resources: List[Dict[str, Any]] = []
 59        self._class_module_mapping: Dict[str, Any] = {}
 60        self._verbose = verbose
 61        self._progress_cb = progress_cb
 62        self._logger = Logger(__name__, self._main_directory, self._verbose)
 63
 64        self._init_resources()
 65
 66        with open(os.path.join(os.path.dirname(__file__), 'data',
 67                               SCHEMA_FILE)) as f:
 68            self._schema = json.load(f)
 69
 70    @property
 71    def main_directory(self) -> str:
 72        """The main directory of all the cases.
 73
 74        Returns
 75        -------
 76        main_directory : str
 77            The path to the main directory of the cases.
 78        """
 79        return self._main_directory
 80
 81    def _init_resources(self) -> None:
 82        """Initialize resources of a case
 83
 84        Resources are discovered automatically by analyzing Python modules.
 85        """
 86
 87        # Discover all modules to import
 88        sys.path.append(os.path.dirname(__file__))
 89        self._modules = list(filter(lambda x: x.endswith('.py')
 90                                    and '__init__' not in x
 91                                    and '__pycache__' not in x,
 92                                    os.listdir(os.path.dirname(__file__))))
 93
 94        # Discover all classes in each module
 95        for m in self._modules:
 96            module_name = os.path.splitext(m)[0]
 97            parent_module = os.path.split(os.path.dirname(__file__))[-1]
 98            import_name = '.'.join([parent_module, module_name])
 99            imported_module = importlib.import_module(import_name)
100            for name, cls in inspect.getmembers(imported_module,
101                                                inspect.isclass):
102                if name.startswith('_') or name[0].islower():
103                    continue
104
105                # Store class-module mapping for reverse look-up
106                self._class_module_mapping[name] = imported_module
107
108                # Discover all methods and their parameters in each class
109                methods: Dict[str, List[Dict[str, str]]] = {}
110                filt = filter(lambda x: '__init__' not in x,
111                              inspect.getmembers(cls, inspect.isfunction))
112                for method_name, method in filt:
113                    parameters = inspect.signature(method).parameters
114                    methods[method_name] = []
115                    for key in parameters.keys():
116                        if key == 'self':
117                            continue
118                        p = parameters[key]
119                        required = (p.default == inspect.Parameter.empty)
120                        methods[method_name].append({'name': p.name,
121                                                     'required': required})
122
123                if name not in list(filter(lambda x: x['name'],
124                                           self._resources)):
125                    self._resources.append({'name': name, 'commands': methods})
126
127    def _resources_all_names(self) -> list:
128        """Retrieve all resources' name in a case.
129
130        Returns
131        -------
132        names : list
133            List of all resources' name in a case.
134        """
135        names = []
136        for r in self._resources:
137            names.append(r['name'])
138
139        return names
140
141    def _resources_all_commands_by_name(self, name: str) -> list:
142        """Retrieve all resources' commands.
143
144        Parameters
145        ----------
146        name : str
147            The resource's name.
148
149        Returns
150        -------
151        commands : list
152            List of commands for the resource.
153        """
154        commands = []
155        for r in filter(lambda x: x['name'] == name, self._resources):
156            commands += list(r['commands'].keys())  # type: ignore
157
158        return commands
159
160    def _resources_all_parameters_by_command(self, name: str,
161                                             command: str,
162                                             required_only=False) -> list:
163        """Retrieve all parameters of a command of a resource.
164
165        Parameters
166        ----------
167        name : str
168            The resource's name.
169        command : str
170            The command's name.
171        required_only : bool
172            Only return the required parameters of a command. Default all
173            parameters are returned.
174
175        Returns
176        -------
177        parameters : list
178            List of parameters of the resource's command. None if failed.
179
180        Raises
181        ------
182        KeyError : Exception
183            If the command cannot be found for the resource.
184        """
185        parameters = []
186        for r in filter(lambda x: x['name'] == name, self._resources):
187            try:
188                for p in r['commands'][command]:
189                    if required_only:
190                        if p['required']:
191                            parameters.append(p['name'])
192                    else:
193                        parameters.append(p['name'])
194            except KeyError as e:
195                self._logger.error(f'Command "{command}" not found for '
196                                   f'resource "{name}": {e}')
197                raise e
198
199        return parameters
200
201    def _validate_case(self, case: dict, path: str) -> bool:
202        """Validate a case's syntax.
203
204        Verify if a case has a valid syntax or not. Report any errors
205        discovered through logging and return if the validation succeeded or
206        not.
207
208        Parameters
209        ----------
210        case : dict
211            The case to validate.
212        path : str
213            The file path to the case.
214
215        Returns
216        -------
217        success : bool
218            Whether the validation of the case succeeded or not.
219        """
220        try:
221            # Verify schema
222            jsonschema.validate(case, self._schema)
223
224            # Verify values
225            for step in case['steps']:
226                # Check if resource is known
227                names = self._resources_all_names()
228                if step['resource'] not in names:
229                    msg = f'{path}: Unknown resource "{step["resource"]}"'
230                    self._logger.error(msg)
231                    return False
232
233                # Check if command is known
234                r = step['resource']
235                commands = self._resources_all_commands_by_name(r)
236                if commands is None or step['command'] not in commands:
237                    msg = f'{path}: Unknown command "{step["command"]}" ' + \
238                          f'for resource "{step["resource"]}"'
239                    self._logger.error(msg)
240                    return False
241
242                # Check if parameters are known
243                r = step['resource']
244                c = step['command']
245                parameters = self._resources_all_parameters_by_command(r, c)
246                if parameters is None:
247                    return False
248
249                for p in step['parameters'].keys():
250                    if p not in parameters:
251                        msg = f'{path}: Unkown parameter "{p}" for ' + \
252                              f'command "{step["command"]}" of resource ' + \
253                              f'"{step["resource"]}"'
254                        self._logger.error(msg)
255                        return False
256
257                # Check if all required parameters are provided
258                r = step['resource']
259                c = step['command']
260                parameters = \
261                    self._resources_all_parameters_by_command(r, c, True)
262                for p in parameters:
263                    if p not in step['parameters'].keys():
264                        msg = f'{path}: Missing required parameter "{p}" ' + \
265                              f'for command "{step["command"]}" ' + \
266                              f'of resource "{step["resource"]}"'
267                        self._logger.error(msg)
268                        return False
269
270        except jsonschema.ValidationError:
271            msg = f'{path}: JSON schema violation'
272            self._logger.error(msg)
273            return False
274
275        return True
276
277    def stats(self, case: dict) -> bool:
278        """Generate statistics for a case.
279
280        Generate statistics for an executed case. The case must be executed
281        before to succeed.
282
283        Parameters
284        ----------
285        case : dict
286            The case to generate statistics for.
287
288        Returns
289        -------
290        success : bool
291            Whether the statistics are generated with success or not.
292
293        """
294        data = case['data']
295        directory = case['directory']
296        results_path = os.path.join(directory, 'results')
297
298        if not os.path.exists(results_path):
299            msg = f'Results do not exist for case "{data["name"]}"'
300            self._logger.error(msg)
301            return False
302
303        stats = Stats(results_path, len(data['steps']), directory,
304                      self._verbose)
305        self._logger.info('Generating stats...')
306        if not stats.statistics():
307            return False
308
309        self._logger.info('Generating aggregated data...')
310        return stats.aggregate()
311
312    def clean(self, case: dict) -> bool:
313        """Clean a case.
314
315        Clean up all results and metrics for a case to start it fresh.
316
317        Parameters
318        ----------
319        case : dict
320            The case to clean.
321
322        Returns
323        -------
324        success : bool
325            Whether the cleaning of the case succeeded or not.
326        """
327        # Checkpoints
328        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
329        if os.path.exists(checkpoint_file):
330            os.remove(checkpoint_file)
331
332        # Results: log files, metric measurements, run checkpoints
333        for result_dir in glob(f'{case["directory"]}/results'):
334            shutil.rmtree(result_dir)
335
336        # Data: persistent storage
337        for data_dir in glob(f'{case["directory"]}/data/*'):
338            if not data_dir.endswith('shared'):
339                shutil.rmtree(data_dir)
340
341        return True
342
343    def run(self, case: dict, interval: float,
344            run: int, checkpoint: bool) -> bool:
345        """Execute a case.
346
347        Execute all steps of a case while collecting metrics and logs.
348        The metrics are collected at a given interval and for a specific run of
349        the case to allow multiple executions of the same case. Checkpoints of
350        runs can be enabled to allow the executor to restart where it stopped
351        in case of a failure, electricity blackout, etc.
352
353        Parameters
354        ----------
355        case : dict
356            The case to execute.
357        interval : float
358            The sample interval for the metrics collection.
359        run : int
360            The run number of the case.
361        checkpoint : bool
362            Enable checkpoints after each run to allow restarts.
363
364        Returns
365        -------
366        success : bool
367            Whether the case was executed successfully or not.
368        """
369        success = True
370        data = case['data']
371        directory = case['directory']
372        data_path = os.path.join(directory, 'data')
373        results_run_path = os.path.join(directory, 'results', f'run_{run}')
374        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
375        run_checkpoint_file = os.path.join(results_run_path,
376                                           CHECKPOINT_FILE_NAME)
377        active_resources = []
378
379        # Make sure we start with a clean setup before the first run
380        if run == 1:
381            self.clean(case)
382
383        # create directories
384        os.umask(0)
385        os.makedirs(data_path, exist_ok=True)
386        os.makedirs(results_run_path, exist_ok=True)
387
388        # Initialize resources if needed
389        # Some resources have to perform an initialization step such as
390        # configuring database users, storage, etc. which is only done once
391        for step in data['steps']:
392            module = self._class_module_mapping[step['resource']]
393            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
394                                                         directory,
395                                                         self._verbose)
396            if hasattr(resource, 'initialization'):
397                if not resource.initialization():
398                    self._logger.error('Failed to initialize resource '
399                                       f'{step["resource"]}')
400                    return False
401
402                self._logger.debug(f'Resource {step["resource"]} initialized')
403                self._progress_cb('Initializing', step['resource'], success)
404
405        # Launch metrics collection
406        collector = Collector(data['name'], results_run_path, interval,
407                              len(data['steps']), run, directory,
408                              self._verbose)
409
410        # Execute steps
411        for index, step in enumerate(data['steps']):
412            success = True
413            module = self._class_module_mapping[step['resource']]
414            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
415                                                         directory,
416                                                         self._verbose)
417            active_resources.append(resource)
418
419            # Containers may need to start up first before executing a command
420            if hasattr(resource, 'wait_until_ready'):
421                if not resource.wait_until_ready():
422                    success = False
423                    self._logger.error('Waiting until resource '
424                                       f'"{step["resource"]} is ready failed')
425                    self._progress_cb(step['resource'], step['name'], success)
426                    break
427                self._logger.debug(f'Resource {step["resource"]} ready')
428
429            # Execute command
430            command = getattr(resource, step['command'])
431            if not command(**step['parameters']):
432                success = False
433                msg = f'Executing command "{step["command"]}" ' + \
434                      f'failed for resource "{step["resource"]}"'
435                # Some steps are non-critical like queries, they may fail but
436                # should not cause a complete case failure. Allow these
437                # failures if the may_fail key is present
438                if step.get('may_fail', False):
439                    self._logger.warning(msg)
440                    self._progress_cb(step['resource'], step['name'], success)
441                    continue
442                else:
443                    self._logger.error(msg)
444                    self._progress_cb(step['resource'], step['name'], success)
445                    break
446            self._logger.debug(f'Command "{step["command"]}" executed on '
447                               f'resource {step["resource"]}')
448
449            # Step complete
450            self._progress_cb(step['resource'], step['name'], success)
451
452            # Step finished, let metric collector know
453            if (index + 1) < len(data['steps']):
454                collector.next_step()
455
456        # Stop metrics collection
457        collector.stop()
458
459        # Stop active containers
460        for resource in active_resources:
461            if resource is not None and hasattr(resource, 'stop'):
462                resource.stop()
463
464        self._logger.debug('Cleaned up all resource')
465        self._progress_cb('Cleaner', 'Clean up resources', True)
466
467        # Mark checkpoint if necessary
468        if checkpoint and success:
469            self._logger.debug('Writing checkpoint...')
470            with open(checkpoint_file, 'w') as f:
471                d = datetime.now().replace(microsecond=0).isoformat()
472                f.write(f'{d}\n')
473
474        # Log file
475        os.makedirs(os.path.join(results_run_path), exist_ok=True)
476        shutil.move(os.path.join(directory, LOG_FILE_NAME),
477                    os.path.join(results_run_path, LOG_FILE_NAME))
478        self._logger.debug('Copied logs to run results path')
479
480        # Metrics measurements
481        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
482            subdir = metrics_file.replace(f'{data_path}/', '') \
483                    .replace('/METRICS_FILE_NAME', '')
484            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
485            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
486                                                   METRICS_FILE_NAME))
487        self._logger.debug('Copied metric measurements to run results path')
488
489        # Results: all 'output_file' and 'result_file' values
490        if success:
491            self._logger.debug('Copying generated files for run')
492            for step in data['steps']:
493                subdir = step['resource'].lower().replace('_', '')
494                parameters = step['parameters']
495                os.makedirs(os.path.join(results_run_path, subdir),
496                            exist_ok=True)
497                if parameters.get('results_file', False):
498                    results_file = parameters['results_file']
499                    p1 = os.path.join(directory, 'data/shared', results_file)
500                    p2 = os.path.join(results_run_path, subdir, results_file)
501                    try:
502                        shutil.move(p1, p2)
503                    except FileNotFoundError as e:
504                        msg = f'Cannot find results file "{p1}": {e}'
505                        self._logger.warning(msg)
506
507                if parameters.get('output_file', False) \
508                        and not parameters.get('multiple_files', False):
509                    output_dir = os.path.join(results_run_path, subdir)
510                    for out_file in glob(os.path.join(str(directory), 'data',
511                                         'shared', '*.nt')):
512                        out_path = os.path.join(output_dir,
513                                                os.path.basename(out_file))
514                        try:
515                            shutil.move(out_file, out_path)
516                        except FileNotFoundError as e:
517                            msg = f'Cannot find output file "{out_file}": {e}'
518                            self._logger.warning(msg)
519
520            # Run complete, mark it
521            run_checkpoint_file = os.path.join(results_run_path,
522                                               CHECKPOINT_FILE_NAME)
523            self._logger.debug('Writing run checkpoint...')
524            with open(run_checkpoint_file, 'w') as f:
525                d = datetime.now().replace(microsecond=0).isoformat()
526                f.write(f'{d}\n')
527
528        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
529        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
530                          True)
531        sleep(WAIT_TIME)
532
533        return success
534
535    def list(self) -> list:
536        """List all cases in a root directory.
537
538            Retrieve a list of all discovered valid cases in a given directory.
539            Cases which do not pass the validation, are excluded and their
540            validation errors are reported through logging.
541
542            Returns
543            -------
544            cases : list
545                List of discovered cases.
546        """
547        cases = []
548
549        for directory in glob(self._main_directory):
550            for root, dirs, files in os.walk(directory):
551                for file in files:
552                    if os.path.basename(file) == METADATA_FILE:
553                        path = os.path.join(root, file)
554                        with open(path, 'r') as f:
555                            data = json.load(f)
556                            if self._validate_case(data, path):
557                                cases.append({
558                                    'directory': os.path.dirname(path),
559                                    'data': data
560                                })
561
562        return cases
METADATA_FILE = 'metadata.json'
SCHEMA_FILE = 'metadata.schema'
CONFIG_DIR = '/home/dylan/Projects/KROWN/execution-framework/bench_executor/config'
WAIT_TIME = 15
CHECKPOINT_FILE_NAME = '.done'
class Executor:
 38class Executor:
 39    """
 40    Executor class executes a case.
 41    """
 42
 43    def __init__(self, main_directory: str, verbose: bool = False,
 44                 progress_cb=_progress_cb):
 45        """Create an instance of the Executor class.
 46
 47        Parameters
 48        ----------
 49        main_directory : str
 50            The root directory of all the cases to execute.
 51        verbose : bool
 52            Enables verbose logs.
 53        process_cb : function
 54            Callback to call when a step is completed of the case. By default,
 55            a dummy callback is provided if the argument is missing.
 56        """
 57        self._main_directory = os.path.abspath(main_directory)
 58        self._schema = {}
 59        self._resources: List[Dict[str, Any]] = []
 60        self._class_module_mapping: Dict[str, Any] = {}
 61        self._verbose = verbose
 62        self._progress_cb = progress_cb
 63        self._logger = Logger(__name__, self._main_directory, self._verbose)
 64
 65        self._init_resources()
 66
 67        with open(os.path.join(os.path.dirname(__file__), 'data',
 68                               SCHEMA_FILE)) as f:
 69            self._schema = json.load(f)
 70
 71    @property
 72    def main_directory(self) -> str:
 73        """The main directory of all the cases.
 74
 75        Returns
 76        -------
 77        main_directory : str
 78            The path to the main directory of the cases.
 79        """
 80        return self._main_directory
 81
 82    def _init_resources(self) -> None:
 83        """Initialize resources of a case
 84
 85        Resources are discovered automatically by analyzing Python modules.
 86        """
 87
 88        # Discover all modules to import
 89        sys.path.append(os.path.dirname(__file__))
 90        self._modules = list(filter(lambda x: x.endswith('.py')
 91                                    and '__init__' not in x
 92                                    and '__pycache__' not in x,
 93                                    os.listdir(os.path.dirname(__file__))))
 94
 95        # Discover all classes in each module
 96        for m in self._modules:
 97            module_name = os.path.splitext(m)[0]
 98            parent_module = os.path.split(os.path.dirname(__file__))[-1]
 99            import_name = '.'.join([parent_module, module_name])
100            imported_module = importlib.import_module(import_name)
101            for name, cls in inspect.getmembers(imported_module,
102                                                inspect.isclass):
103                if name.startswith('_') or name[0].islower():
104                    continue
105
106                # Store class-module mapping for reverse look-up
107                self._class_module_mapping[name] = imported_module
108
109                # Discover all methods and their parameters in each class
110                methods: Dict[str, List[Dict[str, str]]] = {}
111                filt = filter(lambda x: '__init__' not in x,
112                              inspect.getmembers(cls, inspect.isfunction))
113                for method_name, method in filt:
114                    parameters = inspect.signature(method).parameters
115                    methods[method_name] = []
116                    for key in parameters.keys():
117                        if key == 'self':
118                            continue
119                        p = parameters[key]
120                        required = (p.default == inspect.Parameter.empty)
121                        methods[method_name].append({'name': p.name,
122                                                     'required': required})
123
124                if name not in list(filter(lambda x: x['name'],
125                                           self._resources)):
126                    self._resources.append({'name': name, 'commands': methods})
127
128    def _resources_all_names(self) -> list:
129        """Retrieve all resources' name in a case.
130
131        Returns
132        -------
133        names : list
134            List of all resources' name in a case.
135        """
136        names = []
137        for r in self._resources:
138            names.append(r['name'])
139
140        return names
141
142    def _resources_all_commands_by_name(self, name: str) -> list:
143        """Retrieve all resources' commands.
144
145        Parameters
146        ----------
147        name : str
148            The resource's name.
149
150        Returns
151        -------
152        commands : list
153            List of commands for the resource.
154        """
155        commands = []
156        for r in filter(lambda x: x['name'] == name, self._resources):
157            commands += list(r['commands'].keys())  # type: ignore
158
159        return commands
160
161    def _resources_all_parameters_by_command(self, name: str,
162                                             command: str,
163                                             required_only=False) -> list:
164        """Retrieve all parameters of a command of a resource.
165
166        Parameters
167        ----------
168        name : str
169            The resource's name.
170        command : str
171            The command's name.
172        required_only : bool
173            Only return the required parameters of a command. Default all
174            parameters are returned.
175
176        Returns
177        -------
178        parameters : list
179            List of parameters of the resource's command. None if failed.
180
181        Raises
182        ------
183        KeyError : Exception
184            If the command cannot be found for the resource.
185        """
186        parameters = []
187        for r in filter(lambda x: x['name'] == name, self._resources):
188            try:
189                for p in r['commands'][command]:
190                    if required_only:
191                        if p['required']:
192                            parameters.append(p['name'])
193                    else:
194                        parameters.append(p['name'])
195            except KeyError as e:
196                self._logger.error(f'Command "{command}" not found for '
197                                   f'resource "{name}": {e}')
198                raise e
199
200        return parameters
201
202    def _validate_case(self, case: dict, path: str) -> bool:
203        """Validate a case's syntax.
204
205        Verify if a case has a valid syntax or not. Report any errors
206        discovered through logging and return if the validation succeeded or
207        not.
208
209        Parameters
210        ----------
211        case : dict
212            The case to validate.
213        path : str
214            The file path to the case.
215
216        Returns
217        -------
218        success : bool
219            Whether the validation of the case succeeded or not.
220        """
221        try:
222            # Verify schema
223            jsonschema.validate(case, self._schema)
224
225            # Verify values
226            for step in case['steps']:
227                # Check if resource is known
228                names = self._resources_all_names()
229                if step['resource'] not in names:
230                    msg = f'{path}: Unknown resource "{step["resource"]}"'
231                    self._logger.error(msg)
232                    return False
233
234                # Check if command is known
235                r = step['resource']
236                commands = self._resources_all_commands_by_name(r)
237                if commands is None or step['command'] not in commands:
238                    msg = f'{path}: Unknown command "{step["command"]}" ' + \
239                          f'for resource "{step["resource"]}"'
240                    self._logger.error(msg)
241                    return False
242
243                # Check if parameters are known
244                r = step['resource']
245                c = step['command']
246                parameters = self._resources_all_parameters_by_command(r, c)
247                if parameters is None:
248                    return False
249
250                for p in step['parameters'].keys():
251                    if p not in parameters:
252                        msg = f'{path}: Unkown parameter "{p}" for ' + \
253                              f'command "{step["command"]}" of resource ' + \
254                              f'"{step["resource"]}"'
255                        self._logger.error(msg)
256                        return False
257
258                # Check if all required parameters are provided
259                r = step['resource']
260                c = step['command']
261                parameters = \
262                    self._resources_all_parameters_by_command(r, c, True)
263                for p in parameters:
264                    if p not in step['parameters'].keys():
265                        msg = f'{path}: Missing required parameter "{p}" ' + \
266                              f'for command "{step["command"]}" ' + \
267                              f'of resource "{step["resource"]}"'
268                        self._logger.error(msg)
269                        return False
270
271        except jsonschema.ValidationError:
272            msg = f'{path}: JSON schema violation'
273            self._logger.error(msg)
274            return False
275
276        return True
277
278    def stats(self, case: dict) -> bool:
279        """Generate statistics for a case.
280
281        Generate statistics for an executed case. The case must be executed
282        before to succeed.
283
284        Parameters
285        ----------
286        case : dict
287            The case to generate statistics for.
288
289        Returns
290        -------
291        success : bool
292            Whether the statistics are generated with success or not.
293
294        """
295        data = case['data']
296        directory = case['directory']
297        results_path = os.path.join(directory, 'results')
298
299        if not os.path.exists(results_path):
300            msg = f'Results do not exist for case "{data["name"]}"'
301            self._logger.error(msg)
302            return False
303
304        stats = Stats(results_path, len(data['steps']), directory,
305                      self._verbose)
306        self._logger.info('Generating stats...')
307        if not stats.statistics():
308            return False
309
310        self._logger.info('Generating aggregated data...')
311        return stats.aggregate()
312
313    def clean(self, case: dict) -> bool:
314        """Clean a case.
315
316        Clean up all results and metrics for a case to start it fresh.
317
318        Parameters
319        ----------
320        case : dict
321            The case to clean.
322
323        Returns
324        -------
325        success : bool
326            Whether the cleaning of the case succeeded or not.
327        """
328        # Checkpoints
329        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
330        if os.path.exists(checkpoint_file):
331            os.remove(checkpoint_file)
332
333        # Results: log files, metric measurements, run checkpoints
334        for result_dir in glob(f'{case["directory"]}/results'):
335            shutil.rmtree(result_dir)
336
337        # Data: persistent storage
338        for data_dir in glob(f'{case["directory"]}/data/*'):
339            if not data_dir.endswith('shared'):
340                shutil.rmtree(data_dir)
341
342        return True
343
344    def run(self, case: dict, interval: float,
345            run: int, checkpoint: bool) -> bool:
346        """Execute a case.
347
348        Execute all steps of a case while collecting metrics and logs.
349        The metrics are collected at a given interval and for a specific run of
350        the case to allow multiple executions of the same case. Checkpoints of
351        runs can be enabled to allow the executor to restart where it stopped
352        in case of a failure, electricity blackout, etc.
353
354        Parameters
355        ----------
356        case : dict
357            The case to execute.
358        interval : float
359            The sample interval for the metrics collection.
360        run : int
361            The run number of the case.
362        checkpoint : bool
363            Enable checkpoints after each run to allow restarts.
364
365        Returns
366        -------
367        success : bool
368            Whether the case was executed successfully or not.
369        """
370        success = True
371        data = case['data']
372        directory = case['directory']
373        data_path = os.path.join(directory, 'data')
374        results_run_path = os.path.join(directory, 'results', f'run_{run}')
375        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
376        run_checkpoint_file = os.path.join(results_run_path,
377                                           CHECKPOINT_FILE_NAME)
378        active_resources = []
379
380        # Make sure we start with a clean setup before the first run
381        if run == 1:
382            self.clean(case)
383
384        # create directories
385        os.umask(0)
386        os.makedirs(data_path, exist_ok=True)
387        os.makedirs(results_run_path, exist_ok=True)
388
389        # Initialize resources if needed
390        # Some resources have to perform an initialization step such as
391        # configuring database users, storage, etc. which is only done once
392        for step in data['steps']:
393            module = self._class_module_mapping[step['resource']]
394            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
395                                                         directory,
396                                                         self._verbose)
397            if hasattr(resource, 'initialization'):
398                if not resource.initialization():
399                    self._logger.error('Failed to initialize resource '
400                                       f'{step["resource"]}')
401                    return False
402
403                self._logger.debug(f'Resource {step["resource"]} initialized')
404                self._progress_cb('Initializing', step['resource'], success)
405
406        # Launch metrics collection
407        collector = Collector(data['name'], results_run_path, interval,
408                              len(data['steps']), run, directory,
409                              self._verbose)
410
411        # Execute steps
412        for index, step in enumerate(data['steps']):
413            success = True
414            module = self._class_module_mapping[step['resource']]
415            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
416                                                         directory,
417                                                         self._verbose)
418            active_resources.append(resource)
419
420            # Containers may need to start up first before executing a command
421            if hasattr(resource, 'wait_until_ready'):
422                if not resource.wait_until_ready():
423                    success = False
424                    self._logger.error('Waiting until resource '
425                                       f'"{step["resource"]} is ready failed')
426                    self._progress_cb(step['resource'], step['name'], success)
427                    break
428                self._logger.debug(f'Resource {step["resource"]} ready')
429
430            # Execute command
431            command = getattr(resource, step['command'])
432            if not command(**step['parameters']):
433                success = False
434                msg = f'Executing command "{step["command"]}" ' + \
435                      f'failed for resource "{step["resource"]}"'
436                # Some steps are non-critical like queries, they may fail but
437                # should not cause a complete case failure. Allow these
438                # failures if the may_fail key is present
439                if step.get('may_fail', False):
440                    self._logger.warning(msg)
441                    self._progress_cb(step['resource'], step['name'], success)
442                    continue
443                else:
444                    self._logger.error(msg)
445                    self._progress_cb(step['resource'], step['name'], success)
446                    break
447            self._logger.debug(f'Command "{step["command"]}" executed on '
448                               f'resource {step["resource"]}')
449
450            # Step complete
451            self._progress_cb(step['resource'], step['name'], success)
452
453            # Step finished, let metric collector know
454            if (index + 1) < len(data['steps']):
455                collector.next_step()
456
457        # Stop metrics collection
458        collector.stop()
459
460        # Stop active containers
461        for resource in active_resources:
462            if resource is not None and hasattr(resource, 'stop'):
463                resource.stop()
464
465        self._logger.debug('Cleaned up all resource')
466        self._progress_cb('Cleaner', 'Clean up resources', True)
467
468        # Mark checkpoint if necessary
469        if checkpoint and success:
470            self._logger.debug('Writing checkpoint...')
471            with open(checkpoint_file, 'w') as f:
472                d = datetime.now().replace(microsecond=0).isoformat()
473                f.write(f'{d}\n')
474
475        # Log file
476        os.makedirs(os.path.join(results_run_path), exist_ok=True)
477        shutil.move(os.path.join(directory, LOG_FILE_NAME),
478                    os.path.join(results_run_path, LOG_FILE_NAME))
479        self._logger.debug('Copied logs to run results path')
480
481        # Metrics measurements
482        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
483            subdir = metrics_file.replace(f'{data_path}/', '') \
484                    .replace('/METRICS_FILE_NAME', '')
485            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
486            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
487                                                   METRICS_FILE_NAME))
488        self._logger.debug('Copied metric measurements to run results path')
489
490        # Results: all 'output_file' and 'result_file' values
491        if success:
492            self._logger.debug('Copying generated files for run')
493            for step in data['steps']:
494                subdir = step['resource'].lower().replace('_', '')
495                parameters = step['parameters']
496                os.makedirs(os.path.join(results_run_path, subdir),
497                            exist_ok=True)
498                if parameters.get('results_file', False):
499                    results_file = parameters['results_file']
500                    p1 = os.path.join(directory, 'data/shared', results_file)
501                    p2 = os.path.join(results_run_path, subdir, results_file)
502                    try:
503                        shutil.move(p1, p2)
504                    except FileNotFoundError as e:
505                        msg = f'Cannot find results file "{p1}": {e}'
506                        self._logger.warning(msg)
507
508                if parameters.get('output_file', False) \
509                        and not parameters.get('multiple_files', False):
510                    output_dir = os.path.join(results_run_path, subdir)
511                    for out_file in glob(os.path.join(str(directory), 'data',
512                                         'shared', '*.nt')):
513                        out_path = os.path.join(output_dir,
514                                                os.path.basename(out_file))
515                        try:
516                            shutil.move(out_file, out_path)
517                        except FileNotFoundError as e:
518                            msg = f'Cannot find output file "{out_file}": {e}'
519                            self._logger.warning(msg)
520
521            # Run complete, mark it
522            run_checkpoint_file = os.path.join(results_run_path,
523                                               CHECKPOINT_FILE_NAME)
524            self._logger.debug('Writing run checkpoint...')
525            with open(run_checkpoint_file, 'w') as f:
526                d = datetime.now().replace(microsecond=0).isoformat()
527                f.write(f'{d}\n')
528
529        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
530        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
531                          True)
532        sleep(WAIT_TIME)
533
534        return success
535
536    def list(self) -> list:
537        """List all cases in a root directory.
538
539            Retrieve a list of all discovered valid cases in a given directory.
540            Cases which do not pass the validation, are excluded and their
541            validation errors are reported through logging.
542
543            Returns
544            -------
545            cases : list
546                List of discovered cases.
547        """
548        cases = []
549
550        for directory in glob(self._main_directory):
551            for root, dirs, files in os.walk(directory):
552                for file in files:
553                    if os.path.basename(file) == METADATA_FILE:
554                        path = os.path.join(root, file)
555                        with open(path, 'r') as f:
556                            data = json.load(f)
557                            if self._validate_case(data, path):
558                                cases.append({
559                                    'directory': os.path.dirname(path),
560                                    'data': data
561                                })
562
563        return cases

Executor class executes a case.

Executor( main_directory: str, verbose: bool = False, progress_cb=<function _progress_cb>)
43    def __init__(self, main_directory: str, verbose: bool = False,
44                 progress_cb=_progress_cb):
45        """Create an instance of the Executor class.
46
47        Parameters
48        ----------
49        main_directory : str
50            The root directory of all the cases to execute.
51        verbose : bool
52            Enables verbose logs.
53        process_cb : function
54            Callback to call when a step is completed of the case. By default,
55            a dummy callback is provided if the argument is missing.
56        """
57        self._main_directory = os.path.abspath(main_directory)
58        self._schema = {}
59        self._resources: List[Dict[str, Any]] = []
60        self._class_module_mapping: Dict[str, Any] = {}
61        self._verbose = verbose
62        self._progress_cb = progress_cb
63        self._logger = Logger(__name__, self._main_directory, self._verbose)
64
65        self._init_resources()
66
67        with open(os.path.join(os.path.dirname(__file__), 'data',
68                               SCHEMA_FILE)) as f:
69            self._schema = json.load(f)

Create an instance of the Executor class.

Parameters
  • main_directory (str): The root directory of all the cases to execute.
  • verbose (bool): Enables verbose logs.
  • process_cb (function): Callback to call when a step is completed of the case. By default, a dummy callback is provided if the argument is missing.
main_directory: str
71    @property
72    def main_directory(self) -> str:
73        """The main directory of all the cases.
74
75        Returns
76        -------
77        main_directory : str
78            The path to the main directory of the cases.
79        """
80        return self._main_directory

The main directory of all the cases.

Returns
  • main_directory (str): The path to the main directory of the cases.
def stats(self, case: dict) -> bool:
278    def stats(self, case: dict) -> bool:
279        """Generate statistics for a case.
280
281        Generate statistics for an executed case. The case must be executed
282        before to succeed.
283
284        Parameters
285        ----------
286        case : dict
287            The case to generate statistics for.
288
289        Returns
290        -------
291        success : bool
292            Whether the statistics are generated with success or not.
293
294        """
295        data = case['data']
296        directory = case['directory']
297        results_path = os.path.join(directory, 'results')
298
299        if not os.path.exists(results_path):
300            msg = f'Results do not exist for case "{data["name"]}"'
301            self._logger.error(msg)
302            return False
303
304        stats = Stats(results_path, len(data['steps']), directory,
305                      self._verbose)
306        self._logger.info('Generating stats...')
307        if not stats.statistics():
308            return False
309
310        self._logger.info('Generating aggregated data...')
311        return stats.aggregate()

Generate statistics for a case.

Generate statistics for an executed case. The case must be executed before to succeed.

Parameters
  • case (dict): The case to generate statistics for.
Returns
  • success (bool): Whether the statistics are generated with success or not.
def clean(self, case: dict) -> bool:
313    def clean(self, case: dict) -> bool:
314        """Clean a case.
315
316        Clean up all results and metrics for a case to start it fresh.
317
318        Parameters
319        ----------
320        case : dict
321            The case to clean.
322
323        Returns
324        -------
325        success : bool
326            Whether the cleaning of the case succeeded or not.
327        """
328        # Checkpoints
329        checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME)
330        if os.path.exists(checkpoint_file):
331            os.remove(checkpoint_file)
332
333        # Results: log files, metric measurements, run checkpoints
334        for result_dir in glob(f'{case["directory"]}/results'):
335            shutil.rmtree(result_dir)
336
337        # Data: persistent storage
338        for data_dir in glob(f'{case["directory"]}/data/*'):
339            if not data_dir.endswith('shared'):
340                shutil.rmtree(data_dir)
341
342        return True

Clean a case.

Clean up all results and metrics for a case to start it fresh.

Parameters
  • case (dict): The case to clean.
Returns
  • success (bool): Whether the cleaning of the case succeeded or not.
def run(self, case: dict, interval: float, run: int, checkpoint: bool) -> bool:
344    def run(self, case: dict, interval: float,
345            run: int, checkpoint: bool) -> bool:
346        """Execute a case.
347
348        Execute all steps of a case while collecting metrics and logs.
349        The metrics are collected at a given interval and for a specific run of
350        the case to allow multiple executions of the same case. Checkpoints of
351        runs can be enabled to allow the executor to restart where it stopped
352        in case of a failure, electricity blackout, etc.
353
354        Parameters
355        ----------
356        case : dict
357            The case to execute.
358        interval : float
359            The sample interval for the metrics collection.
360        run : int
361            The run number of the case.
362        checkpoint : bool
363            Enable checkpoints after each run to allow restarts.
364
365        Returns
366        -------
367        success : bool
368            Whether the case was executed successfully or not.
369        """
370        success = True
371        data = case['data']
372        directory = case['directory']
373        data_path = os.path.join(directory, 'data')
374        results_run_path = os.path.join(directory, 'results', f'run_{run}')
375        checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME)
376        run_checkpoint_file = os.path.join(results_run_path,
377                                           CHECKPOINT_FILE_NAME)
378        active_resources = []
379
380        # Make sure we start with a clean setup before the first run
381        if run == 1:
382            self.clean(case)
383
384        # create directories
385        os.umask(0)
386        os.makedirs(data_path, exist_ok=True)
387        os.makedirs(results_run_path, exist_ok=True)
388
389        # Initialize resources if needed
390        # Some resources have to perform an initialization step such as
391        # configuring database users, storage, etc. which is only done once
392        for step in data['steps']:
393            module = self._class_module_mapping[step['resource']]
394            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
395                                                         directory,
396                                                         self._verbose)
397            if hasattr(resource, 'initialization'):
398                if not resource.initialization():
399                    self._logger.error('Failed to initialize resource '
400                                       f'{step["resource"]}')
401                    return False
402
403                self._logger.debug(f'Resource {step["resource"]} initialized')
404                self._progress_cb('Initializing', step['resource'], success)
405
406        # Launch metrics collection
407        collector = Collector(data['name'], results_run_path, interval,
408                              len(data['steps']), run, directory,
409                              self._verbose)
410
411        # Execute steps
412        for index, step in enumerate(data['steps']):
413            success = True
414            module = self._class_module_mapping[step['resource']]
415            resource = getattr(module, step['resource'])(data_path, CONFIG_DIR,
416                                                         directory,
417                                                         self._verbose)
418            active_resources.append(resource)
419
420            # Containers may need to start up first before executing a command
421            if hasattr(resource, 'wait_until_ready'):
422                if not resource.wait_until_ready():
423                    success = False
424                    self._logger.error('Waiting until resource '
425                                       f'"{step["resource"]} is ready failed')
426                    self._progress_cb(step['resource'], step['name'], success)
427                    break
428                self._logger.debug(f'Resource {step["resource"]} ready')
429
430            # Execute command
431            command = getattr(resource, step['command'])
432            if not command(**step['parameters']):
433                success = False
434                msg = f'Executing command "{step["command"]}" ' + \
435                      f'failed for resource "{step["resource"]}"'
436                # Some steps are non-critical like queries, they may fail but
437                # should not cause a complete case failure. Allow these
438                # failures if the may_fail key is present
439                if step.get('may_fail', False):
440                    self._logger.warning(msg)
441                    self._progress_cb(step['resource'], step['name'], success)
442                    continue
443                else:
444                    self._logger.error(msg)
445                    self._progress_cb(step['resource'], step['name'], success)
446                    break
447            self._logger.debug(f'Command "{step["command"]}" executed on '
448                               f'resource {step["resource"]}')
449
450            # Step complete
451            self._progress_cb(step['resource'], step['name'], success)
452
453            # Step finished, let metric collector know
454            if (index + 1) < len(data['steps']):
455                collector.next_step()
456
457        # Stop metrics collection
458        collector.stop()
459
460        # Stop active containers
461        for resource in active_resources:
462            if resource is not None and hasattr(resource, 'stop'):
463                resource.stop()
464
465        self._logger.debug('Cleaned up all resource')
466        self._progress_cb('Cleaner', 'Clean up resources', True)
467
468        # Mark checkpoint if necessary
469        if checkpoint and success:
470            self._logger.debug('Writing checkpoint...')
471            with open(checkpoint_file, 'w') as f:
472                d = datetime.now().replace(microsecond=0).isoformat()
473                f.write(f'{d}\n')
474
475        # Log file
476        os.makedirs(os.path.join(results_run_path), exist_ok=True)
477        shutil.move(os.path.join(directory, LOG_FILE_NAME),
478                    os.path.join(results_run_path, LOG_FILE_NAME))
479        self._logger.debug('Copied logs to run results path')
480
481        # Metrics measurements
482        for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'):
483            subdir = metrics_file.replace(f'{data_path}/', '') \
484                    .replace('/METRICS_FILE_NAME', '')
485            os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True)
486            shutil.move(metrics_file, os.path.join(results_run_path, subdir,
487                                                   METRICS_FILE_NAME))
488        self._logger.debug('Copied metric measurements to run results path')
489
490        # Results: all 'output_file' and 'result_file' values
491        if success:
492            self._logger.debug('Copying generated files for run')
493            for step in data['steps']:
494                subdir = step['resource'].lower().replace('_', '')
495                parameters = step['parameters']
496                os.makedirs(os.path.join(results_run_path, subdir),
497                            exist_ok=True)
498                if parameters.get('results_file', False):
499                    results_file = parameters['results_file']
500                    p1 = os.path.join(directory, 'data/shared', results_file)
501                    p2 = os.path.join(results_run_path, subdir, results_file)
502                    try:
503                        shutil.move(p1, p2)
504                    except FileNotFoundError as e:
505                        msg = f'Cannot find results file "{p1}": {e}'
506                        self._logger.warning(msg)
507
508                if parameters.get('output_file', False) \
509                        and not parameters.get('multiple_files', False):
510                    output_dir = os.path.join(results_run_path, subdir)
511                    for out_file in glob(os.path.join(str(directory), 'data',
512                                         'shared', '*.nt')):
513                        out_path = os.path.join(output_dir,
514                                                os.path.basename(out_file))
515                        try:
516                            shutil.move(out_file, out_path)
517                        except FileNotFoundError as e:
518                            msg = f'Cannot find output file "{out_file}": {e}'
519                            self._logger.warning(msg)
520
521            # Run complete, mark it
522            run_checkpoint_file = os.path.join(results_run_path,
523                                               CHECKPOINT_FILE_NAME)
524            self._logger.debug('Writing run checkpoint...')
525            with open(run_checkpoint_file, 'w') as f:
526                d = datetime.now().replace(microsecond=0).isoformat()
527                f.write(f'{d}\n')
528
529        self._logger.debug(f'Cooling down for {WAIT_TIME}s')
530        self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s',
531                          True)
532        sleep(WAIT_TIME)
533
534        return success

Execute a case.

Execute all steps of a case while collecting metrics and logs. The metrics are collected at a given interval and for a specific run of the case to allow multiple executions of the same case. Checkpoints of runs can be enabled to allow the executor to restart where it stopped in case of a failure, electricity blackout, etc.

Parameters
  • case (dict): The case to execute.
  • interval (float): The sample interval for the metrics collection.
  • run (int): The run number of the case.
  • checkpoint (bool): Enable checkpoints after each run to allow restarts.
Returns
  • success (bool): Whether the case was executed successfully or not.
def list(self) -> list:
536    def list(self) -> list:
537        """List all cases in a root directory.
538
539            Retrieve a list of all discovered valid cases in a given directory.
540            Cases which do not pass the validation, are excluded and their
541            validation errors are reported through logging.
542
543            Returns
544            -------
545            cases : list
546                List of discovered cases.
547        """
548        cases = []
549
550        for directory in glob(self._main_directory):
551            for root, dirs, files in os.walk(directory):
552                for file in files:
553                    if os.path.basename(file) == METADATA_FILE:
554                        path = os.path.join(root, file)
555                        with open(path, 'r') as f:
556                            data = json.load(f)
557                            if self._validate_case(data, path):
558                                cases.append({
559                                    'directory': os.path.dirname(path),
560                                    'data': data
561                                })
562
563        return cases

List all cases in a root directory.

Retrieve a list of all discovered valid cases in a given directory. Cases which do not pass the validation, are excluded and their validation errors are reported through logging.

Returns
  • cases (list): List of discovered cases.