bench_executor.rmlstreamer
The RMLStreamer executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources in a streaming way.
Website: https://rml.io
Repository: https://github.com/RMLio/RMLStreamer
1#!/usr/bin/env python3 2 3""" 4The RMLStreamer executes RML rules to generate high quality Linked Data 5from multiple originally (semi-)structured data sources in a streaming way. 6 7**Website**: https://rml.io<br> 8**Repository**: https://github.com/RMLio/RMLStreamer 9""" 10 11import os 12import errno 13import shutil 14import psutil 15from glob import glob 16from typing import Optional 17from timeout_decorator import timeout, TimeoutError # type: ignore 18from rdflib import Graph, BNode, Namespace, Literal, RDF 19from bench_executor.container import Container 20from bench_executor.logger import Logger 21R2RML = Namespace('http://www.w3.org/ns/r2rml#') 22RML = Namespace('http://semweb.mmlab.be/ns/rml#') 23D2RQ = Namespace('http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#') 24 25VERSION = '2.5.0' # standalone mode with RDB support 26TIMEOUT = 6 * 3600 # 6 hours 27IMAGE = f'kgconstruct/rmlstreamer:v{VERSION}' 28 29 30class RMLStreamer(Container): 31 """RMLStreamer container for executing RML mappings.""" 32 33 def __init__(self, data_path: str, config_path: str, directory: str, 34 verbose: bool): 35 """Creates an instance of the RMLStreamer class. 36 37 Parameters 38 ---------- 39 data_path : str 40 Path to the data directory of the case. 41 config_path : str 42 Path to the config directory of the case. 43 directory : str 44 Path to the directory to store logs. 45 verbose : bool 46 Enable verbose logs. 47 """ 48 self._data_path = os.path.abspath(data_path) 49 self._config_path = os.path.abspath(config_path) 50 self._logger = Logger(__name__, directory, verbose) 51 self._verbose = verbose 52 super().__init__(IMAGE, 'RMLStreamer', self._logger, 53 volumes=[f'{self._data_path}/rmlstreamer:/data', 54 f'{self._data_path}/shared:/data/shared']) 55 56 @property 57 def root_mount_directory(self) -> str: 58 """Subdirectory in the root directory of the case for RMLStreamer. 59 60 Returns 61 ------- 62 subdirectory : str 63 Subdirectory of the root directory for RMLStreamer. 64 65 """ 66 return __name__.lower() 67 68 @timeout(TIMEOUT) 69 def _execute_with_timeout(self, arguments: list) -> bool: 70 """Execute a mapping with a provided timeout. 71 72 Returns 73 ------- 74 success : bool 75 Whether the execution was successfull or not. 76 """ 77 # Set Java heap to 50% of available memory instead of the default 1/4 78 max_heap = int(psutil.virtual_memory().total * 0.5) 79 80 # Execute command 81 cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \ 82 ' -jar /rmlstreamer/rmlstreamer.jar' 83 cmd += f' {" ".join(arguments)}' 84 85 self._logger.debug(f'Executing RMLStreamer with arguments ' 86 f'{" ".join(arguments)}') 87 88 return self.run_and_wait_for_exit(cmd) 89 90 def execute(self, arguments: list) -> bool: 91 """Execute RMLStreamer with given arguments. 92 93 Parameters 94 ---------- 95 arguments : list 96 Arguments to supply to RMLStreamer. 97 98 Returns 99 ------- 100 success : bool 101 Whether the execution succeeded or not. 102 """ 103 try: 104 return self._execute_with_timeout(arguments) 105 except TimeoutError: 106 msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer' 107 self._logger.warning(msg) 108 109 return False 110 111 def execute_mapping(self, 112 mapping_file: str, 113 output_file: str, 114 serialization: str, 115 rdb_username: Optional[str] = None, 116 rdb_password: Optional[str] = None, 117 rdb_host: Optional[str] = None, 118 rdb_port: Optional[int] = None, 119 rdb_name: Optional[str] = None, 120 rdb_type: Optional[str] = None) -> bool: 121 """Execute a mapping file with RMLStreamer. 122 123 N-Quads/N-Triples is the only currently supported as serialization 124 format for RMLStreamer. 125 126 Parameters 127 ---------- 128 mapping_file : str 129 Path to the mapping file to execute. 130 output_file : str 131 Name of the output file to store the triples in. 132 serialization : str 133 Serialization format to use. 134 rdb_username : Optional[str] 135 Username for the database, required when a database is used as 136 source. 137 rdb_password : Optional[str] 138 Password for the database, required when a database is used as 139 source. 140 rdb_host : Optional[str] 141 Hostname for the database, required when a database is used as 142 source. 143 rdb_port : Optional[int] 144 Port for the database, required when a database is used as source. 145 rdb_name : Optional[str] 146 Database name for the database, required when a database is used as 147 source. 148 rdb_type : Optional[str] 149 Database type, required when a database is used as source. 150 151 Returns 152 ------- 153 success : bool 154 Whether the execution was successfull or not. 155 """ 156 arguments = ['toFile', ' ', 157 '-o', '/data/output'] 158 mapping_file = os.path.join('/data/shared/', mapping_file) 159 160 if rdb_username is not None and rdb_password is not None \ 161 and rdb_host is not None and rdb_port is not None \ 162 and rdb_name is not None and rdb_type is not None: 163 if rdb_type == 'MySQL': 164 driver = 'jdbc:mysql' 165 elif rdb_type == 'PostgreSQL': 166 driver = 'jdbc:postgresql' 167 else: 168 raise NotImplementedError('RMLStreamer does not support RDB ' 169 f'"{rdb_type}"') 170 dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}' 171 172 # Compatibility with R2RML mapping files 173 # Replace rr:logicalTable with rml:logicalSource + D2RQ description 174 # and rr:column with rml:reference 175 g = Graph() 176 g.bind('rr', R2RML) 177 g.bind('rml', RML) 178 g.bind('d2rq', D2RQ) 179 g.bind('rdf', RDF) 180 g.parse(os.path.join(self._data_path, 'shared', 181 os.path.basename(mapping_file))) 182 183 # rr:logicalTable --> rml:logicalSource 184 for triples_map_iri, p, o in g.triples((None, RDF.type, 185 R2RML.TriplesMap)): 186 logical_source_iri = BNode() 187 d2rq_rdb_iri = BNode() 188 logical_table_iri = g.value(triples_map_iri, 189 R2RML.logicalTable) 190 if logical_table_iri is None: 191 break 192 193 table_name_literal = g.value(logical_table_iri, 194 R2RML.tableName) 195 if table_name_literal is None: 196 break 197 198 g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn))) 199 g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver))) 200 g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username))) 201 g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password))) 202 g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database)) 203 g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008)) 204 g.add((logical_source_iri, R2RML.tableName, 205 table_name_literal)) 206 g.add((logical_source_iri, RML.source, d2rq_rdb_iri)) 207 g.add((logical_source_iri, RDF.type, RML.LogicalSource)) 208 g.add((triples_map_iri, RML.logicalSource, logical_source_iri)) 209 g.remove((triples_map_iri, R2RML.logicalTable, 210 logical_table_iri)) 211 g.remove((logical_table_iri, R2RML.tableName, 212 table_name_literal)) 213 g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable)) 214 g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008)) 215 216 # rr:column --> rml:reference 217 for s, p, o in g.triples((None, R2RML.column, None)): 218 g.add((s, RML.reference, o)) 219 g.remove((s, p, o)) 220 221 mapping_file = os.path.join('/', 'data', 222 'mapping_converted.rml.ttl') 223 destination = os.path.join(self._data_path, 'rmlstreamer', 224 'mapping_converted.rml.ttl') 225 g.serialize(destination=destination, format='turtle') 226 227 arguments.append('-m') 228 arguments.append(mapping_file) 229 230 os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'), 231 exist_ok=True) 232 status_code = self.execute(arguments) 233 234 # Combine all output into a single file. 235 # Duplicates may exist because RMLStreamer does not support duplicate 236 # removal 237 output_path = os.path.join(self._data_path, 'shared', output_file) 238 try: 239 os.remove(output_path) 240 except OSError as e: 241 if e.errno != errno.ENOENT: 242 raise 243 244 with open(output_path, 'a') as out_file: 245 files = list(glob(os.path.join(self._data_path, 'rmlstreamer', 246 'output', '.*'))) 247 files += list(glob(os.path.join(self._data_path, 'rmlstreamer', 248 'output', '*'))) 249 for gen_file in files: 250 with open(gen_file, 'r') as f: 251 out_file.write(f.read()) 252 253 shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'), 254 ignore_errors=True) 255 256 return status_code
R2RML =
Namespace('http://www.w3.org/ns/r2rml#')
RML =
Namespace('http://semweb.mmlab.be/ns/rml#')
D2RQ =
Namespace('http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#')
VERSION =
'2.5.0'
TIMEOUT =
21600
IMAGE =
'kgconstruct/rmlstreamer:v2.5.0'
31class RMLStreamer(Container): 32 """RMLStreamer container for executing RML mappings.""" 33 34 def __init__(self, data_path: str, config_path: str, directory: str, 35 verbose: bool): 36 """Creates an instance of the RMLStreamer class. 37 38 Parameters 39 ---------- 40 data_path : str 41 Path to the data directory of the case. 42 config_path : str 43 Path to the config directory of the case. 44 directory : str 45 Path to the directory to store logs. 46 verbose : bool 47 Enable verbose logs. 48 """ 49 self._data_path = os.path.abspath(data_path) 50 self._config_path = os.path.abspath(config_path) 51 self._logger = Logger(__name__, directory, verbose) 52 self._verbose = verbose 53 super().__init__(IMAGE, 'RMLStreamer', self._logger, 54 volumes=[f'{self._data_path}/rmlstreamer:/data', 55 f'{self._data_path}/shared:/data/shared']) 56 57 @property 58 def root_mount_directory(self) -> str: 59 """Subdirectory in the root directory of the case for RMLStreamer. 60 61 Returns 62 ------- 63 subdirectory : str 64 Subdirectory of the root directory for RMLStreamer. 65 66 """ 67 return __name__.lower() 68 69 @timeout(TIMEOUT) 70 def _execute_with_timeout(self, arguments: list) -> bool: 71 """Execute a mapping with a provided timeout. 72 73 Returns 74 ------- 75 success : bool 76 Whether the execution was successfull or not. 77 """ 78 # Set Java heap to 50% of available memory instead of the default 1/4 79 max_heap = int(psutil.virtual_memory().total * 0.5) 80 81 # Execute command 82 cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \ 83 ' -jar /rmlstreamer/rmlstreamer.jar' 84 cmd += f' {" ".join(arguments)}' 85 86 self._logger.debug(f'Executing RMLStreamer with arguments ' 87 f'{" ".join(arguments)}') 88 89 return self.run_and_wait_for_exit(cmd) 90 91 def execute(self, arguments: list) -> bool: 92 """Execute RMLStreamer with given arguments. 93 94 Parameters 95 ---------- 96 arguments : list 97 Arguments to supply to RMLStreamer. 98 99 Returns 100 ------- 101 success : bool 102 Whether the execution succeeded or not. 103 """ 104 try: 105 return self._execute_with_timeout(arguments) 106 except TimeoutError: 107 msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer' 108 self._logger.warning(msg) 109 110 return False 111 112 def execute_mapping(self, 113 mapping_file: str, 114 output_file: str, 115 serialization: str, 116 rdb_username: Optional[str] = None, 117 rdb_password: Optional[str] = None, 118 rdb_host: Optional[str] = None, 119 rdb_port: Optional[int] = None, 120 rdb_name: Optional[str] = None, 121 rdb_type: Optional[str] = None) -> bool: 122 """Execute a mapping file with RMLStreamer. 123 124 N-Quads/N-Triples is the only currently supported as serialization 125 format for RMLStreamer. 126 127 Parameters 128 ---------- 129 mapping_file : str 130 Path to the mapping file to execute. 131 output_file : str 132 Name of the output file to store the triples in. 133 serialization : str 134 Serialization format to use. 135 rdb_username : Optional[str] 136 Username for the database, required when a database is used as 137 source. 138 rdb_password : Optional[str] 139 Password for the database, required when a database is used as 140 source. 141 rdb_host : Optional[str] 142 Hostname for the database, required when a database is used as 143 source. 144 rdb_port : Optional[int] 145 Port for the database, required when a database is used as source. 146 rdb_name : Optional[str] 147 Database name for the database, required when a database is used as 148 source. 149 rdb_type : Optional[str] 150 Database type, required when a database is used as source. 151 152 Returns 153 ------- 154 success : bool 155 Whether the execution was successfull or not. 156 """ 157 arguments = ['toFile', ' ', 158 '-o', '/data/output'] 159 mapping_file = os.path.join('/data/shared/', mapping_file) 160 161 if rdb_username is not None and rdb_password is not None \ 162 and rdb_host is not None and rdb_port is not None \ 163 and rdb_name is not None and rdb_type is not None: 164 if rdb_type == 'MySQL': 165 driver = 'jdbc:mysql' 166 elif rdb_type == 'PostgreSQL': 167 driver = 'jdbc:postgresql' 168 else: 169 raise NotImplementedError('RMLStreamer does not support RDB ' 170 f'"{rdb_type}"') 171 dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}' 172 173 # Compatibility with R2RML mapping files 174 # Replace rr:logicalTable with rml:logicalSource + D2RQ description 175 # and rr:column with rml:reference 176 g = Graph() 177 g.bind('rr', R2RML) 178 g.bind('rml', RML) 179 g.bind('d2rq', D2RQ) 180 g.bind('rdf', RDF) 181 g.parse(os.path.join(self._data_path, 'shared', 182 os.path.basename(mapping_file))) 183 184 # rr:logicalTable --> rml:logicalSource 185 for triples_map_iri, p, o in g.triples((None, RDF.type, 186 R2RML.TriplesMap)): 187 logical_source_iri = BNode() 188 d2rq_rdb_iri = BNode() 189 logical_table_iri = g.value(triples_map_iri, 190 R2RML.logicalTable) 191 if logical_table_iri is None: 192 break 193 194 table_name_literal = g.value(logical_table_iri, 195 R2RML.tableName) 196 if table_name_literal is None: 197 break 198 199 g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn))) 200 g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver))) 201 g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username))) 202 g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password))) 203 g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database)) 204 g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008)) 205 g.add((logical_source_iri, R2RML.tableName, 206 table_name_literal)) 207 g.add((logical_source_iri, RML.source, d2rq_rdb_iri)) 208 g.add((logical_source_iri, RDF.type, RML.LogicalSource)) 209 g.add((triples_map_iri, RML.logicalSource, logical_source_iri)) 210 g.remove((triples_map_iri, R2RML.logicalTable, 211 logical_table_iri)) 212 g.remove((logical_table_iri, R2RML.tableName, 213 table_name_literal)) 214 g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable)) 215 g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008)) 216 217 # rr:column --> rml:reference 218 for s, p, o in g.triples((None, R2RML.column, None)): 219 g.add((s, RML.reference, o)) 220 g.remove((s, p, o)) 221 222 mapping_file = os.path.join('/', 'data', 223 'mapping_converted.rml.ttl') 224 destination = os.path.join(self._data_path, 'rmlstreamer', 225 'mapping_converted.rml.ttl') 226 g.serialize(destination=destination, format='turtle') 227 228 arguments.append('-m') 229 arguments.append(mapping_file) 230 231 os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'), 232 exist_ok=True) 233 status_code = self.execute(arguments) 234 235 # Combine all output into a single file. 236 # Duplicates may exist because RMLStreamer does not support duplicate 237 # removal 238 output_path = os.path.join(self._data_path, 'shared', output_file) 239 try: 240 os.remove(output_path) 241 except OSError as e: 242 if e.errno != errno.ENOENT: 243 raise 244 245 with open(output_path, 'a') as out_file: 246 files = list(glob(os.path.join(self._data_path, 'rmlstreamer', 247 'output', '.*'))) 248 files += list(glob(os.path.join(self._data_path, 'rmlstreamer', 249 'output', '*'))) 250 for gen_file in files: 251 with open(gen_file, 'r') as f: 252 out_file.write(f.read()) 253 254 shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'), 255 ignore_errors=True) 256 257 return status_code
RMLStreamer container for executing RML mappings.
RMLStreamer(data_path: str, config_path: str, directory: str, verbose: bool)
34 def __init__(self, data_path: str, config_path: str, directory: str, 35 verbose: bool): 36 """Creates an instance of the RMLStreamer class. 37 38 Parameters 39 ---------- 40 data_path : str 41 Path to the data directory of the case. 42 config_path : str 43 Path to the config directory of the case. 44 directory : str 45 Path to the directory to store logs. 46 verbose : bool 47 Enable verbose logs. 48 """ 49 self._data_path = os.path.abspath(data_path) 50 self._config_path = os.path.abspath(config_path) 51 self._logger = Logger(__name__, directory, verbose) 52 self._verbose = verbose 53 super().__init__(IMAGE, 'RMLStreamer', self._logger, 54 volumes=[f'{self._data_path}/rmlstreamer:/data', 55 f'{self._data_path}/shared:/data/shared'])
Creates an instance of the RMLStreamer class.
Parameters
- data_path (str): Path to the data directory of the case.
- config_path (str): Path to the config directory of the case.
- directory (str): Path to the directory to store logs.
- verbose (bool): Enable verbose logs.
root_mount_directory: str
57 @property 58 def root_mount_directory(self) -> str: 59 """Subdirectory in the root directory of the case for RMLStreamer. 60 61 Returns 62 ------- 63 subdirectory : str 64 Subdirectory of the root directory for RMLStreamer. 65 66 """ 67 return __name__.lower()
Subdirectory in the root directory of the case for RMLStreamer.
Returns
- subdirectory (str): Subdirectory of the root directory for RMLStreamer.
def
execute(self, arguments: list) -> bool:
91 def execute(self, arguments: list) -> bool: 92 """Execute RMLStreamer with given arguments. 93 94 Parameters 95 ---------- 96 arguments : list 97 Arguments to supply to RMLStreamer. 98 99 Returns 100 ------- 101 success : bool 102 Whether the execution succeeded or not. 103 """ 104 try: 105 return self._execute_with_timeout(arguments) 106 except TimeoutError: 107 msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer' 108 self._logger.warning(msg) 109 110 return False
Execute RMLStreamer with given arguments.
Parameters
- arguments (list): Arguments to supply to RMLStreamer.
Returns
- success (bool): Whether the execution succeeded or not.
def
execute_mapping( self, mapping_file: str, output_file: str, serialization: str, rdb_username: Optional[str] = None, rdb_password: Optional[str] = None, rdb_host: Optional[str] = None, rdb_port: Optional[int] = None, rdb_name: Optional[str] = None, rdb_type: Optional[str] = None) -> bool:
112 def execute_mapping(self, 113 mapping_file: str, 114 output_file: str, 115 serialization: str, 116 rdb_username: Optional[str] = None, 117 rdb_password: Optional[str] = None, 118 rdb_host: Optional[str] = None, 119 rdb_port: Optional[int] = None, 120 rdb_name: Optional[str] = None, 121 rdb_type: Optional[str] = None) -> bool: 122 """Execute a mapping file with RMLStreamer. 123 124 N-Quads/N-Triples is the only currently supported as serialization 125 format for RMLStreamer. 126 127 Parameters 128 ---------- 129 mapping_file : str 130 Path to the mapping file to execute. 131 output_file : str 132 Name of the output file to store the triples in. 133 serialization : str 134 Serialization format to use. 135 rdb_username : Optional[str] 136 Username for the database, required when a database is used as 137 source. 138 rdb_password : Optional[str] 139 Password for the database, required when a database is used as 140 source. 141 rdb_host : Optional[str] 142 Hostname for the database, required when a database is used as 143 source. 144 rdb_port : Optional[int] 145 Port for the database, required when a database is used as source. 146 rdb_name : Optional[str] 147 Database name for the database, required when a database is used as 148 source. 149 rdb_type : Optional[str] 150 Database type, required when a database is used as source. 151 152 Returns 153 ------- 154 success : bool 155 Whether the execution was successfull or not. 156 """ 157 arguments = ['toFile', ' ', 158 '-o', '/data/output'] 159 mapping_file = os.path.join('/data/shared/', mapping_file) 160 161 if rdb_username is not None and rdb_password is not None \ 162 and rdb_host is not None and rdb_port is not None \ 163 and rdb_name is not None and rdb_type is not None: 164 if rdb_type == 'MySQL': 165 driver = 'jdbc:mysql' 166 elif rdb_type == 'PostgreSQL': 167 driver = 'jdbc:postgresql' 168 else: 169 raise NotImplementedError('RMLStreamer does not support RDB ' 170 f'"{rdb_type}"') 171 dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}' 172 173 # Compatibility with R2RML mapping files 174 # Replace rr:logicalTable with rml:logicalSource + D2RQ description 175 # and rr:column with rml:reference 176 g = Graph() 177 g.bind('rr', R2RML) 178 g.bind('rml', RML) 179 g.bind('d2rq', D2RQ) 180 g.bind('rdf', RDF) 181 g.parse(os.path.join(self._data_path, 'shared', 182 os.path.basename(mapping_file))) 183 184 # rr:logicalTable --> rml:logicalSource 185 for triples_map_iri, p, o in g.triples((None, RDF.type, 186 R2RML.TriplesMap)): 187 logical_source_iri = BNode() 188 d2rq_rdb_iri = BNode() 189 logical_table_iri = g.value(triples_map_iri, 190 R2RML.logicalTable) 191 if logical_table_iri is None: 192 break 193 194 table_name_literal = g.value(logical_table_iri, 195 R2RML.tableName) 196 if table_name_literal is None: 197 break 198 199 g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn))) 200 g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver))) 201 g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username))) 202 g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password))) 203 g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database)) 204 g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008)) 205 g.add((logical_source_iri, R2RML.tableName, 206 table_name_literal)) 207 g.add((logical_source_iri, RML.source, d2rq_rdb_iri)) 208 g.add((logical_source_iri, RDF.type, RML.LogicalSource)) 209 g.add((triples_map_iri, RML.logicalSource, logical_source_iri)) 210 g.remove((triples_map_iri, R2RML.logicalTable, 211 logical_table_iri)) 212 g.remove((logical_table_iri, R2RML.tableName, 213 table_name_literal)) 214 g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable)) 215 g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008)) 216 217 # rr:column --> rml:reference 218 for s, p, o in g.triples((None, R2RML.column, None)): 219 g.add((s, RML.reference, o)) 220 g.remove((s, p, o)) 221 222 mapping_file = os.path.join('/', 'data', 223 'mapping_converted.rml.ttl') 224 destination = os.path.join(self._data_path, 'rmlstreamer', 225 'mapping_converted.rml.ttl') 226 g.serialize(destination=destination, format='turtle') 227 228 arguments.append('-m') 229 arguments.append(mapping_file) 230 231 os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'), 232 exist_ok=True) 233 status_code = self.execute(arguments) 234 235 # Combine all output into a single file. 236 # Duplicates may exist because RMLStreamer does not support duplicate 237 # removal 238 output_path = os.path.join(self._data_path, 'shared', output_file) 239 try: 240 os.remove(output_path) 241 except OSError as e: 242 if e.errno != errno.ENOENT: 243 raise 244 245 with open(output_path, 'a') as out_file: 246 files = list(glob(os.path.join(self._data_path, 'rmlstreamer', 247 'output', '.*'))) 248 files += list(glob(os.path.join(self._data_path, 'rmlstreamer', 249 'output', '*'))) 250 for gen_file in files: 251 with open(gen_file, 'r') as f: 252 out_file.write(f.read()) 253 254 shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'), 255 ignore_errors=True) 256 257 return status_code
Execute a mapping file with RMLStreamer.
N-Quads/N-Triples is the only currently supported as serialization format for RMLStreamer.
Parameters
- mapping_file (str): Path to the mapping file to execute.
- output_file (str): Name of the output file to store the triples in.
- serialization (str): Serialization format to use.
- rdb_username (Optional[str]): Username for the database, required when a database is used as source.
- rdb_password (Optional[str]): Password for the database, required when a database is used as source.
- rdb_host (Optional[str]): Hostname for the database, required when a database is used as source.
- rdb_port (Optional[int]): Port for the database, required when a database is used as source.
- rdb_name (Optional[str]): Database name for the database, required when a database is used as source.
- rdb_type (Optional[str]): Database type, required when a database is used as source.
Returns
- success (bool): Whether the execution was successfull or not.