bench_executor.executor
This module holds the Executor class which is responsible for executing a case, collecting metrics, and exposing this functionality to the CLI. All features of this tool can be accessed through the Executor class, other classes should not be used directly.
1#!/usr/bin/env python3 2 3""" 4This module holds the Executor class which is responsible for executing a case, 5collecting metrics, and exposing this functionality to the CLI. 6All features of this tool can be accessed through the Executor class, other 7classes should not be used directly. 8""" 9 10import os 11import sys 12import json 13import jsonschema 14import importlib 15import inspect 16import shutil 17from glob import glob 18from datetime import datetime 19from time import sleep 20from typing import List, Dict, Any 21from bench_executor.collector import Collector, METRICS_FILE_NAME 22from bench_executor.stats import Stats 23from bench_executor.logger import Logger, LOG_FILE_NAME 24 25METADATA_FILE = 'metadata.json' 26SCHEMA_FILE = 'metadata.schema' 27CONFIG_DIR = os.path.join(os.path.dirname(__file__), 'config') 28WAIT_TIME = 15 # seconds 29CHECKPOINT_FILE_NAME = '.done' 30 31 32# Dummy callback in case no callback was provided 33def _progress_cb(resource: str, name: str, success: bool): 34 pass 35 36 37class Executor: 38 """ 39 Executor class executes a case. 40 """ 41 42 def __init__(self, main_directory: str, verbose: bool = False, 43 progress_cb=_progress_cb): 44 """Create an instance of the Executor class. 45 46 Parameters 47 ---------- 48 main_directory : str 49 The root directory of all the cases to execute. 50 verbose : bool 51 Enables verbose logs. 52 process_cb : function 53 Callback to call when a step is completed of the case. By default, 54 a dummy callback is provided if the argument is missing. 55 """ 56 self._main_directory = os.path.abspath(main_directory) 57 self._schema = {} 58 self._resources: List[Dict[str, Any]] = [] 59 self._class_module_mapping: Dict[str, Any] = {} 60 self._verbose = verbose 61 self._progress_cb = progress_cb 62 self._logger = Logger(__name__, self._main_directory, self._verbose) 63 64 self._init_resources() 65 66 with open(os.path.join(os.path.dirname(__file__), 'data', 67 SCHEMA_FILE)) as f: 68 self._schema = json.load(f) 69 70 @property 71 def main_directory(self) -> str: 72 """The main directory of all the cases. 73 74 Returns 75 ------- 76 main_directory : str 77 The path to the main directory of the cases. 78 """ 79 return self._main_directory 80 81 def _init_resources(self) -> None: 82 """Initialize resources of a case 83 84 Resources are discovered automatically by analyzing Python modules. 85 """ 86 87 # Discover all modules to import 88 sys.path.append(os.path.dirname(__file__)) 89 self._modules = list(filter(lambda x: x.endswith('.py') 90 and '__init__' not in x 91 and '__pycache__' not in x, 92 os.listdir(os.path.dirname(__file__)))) 93 94 # Discover all classes in each module 95 for m in self._modules: 96 module_name = os.path.splitext(m)[0] 97 parent_module = os.path.split(os.path.dirname(__file__))[-1] 98 import_name = '.'.join([parent_module, module_name]) 99 imported_module = importlib.import_module(import_name) 100 for name, cls in inspect.getmembers(imported_module, 101 inspect.isclass): 102 if name.startswith('_') or name[0].islower(): 103 continue 104 105 # Store class-module mapping for reverse look-up 106 self._class_module_mapping[name] = imported_module 107 108 # Discover all methods and their parameters in each class 109 methods: Dict[str, List[Dict[str, str]]] = {} 110 filt = filter(lambda x: '__init__' not in x, 111 inspect.getmembers(cls, inspect.isfunction)) 112 for method_name, method in filt: 113 parameters = inspect.signature(method).parameters 114 methods[method_name] = [] 115 for key in parameters.keys(): 116 if key == 'self': 117 continue 118 p = parameters[key] 119 required = (p.default == inspect.Parameter.empty) 120 methods[method_name].append({'name': p.name, 121 'required': required}) 122 123 if name not in list(filter(lambda x: x['name'], 124 self._resources)): 125 self._resources.append({'name': name, 'commands': methods}) 126 127 def _resources_all_names(self) -> list: 128 """Retrieve all resources' name in a case. 129 130 Returns 131 ------- 132 names : list 133 List of all resources' name in a case. 134 """ 135 names = [] 136 for r in self._resources: 137 names.append(r['name']) 138 139 return names 140 141 def _resources_all_commands_by_name(self, name: str) -> list: 142 """Retrieve all resources' commands. 143 144 Parameters 145 ---------- 146 name : str 147 The resource's name. 148 149 Returns 150 ------- 151 commands : list 152 List of commands for the resource. 153 """ 154 commands = [] 155 for r in filter(lambda x: x['name'] == name, self._resources): 156 commands += list(r['commands'].keys()) # type: ignore 157 158 return commands 159 160 def _resources_all_parameters_by_command(self, name: str, 161 command: str, 162 required_only=False) -> list: 163 """Retrieve all parameters of a command of a resource. 164 165 Parameters 166 ---------- 167 name : str 168 The resource's name. 169 command : str 170 The command's name. 171 required_only : bool 172 Only return the required parameters of a command. Default all 173 parameters are returned. 174 175 Returns 176 ------- 177 parameters : list 178 List of parameters of the resource's command. None if failed. 179 180 Raises 181 ------ 182 KeyError : Exception 183 If the command cannot be found for the resource. 184 """ 185 parameters = [] 186 for r in filter(lambda x: x['name'] == name, self._resources): 187 try: 188 for p in r['commands'][command]: 189 if required_only: 190 if p['required']: 191 parameters.append(p['name']) 192 else: 193 parameters.append(p['name']) 194 except KeyError as e: 195 self._logger.error(f'Command "{command}" not found for ' 196 f'resource "{name}": {e}') 197 raise e 198 199 return parameters 200 201 def _validate_case(self, case: dict, path: str) -> bool: 202 """Validate a case's syntax. 203 204 Verify if a case has a valid syntax or not. Report any errors 205 discovered through logging and return if the validation succeeded or 206 not. 207 208 Parameters 209 ---------- 210 case : dict 211 The case to validate. 212 path : str 213 The file path to the case. 214 215 Returns 216 ------- 217 success : bool 218 Whether the validation of the case succeeded or not. 219 """ 220 try: 221 # Verify schema 222 jsonschema.validate(case, self._schema) 223 224 # Verify values 225 for step in case['steps']: 226 # Check if resource is known 227 names = self._resources_all_names() 228 if step['resource'] not in names: 229 msg = f'{path}: Unknown resource "{step["resource"]}"' 230 self._logger.error(msg) 231 return False 232 233 # Check if command is known 234 r = step['resource'] 235 commands = self._resources_all_commands_by_name(r) 236 if commands is None or step['command'] not in commands: 237 msg = f'{path}: Unknown command "{step["command"]}" ' + \ 238 f'for resource "{step["resource"]}"' 239 self._logger.error(msg) 240 return False 241 242 # Check if parameters are known 243 r = step['resource'] 244 c = step['command'] 245 parameters = self._resources_all_parameters_by_command(r, c) 246 if parameters is None: 247 return False 248 249 for p in step['parameters'].keys(): 250 if p not in parameters: 251 msg = f'{path}: Unkown parameter "{p}" for ' + \ 252 f'command "{step["command"]}" of resource ' + \ 253 f'"{step["resource"]}"' 254 self._logger.error(msg) 255 return False 256 257 # Check if all required parameters are provided 258 r = step['resource'] 259 c = step['command'] 260 parameters = \ 261 self._resources_all_parameters_by_command(r, c, True) 262 for p in parameters: 263 if p not in step['parameters'].keys(): 264 msg = f'{path}: Missing required parameter "{p}" ' + \ 265 f'for command "{step["command"]}" ' + \ 266 f'of resource "{step["resource"]}"' 267 self._logger.error(msg) 268 return False 269 270 except jsonschema.ValidationError: 271 msg = f'{path}: JSON schema violation' 272 self._logger.error(msg) 273 return False 274 275 return True 276 277 def stats(self, case: dict) -> bool: 278 """Generate statistics for a case. 279 280 Generate statistics for an executed case. The case must be executed 281 before to succeed. 282 283 Parameters 284 ---------- 285 case : dict 286 The case to generate statistics for. 287 288 Returns 289 ------- 290 success : bool 291 Whether the statistics are generated with success or not. 292 293 """ 294 data = case['data'] 295 directory = case['directory'] 296 results_path = os.path.join(directory, 'results') 297 298 if not os.path.exists(results_path): 299 msg = f'Results do not exist for case "{data["name"]}"' 300 self._logger.error(msg) 301 return False 302 303 stats = Stats(results_path, len(data['steps']), directory, 304 self._verbose) 305 self._logger.info('Generating stats...') 306 if not stats.statistics(): 307 return False 308 309 self._logger.info('Generating aggregated data...') 310 return stats.aggregate() 311 312 def clean(self, case: dict) -> bool: 313 """Clean a case. 314 315 Clean up all results and metrics for a case to start it fresh. 316 317 Parameters 318 ---------- 319 case : dict 320 The case to clean. 321 322 Returns 323 ------- 324 success : bool 325 Whether the cleaning of the case succeeded or not. 326 """ 327 # Checkpoints 328 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 329 if os.path.exists(checkpoint_file): 330 os.remove(checkpoint_file) 331 332 # Results: log files, metric measurements, run checkpoints 333 for result_dir in glob(f'{case["directory"]}/results'): 334 shutil.rmtree(result_dir) 335 336 # Data: persistent storage 337 for data_dir in glob(f'{case["directory"]}/data/*'): 338 if not data_dir.endswith('shared'): 339 shutil.rmtree(data_dir) 340 341 return True 342 343 def run(self, case: dict, interval: float, 344 run: int, checkpoint: bool) -> bool: 345 """Execute a case. 346 347 Execute all steps of a case while collecting metrics and logs. 348 The metrics are collected at a given interval and for a specific run of 349 the case to allow multiple executions of the same case. Checkpoints of 350 runs can be enabled to allow the executor to restart where it stopped 351 in case of a failure, electricity blackout, etc. 352 353 Parameters 354 ---------- 355 case : dict 356 The case to execute. 357 interval : float 358 The sample interval for the metrics collection. 359 run : int 360 The run number of the case. 361 checkpoint : bool 362 Enable checkpoints after each run to allow restarts. 363 364 Returns 365 ------- 366 success : bool 367 Whether the case was executed successfully or not. 368 """ 369 success = True 370 data = case['data'] 371 directory = case['directory'] 372 data_path = os.path.join(directory, 'data') 373 results_run_path = os.path.join(directory, 'results', f'run_{run}') 374 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 375 run_checkpoint_file = os.path.join(results_run_path, 376 CHECKPOINT_FILE_NAME) 377 active_resources = [] 378 379 # Make sure we start with a clean setup before the first run 380 if run == 1: 381 self.clean(case) 382 383 # create directories 384 os.umask(0) 385 os.makedirs(data_path, exist_ok=True) 386 os.makedirs(results_run_path, exist_ok=True) 387 388 # Initialize resources if needed 389 # Some resources have to perform an initialization step such as 390 # configuring database users, storage, etc. which is only done once 391 for step in data['steps']: 392 module = self._class_module_mapping[step['resource']] 393 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 394 directory, 395 self._verbose) 396 if hasattr(resource, 'initialization'): 397 if not resource.initialization(): 398 self._logger.error('Failed to initialize resource ' 399 f'{step["resource"]}') 400 return False 401 402 self._logger.debug(f'Resource {step["resource"]} initialized') 403 self._progress_cb('Initializing', step['resource'], success) 404 405 # Launch metrics collection 406 collector = Collector(data['name'], results_run_path, interval, 407 len(data['steps']), run, directory, 408 self._verbose) 409 410 # Execute steps 411 for index, step in enumerate(data['steps']): 412 success = True 413 module = self._class_module_mapping[step['resource']] 414 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 415 directory, 416 self._verbose) 417 active_resources.append(resource) 418 419 # Containers may need to start up first before executing a command 420 if hasattr(resource, 'wait_until_ready'): 421 if not resource.wait_until_ready(): 422 success = False 423 self._logger.error('Waiting until resource ' 424 f'"{step["resource"]} is ready failed') 425 self._progress_cb(step['resource'], step['name'], success) 426 break 427 self._logger.debug(f'Resource {step["resource"]} ready') 428 429 # Execute command 430 command = getattr(resource, step['command']) 431 if not command(**step['parameters']): 432 success = False 433 msg = f'Executing command "{step["command"]}" ' + \ 434 f'failed for resource "{step["resource"]}"' 435 # Some steps are non-critical like queries, they may fail but 436 # should not cause a complete case failure. Allow these 437 # failures if the may_fail key is present 438 if step.get('may_fail', False): 439 self._logger.warning(msg) 440 self._progress_cb(step['resource'], step['name'], success) 441 continue 442 else: 443 self._logger.error(msg) 444 self._progress_cb(step['resource'], step['name'], success) 445 break 446 self._logger.debug(f'Command "{step["command"]}" executed on ' 447 f'resource {step["resource"]}') 448 449 # Step complete 450 self._progress_cb(step['resource'], step['name'], success) 451 452 # Step finished, let metric collector know 453 if (index + 1) < len(data['steps']): 454 collector.next_step() 455 456 # Stop metrics collection 457 collector.stop() 458 459 # Stop active containers 460 for resource in active_resources: 461 if resource is not None and hasattr(resource, 'stop'): 462 resource.stop() 463 464 self._logger.debug('Cleaned up all resource') 465 self._progress_cb('Cleaner', 'Clean up resources', True) 466 467 # Mark checkpoint if necessary 468 if checkpoint and success: 469 self._logger.debug('Writing checkpoint...') 470 with open(checkpoint_file, 'w') as f: 471 d = datetime.now().replace(microsecond=0).isoformat() 472 f.write(f'{d}\n') 473 474 # Log file 475 os.makedirs(os.path.join(results_run_path), exist_ok=True) 476 shutil.move(os.path.join(directory, LOG_FILE_NAME), 477 os.path.join(results_run_path, LOG_FILE_NAME)) 478 self._logger.debug('Copied logs to run results path') 479 480 # Metrics measurements 481 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 482 subdir = metrics_file.replace(f'{data_path}/', '') \ 483 .replace('/METRICS_FILE_NAME', '') 484 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 485 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 486 METRICS_FILE_NAME)) 487 self._logger.debug('Copied metric measurements to run results path') 488 489 # Results: all 'output_file' and 'result_file' values 490 if success: 491 self._logger.debug('Copying generated files for run') 492 for step in data['steps']: 493 subdir = step['resource'].lower().replace('_', '') 494 parameters = step['parameters'] 495 os.makedirs(os.path.join(results_run_path, subdir), 496 exist_ok=True) 497 if parameters.get('results_file', False): 498 results_file = parameters['results_file'] 499 p1 = os.path.join(directory, 'data/shared', results_file) 500 p2 = os.path.join(results_run_path, subdir, results_file) 501 try: 502 shutil.move(p1, p2) 503 except FileNotFoundError as e: 504 msg = f'Cannot find results file "{p1}": {e}' 505 self._logger.warning(msg) 506 507 if parameters.get('output_file', False) \ 508 and not parameters.get('multiple_files', False): 509 output_dir = os.path.join(results_run_path, subdir) 510 for out_file in glob(os.path.join(str(directory), 'data', 511 'shared', '*.nt')): 512 out_path = os.path.join(output_dir, 513 os.path.basename(out_file)) 514 try: 515 shutil.move(out_file, out_path) 516 except FileNotFoundError as e: 517 msg = f'Cannot find output file "{out_file}": {e}' 518 self._logger.warning(msg) 519 520 # Run complete, mark it 521 run_checkpoint_file = os.path.join(results_run_path, 522 CHECKPOINT_FILE_NAME) 523 self._logger.debug('Writing run checkpoint...') 524 with open(run_checkpoint_file, 'w') as f: 525 d = datetime.now().replace(microsecond=0).isoformat() 526 f.write(f'{d}\n') 527 528 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 529 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 530 True) 531 sleep(WAIT_TIME) 532 533 return success 534 535 def list(self) -> list: 536 """List all cases in a root directory. 537 538 Retrieve a list of all discovered valid cases in a given directory. 539 Cases which do not pass the validation, are excluded and their 540 validation errors are reported through logging. 541 542 Returns 543 ------- 544 cases : list 545 List of discovered cases. 546 """ 547 cases = [] 548 549 for directory in glob(self._main_directory): 550 for root, dirs, files in os.walk(directory): 551 for file in files: 552 if os.path.basename(file) == METADATA_FILE: 553 path = os.path.join(root, file) 554 with open(path, 'r') as f: 555 data = json.load(f) 556 if self._validate_case(data, path): 557 cases.append({ 558 'directory': os.path.dirname(path), 559 'data': data 560 }) 561 562 return cases
38class Executor: 39 """ 40 Executor class executes a case. 41 """ 42 43 def __init__(self, main_directory: str, verbose: bool = False, 44 progress_cb=_progress_cb): 45 """Create an instance of the Executor class. 46 47 Parameters 48 ---------- 49 main_directory : str 50 The root directory of all the cases to execute. 51 verbose : bool 52 Enables verbose logs. 53 process_cb : function 54 Callback to call when a step is completed of the case. By default, 55 a dummy callback is provided if the argument is missing. 56 """ 57 self._main_directory = os.path.abspath(main_directory) 58 self._schema = {} 59 self._resources: List[Dict[str, Any]] = [] 60 self._class_module_mapping: Dict[str, Any] = {} 61 self._verbose = verbose 62 self._progress_cb = progress_cb 63 self._logger = Logger(__name__, self._main_directory, self._verbose) 64 65 self._init_resources() 66 67 with open(os.path.join(os.path.dirname(__file__), 'data', 68 SCHEMA_FILE)) as f: 69 self._schema = json.load(f) 70 71 @property 72 def main_directory(self) -> str: 73 """The main directory of all the cases. 74 75 Returns 76 ------- 77 main_directory : str 78 The path to the main directory of the cases. 79 """ 80 return self._main_directory 81 82 def _init_resources(self) -> None: 83 """Initialize resources of a case 84 85 Resources are discovered automatically by analyzing Python modules. 86 """ 87 88 # Discover all modules to import 89 sys.path.append(os.path.dirname(__file__)) 90 self._modules = list(filter(lambda x: x.endswith('.py') 91 and '__init__' not in x 92 and '__pycache__' not in x, 93 os.listdir(os.path.dirname(__file__)))) 94 95 # Discover all classes in each module 96 for m in self._modules: 97 module_name = os.path.splitext(m)[0] 98 parent_module = os.path.split(os.path.dirname(__file__))[-1] 99 import_name = '.'.join([parent_module, module_name]) 100 imported_module = importlib.import_module(import_name) 101 for name, cls in inspect.getmembers(imported_module, 102 inspect.isclass): 103 if name.startswith('_') or name[0].islower(): 104 continue 105 106 # Store class-module mapping for reverse look-up 107 self._class_module_mapping[name] = imported_module 108 109 # Discover all methods and their parameters in each class 110 methods: Dict[str, List[Dict[str, str]]] = {} 111 filt = filter(lambda x: '__init__' not in x, 112 inspect.getmembers(cls, inspect.isfunction)) 113 for method_name, method in filt: 114 parameters = inspect.signature(method).parameters 115 methods[method_name] = [] 116 for key in parameters.keys(): 117 if key == 'self': 118 continue 119 p = parameters[key] 120 required = (p.default == inspect.Parameter.empty) 121 methods[method_name].append({'name': p.name, 122 'required': required}) 123 124 if name not in list(filter(lambda x: x['name'], 125 self._resources)): 126 self._resources.append({'name': name, 'commands': methods}) 127 128 def _resources_all_names(self) -> list: 129 """Retrieve all resources' name in a case. 130 131 Returns 132 ------- 133 names : list 134 List of all resources' name in a case. 135 """ 136 names = [] 137 for r in self._resources: 138 names.append(r['name']) 139 140 return names 141 142 def _resources_all_commands_by_name(self, name: str) -> list: 143 """Retrieve all resources' commands. 144 145 Parameters 146 ---------- 147 name : str 148 The resource's name. 149 150 Returns 151 ------- 152 commands : list 153 List of commands for the resource. 154 """ 155 commands = [] 156 for r in filter(lambda x: x['name'] == name, self._resources): 157 commands += list(r['commands'].keys()) # type: ignore 158 159 return commands 160 161 def _resources_all_parameters_by_command(self, name: str, 162 command: str, 163 required_only=False) -> list: 164 """Retrieve all parameters of a command of a resource. 165 166 Parameters 167 ---------- 168 name : str 169 The resource's name. 170 command : str 171 The command's name. 172 required_only : bool 173 Only return the required parameters of a command. Default all 174 parameters are returned. 175 176 Returns 177 ------- 178 parameters : list 179 List of parameters of the resource's command. None if failed. 180 181 Raises 182 ------ 183 KeyError : Exception 184 If the command cannot be found for the resource. 185 """ 186 parameters = [] 187 for r in filter(lambda x: x['name'] == name, self._resources): 188 try: 189 for p in r['commands'][command]: 190 if required_only: 191 if p['required']: 192 parameters.append(p['name']) 193 else: 194 parameters.append(p['name']) 195 except KeyError as e: 196 self._logger.error(f'Command "{command}" not found for ' 197 f'resource "{name}": {e}') 198 raise e 199 200 return parameters 201 202 def _validate_case(self, case: dict, path: str) -> bool: 203 """Validate a case's syntax. 204 205 Verify if a case has a valid syntax or not. Report any errors 206 discovered through logging and return if the validation succeeded or 207 not. 208 209 Parameters 210 ---------- 211 case : dict 212 The case to validate. 213 path : str 214 The file path to the case. 215 216 Returns 217 ------- 218 success : bool 219 Whether the validation of the case succeeded or not. 220 """ 221 try: 222 # Verify schema 223 jsonschema.validate(case, self._schema) 224 225 # Verify values 226 for step in case['steps']: 227 # Check if resource is known 228 names = self._resources_all_names() 229 if step['resource'] not in names: 230 msg = f'{path}: Unknown resource "{step["resource"]}"' 231 self._logger.error(msg) 232 return False 233 234 # Check if command is known 235 r = step['resource'] 236 commands = self._resources_all_commands_by_name(r) 237 if commands is None or step['command'] not in commands: 238 msg = f'{path}: Unknown command "{step["command"]}" ' + \ 239 f'for resource "{step["resource"]}"' 240 self._logger.error(msg) 241 return False 242 243 # Check if parameters are known 244 r = step['resource'] 245 c = step['command'] 246 parameters = self._resources_all_parameters_by_command(r, c) 247 if parameters is None: 248 return False 249 250 for p in step['parameters'].keys(): 251 if p not in parameters: 252 msg = f'{path}: Unkown parameter "{p}" for ' + \ 253 f'command "{step["command"]}" of resource ' + \ 254 f'"{step["resource"]}"' 255 self._logger.error(msg) 256 return False 257 258 # Check if all required parameters are provided 259 r = step['resource'] 260 c = step['command'] 261 parameters = \ 262 self._resources_all_parameters_by_command(r, c, True) 263 for p in parameters: 264 if p not in step['parameters'].keys(): 265 msg = f'{path}: Missing required parameter "{p}" ' + \ 266 f'for command "{step["command"]}" ' + \ 267 f'of resource "{step["resource"]}"' 268 self._logger.error(msg) 269 return False 270 271 except jsonschema.ValidationError: 272 msg = f'{path}: JSON schema violation' 273 self._logger.error(msg) 274 return False 275 276 return True 277 278 def stats(self, case: dict) -> bool: 279 """Generate statistics for a case. 280 281 Generate statistics for an executed case. The case must be executed 282 before to succeed. 283 284 Parameters 285 ---------- 286 case : dict 287 The case to generate statistics for. 288 289 Returns 290 ------- 291 success : bool 292 Whether the statistics are generated with success or not. 293 294 """ 295 data = case['data'] 296 directory = case['directory'] 297 results_path = os.path.join(directory, 'results') 298 299 if not os.path.exists(results_path): 300 msg = f'Results do not exist for case "{data["name"]}"' 301 self._logger.error(msg) 302 return False 303 304 stats = Stats(results_path, len(data['steps']), directory, 305 self._verbose) 306 self._logger.info('Generating stats...') 307 if not stats.statistics(): 308 return False 309 310 self._logger.info('Generating aggregated data...') 311 return stats.aggregate() 312 313 def clean(self, case: dict) -> bool: 314 """Clean a case. 315 316 Clean up all results and metrics for a case to start it fresh. 317 318 Parameters 319 ---------- 320 case : dict 321 The case to clean. 322 323 Returns 324 ------- 325 success : bool 326 Whether the cleaning of the case succeeded or not. 327 """ 328 # Checkpoints 329 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 330 if os.path.exists(checkpoint_file): 331 os.remove(checkpoint_file) 332 333 # Results: log files, metric measurements, run checkpoints 334 for result_dir in glob(f'{case["directory"]}/results'): 335 shutil.rmtree(result_dir) 336 337 # Data: persistent storage 338 for data_dir in glob(f'{case["directory"]}/data/*'): 339 if not data_dir.endswith('shared'): 340 shutil.rmtree(data_dir) 341 342 return True 343 344 def run(self, case: dict, interval: float, 345 run: int, checkpoint: bool) -> bool: 346 """Execute a case. 347 348 Execute all steps of a case while collecting metrics and logs. 349 The metrics are collected at a given interval and for a specific run of 350 the case to allow multiple executions of the same case. Checkpoints of 351 runs can be enabled to allow the executor to restart where it stopped 352 in case of a failure, electricity blackout, etc. 353 354 Parameters 355 ---------- 356 case : dict 357 The case to execute. 358 interval : float 359 The sample interval for the metrics collection. 360 run : int 361 The run number of the case. 362 checkpoint : bool 363 Enable checkpoints after each run to allow restarts. 364 365 Returns 366 ------- 367 success : bool 368 Whether the case was executed successfully or not. 369 """ 370 success = True 371 data = case['data'] 372 directory = case['directory'] 373 data_path = os.path.join(directory, 'data') 374 results_run_path = os.path.join(directory, 'results', f'run_{run}') 375 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 376 run_checkpoint_file = os.path.join(results_run_path, 377 CHECKPOINT_FILE_NAME) 378 active_resources = [] 379 380 # Make sure we start with a clean setup before the first run 381 if run == 1: 382 self.clean(case) 383 384 # create directories 385 os.umask(0) 386 os.makedirs(data_path, exist_ok=True) 387 os.makedirs(results_run_path, exist_ok=True) 388 389 # Initialize resources if needed 390 # Some resources have to perform an initialization step such as 391 # configuring database users, storage, etc. which is only done once 392 for step in data['steps']: 393 module = self._class_module_mapping[step['resource']] 394 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 395 directory, 396 self._verbose) 397 if hasattr(resource, 'initialization'): 398 if not resource.initialization(): 399 self._logger.error('Failed to initialize resource ' 400 f'{step["resource"]}') 401 return False 402 403 self._logger.debug(f'Resource {step["resource"]} initialized') 404 self._progress_cb('Initializing', step['resource'], success) 405 406 # Launch metrics collection 407 collector = Collector(data['name'], results_run_path, interval, 408 len(data['steps']), run, directory, 409 self._verbose) 410 411 # Execute steps 412 for index, step in enumerate(data['steps']): 413 success = True 414 module = self._class_module_mapping[step['resource']] 415 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 416 directory, 417 self._verbose) 418 active_resources.append(resource) 419 420 # Containers may need to start up first before executing a command 421 if hasattr(resource, 'wait_until_ready'): 422 if not resource.wait_until_ready(): 423 success = False 424 self._logger.error('Waiting until resource ' 425 f'"{step["resource"]} is ready failed') 426 self._progress_cb(step['resource'], step['name'], success) 427 break 428 self._logger.debug(f'Resource {step["resource"]} ready') 429 430 # Execute command 431 command = getattr(resource, step['command']) 432 if not command(**step['parameters']): 433 success = False 434 msg = f'Executing command "{step["command"]}" ' + \ 435 f'failed for resource "{step["resource"]}"' 436 # Some steps are non-critical like queries, they may fail but 437 # should not cause a complete case failure. Allow these 438 # failures if the may_fail key is present 439 if step.get('may_fail', False): 440 self._logger.warning(msg) 441 self._progress_cb(step['resource'], step['name'], success) 442 continue 443 else: 444 self._logger.error(msg) 445 self._progress_cb(step['resource'], step['name'], success) 446 break 447 self._logger.debug(f'Command "{step["command"]}" executed on ' 448 f'resource {step["resource"]}') 449 450 # Step complete 451 self._progress_cb(step['resource'], step['name'], success) 452 453 # Step finished, let metric collector know 454 if (index + 1) < len(data['steps']): 455 collector.next_step() 456 457 # Stop metrics collection 458 collector.stop() 459 460 # Stop active containers 461 for resource in active_resources: 462 if resource is not None and hasattr(resource, 'stop'): 463 resource.stop() 464 465 self._logger.debug('Cleaned up all resource') 466 self._progress_cb('Cleaner', 'Clean up resources', True) 467 468 # Mark checkpoint if necessary 469 if checkpoint and success: 470 self._logger.debug('Writing checkpoint...') 471 with open(checkpoint_file, 'w') as f: 472 d = datetime.now().replace(microsecond=0).isoformat() 473 f.write(f'{d}\n') 474 475 # Log file 476 os.makedirs(os.path.join(results_run_path), exist_ok=True) 477 shutil.move(os.path.join(directory, LOG_FILE_NAME), 478 os.path.join(results_run_path, LOG_FILE_NAME)) 479 self._logger.debug('Copied logs to run results path') 480 481 # Metrics measurements 482 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 483 subdir = metrics_file.replace(f'{data_path}/', '') \ 484 .replace('/METRICS_FILE_NAME', '') 485 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 486 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 487 METRICS_FILE_NAME)) 488 self._logger.debug('Copied metric measurements to run results path') 489 490 # Results: all 'output_file' and 'result_file' values 491 if success: 492 self._logger.debug('Copying generated files for run') 493 for step in data['steps']: 494 subdir = step['resource'].lower().replace('_', '') 495 parameters = step['parameters'] 496 os.makedirs(os.path.join(results_run_path, subdir), 497 exist_ok=True) 498 if parameters.get('results_file', False): 499 results_file = parameters['results_file'] 500 p1 = os.path.join(directory, 'data/shared', results_file) 501 p2 = os.path.join(results_run_path, subdir, results_file) 502 try: 503 shutil.move(p1, p2) 504 except FileNotFoundError as e: 505 msg = f'Cannot find results file "{p1}": {e}' 506 self._logger.warning(msg) 507 508 if parameters.get('output_file', False) \ 509 and not parameters.get('multiple_files', False): 510 output_dir = os.path.join(results_run_path, subdir) 511 for out_file in glob(os.path.join(str(directory), 'data', 512 'shared', '*.nt')): 513 out_path = os.path.join(output_dir, 514 os.path.basename(out_file)) 515 try: 516 shutil.move(out_file, out_path) 517 except FileNotFoundError as e: 518 msg = f'Cannot find output file "{out_file}": {e}' 519 self._logger.warning(msg) 520 521 # Run complete, mark it 522 run_checkpoint_file = os.path.join(results_run_path, 523 CHECKPOINT_FILE_NAME) 524 self._logger.debug('Writing run checkpoint...') 525 with open(run_checkpoint_file, 'w') as f: 526 d = datetime.now().replace(microsecond=0).isoformat() 527 f.write(f'{d}\n') 528 529 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 530 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 531 True) 532 sleep(WAIT_TIME) 533 534 return success 535 536 def list(self) -> list: 537 """List all cases in a root directory. 538 539 Retrieve a list of all discovered valid cases in a given directory. 540 Cases which do not pass the validation, are excluded and their 541 validation errors are reported through logging. 542 543 Returns 544 ------- 545 cases : list 546 List of discovered cases. 547 """ 548 cases = [] 549 550 for directory in glob(self._main_directory): 551 for root, dirs, files in os.walk(directory): 552 for file in files: 553 if os.path.basename(file) == METADATA_FILE: 554 path = os.path.join(root, file) 555 with open(path, 'r') as f: 556 data = json.load(f) 557 if self._validate_case(data, path): 558 cases.append({ 559 'directory': os.path.dirname(path), 560 'data': data 561 }) 562 563 return cases
Executor class executes a case.
43 def __init__(self, main_directory: str, verbose: bool = False, 44 progress_cb=_progress_cb): 45 """Create an instance of the Executor class. 46 47 Parameters 48 ---------- 49 main_directory : str 50 The root directory of all the cases to execute. 51 verbose : bool 52 Enables verbose logs. 53 process_cb : function 54 Callback to call when a step is completed of the case. By default, 55 a dummy callback is provided if the argument is missing. 56 """ 57 self._main_directory = os.path.abspath(main_directory) 58 self._schema = {} 59 self._resources: List[Dict[str, Any]] = [] 60 self._class_module_mapping: Dict[str, Any] = {} 61 self._verbose = verbose 62 self._progress_cb = progress_cb 63 self._logger = Logger(__name__, self._main_directory, self._verbose) 64 65 self._init_resources() 66 67 with open(os.path.join(os.path.dirname(__file__), 'data', 68 SCHEMA_FILE)) as f: 69 self._schema = json.load(f)
Create an instance of the Executor class.
Parameters
- main_directory (str): The root directory of all the cases to execute.
- verbose (bool): Enables verbose logs.
- process_cb (function): Callback to call when a step is completed of the case. By default, a dummy callback is provided if the argument is missing.
71 @property 72 def main_directory(self) -> str: 73 """The main directory of all the cases. 74 75 Returns 76 ------- 77 main_directory : str 78 The path to the main directory of the cases. 79 """ 80 return self._main_directory
The main directory of all the cases.
Returns
- main_directory (str): The path to the main directory of the cases.
278 def stats(self, case: dict) -> bool: 279 """Generate statistics for a case. 280 281 Generate statistics for an executed case. The case must be executed 282 before to succeed. 283 284 Parameters 285 ---------- 286 case : dict 287 The case to generate statistics for. 288 289 Returns 290 ------- 291 success : bool 292 Whether the statistics are generated with success or not. 293 294 """ 295 data = case['data'] 296 directory = case['directory'] 297 results_path = os.path.join(directory, 'results') 298 299 if not os.path.exists(results_path): 300 msg = f'Results do not exist for case "{data["name"]}"' 301 self._logger.error(msg) 302 return False 303 304 stats = Stats(results_path, len(data['steps']), directory, 305 self._verbose) 306 self._logger.info('Generating stats...') 307 if not stats.statistics(): 308 return False 309 310 self._logger.info('Generating aggregated data...') 311 return stats.aggregate()
Generate statistics for a case.
Generate statistics for an executed case. The case must be executed before to succeed.
Parameters
- case (dict): The case to generate statistics for.
Returns
- success (bool): Whether the statistics are generated with success or not.
313 def clean(self, case: dict) -> bool: 314 """Clean a case. 315 316 Clean up all results and metrics for a case to start it fresh. 317 318 Parameters 319 ---------- 320 case : dict 321 The case to clean. 322 323 Returns 324 ------- 325 success : bool 326 Whether the cleaning of the case succeeded or not. 327 """ 328 # Checkpoints 329 checkpoint_file = os.path.join(case['directory'], CHECKPOINT_FILE_NAME) 330 if os.path.exists(checkpoint_file): 331 os.remove(checkpoint_file) 332 333 # Results: log files, metric measurements, run checkpoints 334 for result_dir in glob(f'{case["directory"]}/results'): 335 shutil.rmtree(result_dir) 336 337 # Data: persistent storage 338 for data_dir in glob(f'{case["directory"]}/data/*'): 339 if not data_dir.endswith('shared'): 340 shutil.rmtree(data_dir) 341 342 return True
Clean a case.
Clean up all results and metrics for a case to start it fresh.
Parameters
- case (dict): The case to clean.
Returns
- success (bool): Whether the cleaning of the case succeeded or not.
344 def run(self, case: dict, interval: float, 345 run: int, checkpoint: bool) -> bool: 346 """Execute a case. 347 348 Execute all steps of a case while collecting metrics and logs. 349 The metrics are collected at a given interval and for a specific run of 350 the case to allow multiple executions of the same case. Checkpoints of 351 runs can be enabled to allow the executor to restart where it stopped 352 in case of a failure, electricity blackout, etc. 353 354 Parameters 355 ---------- 356 case : dict 357 The case to execute. 358 interval : float 359 The sample interval for the metrics collection. 360 run : int 361 The run number of the case. 362 checkpoint : bool 363 Enable checkpoints after each run to allow restarts. 364 365 Returns 366 ------- 367 success : bool 368 Whether the case was executed successfully or not. 369 """ 370 success = True 371 data = case['data'] 372 directory = case['directory'] 373 data_path = os.path.join(directory, 'data') 374 results_run_path = os.path.join(directory, 'results', f'run_{run}') 375 checkpoint_file = os.path.join(directory, CHECKPOINT_FILE_NAME) 376 run_checkpoint_file = os.path.join(results_run_path, 377 CHECKPOINT_FILE_NAME) 378 active_resources = [] 379 380 # Make sure we start with a clean setup before the first run 381 if run == 1: 382 self.clean(case) 383 384 # create directories 385 os.umask(0) 386 os.makedirs(data_path, exist_ok=True) 387 os.makedirs(results_run_path, exist_ok=True) 388 389 # Initialize resources if needed 390 # Some resources have to perform an initialization step such as 391 # configuring database users, storage, etc. which is only done once 392 for step in data['steps']: 393 module = self._class_module_mapping[step['resource']] 394 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 395 directory, 396 self._verbose) 397 if hasattr(resource, 'initialization'): 398 if not resource.initialization(): 399 self._logger.error('Failed to initialize resource ' 400 f'{step["resource"]}') 401 return False 402 403 self._logger.debug(f'Resource {step["resource"]} initialized') 404 self._progress_cb('Initializing', step['resource'], success) 405 406 # Launch metrics collection 407 collector = Collector(data['name'], results_run_path, interval, 408 len(data['steps']), run, directory, 409 self._verbose) 410 411 # Execute steps 412 for index, step in enumerate(data['steps']): 413 success = True 414 module = self._class_module_mapping[step['resource']] 415 resource = getattr(module, step['resource'])(data_path, CONFIG_DIR, 416 directory, 417 self._verbose) 418 active_resources.append(resource) 419 420 # Containers may need to start up first before executing a command 421 if hasattr(resource, 'wait_until_ready'): 422 if not resource.wait_until_ready(): 423 success = False 424 self._logger.error('Waiting until resource ' 425 f'"{step["resource"]} is ready failed') 426 self._progress_cb(step['resource'], step['name'], success) 427 break 428 self._logger.debug(f'Resource {step["resource"]} ready') 429 430 # Execute command 431 command = getattr(resource, step['command']) 432 if not command(**step['parameters']): 433 success = False 434 msg = f'Executing command "{step["command"]}" ' + \ 435 f'failed for resource "{step["resource"]}"' 436 # Some steps are non-critical like queries, they may fail but 437 # should not cause a complete case failure. Allow these 438 # failures if the may_fail key is present 439 if step.get('may_fail', False): 440 self._logger.warning(msg) 441 self._progress_cb(step['resource'], step['name'], success) 442 continue 443 else: 444 self._logger.error(msg) 445 self._progress_cb(step['resource'], step['name'], success) 446 break 447 self._logger.debug(f'Command "{step["command"]}" executed on ' 448 f'resource {step["resource"]}') 449 450 # Step complete 451 self._progress_cb(step['resource'], step['name'], success) 452 453 # Step finished, let metric collector know 454 if (index + 1) < len(data['steps']): 455 collector.next_step() 456 457 # Stop metrics collection 458 collector.stop() 459 460 # Stop active containers 461 for resource in active_resources: 462 if resource is not None and hasattr(resource, 'stop'): 463 resource.stop() 464 465 self._logger.debug('Cleaned up all resource') 466 self._progress_cb('Cleaner', 'Clean up resources', True) 467 468 # Mark checkpoint if necessary 469 if checkpoint and success: 470 self._logger.debug('Writing checkpoint...') 471 with open(checkpoint_file, 'w') as f: 472 d = datetime.now().replace(microsecond=0).isoformat() 473 f.write(f'{d}\n') 474 475 # Log file 476 os.makedirs(os.path.join(results_run_path), exist_ok=True) 477 shutil.move(os.path.join(directory, LOG_FILE_NAME), 478 os.path.join(results_run_path, LOG_FILE_NAME)) 479 self._logger.debug('Copied logs to run results path') 480 481 # Metrics measurements 482 for metrics_file in glob(f'{data_path}/*/{METRICS_FILE_NAME}'): 483 subdir = metrics_file.replace(f'{data_path}/', '') \ 484 .replace('/METRICS_FILE_NAME', '') 485 os.makedirs(os.path.join(results_run_path, subdir), exist_ok=True) 486 shutil.move(metrics_file, os.path.join(results_run_path, subdir, 487 METRICS_FILE_NAME)) 488 self._logger.debug('Copied metric measurements to run results path') 489 490 # Results: all 'output_file' and 'result_file' values 491 if success: 492 self._logger.debug('Copying generated files for run') 493 for step in data['steps']: 494 subdir = step['resource'].lower().replace('_', '') 495 parameters = step['parameters'] 496 os.makedirs(os.path.join(results_run_path, subdir), 497 exist_ok=True) 498 if parameters.get('results_file', False): 499 results_file = parameters['results_file'] 500 p1 = os.path.join(directory, 'data/shared', results_file) 501 p2 = os.path.join(results_run_path, subdir, results_file) 502 try: 503 shutil.move(p1, p2) 504 except FileNotFoundError as e: 505 msg = f'Cannot find results file "{p1}": {e}' 506 self._logger.warning(msg) 507 508 if parameters.get('output_file', False) \ 509 and not parameters.get('multiple_files', False): 510 output_dir = os.path.join(results_run_path, subdir) 511 for out_file in glob(os.path.join(str(directory), 'data', 512 'shared', '*.nt')): 513 out_path = os.path.join(output_dir, 514 os.path.basename(out_file)) 515 try: 516 shutil.move(out_file, out_path) 517 except FileNotFoundError as e: 518 msg = f'Cannot find output file "{out_file}": {e}' 519 self._logger.warning(msg) 520 521 # Run complete, mark it 522 run_checkpoint_file = os.path.join(results_run_path, 523 CHECKPOINT_FILE_NAME) 524 self._logger.debug('Writing run checkpoint...') 525 with open(run_checkpoint_file, 'w') as f: 526 d = datetime.now().replace(microsecond=0).isoformat() 527 f.write(f'{d}\n') 528 529 self._logger.debug(f'Cooling down for {WAIT_TIME}s') 530 self._progress_cb('Cooldown', f'Hardware cooldown period {WAIT_TIME}s', 531 True) 532 sleep(WAIT_TIME) 533 534 return success
Execute a case.
Execute all steps of a case while collecting metrics and logs. The metrics are collected at a given interval and for a specific run of the case to allow multiple executions of the same case. Checkpoints of runs can be enabled to allow the executor to restart where it stopped in case of a failure, electricity blackout, etc.
Parameters
- case (dict): The case to execute.
- interval (float): The sample interval for the metrics collection.
- run (int): The run number of the case.
- checkpoint (bool): Enable checkpoints after each run to allow restarts.
Returns
- success (bool): Whether the case was executed successfully or not.
536 def list(self) -> list: 537 """List all cases in a root directory. 538 539 Retrieve a list of all discovered valid cases in a given directory. 540 Cases which do not pass the validation, are excluded and their 541 validation errors are reported through logging. 542 543 Returns 544 ------- 545 cases : list 546 List of discovered cases. 547 """ 548 cases = [] 549 550 for directory in glob(self._main_directory): 551 for root, dirs, files in os.walk(directory): 552 for file in files: 553 if os.path.basename(file) == METADATA_FILE: 554 path = os.path.join(root, file) 555 with open(path, 'r') as f: 556 data = json.load(f) 557 if self._validate_case(data, path): 558 cases.append({ 559 'directory': os.path.dirname(path), 560 'data': data 561 }) 562 563 return cases
List all cases in a root directory.
Retrieve a list of all discovered valid cases in a given directory. Cases which do not pass the validation, are excluded and their validation errors are reported through logging.
Returns
- cases (list): List of discovered cases.