bench_executor.rmlstreamer

The RMLStreamer executes RML rules to generate high quality Linked Data from multiple originally (semi-)structured data sources in a streaming way.

Website: https://rml.io
Repository: https://github.com/RMLio/RMLStreamer

  1#!/usr/bin/env python3
  2
  3"""
  4The RMLStreamer executes RML rules to generate high quality Linked Data
  5from multiple originally (semi-)structured data sources in a streaming way.
  6
  7**Website**: https://rml.io<br>
  8**Repository**: https://github.com/RMLio/RMLStreamer
  9"""
 10
 11import os
 12import errno
 13import shutil
 14import psutil
 15from glob import glob
 16from typing import Optional
 17from timeout_decorator import timeout, TimeoutError  # type: ignore
 18from rdflib import Graph, BNode, Namespace, Literal, RDF
 19from bench_executor.container import Container
 20from bench_executor.logger import Logger
 21R2RML = Namespace('http://www.w3.org/ns/r2rml#')
 22RML = Namespace('http://semweb.mmlab.be/ns/rml#')
 23D2RQ = Namespace('http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#')
 24
 25VERSION = '2.5.0'  # standalone mode with RDB support
 26TIMEOUT = 6 * 3600  # 6 hours
 27IMAGE = f'kgconstruct/rmlstreamer:v{VERSION}'
 28
 29
 30class RMLStreamer(Container):
 31    """RMLStreamer container for executing RML mappings."""
 32
 33    def __init__(self, data_path: str, config_path: str, directory: str,
 34                 verbose: bool):
 35        """Creates an instance of the RMLStreamer class.
 36
 37        Parameters
 38        ----------
 39        data_path : str
 40            Path to the data directory of the case.
 41        config_path : str
 42            Path to the config directory of the case.
 43        directory : str
 44            Path to the directory to store logs.
 45        verbose : bool
 46            Enable verbose logs.
 47        """
 48        self._data_path = os.path.abspath(data_path)
 49        self._config_path = os.path.abspath(config_path)
 50        self._logger = Logger(__name__, directory, verbose)
 51        self._verbose = verbose
 52        super().__init__(IMAGE, 'RMLStreamer', self._logger,
 53                         volumes=[f'{self._data_path}/rmlstreamer:/data',
 54                                  f'{self._data_path}/shared:/data/shared'])
 55
 56    @property
 57    def root_mount_directory(self) -> str:
 58        """Subdirectory in the root directory of the case for RMLStreamer.
 59
 60        Returns
 61        -------
 62        subdirectory : str
 63            Subdirectory of the root directory for RMLStreamer.
 64
 65        """
 66        return __name__.lower()
 67
 68    @timeout(TIMEOUT)
 69    def _execute_with_timeout(self, arguments: list) -> bool:
 70        """Execute a mapping with a provided timeout.
 71
 72        Returns
 73        -------
 74        success : bool
 75            Whether the execution was successfull or not.
 76        """
 77        # Set Java heap to 50% of available memory instead of the default 1/4
 78        max_heap = int(psutil.virtual_memory().total * 0.5)
 79
 80        # Execute command
 81        cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \
 82              ' -jar /rmlstreamer/rmlstreamer.jar'
 83        cmd += f' {" ".join(arguments)}'
 84
 85        self._logger.debug(f'Executing RMLStreamer with arguments '
 86                           f'{" ".join(arguments)}')
 87
 88        return self.run_and_wait_for_exit(cmd)
 89
 90    def execute(self, arguments: list) -> bool:
 91        """Execute RMLStreamer with given arguments.
 92
 93        Parameters
 94        ----------
 95        arguments : list
 96            Arguments to supply to RMLStreamer.
 97
 98        Returns
 99        -------
100        success : bool
101            Whether the execution succeeded or not.
102        """
103        try:
104            return self._execute_with_timeout(arguments)
105        except TimeoutError:
106            msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer'
107            self._logger.warning(msg)
108
109        return False
110
111    def execute_mapping(self,
112                        mapping_file: str,
113                        output_file: str,
114                        serialization: str,
115                        rdb_username: Optional[str] = None,
116                        rdb_password: Optional[str] = None,
117                        rdb_host: Optional[str] = None,
118                        rdb_port: Optional[int] = None,
119                        rdb_name: Optional[str] = None,
120                        rdb_type: Optional[str] = None) -> bool:
121        """Execute a mapping file with RMLStreamer.
122
123        N-Quads/N-Triples is the only currently supported as serialization
124        format for RMLStreamer.
125
126        Parameters
127        ----------
128        mapping_file : str
129            Path to the mapping file to execute.
130        output_file : str
131            Name of the output file to store the triples in.
132        serialization : str
133            Serialization format to use.
134        rdb_username : Optional[str]
135            Username for the database, required when a database is used as
136            source.
137        rdb_password : Optional[str]
138            Password for the database, required when a database is used as
139            source.
140        rdb_host : Optional[str]
141            Hostname for the database, required when a database is used as
142            source.
143        rdb_port : Optional[int]
144            Port for the database, required when a database is used as source.
145        rdb_name : Optional[str]
146            Database name for the database, required when a database is used as
147            source.
148        rdb_type : Optional[str]
149            Database type, required when a database is used as source.
150
151        Returns
152        -------
153        success : bool
154            Whether the execution was successfull or not.
155        """
156        arguments = ['toFile', ' ',
157                     '-o', '/data/output']
158        mapping_file = os.path.join('/data/shared/', mapping_file)
159
160        if rdb_username is not None and rdb_password is not None \
161                and rdb_host is not None and rdb_port is not None \
162                and rdb_name is not None and rdb_type is not None:
163            if rdb_type == 'MySQL':
164                driver = 'jdbc:mysql'
165            elif rdb_type == 'PostgreSQL':
166                driver = 'jdbc:postgresql'
167            else:
168                raise NotImplementedError('RMLStreamer does not support RDB '
169                                          f'"{rdb_type}"')
170            dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}'
171
172            # Compatibility with R2RML mapping files
173            # Replace rr:logicalTable with rml:logicalSource + D2RQ description
174            # and rr:column with rml:reference
175            g = Graph()
176            g.bind('rr', R2RML)
177            g.bind('rml', RML)
178            g.bind('d2rq', D2RQ)
179            g.bind('rdf', RDF)
180            g.parse(os.path.join(self._data_path, 'shared',
181                                 os.path.basename(mapping_file)))
182
183            # rr:logicalTable --> rml:logicalSource
184            for triples_map_iri, p, o in g.triples((None, RDF.type,
185                                                    R2RML.TriplesMap)):
186                logical_source_iri = BNode()
187                d2rq_rdb_iri = BNode()
188                logical_table_iri = g.value(triples_map_iri,
189                                            R2RML.logicalTable)
190                if logical_table_iri is None:
191                    break
192
193                table_name_literal = g.value(logical_table_iri,
194                                             R2RML.tableName)
195                if table_name_literal is None:
196                    break
197
198                g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn)))
199                g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver)))
200                g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username)))
201                g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password)))
202                g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database))
203                g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008))
204                g.add((logical_source_iri, R2RML.tableName,
205                       table_name_literal))
206                g.add((logical_source_iri, RML.source, d2rq_rdb_iri))
207                g.add((logical_source_iri, RDF.type, RML.LogicalSource))
208                g.add((triples_map_iri, RML.logicalSource, logical_source_iri))
209                g.remove((triples_map_iri, R2RML.logicalTable,
210                          logical_table_iri))
211                g.remove((logical_table_iri, R2RML.tableName,
212                          table_name_literal))
213                g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable))
214                g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008))
215
216            # rr:column --> rml:reference
217            for s, p, o in g.triples((None, R2RML.column, None)):
218                g.add((s, RML.reference, o))
219                g.remove((s, p, o))
220
221            mapping_file = os.path.join('/', 'data',
222                                        'mapping_converted.rml.ttl')
223            destination = os.path.join(self._data_path, 'rmlstreamer',
224                                       'mapping_converted.rml.ttl')
225            g.serialize(destination=destination, format='turtle')
226
227        arguments.append('-m')
228        arguments.append(mapping_file)
229
230        os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'),
231                    exist_ok=True)
232        status_code = self.execute(arguments)
233
234        # Combine all output into a single file.
235        # Duplicates may exist because RMLStreamer does not support duplicate
236        # removal
237        output_path = os.path.join(self._data_path, 'shared', output_file)
238        try:
239            os.remove(output_path)
240        except OSError as e:
241            if e.errno != errno.ENOENT:
242                raise
243
244        with open(output_path, 'a') as out_file:
245            files = list(glob(os.path.join(self._data_path, 'rmlstreamer',
246                                           'output', '.*')))
247            files += list(glob(os.path.join(self._data_path, 'rmlstreamer',
248                                            'output', '*')))
249            for gen_file in files:
250                with open(gen_file, 'r') as f:
251                    out_file.write(f.read())
252
253        shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'),
254                      ignore_errors=True)
255
256        return status_code
R2RML = Namespace('http://www.w3.org/ns/r2rml#')
RML = Namespace('http://semweb.mmlab.be/ns/rml#')
D2RQ = Namespace('http://www.wiwiss.fu-berlin.de/suhl/bizer/D2RQ/0.1#')
VERSION = '2.5.0'
TIMEOUT = 21600
IMAGE = 'kgconstruct/rmlstreamer:v2.5.0'
class RMLStreamer(bench_executor.container.Container):
 31class RMLStreamer(Container):
 32    """RMLStreamer container for executing RML mappings."""
 33
 34    def __init__(self, data_path: str, config_path: str, directory: str,
 35                 verbose: bool):
 36        """Creates an instance of the RMLStreamer class.
 37
 38        Parameters
 39        ----------
 40        data_path : str
 41            Path to the data directory of the case.
 42        config_path : str
 43            Path to the config directory of the case.
 44        directory : str
 45            Path to the directory to store logs.
 46        verbose : bool
 47            Enable verbose logs.
 48        """
 49        self._data_path = os.path.abspath(data_path)
 50        self._config_path = os.path.abspath(config_path)
 51        self._logger = Logger(__name__, directory, verbose)
 52        self._verbose = verbose
 53        super().__init__(IMAGE, 'RMLStreamer', self._logger,
 54                         volumes=[f'{self._data_path}/rmlstreamer:/data',
 55                                  f'{self._data_path}/shared:/data/shared'])
 56
 57    @property
 58    def root_mount_directory(self) -> str:
 59        """Subdirectory in the root directory of the case for RMLStreamer.
 60
 61        Returns
 62        -------
 63        subdirectory : str
 64            Subdirectory of the root directory for RMLStreamer.
 65
 66        """
 67        return __name__.lower()
 68
 69    @timeout(TIMEOUT)
 70    def _execute_with_timeout(self, arguments: list) -> bool:
 71        """Execute a mapping with a provided timeout.
 72
 73        Returns
 74        -------
 75        success : bool
 76            Whether the execution was successfull or not.
 77        """
 78        # Set Java heap to 50% of available memory instead of the default 1/4
 79        max_heap = int(psutil.virtual_memory().total * 0.5)
 80
 81        # Execute command
 82        cmd = f'java -Xmx{max_heap} -Xms{max_heap}' + \
 83              ' -jar /rmlstreamer/rmlstreamer.jar'
 84        cmd += f' {" ".join(arguments)}'
 85
 86        self._logger.debug(f'Executing RMLStreamer with arguments '
 87                           f'{" ".join(arguments)}')
 88
 89        return self.run_and_wait_for_exit(cmd)
 90
 91    def execute(self, arguments: list) -> bool:
 92        """Execute RMLStreamer with given arguments.
 93
 94        Parameters
 95        ----------
 96        arguments : list
 97            Arguments to supply to RMLStreamer.
 98
 99        Returns
100        -------
101        success : bool
102            Whether the execution succeeded or not.
103        """
104        try:
105            return self._execute_with_timeout(arguments)
106        except TimeoutError:
107            msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer'
108            self._logger.warning(msg)
109
110        return False
111
112    def execute_mapping(self,
113                        mapping_file: str,
114                        output_file: str,
115                        serialization: str,
116                        rdb_username: Optional[str] = None,
117                        rdb_password: Optional[str] = None,
118                        rdb_host: Optional[str] = None,
119                        rdb_port: Optional[int] = None,
120                        rdb_name: Optional[str] = None,
121                        rdb_type: Optional[str] = None) -> bool:
122        """Execute a mapping file with RMLStreamer.
123
124        N-Quads/N-Triples is the only currently supported as serialization
125        format for RMLStreamer.
126
127        Parameters
128        ----------
129        mapping_file : str
130            Path to the mapping file to execute.
131        output_file : str
132            Name of the output file to store the triples in.
133        serialization : str
134            Serialization format to use.
135        rdb_username : Optional[str]
136            Username for the database, required when a database is used as
137            source.
138        rdb_password : Optional[str]
139            Password for the database, required when a database is used as
140            source.
141        rdb_host : Optional[str]
142            Hostname for the database, required when a database is used as
143            source.
144        rdb_port : Optional[int]
145            Port for the database, required when a database is used as source.
146        rdb_name : Optional[str]
147            Database name for the database, required when a database is used as
148            source.
149        rdb_type : Optional[str]
150            Database type, required when a database is used as source.
151
152        Returns
153        -------
154        success : bool
155            Whether the execution was successfull or not.
156        """
157        arguments = ['toFile', ' ',
158                     '-o', '/data/output']
159        mapping_file = os.path.join('/data/shared/', mapping_file)
160
161        if rdb_username is not None and rdb_password is not None \
162                and rdb_host is not None and rdb_port is not None \
163                and rdb_name is not None and rdb_type is not None:
164            if rdb_type == 'MySQL':
165                driver = 'jdbc:mysql'
166            elif rdb_type == 'PostgreSQL':
167                driver = 'jdbc:postgresql'
168            else:
169                raise NotImplementedError('RMLStreamer does not support RDB '
170                                          f'"{rdb_type}"')
171            dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}'
172
173            # Compatibility with R2RML mapping files
174            # Replace rr:logicalTable with rml:logicalSource + D2RQ description
175            # and rr:column with rml:reference
176            g = Graph()
177            g.bind('rr', R2RML)
178            g.bind('rml', RML)
179            g.bind('d2rq', D2RQ)
180            g.bind('rdf', RDF)
181            g.parse(os.path.join(self._data_path, 'shared',
182                                 os.path.basename(mapping_file)))
183
184            # rr:logicalTable --> rml:logicalSource
185            for triples_map_iri, p, o in g.triples((None, RDF.type,
186                                                    R2RML.TriplesMap)):
187                logical_source_iri = BNode()
188                d2rq_rdb_iri = BNode()
189                logical_table_iri = g.value(triples_map_iri,
190                                            R2RML.logicalTable)
191                if logical_table_iri is None:
192                    break
193
194                table_name_literal = g.value(logical_table_iri,
195                                             R2RML.tableName)
196                if table_name_literal is None:
197                    break
198
199                g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn)))
200                g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver)))
201                g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username)))
202                g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password)))
203                g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database))
204                g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008))
205                g.add((logical_source_iri, R2RML.tableName,
206                       table_name_literal))
207                g.add((logical_source_iri, RML.source, d2rq_rdb_iri))
208                g.add((logical_source_iri, RDF.type, RML.LogicalSource))
209                g.add((triples_map_iri, RML.logicalSource, logical_source_iri))
210                g.remove((triples_map_iri, R2RML.logicalTable,
211                          logical_table_iri))
212                g.remove((logical_table_iri, R2RML.tableName,
213                          table_name_literal))
214                g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable))
215                g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008))
216
217            # rr:column --> rml:reference
218            for s, p, o in g.triples((None, R2RML.column, None)):
219                g.add((s, RML.reference, o))
220                g.remove((s, p, o))
221
222            mapping_file = os.path.join('/', 'data',
223                                        'mapping_converted.rml.ttl')
224            destination = os.path.join(self._data_path, 'rmlstreamer',
225                                       'mapping_converted.rml.ttl')
226            g.serialize(destination=destination, format='turtle')
227
228        arguments.append('-m')
229        arguments.append(mapping_file)
230
231        os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'),
232                    exist_ok=True)
233        status_code = self.execute(arguments)
234
235        # Combine all output into a single file.
236        # Duplicates may exist because RMLStreamer does not support duplicate
237        # removal
238        output_path = os.path.join(self._data_path, 'shared', output_file)
239        try:
240            os.remove(output_path)
241        except OSError as e:
242            if e.errno != errno.ENOENT:
243                raise
244
245        with open(output_path, 'a') as out_file:
246            files = list(glob(os.path.join(self._data_path, 'rmlstreamer',
247                                           'output', '.*')))
248            files += list(glob(os.path.join(self._data_path, 'rmlstreamer',
249                                            'output', '*')))
250            for gen_file in files:
251                with open(gen_file, 'r') as f:
252                    out_file.write(f.read())
253
254        shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'),
255                      ignore_errors=True)
256
257        return status_code

RMLStreamer container for executing RML mappings.

RMLStreamer(data_path: str, config_path: str, directory: str, verbose: bool)
34    def __init__(self, data_path: str, config_path: str, directory: str,
35                 verbose: bool):
36        """Creates an instance of the RMLStreamer class.
37
38        Parameters
39        ----------
40        data_path : str
41            Path to the data directory of the case.
42        config_path : str
43            Path to the config directory of the case.
44        directory : str
45            Path to the directory to store logs.
46        verbose : bool
47            Enable verbose logs.
48        """
49        self._data_path = os.path.abspath(data_path)
50        self._config_path = os.path.abspath(config_path)
51        self._logger = Logger(__name__, directory, verbose)
52        self._verbose = verbose
53        super().__init__(IMAGE, 'RMLStreamer', self._logger,
54                         volumes=[f'{self._data_path}/rmlstreamer:/data',
55                                  f'{self._data_path}/shared:/data/shared'])

Creates an instance of the RMLStreamer class.

Parameters
  • data_path (str): Path to the data directory of the case.
  • config_path (str): Path to the config directory of the case.
  • directory (str): Path to the directory to store logs.
  • verbose (bool): Enable verbose logs.
root_mount_directory: str
57    @property
58    def root_mount_directory(self) -> str:
59        """Subdirectory in the root directory of the case for RMLStreamer.
60
61        Returns
62        -------
63        subdirectory : str
64            Subdirectory of the root directory for RMLStreamer.
65
66        """
67        return __name__.lower()

Subdirectory in the root directory of the case for RMLStreamer.

Returns
  • subdirectory (str): Subdirectory of the root directory for RMLStreamer.
def execute(self, arguments: list) -> bool:
 91    def execute(self, arguments: list) -> bool:
 92        """Execute RMLStreamer with given arguments.
 93
 94        Parameters
 95        ----------
 96        arguments : list
 97            Arguments to supply to RMLStreamer.
 98
 99        Returns
100        -------
101        success : bool
102            Whether the execution succeeded or not.
103        """
104        try:
105            return self._execute_with_timeout(arguments)
106        except TimeoutError:
107            msg = f'Timeout ({TIMEOUT}s) reached for RMLStreamer'
108            self._logger.warning(msg)
109
110        return False

Execute RMLStreamer with given arguments.

Parameters
  • arguments (list): Arguments to supply to RMLStreamer.
Returns
  • success (bool): Whether the execution succeeded or not.
def execute_mapping( self, mapping_file: str, output_file: str, serialization: str, rdb_username: Optional[str] = None, rdb_password: Optional[str] = None, rdb_host: Optional[str] = None, rdb_port: Optional[int] = None, rdb_name: Optional[str] = None, rdb_type: Optional[str] = None) -> bool:
112    def execute_mapping(self,
113                        mapping_file: str,
114                        output_file: str,
115                        serialization: str,
116                        rdb_username: Optional[str] = None,
117                        rdb_password: Optional[str] = None,
118                        rdb_host: Optional[str] = None,
119                        rdb_port: Optional[int] = None,
120                        rdb_name: Optional[str] = None,
121                        rdb_type: Optional[str] = None) -> bool:
122        """Execute a mapping file with RMLStreamer.
123
124        N-Quads/N-Triples is the only currently supported as serialization
125        format for RMLStreamer.
126
127        Parameters
128        ----------
129        mapping_file : str
130            Path to the mapping file to execute.
131        output_file : str
132            Name of the output file to store the triples in.
133        serialization : str
134            Serialization format to use.
135        rdb_username : Optional[str]
136            Username for the database, required when a database is used as
137            source.
138        rdb_password : Optional[str]
139            Password for the database, required when a database is used as
140            source.
141        rdb_host : Optional[str]
142            Hostname for the database, required when a database is used as
143            source.
144        rdb_port : Optional[int]
145            Port for the database, required when a database is used as source.
146        rdb_name : Optional[str]
147            Database name for the database, required when a database is used as
148            source.
149        rdb_type : Optional[str]
150            Database type, required when a database is used as source.
151
152        Returns
153        -------
154        success : bool
155            Whether the execution was successfull or not.
156        """
157        arguments = ['toFile', ' ',
158                     '-o', '/data/output']
159        mapping_file = os.path.join('/data/shared/', mapping_file)
160
161        if rdb_username is not None and rdb_password is not None \
162                and rdb_host is not None and rdb_port is not None \
163                and rdb_name is not None and rdb_type is not None:
164            if rdb_type == 'MySQL':
165                driver = 'jdbc:mysql'
166            elif rdb_type == 'PostgreSQL':
167                driver = 'jdbc:postgresql'
168            else:
169                raise NotImplementedError('RMLStreamer does not support RDB '
170                                          f'"{rdb_type}"')
171            dsn = f'{driver}://{rdb_host}:{rdb_port}/{rdb_name}'
172
173            # Compatibility with R2RML mapping files
174            # Replace rr:logicalTable with rml:logicalSource + D2RQ description
175            # and rr:column with rml:reference
176            g = Graph()
177            g.bind('rr', R2RML)
178            g.bind('rml', RML)
179            g.bind('d2rq', D2RQ)
180            g.bind('rdf', RDF)
181            g.parse(os.path.join(self._data_path, 'shared',
182                                 os.path.basename(mapping_file)))
183
184            # rr:logicalTable --> rml:logicalSource
185            for triples_map_iri, p, o in g.triples((None, RDF.type,
186                                                    R2RML.TriplesMap)):
187                logical_source_iri = BNode()
188                d2rq_rdb_iri = BNode()
189                logical_table_iri = g.value(triples_map_iri,
190                                            R2RML.logicalTable)
191                if logical_table_iri is None:
192                    break
193
194                table_name_literal = g.value(logical_table_iri,
195                                             R2RML.tableName)
196                if table_name_literal is None:
197                    break
198
199                g.add((d2rq_rdb_iri, D2RQ.jdbcDSN, Literal(dsn)))
200                g.add((d2rq_rdb_iri, D2RQ.jdbcDriver, Literal(driver)))
201                g.add((d2rq_rdb_iri, D2RQ.username, Literal(rdb_username)))
202                g.add((d2rq_rdb_iri, D2RQ.password, Literal(rdb_password)))
203                g.add((d2rq_rdb_iri, RDF.type, D2RQ.Database))
204                g.add((logical_source_iri, R2RML.sqlVersion, R2RML.SQL2008))
205                g.add((logical_source_iri, R2RML.tableName,
206                       table_name_literal))
207                g.add((logical_source_iri, RML.source, d2rq_rdb_iri))
208                g.add((logical_source_iri, RDF.type, RML.LogicalSource))
209                g.add((triples_map_iri, RML.logicalSource, logical_source_iri))
210                g.remove((triples_map_iri, R2RML.logicalTable,
211                          logical_table_iri))
212                g.remove((logical_table_iri, R2RML.tableName,
213                          table_name_literal))
214                g.remove((logical_table_iri, RDF.type, R2RML.LogicalTable))
215                g.remove((logical_table_iri, R2RML.sqlVersion, R2RML.SQL2008))
216
217            # rr:column --> rml:reference
218            for s, p, o in g.triples((None, R2RML.column, None)):
219                g.add((s, RML.reference, o))
220                g.remove((s, p, o))
221
222            mapping_file = os.path.join('/', 'data',
223                                        'mapping_converted.rml.ttl')
224            destination = os.path.join(self._data_path, 'rmlstreamer',
225                                       'mapping_converted.rml.ttl')
226            g.serialize(destination=destination, format='turtle')
227
228        arguments.append('-m')
229        arguments.append(mapping_file)
230
231        os.makedirs(os.path.join(self._data_path, 'rmlstreamer', 'output'),
232                    exist_ok=True)
233        status_code = self.execute(arguments)
234
235        # Combine all output into a single file.
236        # Duplicates may exist because RMLStreamer does not support duplicate
237        # removal
238        output_path = os.path.join(self._data_path, 'shared', output_file)
239        try:
240            os.remove(output_path)
241        except OSError as e:
242            if e.errno != errno.ENOENT:
243                raise
244
245        with open(output_path, 'a') as out_file:
246            files = list(glob(os.path.join(self._data_path, 'rmlstreamer',
247                                           'output', '.*')))
248            files += list(glob(os.path.join(self._data_path, 'rmlstreamer',
249                                            'output', '*')))
250            for gen_file in files:
251                with open(gen_file, 'r') as f:
252                    out_file.write(f.read())
253
254        shutil.rmtree(os.path.join(self._data_path, 'rmlstreamer', 'output'),
255                      ignore_errors=True)
256
257        return status_code

Execute a mapping file with RMLStreamer.

N-Quads/N-Triples is the only currently supported as serialization format for RMLStreamer.

Parameters
  • mapping_file (str): Path to the mapping file to execute.
  • output_file (str): Name of the output file to store the triples in.
  • serialization (str): Serialization format to use.
  • rdb_username (Optional[str]): Username for the database, required when a database is used as source.
  • rdb_password (Optional[str]): Password for the database, required when a database is used as source.
  • rdb_host (Optional[str]): Hostname for the database, required when a database is used as source.
  • rdb_port (Optional[int]): Port for the database, required when a database is used as source.
  • rdb_name (Optional[str]): Database name for the database, required when a database is used as source.
  • rdb_type (Optional[str]): Database type, required when a database is used as source.
Returns
  • success (bool): Whether the execution was successfull or not.