bench_executor.rmlmapper_jar
The RMLMapper executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources.
Website: https://rml.io
Repository: https://github.com/RMLio/rmlmapper-java
1#!/usr/bin/env python3 2 3""" 4The RMLMapper executes RML rules to generate high quality Linked Data 5from multiple originally (semi-)structured data sources. 6 7**Website**: https://rml.io<br> 8**Repository**: https://github.com/RMLio/rmlmapper-java 9""" 10 11import os 12import psutil 13import subprocess 14from typing import Optional 15from timeout_decorator import timeout, TimeoutError # type: ignore 16from bench_executor.logger import Logger 17 18VERSION = '6.2.1' 19TIMEOUT = 6 * 3600 # 6 hours 20 21 22class RMLMapperJar(): 23 """RMLMapper container for executing R2RML and RML mappings.""" 24 25 def __init__(self, data_path: str, config_path: str, directory: str, 26 verbose: bool): 27 """Creates an instance of the RMLMapper class. 28 29 Parameters 30 ---------- 31 data_path : str 32 Path to the data directory of the case. 33 config_path : str 34 Path to the config directory of the case. 35 directory : str 36 Path to the directory to store logs. 37 verbose : bool 38 Enable verbose logs. 39 """ 40 self._data_path = os.path.abspath(data_path) 41 self._config_path = os.path.abspath(config_path) 42 self._logger = Logger(__name__, directory, verbose) 43 self._verbose = verbose 44 45 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) 46 47 @property 48 def root_mount_directory(self) -> str: 49 """Subdirectory in the root directory of the case for RMLMapper. 50 51 Returns 52 ------- 53 subdirectory : str 54 Subdirectory of the root directory for RMLMapper. 55 56 """ 57 return __name__.lower() 58 59 @timeout(TIMEOUT) 60 def _execute_with_timeout(self, arguments: list) -> bool: 61 """Execute a mapping with a provided timeout. 62 63 Returns 64 ------- 65 success : bool 66 Whether the execution was successfull or not. 67 """ 68 # Set Java heap to 90% of available memory instead of the default 1/4 69 max_heap = int(psutil.virtual_memory().total * 0.9) 70 71 # Execute command 72 cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \ 73 ' -jar rmlmapper.jar' 74 if self._verbose: 75 cmd += ' -vvvvvvvvvvvvv' 76 cmd += f' {" ".join(arguments)}' 77 78 self._logger.debug(f'Executing RMLMapper with arguments ' 79 f'{" ".join(arguments)}') 80 81 exitcode, output = subprocess.getstatusoutput(cmd) 82 self._logger.debug(output) 83 return exitcode == 0 84 85 def execute(self, arguments: list) -> bool: 86 """Execute RMLMapper with given arguments. 87 88 Parameters 89 ---------- 90 arguments : list 91 Arguments to supply to RMLMapper. 92 93 Returns 94 ------- 95 success : bool 96 Whether the execution succeeded or not. 97 """ 98 try: 99 return self._execute_with_timeout(arguments) 100 except TimeoutError: 101 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 102 self._logger.warning(msg) 103 104 return False 105 106 def execute_mapping(self, 107 mapping_file: str, 108 output_file: str, 109 serialization: str, 110 rdb_username: Optional[str] = None, 111 rdb_password: Optional[str] = None, 112 rdb_host: Optional[str] = None, 113 rdb_port: Optional[int] = None, 114 rdb_name: Optional[str] = None, 115 rdb_type: Optional[str] = None) -> bool: 116 """Execute a mapping file with RMLMapper. 117 118 N-Quads and N-Triples are currently supported as serialization 119 format for RMLMapper. 120 121 Parameters 122 ---------- 123 mapping_file : str 124 Path to the mapping file to execute. 125 output_file : str 126 Name of the output file to store the triples in. 127 serialization : str 128 Serialization format to use. 129 rdb_username : Optional[str] 130 Username for the database, required when a database is used as 131 source. 132 rdb_password : Optional[str] 133 Password for the database, required when a database is used as 134 source. 135 rdb_host : Optional[str] 136 Hostname for the database, required when a database is used as 137 source. 138 rdb_port : Optional[int] 139 Port for the database, required when a database is used as source. 140 rdb_name : Optional[str] 141 Database name for the database, required when a database is used as 142 source. 143 rdb_type : Optional[str] 144 Database type, required when a database is used as source. 145 146 Returns 147 ------- 148 success : bool 149 Whether the execution was successfull or not. 150 """ 151 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 152 '-s', serialization, 153 '-o', os.path.join('/data/shared/', output_file), 154 '-d'] # Enable duplicate removal 155 156 if rdb_username is not None and rdb_password is not None \ 157 and rdb_host is not None and rdb_port is not None \ 158 and rdb_name is not None and rdb_type is not None: 159 160 arguments.append('-u') 161 arguments.append(rdb_username) 162 arguments.append('-p') 163 arguments.append(rdb_password) 164 165 parameters = '' 166 if rdb_type == 'MySQL': 167 protocol = 'jdbc:mysql' 168 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 169 elif rdb_type == 'PostgreSQL': 170 protocol = 'jdbc:postgresql' 171 else: 172 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 173 rdb_dsn = f'\'{protocol}://{rdb_host}:{rdb_port}/' + \ 174 f'{rdb_name}{parameters}\'' 175 arguments.append('-dsn') 176 arguments.append(rdb_dsn) 177 178 return self.execute(arguments)
VERSION =
'6.2.1'
TIMEOUT =
21600
class
RMLMapperJar:
23class RMLMapperJar(): 24 """RMLMapper container for executing R2RML and RML mappings.""" 25 26 def __init__(self, data_path: str, config_path: str, directory: str, 27 verbose: bool): 28 """Creates an instance of the RMLMapper class. 29 30 Parameters 31 ---------- 32 data_path : str 33 Path to the data directory of the case. 34 config_path : str 35 Path to the config directory of the case. 36 directory : str 37 Path to the directory to store logs. 38 verbose : bool 39 Enable verbose logs. 40 """ 41 self._data_path = os.path.abspath(data_path) 42 self._config_path = os.path.abspath(config_path) 43 self._logger = Logger(__name__, directory, verbose) 44 self._verbose = verbose 45 46 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True) 47 48 @property 49 def root_mount_directory(self) -> str: 50 """Subdirectory in the root directory of the case for RMLMapper. 51 52 Returns 53 ------- 54 subdirectory : str 55 Subdirectory of the root directory for RMLMapper. 56 57 """ 58 return __name__.lower() 59 60 @timeout(TIMEOUT) 61 def _execute_with_timeout(self, arguments: list) -> bool: 62 """Execute a mapping with a provided timeout. 63 64 Returns 65 ------- 66 success : bool 67 Whether the execution was successfull or not. 68 """ 69 # Set Java heap to 90% of available memory instead of the default 1/4 70 max_heap = int(psutil.virtual_memory().total * 0.9) 71 72 # Execute command 73 cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \ 74 ' -jar rmlmapper.jar' 75 if self._verbose: 76 cmd += ' -vvvvvvvvvvvvv' 77 cmd += f' {" ".join(arguments)}' 78 79 self._logger.debug(f'Executing RMLMapper with arguments ' 80 f'{" ".join(arguments)}') 81 82 exitcode, output = subprocess.getstatusoutput(cmd) 83 self._logger.debug(output) 84 return exitcode == 0 85 86 def execute(self, arguments: list) -> bool: 87 """Execute RMLMapper with given arguments. 88 89 Parameters 90 ---------- 91 arguments : list 92 Arguments to supply to RMLMapper. 93 94 Returns 95 ------- 96 success : bool 97 Whether the execution succeeded or not. 98 """ 99 try: 100 return self._execute_with_timeout(arguments) 101 except TimeoutError: 102 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 103 self._logger.warning(msg) 104 105 return False 106 107 def execute_mapping(self, 108 mapping_file: str, 109 output_file: str, 110 serialization: str, 111 rdb_username: Optional[str] = None, 112 rdb_password: Optional[str] = None, 113 rdb_host: Optional[str] = None, 114 rdb_port: Optional[int] = None, 115 rdb_name: Optional[str] = None, 116 rdb_type: Optional[str] = None) -> bool: 117 """Execute a mapping file with RMLMapper. 118 119 N-Quads and N-Triples are currently supported as serialization 120 format for RMLMapper. 121 122 Parameters 123 ---------- 124 mapping_file : str 125 Path to the mapping file to execute. 126 output_file : str 127 Name of the output file to store the triples in. 128 serialization : str 129 Serialization format to use. 130 rdb_username : Optional[str] 131 Username for the database, required when a database is used as 132 source. 133 rdb_password : Optional[str] 134 Password for the database, required when a database is used as 135 source. 136 rdb_host : Optional[str] 137 Hostname for the database, required when a database is used as 138 source. 139 rdb_port : Optional[int] 140 Port for the database, required when a database is used as source. 141 rdb_name : Optional[str] 142 Database name for the database, required when a database is used as 143 source. 144 rdb_type : Optional[str] 145 Database type, required when a database is used as source. 146 147 Returns 148 ------- 149 success : bool 150 Whether the execution was successfull or not. 151 """ 152 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 153 '-s', serialization, 154 '-o', os.path.join('/data/shared/', output_file), 155 '-d'] # Enable duplicate removal 156 157 if rdb_username is not None and rdb_password is not None \ 158 and rdb_host is not None and rdb_port is not None \ 159 and rdb_name is not None and rdb_type is not None: 160 161 arguments.append('-u') 162 arguments.append(rdb_username) 163 arguments.append('-p') 164 arguments.append(rdb_password) 165 166 parameters = '' 167 if rdb_type == 'MySQL': 168 protocol = 'jdbc:mysql' 169 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 170 elif rdb_type == 'PostgreSQL': 171 protocol = 'jdbc:postgresql' 172 else: 173 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 174 rdb_dsn = f'\'{protocol}://{rdb_host}:{rdb_port}/' + \ 175 f'{rdb_name}{parameters}\'' 176 arguments.append('-dsn') 177 arguments.append(rdb_dsn) 178 179 return self.execute(arguments)
RMLMapper container for executing R2RML and RML mappings.
RMLMapperJar(data_path: str, config_path: str, directory: str, verbose: bool)
26 def __init__(self, data_path: str, config_path: str, directory: str, 27 verbose: bool): 28 """Creates an instance of the RMLMapper class. 29 30 Parameters 31 ---------- 32 data_path : str 33 Path to the data directory of the case. 34 config_path : str 35 Path to the config directory of the case. 36 directory : str 37 Path to the directory to store logs. 38 verbose : bool 39 Enable verbose logs. 40 """ 41 self._data_path = os.path.abspath(data_path) 42 self._config_path = os.path.abspath(config_path) 43 self._logger = Logger(__name__, directory, verbose) 44 self._verbose = verbose 45 46 os.makedirs(os.path.join(self._data_path, 'rmlmapper'), exist_ok=True)
Creates an instance of the RMLMapper class.
Parameters
- data_path (str): Path to the data directory of the case.
- config_path (str): Path to the config directory of the case.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
root_mount_directory: str
48 @property 49 def root_mount_directory(self) -> str: 50 """Subdirectory in the root directory of the case for RMLMapper. 51 52 Returns 53 ------- 54 subdirectory : str 55 Subdirectory of the root directory for RMLMapper. 56 57 """ 58 return __name__.lower()
Subdirectory in the root directory of the case for RMLMapper.
Returns
- subdirectory (str): Subdirectory of the root directory for RMLMapper.
def
execute(self, arguments: list) -> bool:
86 def execute(self, arguments: list) -> bool: 87 """Execute RMLMapper with given arguments. 88 89 Parameters 90 ---------- 91 arguments : list 92 Arguments to supply to RMLMapper. 93 94 Returns 95 ------- 96 success : bool 97 Whether the execution succeeded or not. 98 """ 99 try: 100 return self._execute_with_timeout(arguments) 101 except TimeoutError: 102 msg = f'Timeout ({TIMEOUT}s) reached for RMLMapper' 103 self._logger.warning(msg) 104 105 return False
Execute RMLMapper with given arguments.
Parameters
- arguments (list): Arguments to supply to RMLMapper.
Returns
- success (bool): Whether the execution succeeded or not.
def
execute_mapping( self, mapping_file: str, output_file: str, serialization: str, rdb_username: Optional[str] = None, rdb_password: Optional[str] = None, rdb_host: Optional[str] = None, rdb_port: Optional[int] = None, rdb_name: Optional[str] = None, rdb_type: Optional[str] = None) -> bool:
107 def execute_mapping(self, 108 mapping_file: str, 109 output_file: str, 110 serialization: str, 111 rdb_username: Optional[str] = None, 112 rdb_password: Optional[str] = None, 113 rdb_host: Optional[str] = None, 114 rdb_port: Optional[int] = None, 115 rdb_name: Optional[str] = None, 116 rdb_type: Optional[str] = None) -> bool: 117 """Execute a mapping file with RMLMapper. 118 119 N-Quads and N-Triples are currently supported as serialization 120 format for RMLMapper. 121 122 Parameters 123 ---------- 124 mapping_file : str 125 Path to the mapping file to execute. 126 output_file : str 127 Name of the output file to store the triples in. 128 serialization : str 129 Serialization format to use. 130 rdb_username : Optional[str] 131 Username for the database, required when a database is used as 132 source. 133 rdb_password : Optional[str] 134 Password for the database, required when a database is used as 135 source. 136 rdb_host : Optional[str] 137 Hostname for the database, required when a database is used as 138 source. 139 rdb_port : Optional[int] 140 Port for the database, required when a database is used as source. 141 rdb_name : Optional[str] 142 Database name for the database, required when a database is used as 143 source. 144 rdb_type : Optional[str] 145 Database type, required when a database is used as source. 146 147 Returns 148 ------- 149 success : bool 150 Whether the execution was successfull or not. 151 """ 152 arguments = ['-m', os.path.join('/data/shared/', mapping_file), 153 '-s', serialization, 154 '-o', os.path.join('/data/shared/', output_file), 155 '-d'] # Enable duplicate removal 156 157 if rdb_username is not None and rdb_password is not None \ 158 and rdb_host is not None and rdb_port is not None \ 159 and rdb_name is not None and rdb_type is not None: 160 161 arguments.append('-u') 162 arguments.append(rdb_username) 163 arguments.append('-p') 164 arguments.append(rdb_password) 165 166 parameters = '' 167 if rdb_type == 'MySQL': 168 protocol = 'jdbc:mysql' 169 parameters = '?allowPublicKeyRetrieval=true&useSSL=false' 170 elif rdb_type == 'PostgreSQL': 171 protocol = 'jdbc:postgresql' 172 else: 173 raise ValueError(f'Unknown RDB type: "{rdb_type}"') 174 rdb_dsn = f'\'{protocol}://{rdb_host}:{rdb_port}/' + \ 175 f'{rdb_name}{parameters}\'' 176 arguments.append('-dsn') 177 arguments.append(rdb_dsn) 178 179 return self.execute(arguments)
Execute a mapping file with RMLMapper.
N-Quads and N-Triples are currently supported as serialization format for RMLMapper.
Parameters
- mapping_file (str): Path to the mapping file to execute.
- output_file (str): Name of the output file to store the triples in.
- serialization (str): Serialization format to use.
- rdb_username (Optional[str]): Username for the database, required when a database is used as source.
- rdb_password (Optional[str]): Password for the database, required when a database is used as source.
- rdb_host (Optional[str]): Hostname for the database, required when a database is used as source.
- rdb_port (Optional[int]): Port for the database, required when a database is used as source.
- rdb_name (Optional[str]): Database name for the database, required when a database is used as source.
- rdb_type (Optional[str]): Database type, required when a database is used as source.
Returns
- success (bool): Whether the execution was successfull or not.