bench_executor.container

This module holds the Container and ContainerManager classes. The Container class is responsible for abstracting the Docker containers and allow running containers easily and make sure they are initialized before using them. The Containermanager class allows to create container networks, list all running containers and stop them.

  1#!/usr/bin/env python3
  2
  3"""
  4This module holds the Container and ContainerManager classes. The Container
  5class is responsible for abstracting the Docker containers and allow running
  6containers easily and make sure they are initialized before using them.
  7The Containermanager class allows to create container networks, list all
  8running containers and stop them.
  9"""
 10
 11from time import time, sleep
 12from typing import List, Tuple, Optional
 13from bench_executor.logger import Logger
 14from bench_executor.docker import Docker
 15
 16WAIT_TIME = 1  # seconds
 17TIMEOUT_TIME = 600  # seconds
 18NETWORK_NAME = 'bench_executor'
 19
 20
 21class ContainerManager():
 22    """Manage containers and networks."""
 23
 24    def __init__(self, docker: Docker):
 25        """Creates an instance of the ContainerManager class."""
 26        self._docker = docker
 27
 28    def create_network(self, name: str):
 29        """Create a container network.
 30
 31        Parameters
 32        ----------
 33        name : str
 34            Name of the network
 35        """
 36        self._docker.create_network(name)
 37
 38
 39class Container():
 40    """Container abstracts a Docker container
 41
 42    Abstract how to run a command in a container, start or stop a container,
 43    or retrieve logs. Also allow to wait for a certain log entry to appear or
 44    exit successfully.
 45    """
 46
 47    def __init__(self, container: str, name: str, logger: Logger,
 48                 ports: dict = {}, environment: dict = {},
 49                 volumes: List[str] = []):
 50        """Creates an instance of the Container class.
 51
 52        Parameters
 53        ----------
 54        container : str
 55            Container ID.
 56        name : str
 57            Pretty name of the container.
 58        logger : Logger
 59            Logger class to use for container logs.
 60        ports : dict
 61            Ports mapping of the container onto the host.
 62        volumes : list
 63            Volumes mapping of the container onto the host.
 64        """
 65        self._docker = Docker(logger)
 66        self._manager = ContainerManager(self._docker)
 67        self._container_id: Optional[str] = None
 68        self._container_name = container
 69        self._name = name
 70        self._ports = ports
 71        self._volumes = volumes
 72        self._environment = environment
 73        self._proc_pid = None
 74        self._long_id = None
 75        self._cgroups_mode = None
 76        self._cgroups_dir = None
 77        self._started = False
 78        self._logger = logger
 79
 80        # create network if not exist
 81        self._manager.create_network(NETWORK_NAME)
 82
 83    @property
 84    def started(self) -> bool:
 85        """Indicates if the container is already started"""
 86        return self._started
 87
 88    @property
 89    def name(self) -> str:
 90        """The pretty name of the container"""
 91        return self._name
 92
 93    def run(self, command: str = '', detach=True) -> bool:
 94        """Run the container.
 95
 96        This is used for containers which are long running to provide services
 97        such as a database or endpoint.
 98
 99        Parameters
100        ----------
101        command : str
102            The command to execute in the container, optionally and defaults to
103            no command.
104        detach : bool
105            If the container may run in the background, default True.
106
107        Returns
108        -------
109        success : bool
110            Whether running the container was successfull or not.
111        """
112        e = self._environment
113        v = self._volumes
114        self._started, self._container_id = \
115            self._docker.run(self._container_name, command, self._name, detach,
116                             self._ports, NETWORK_NAME, e, v)
117
118        if not self._started:
119            self._logger.error(f'Starting container "{self._name}" failed!')
120        return self._started
121
122    def exec(self, command: str) -> Tuple[bool, List[str]]:
123        """Execute a command in the container.
124
125        Parameters
126        ----------
127        command : str
128            The command to execute in the container.
129
130        Returns
131        -------
132        success : bool
133            Whether the command was executed successfully or not.
134        logs : list
135            The logs of the container for executing the command.
136        """
137        logs: List[str] = []
138
139        if self._container_id is None:
140            self._logger.error('Container is not initialized yet')
141            return False, []
142        exit_code = self._docker.exec(self._container_id, command)
143        logs = self._docker.logs(self._container_id)
144        if exit_code == 0:
145            return True, logs
146
147        return False, logs
148
149    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
150        """Run the container and wait for a log line to appear.
151
152        This blocks until the container's log contains the `log_line`.
153
154        Parameters
155        ----------
156        log_line : str
157            The log line to wait for in the logs.
158        command : str
159            The command to execute in the container, optionally and defaults to
160            no command.
161
162        Returns
163        -------
164        success : bool
165            Whether the container exited with status code 0 or not.
166        """
167        if not self.run(command):
168            self._logger.error(f'Command "{command}" failed')
169            return False
170
171        if self._container_id is None:
172            self._logger.error('Container is not initialized yet')
173            return False
174
175        start = time()
176        found_line = False
177        line_number = 0
178        while (True):
179            logs = self._docker.logs(self._container_id)
180            for index, line in enumerate(logs):
181                # Only print new lines when iterating
182                if index > line_number:
183                    line_number = index
184                    self._logger.debug(line)
185
186                if time() - start > TIMEOUT_TIME:
187                    msg = f'Starting container "{self._name}" timed out!'
188                    self._logger.error(msg)
189                    break
190
191                if log_line in line:
192                    found_line = True
193                    break
194
195            if found_line:
196                sleep(WAIT_TIME)
197                return True
198
199        # Logs are collected on success, log them on failure
200        self._logger.error(f'Waiting for container "{self._name}" failed!')
201        logs = self._docker.logs(self._container_id)
202        for line in logs:
203            self._logger.error(line)
204        return False
205
206    def run_and_wait_for_exit(self, command: str = '') -> bool:
207        """Run the container and wait for exit
208
209        This blocks until the container exit and gives a status code.
210
211        Parameters
212        ----------
213        command : str
214            The command to execute in the container, optionally and defaults to
215            no command.
216
217        Returns
218        -------
219       success : bool
220            Whether the container exited with status code 0 or not.
221        """
222        if not self.run(command):
223            return False
224
225        if self._container_id is None:
226            self._logger.error('Container is not initialized yet')
227            return False
228
229        status_code = self._docker.wait(self._container_id)
230        logs = self._docker.logs(self._container_id)
231        if logs is not None:
232            for line in logs:
233                # On success, logs are collected when the container is stopped.
234                if status_code != 0:
235                    self._logger.error(line)
236                else:
237                    self._logger.debug(line)
238
239        if status_code == 0:
240            self.stop()
241            return True
242
243        self._logger.error('Command failed while waiting for exit with status '
244                           f'code: {status_code}')
245        return False
246
247    def stop(self) -> bool:
248        """Stop a running container
249
250        Stops the container and removes it, including its volumes.
251
252        Returns
253        -------
254        success : bool
255            Whether stopping the container was successfull or not.
256        """
257
258        if self._container_id is None:
259            self._logger.error('Container is not initialized yet')
260            return False
261
262        self._docker.stop(self._container_id)
263        return True
WAIT_TIME = 1
TIMEOUT_TIME = 600
NETWORK_NAME = 'bench_executor'
class ContainerManager:
22class ContainerManager():
23    """Manage containers and networks."""
24
25    def __init__(self, docker: Docker):
26        """Creates an instance of the ContainerManager class."""
27        self._docker = docker
28
29    def create_network(self, name: str):
30        """Create a container network.
31
32        Parameters
33        ----------
34        name : str
35            Name of the network
36        """
37        self._docker.create_network(name)

Manage containers and networks.

ContainerManager(docker: bench_executor.docker.Docker)
25    def __init__(self, docker: Docker):
26        """Creates an instance of the ContainerManager class."""
27        self._docker = docker

Creates an instance of the ContainerManager class.

def create_network(self, name: str):
29    def create_network(self, name: str):
30        """Create a container network.
31
32        Parameters
33        ----------
34        name : str
35            Name of the network
36        """
37        self._docker.create_network(name)

Create a container network.

Parameters
  • name (str): Name of the network
class Container:
 40class Container():
 41    """Container abstracts a Docker container
 42
 43    Abstract how to run a command in a container, start or stop a container,
 44    or retrieve logs. Also allow to wait for a certain log entry to appear or
 45    exit successfully.
 46    """
 47
 48    def __init__(self, container: str, name: str, logger: Logger,
 49                 ports: dict = {}, environment: dict = {},
 50                 volumes: List[str] = []):
 51        """Creates an instance of the Container class.
 52
 53        Parameters
 54        ----------
 55        container : str
 56            Container ID.
 57        name : str
 58            Pretty name of the container.
 59        logger : Logger
 60            Logger class to use for container logs.
 61        ports : dict
 62            Ports mapping of the container onto the host.
 63        volumes : list
 64            Volumes mapping of the container onto the host.
 65        """
 66        self._docker = Docker(logger)
 67        self._manager = ContainerManager(self._docker)
 68        self._container_id: Optional[str] = None
 69        self._container_name = container
 70        self._name = name
 71        self._ports = ports
 72        self._volumes = volumes
 73        self._environment = environment
 74        self._proc_pid = None
 75        self._long_id = None
 76        self._cgroups_mode = None
 77        self._cgroups_dir = None
 78        self._started = False
 79        self._logger = logger
 80
 81        # create network if not exist
 82        self._manager.create_network(NETWORK_NAME)
 83
 84    @property
 85    def started(self) -> bool:
 86        """Indicates if the container is already started"""
 87        return self._started
 88
 89    @property
 90    def name(self) -> str:
 91        """The pretty name of the container"""
 92        return self._name
 93
 94    def run(self, command: str = '', detach=True) -> bool:
 95        """Run the container.
 96
 97        This is used for containers which are long running to provide services
 98        such as a database or endpoint.
 99
100        Parameters
101        ----------
102        command : str
103            The command to execute in the container, optionally and defaults to
104            no command.
105        detach : bool
106            If the container may run in the background, default True.
107
108        Returns
109        -------
110        success : bool
111            Whether running the container was successfull or not.
112        """
113        e = self._environment
114        v = self._volumes
115        self._started, self._container_id = \
116            self._docker.run(self._container_name, command, self._name, detach,
117                             self._ports, NETWORK_NAME, e, v)
118
119        if not self._started:
120            self._logger.error(f'Starting container "{self._name}" failed!')
121        return self._started
122
123    def exec(self, command: str) -> Tuple[bool, List[str]]:
124        """Execute a command in the container.
125
126        Parameters
127        ----------
128        command : str
129            The command to execute in the container.
130
131        Returns
132        -------
133        success : bool
134            Whether the command was executed successfully or not.
135        logs : list
136            The logs of the container for executing the command.
137        """
138        logs: List[str] = []
139
140        if self._container_id is None:
141            self._logger.error('Container is not initialized yet')
142            return False, []
143        exit_code = self._docker.exec(self._container_id, command)
144        logs = self._docker.logs(self._container_id)
145        if exit_code == 0:
146            return True, logs
147
148        return False, logs
149
150    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
151        """Run the container and wait for a log line to appear.
152
153        This blocks until the container's log contains the `log_line`.
154
155        Parameters
156        ----------
157        log_line : str
158            The log line to wait for in the logs.
159        command : str
160            The command to execute in the container, optionally and defaults to
161            no command.
162
163        Returns
164        -------
165        success : bool
166            Whether the container exited with status code 0 or not.
167        """
168        if not self.run(command):
169            self._logger.error(f'Command "{command}" failed')
170            return False
171
172        if self._container_id is None:
173            self._logger.error('Container is not initialized yet')
174            return False
175
176        start = time()
177        found_line = False
178        line_number = 0
179        while (True):
180            logs = self._docker.logs(self._container_id)
181            for index, line in enumerate(logs):
182                # Only print new lines when iterating
183                if index > line_number:
184                    line_number = index
185                    self._logger.debug(line)
186
187                if time() - start > TIMEOUT_TIME:
188                    msg = f'Starting container "{self._name}" timed out!'
189                    self._logger.error(msg)
190                    break
191
192                if log_line in line:
193                    found_line = True
194                    break
195
196            if found_line:
197                sleep(WAIT_TIME)
198                return True
199
200        # Logs are collected on success, log them on failure
201        self._logger.error(f'Waiting for container "{self._name}" failed!')
202        logs = self._docker.logs(self._container_id)
203        for line in logs:
204            self._logger.error(line)
205        return False
206
207    def run_and_wait_for_exit(self, command: str = '') -> bool:
208        """Run the container and wait for exit
209
210        This blocks until the container exit and gives a status code.
211
212        Parameters
213        ----------
214        command : str
215            The command to execute in the container, optionally and defaults to
216            no command.
217
218        Returns
219        -------
220       success : bool
221            Whether the container exited with status code 0 or not.
222        """
223        if not self.run(command):
224            return False
225
226        if self._container_id is None:
227            self._logger.error('Container is not initialized yet')
228            return False
229
230        status_code = self._docker.wait(self._container_id)
231        logs = self._docker.logs(self._container_id)
232        if logs is not None:
233            for line in logs:
234                # On success, logs are collected when the container is stopped.
235                if status_code != 0:
236                    self._logger.error(line)
237                else:
238                    self._logger.debug(line)
239
240        if status_code == 0:
241            self.stop()
242            return True
243
244        self._logger.error('Command failed while waiting for exit with status '
245                           f'code: {status_code}')
246        return False
247
248    def stop(self) -> bool:
249        """Stop a running container
250
251        Stops the container and removes it, including its volumes.
252
253        Returns
254        -------
255        success : bool
256            Whether stopping the container was successfull or not.
257        """
258
259        if self._container_id is None:
260            self._logger.error('Container is not initialized yet')
261            return False
262
263        self._docker.stop(self._container_id)
264        return True

Container abstracts a Docker container

Abstract how to run a command in a container, start or stop a container, or retrieve logs. Also allow to wait for a certain log entry to appear or exit successfully.

Container( container: str, name: str, logger: bench_executor.logger.Logger, ports: dict = {}, environment: dict = {}, volumes: List[str] = [])
48    def __init__(self, container: str, name: str, logger: Logger,
49                 ports: dict = {}, environment: dict = {},
50                 volumes: List[str] = []):
51        """Creates an instance of the Container class.
52
53        Parameters
54        ----------
55        container : str
56            Container ID.
57        name : str
58            Pretty name of the container.
59        logger : Logger
60            Logger class to use for container logs.
61        ports : dict
62            Ports mapping of the container onto the host.
63        volumes : list
64            Volumes mapping of the container onto the host.
65        """
66        self._docker = Docker(logger)
67        self._manager = ContainerManager(self._docker)
68        self._container_id: Optional[str] = None
69        self._container_name = container
70        self._name = name
71        self._ports = ports
72        self._volumes = volumes
73        self._environment = environment
74        self._proc_pid = None
75        self._long_id = None
76        self._cgroups_mode = None
77        self._cgroups_dir = None
78        self._started = False
79        self._logger = logger
80
81        # create network if not exist
82        self._manager.create_network(NETWORK_NAME)

Creates an instance of the Container class.

Parameters
  • container (str): Container ID.
  • name (str): Pretty name of the container.
  • logger (Logger): Logger class to use for container logs.
  • ports (dict): Ports mapping of the container onto the host.
  • volumes (list): Volumes mapping of the container onto the host.
started: bool
84    @property
85    def started(self) -> bool:
86        """Indicates if the container is already started"""
87        return self._started

Indicates if the container is already started

name: str
89    @property
90    def name(self) -> str:
91        """The pretty name of the container"""
92        return self._name

The pretty name of the container

def run(self, command: str = '', detach=True) -> bool:
 94    def run(self, command: str = '', detach=True) -> bool:
 95        """Run the container.
 96
 97        This is used for containers which are long running to provide services
 98        such as a database or endpoint.
 99
100        Parameters
101        ----------
102        command : str
103            The command to execute in the container, optionally and defaults to
104            no command.
105        detach : bool
106            If the container may run in the background, default True.
107
108        Returns
109        -------
110        success : bool
111            Whether running the container was successfull or not.
112        """
113        e = self._environment
114        v = self._volumes
115        self._started, self._container_id = \
116            self._docker.run(self._container_name, command, self._name, detach,
117                             self._ports, NETWORK_NAME, e, v)
118
119        if not self._started:
120            self._logger.error(f'Starting container "{self._name}" failed!')
121        return self._started

Run the container.

This is used for containers which are long running to provide services such as a database or endpoint.

Parameters
  • command (str): The command to execute in the container, optionally and defaults to no command.
  • detach (bool): If the container may run in the background, default True.
Returns
  • success (bool): Whether running the container was successfull or not.
def exec(self, command: str) -> Tuple[bool, List[str]]:
123    def exec(self, command: str) -> Tuple[bool, List[str]]:
124        """Execute a command in the container.
125
126        Parameters
127        ----------
128        command : str
129            The command to execute in the container.
130
131        Returns
132        -------
133        success : bool
134            Whether the command was executed successfully or not.
135        logs : list
136            The logs of the container for executing the command.
137        """
138        logs: List[str] = []
139
140        if self._container_id is None:
141            self._logger.error('Container is not initialized yet')
142            return False, []
143        exit_code = self._docker.exec(self._container_id, command)
144        logs = self._docker.logs(self._container_id)
145        if exit_code == 0:
146            return True, logs
147
148        return False, logs

Execute a command in the container.

Parameters
  • command (str): The command to execute in the container.
Returns
  • success (bool): Whether the command was executed successfully or not.
  • logs (list): The logs of the container for executing the command.
def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
150    def run_and_wait_for_log(self, log_line: str, command: str = '') -> bool:
151        """Run the container and wait for a log line to appear.
152
153        This blocks until the container's log contains the `log_line`.
154
155        Parameters
156        ----------
157        log_line : str
158            The log line to wait for in the logs.
159        command : str
160            The command to execute in the container, optionally and defaults to
161            no command.
162
163        Returns
164        -------
165        success : bool
166            Whether the container exited with status code 0 or not.
167        """
168        if not self.run(command):
169            self._logger.error(f'Command "{command}" failed')
170            return False
171
172        if self._container_id is None:
173            self._logger.error('Container is not initialized yet')
174            return False
175
176        start = time()
177        found_line = False
178        line_number = 0
179        while (True):
180            logs = self._docker.logs(self._container_id)
181            for index, line in enumerate(logs):
182                # Only print new lines when iterating
183                if index > line_number:
184                    line_number = index
185                    self._logger.debug(line)
186
187                if time() - start > TIMEOUT_TIME:
188                    msg = f'Starting container "{self._name}" timed out!'
189                    self._logger.error(msg)
190                    break
191
192                if log_line in line:
193                    found_line = True
194                    break
195
196            if found_line:
197                sleep(WAIT_TIME)
198                return True
199
200        # Logs are collected on success, log them on failure
201        self._logger.error(f'Waiting for container "{self._name}" failed!')
202        logs = self._docker.logs(self._container_id)
203        for line in logs:
204            self._logger.error(line)
205        return False

Run the container and wait for a log line to appear.

This blocks until the container's log contains the log_line.

Parameters
  • log_line (str): The log line to wait for in the logs.
  • command (str): The command to execute in the container, optionally and defaults to no command.
Returns
  • success (bool): Whether the container exited with status code 0 or not.
def run_and_wait_for_exit(self, command: str = '') -> bool:
207    def run_and_wait_for_exit(self, command: str = '') -> bool:
208        """Run the container and wait for exit
209
210        This blocks until the container exit and gives a status code.
211
212        Parameters
213        ----------
214        command : str
215            The command to execute in the container, optionally and defaults to
216            no command.
217
218        Returns
219        -------
220       success : bool
221            Whether the container exited with status code 0 or not.
222        """
223        if not self.run(command):
224            return False
225
226        if self._container_id is None:
227            self._logger.error('Container is not initialized yet')
228            return False
229
230        status_code = self._docker.wait(self._container_id)
231        logs = self._docker.logs(self._container_id)
232        if logs is not None:
233            for line in logs:
234                # On success, logs are collected when the container is stopped.
235                if status_code != 0:
236                    self._logger.error(line)
237                else:
238                    self._logger.debug(line)
239
240        if status_code == 0:
241            self.stop()
242            return True
243
244        self._logger.error('Command failed while waiting for exit with status '
245                           f'code: {status_code}')
246        return False

Run the container and wait for exit

This blocks until the container exit and gives a status code.

Parameters


command : str The command to execute in the container, optionally and defaults to no command.

Returns


success : bool Whether the container exited with status code 0 or not.

def stop(self) -> bool:
248    def stop(self) -> bool:
249        """Stop a running container
250
251        Stops the container and removes it, including its volumes.
252
253        Returns
254        -------
255        success : bool
256            Whether stopping the container was successfull or not.
257        """
258
259        if self._container_id is None:
260            self._logger.error('Container is not initialized yet')
261            return False
262
263        self._docker.stop(self._container_id)
264        return True

Stop a running container

Stops the container and removes it, including its volumes.

Returns
  • success (bool): Whether stopping the container was successfull or not.