bench_executor.docker

This module holds the Docker class which implements the necessary API calls for the Container class to control Docker containers. The docker-py module which is similar has serious issues with resource leaking for years.

  1#!/usr/bin/env python3
  2
  3"""
  4This module holds the Docker class which implements the necessary API calls
  5for the Container class to control Docker containers. The docker-py module
  6which is similar has serious issues with resource leaking for years.
  7"""
  8
  9import json
 10import subprocess
 11from time import sleep
 12from typing import List, Tuple
 13from bench_executor.logger import Logger
 14
 15
 16class Docker():
 17    """Client for the Docker CLI."""
 18
 19    def __init__(self, logger: Logger):
 20        """Creates an instance of the Docker class.
 21
 22        """
 23        self._logger = logger
 24
 25    def exec(self, container_id: str, command: str) -> int:
 26        """Execute a command inside a running Docker container.
 27
 28        Parameters
 29        ----------
 30        container_id : str
 31            ID of the Docker container.
 32        command : str
 33            Command to execute inside the Docker container.
 34
 35        Returns
 36        -------
 37        status_code : int
 38            The exit status code of the executed command.
 39        """
 40
 41        cmd = f'docker exec "{container_id}" {command}'
 42        self._logger.debug(f'Executing command in Docker container: {cmd}')
 43        status_code, output = subprocess.getstatusoutput(cmd)
 44
 45        return status_code
 46
 47    def wait(self, container_id: str) -> int:
 48        """Wait for Docker container to exit.
 49
 50        Parameters
 51        ----------
 52        container_id : str
 53            ID of the Docker container.
 54
 55        Returns
 56        -------
 57        status_code : int
 58            The exit status code of the Docker container.
 59        """
 60
 61        cmd = f'docker wait "{container_id}"'
 62        self._logger.debug(f'Waiting for Docker container: {cmd}')
 63        status_code, output = subprocess.getstatusoutput(cmd)
 64
 65        return status_code
 66
 67    def stop(self, container_id: str) -> bool:
 68        """Stop a running Docker container.
 69
 70        Parameters
 71        ----------
 72        container_id : str
 73            ID of the Docker container.
 74
 75        Returns
 76        -------
 77        success : bool
 78            True if stopping the container was successful.
 79        """
 80
 81        cmd = f'docker stop "{container_id}"'
 82        self._logger.debug(f'Stopping Docker container: {cmd}')
 83        status_code, output = subprocess.getstatusoutput(cmd)
 84
 85        if status_code != 0:
 86            return False
 87
 88        cmd = f'docker rm "{container_id}"'
 89        self._logger.debug(f'Removing Docker container: {cmd}')
 90        status_code, output = subprocess.getstatusoutput(cmd)
 91
 92        return status_code == 0
 93
 94    def logs(self, container_id: str) -> List[str]:
 95        """Retrieve the logs of a container.
 96
 97        Parameters
 98        ----------
 99        container_id : str
100            ID of the Docker container.
101
102        Returns
103        -------
104        logs : List[str]
105            List of loglines from the container.
106        """
107
108        cmd = f'docker logs "{container_id}"'
109        status_code, output = subprocess.getstatusoutput(cmd)
110
111        logs = []
112        for line in output.split('\n'):
113            logs.append(line.strip())
114
115        return logs
116
117    def pull(self, image: str) -> bool:
118        """Pull a Docker image from DockerHub or other registries.
119
120        If the image exists locally, pulling will be skipped without
121        the force flag.
122
123        Parameters
124        ----------
125        image : str
126            Name of the Docker container image.
127
128        Returns
129        -------
130        success : bool
131            True if starting the container was successful.
132        """
133        # Check if the image already exists
134        cmd = f'docker inspect "{image}"'
135        status_code, output = subprocess.getstatusoutput(cmd)
136        if status_code == 0:
137            return True
138
139        # Pull the image
140        cmd = f'docker pull -q "{image}"'
141        status_code, output = subprocess.getstatusoutput(cmd)
142        return status_code == 0
143
144    def run(self, image: str, command: str, name: str, detach: bool,
145            ports: dict, network: str, environment: dict,
146            volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]:
147        """Start a Docker container.
148
149        Parameters
150        ----------
151        image : str
152            Name of the Docker container image.
153        command : str
154            Command to execute in the Docker container.
155        name : str
156            Canonical name to assign to the container.
157        detach : bool
158            Whether to detach from the container or not.
159        ports : dict
160            Ports to expose from the container to the host.
161        network : str
162            Name of the Docker network to attach the container to.
163        environment : dict
164            Environment variables to set.
165        volumes : List[str]
166            Volumes to mount on the container from the host.
167        must_pull: bool
168            Whether the image should be pulled first, default is True.
169
170        Returns
171        -------
172        success : bool
173            True if starting the container was successful.
174        container_id : str
175            ID of the container that was started.
176        """
177
178        # Make sure the image is available locally
179        if must_pull:
180            self.pull(image)
181
182        # Avoid race condition between removing and starting the same container
183        removing = False
184        while (True):
185            cmd = f'docker ps -a | grep "{name}"'
186            status_code, output = subprocess.getstatusoutput(cmd)
187            if status_code == 0 and not removing:
188                cmd = f'docker stop "{name}" && docker rm "{name}"'
189                subprocess.getstatusoutput(cmd)
190                self._logger.debug(f'Schedule container "{name}" for removal')
191                removing = True
192            elif status_code != 0:
193                break
194            sleep(0.1)
195
196        # Start container
197        cmd = f'docker run --name "{name}"'
198        if detach:
199            cmd += ' --detach'
200        for variable, value in environment.items():
201            cmd += f' --env "{variable}={value}"'
202        for host_port, container_port in ports.items():
203            cmd += f' -p "{host_port}:{container_port}"'
204        for volume in volumes:
205            cmd += f' -v "{volume}"'
206        cmd += f' --network "{network}"'
207        cmd += f' {image} {command}'
208        self._logger.debug(f'Starting Docker container: {cmd}')
209        status_code, container_id = subprocess.getstatusoutput(cmd)
210        container_id = container_id.strip()
211        self._logger.debug(f'Container "{container_id}" running')
212
213        return status_code == 0, container_id
214
215    def create_network(self, network: str) -> bool:
216        """Create a Docker container network.
217
218        If the network already exist, it will not be recreated.
219
220        Parameters
221        ----------
222        network : str
223            Name of the network.
224
225        Returns
226        -------
227        success : bool
228            True if the network was created
229        """
230
231        # Check if network exist
232        cmd = f'docker network ls | grep "{network}"'
233        status_code, output = subprocess.getstatusoutput(cmd)
234        if status_code == 0:
235            return True
236
237        # Create it as it does not exist yet
238        cmd = f'docker network create "{network}"'
239        status_code, output = subprocess.getstatusoutput(cmd)
240        self._logger.debug(f'Created network "{network}"')
241
242        return status_code == 0
243
244    def info(self) -> Tuple[bool, dict]:
245        """Dump the Docker daemon system information.
246
247        Returns
248        -------
249        success : bool
250            True if the command succeed.
251        info : dict
252            Docker daemon system information as dictionary.
253        """
254
255        # Check if network exist
256        cmd = 'docker info --format \'{{json .}}\''
257        status_code, output = subprocess.getstatusoutput(cmd)
258
259        if status_code != 0:
260            return False, {}
261
262        info = json.loads(output)
263        self._logger.debug('Docker daemon system information successfully')
264        return True, info
class Docker:
 17class Docker():
 18    """Client for the Docker CLI."""
 19
 20    def __init__(self, logger: Logger):
 21        """Creates an instance of the Docker class.
 22
 23        """
 24        self._logger = logger
 25
 26    def exec(self, container_id: str, command: str) -> int:
 27        """Execute a command inside a running Docker container.
 28
 29        Parameters
 30        ----------
 31        container_id : str
 32            ID of the Docker container.
 33        command : str
 34            Command to execute inside the Docker container.
 35
 36        Returns
 37        -------
 38        status_code : int
 39            The exit status code of the executed command.
 40        """
 41
 42        cmd = f'docker exec "{container_id}" {command}'
 43        self._logger.debug(f'Executing command in Docker container: {cmd}')
 44        status_code, output = subprocess.getstatusoutput(cmd)
 45
 46        return status_code
 47
 48    def wait(self, container_id: str) -> int:
 49        """Wait for Docker container to exit.
 50
 51        Parameters
 52        ----------
 53        container_id : str
 54            ID of the Docker container.
 55
 56        Returns
 57        -------
 58        status_code : int
 59            The exit status code of the Docker container.
 60        """
 61
 62        cmd = f'docker wait "{container_id}"'
 63        self._logger.debug(f'Waiting for Docker container: {cmd}')
 64        status_code, output = subprocess.getstatusoutput(cmd)
 65
 66        return status_code
 67
 68    def stop(self, container_id: str) -> bool:
 69        """Stop a running Docker container.
 70
 71        Parameters
 72        ----------
 73        container_id : str
 74            ID of the Docker container.
 75
 76        Returns
 77        -------
 78        success : bool
 79            True if stopping the container was successful.
 80        """
 81
 82        cmd = f'docker stop "{container_id}"'
 83        self._logger.debug(f'Stopping Docker container: {cmd}')
 84        status_code, output = subprocess.getstatusoutput(cmd)
 85
 86        if status_code != 0:
 87            return False
 88
 89        cmd = f'docker rm "{container_id}"'
 90        self._logger.debug(f'Removing Docker container: {cmd}')
 91        status_code, output = subprocess.getstatusoutput(cmd)
 92
 93        return status_code == 0
 94
 95    def logs(self, container_id: str) -> List[str]:
 96        """Retrieve the logs of a container.
 97
 98        Parameters
 99        ----------
100        container_id : str
101            ID of the Docker container.
102
103        Returns
104        -------
105        logs : List[str]
106            List of loglines from the container.
107        """
108
109        cmd = f'docker logs "{container_id}"'
110        status_code, output = subprocess.getstatusoutput(cmd)
111
112        logs = []
113        for line in output.split('\n'):
114            logs.append(line.strip())
115
116        return logs
117
118    def pull(self, image: str) -> bool:
119        """Pull a Docker image from DockerHub or other registries.
120
121        If the image exists locally, pulling will be skipped without
122        the force flag.
123
124        Parameters
125        ----------
126        image : str
127            Name of the Docker container image.
128
129        Returns
130        -------
131        success : bool
132            True if starting the container was successful.
133        """
134        # Check if the image already exists
135        cmd = f'docker inspect "{image}"'
136        status_code, output = subprocess.getstatusoutput(cmd)
137        if status_code == 0:
138            return True
139
140        # Pull the image
141        cmd = f'docker pull -q "{image}"'
142        status_code, output = subprocess.getstatusoutput(cmd)
143        return status_code == 0
144
145    def run(self, image: str, command: str, name: str, detach: bool,
146            ports: dict, network: str, environment: dict,
147            volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]:
148        """Start a Docker container.
149
150        Parameters
151        ----------
152        image : str
153            Name of the Docker container image.
154        command : str
155            Command to execute in the Docker container.
156        name : str
157            Canonical name to assign to the container.
158        detach : bool
159            Whether to detach from the container or not.
160        ports : dict
161            Ports to expose from the container to the host.
162        network : str
163            Name of the Docker network to attach the container to.
164        environment : dict
165            Environment variables to set.
166        volumes : List[str]
167            Volumes to mount on the container from the host.
168        must_pull: bool
169            Whether the image should be pulled first, default is True.
170
171        Returns
172        -------
173        success : bool
174            True if starting the container was successful.
175        container_id : str
176            ID of the container that was started.
177        """
178
179        # Make sure the image is available locally
180        if must_pull:
181            self.pull(image)
182
183        # Avoid race condition between removing and starting the same container
184        removing = False
185        while (True):
186            cmd = f'docker ps -a | grep "{name}"'
187            status_code, output = subprocess.getstatusoutput(cmd)
188            if status_code == 0 and not removing:
189                cmd = f'docker stop "{name}" && docker rm "{name}"'
190                subprocess.getstatusoutput(cmd)
191                self._logger.debug(f'Schedule container "{name}" for removal')
192                removing = True
193            elif status_code != 0:
194                break
195            sleep(0.1)
196
197        # Start container
198        cmd = f'docker run --name "{name}"'
199        if detach:
200            cmd += ' --detach'
201        for variable, value in environment.items():
202            cmd += f' --env "{variable}={value}"'
203        for host_port, container_port in ports.items():
204            cmd += f' -p "{host_port}:{container_port}"'
205        for volume in volumes:
206            cmd += f' -v "{volume}"'
207        cmd += f' --network "{network}"'
208        cmd += f' {image} {command}'
209        self._logger.debug(f'Starting Docker container: {cmd}')
210        status_code, container_id = subprocess.getstatusoutput(cmd)
211        container_id = container_id.strip()
212        self._logger.debug(f'Container "{container_id}" running')
213
214        return status_code == 0, container_id
215
216    def create_network(self, network: str) -> bool:
217        """Create a Docker container network.
218
219        If the network already exist, it will not be recreated.
220
221        Parameters
222        ----------
223        network : str
224            Name of the network.
225
226        Returns
227        -------
228        success : bool
229            True if the network was created
230        """
231
232        # Check if network exist
233        cmd = f'docker network ls | grep "{network}"'
234        status_code, output = subprocess.getstatusoutput(cmd)
235        if status_code == 0:
236            return True
237
238        # Create it as it does not exist yet
239        cmd = f'docker network create "{network}"'
240        status_code, output = subprocess.getstatusoutput(cmd)
241        self._logger.debug(f'Created network "{network}"')
242
243        return status_code == 0
244
245    def info(self) -> Tuple[bool, dict]:
246        """Dump the Docker daemon system information.
247
248        Returns
249        -------
250        success : bool
251            True if the command succeed.
252        info : dict
253            Docker daemon system information as dictionary.
254        """
255
256        # Check if network exist
257        cmd = 'docker info --format \'{{json .}}\''
258        status_code, output = subprocess.getstatusoutput(cmd)
259
260        if status_code != 0:
261            return False, {}
262
263        info = json.loads(output)
264        self._logger.debug('Docker daemon system information successfully')
265        return True, info

Client for the Docker CLI.

Docker(logger: bench_executor.logger.Logger)
20    def __init__(self, logger: Logger):
21        """Creates an instance of the Docker class.
22
23        """
24        self._logger = logger

Creates an instance of the Docker class.

def exec(self, container_id: str, command: str) -> int:
26    def exec(self, container_id: str, command: str) -> int:
27        """Execute a command inside a running Docker container.
28
29        Parameters
30        ----------
31        container_id : str
32            ID of the Docker container.
33        command : str
34            Command to execute inside the Docker container.
35
36        Returns
37        -------
38        status_code : int
39            The exit status code of the executed command.
40        """
41
42        cmd = f'docker exec "{container_id}" {command}'
43        self._logger.debug(f'Executing command in Docker container: {cmd}')
44        status_code, output = subprocess.getstatusoutput(cmd)
45
46        return status_code

Execute a command inside a running Docker container.

Parameters
  • container_id (str): ID of the Docker container.
  • command (str): Command to execute inside the Docker container.
Returns
  • status_code (int): The exit status code of the executed command.
def wait(self, container_id: str) -> int:
48    def wait(self, container_id: str) -> int:
49        """Wait for Docker container to exit.
50
51        Parameters
52        ----------
53        container_id : str
54            ID of the Docker container.
55
56        Returns
57        -------
58        status_code : int
59            The exit status code of the Docker container.
60        """
61
62        cmd = f'docker wait "{container_id}"'
63        self._logger.debug(f'Waiting for Docker container: {cmd}')
64        status_code, output = subprocess.getstatusoutput(cmd)
65
66        return status_code

Wait for Docker container to exit.

Parameters
  • container_id (str): ID of the Docker container.
Returns
  • status_code (int): The exit status code of the Docker container.
def stop(self, container_id: str) -> bool:
68    def stop(self, container_id: str) -> bool:
69        """Stop a running Docker container.
70
71        Parameters
72        ----------
73        container_id : str
74            ID of the Docker container.
75
76        Returns
77        -------
78        success : bool
79            True if stopping the container was successful.
80        """
81
82        cmd = f'docker stop "{container_id}"'
83        self._logger.debug(f'Stopping Docker container: {cmd}')
84        status_code, output = subprocess.getstatusoutput(cmd)
85
86        if status_code != 0:
87            return False
88
89        cmd = f'docker rm "{container_id}"'
90        self._logger.debug(f'Removing Docker container: {cmd}')
91        status_code, output = subprocess.getstatusoutput(cmd)
92
93        return status_code == 0

Stop a running Docker container.

Parameters
  • container_id (str): ID of the Docker container.
Returns
  • success (bool): True if stopping the container was successful.
def logs(self, container_id: str) -> List[str]:
 95    def logs(self, container_id: str) -> List[str]:
 96        """Retrieve the logs of a container.
 97
 98        Parameters
 99        ----------
100        container_id : str
101            ID of the Docker container.
102
103        Returns
104        -------
105        logs : List[str]
106            List of loglines from the container.
107        """
108
109        cmd = f'docker logs "{container_id}"'
110        status_code, output = subprocess.getstatusoutput(cmd)
111
112        logs = []
113        for line in output.split('\n'):
114            logs.append(line.strip())
115
116        return logs

Retrieve the logs of a container.

Parameters
  • container_id (str): ID of the Docker container.
Returns
  • logs (List[str]): List of loglines from the container.
def pull(self, image: str) -> bool:
118    def pull(self, image: str) -> bool:
119        """Pull a Docker image from DockerHub or other registries.
120
121        If the image exists locally, pulling will be skipped without
122        the force flag.
123
124        Parameters
125        ----------
126        image : str
127            Name of the Docker container image.
128
129        Returns
130        -------
131        success : bool
132            True if starting the container was successful.
133        """
134        # Check if the image already exists
135        cmd = f'docker inspect "{image}"'
136        status_code, output = subprocess.getstatusoutput(cmd)
137        if status_code == 0:
138            return True
139
140        # Pull the image
141        cmd = f'docker pull -q "{image}"'
142        status_code, output = subprocess.getstatusoutput(cmd)
143        return status_code == 0

Pull a Docker image from DockerHub or other registries.

If the image exists locally, pulling will be skipped without the force flag.

Parameters
  • image (str): Name of the Docker container image.
Returns
  • success (bool): True if starting the container was successful.
def run( self, image: str, command: str, name: str, detach: bool, ports: dict, network: str, environment: dict, volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]:
145    def run(self, image: str, command: str, name: str, detach: bool,
146            ports: dict, network: str, environment: dict,
147            volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]:
148        """Start a Docker container.
149
150        Parameters
151        ----------
152        image : str
153            Name of the Docker container image.
154        command : str
155            Command to execute in the Docker container.
156        name : str
157            Canonical name to assign to the container.
158        detach : bool
159            Whether to detach from the container or not.
160        ports : dict
161            Ports to expose from the container to the host.
162        network : str
163            Name of the Docker network to attach the container to.
164        environment : dict
165            Environment variables to set.
166        volumes : List[str]
167            Volumes to mount on the container from the host.
168        must_pull: bool
169            Whether the image should be pulled first, default is True.
170
171        Returns
172        -------
173        success : bool
174            True if starting the container was successful.
175        container_id : str
176            ID of the container that was started.
177        """
178
179        # Make sure the image is available locally
180        if must_pull:
181            self.pull(image)
182
183        # Avoid race condition between removing and starting the same container
184        removing = False
185        while (True):
186            cmd = f'docker ps -a | grep "{name}"'
187            status_code, output = subprocess.getstatusoutput(cmd)
188            if status_code == 0 and not removing:
189                cmd = f'docker stop "{name}" && docker rm "{name}"'
190                subprocess.getstatusoutput(cmd)
191                self._logger.debug(f'Schedule container "{name}" for removal')
192                removing = True
193            elif status_code != 0:
194                break
195            sleep(0.1)
196
197        # Start container
198        cmd = f'docker run --name "{name}"'
199        if detach:
200            cmd += ' --detach'
201        for variable, value in environment.items():
202            cmd += f' --env "{variable}={value}"'
203        for host_port, container_port in ports.items():
204            cmd += f' -p "{host_port}:{container_port}"'
205        for volume in volumes:
206            cmd += f' -v "{volume}"'
207        cmd += f' --network "{network}"'
208        cmd += f' {image} {command}'
209        self._logger.debug(f'Starting Docker container: {cmd}')
210        status_code, container_id = subprocess.getstatusoutput(cmd)
211        container_id = container_id.strip()
212        self._logger.debug(f'Container "{container_id}" running')
213
214        return status_code == 0, container_id

Start a Docker container.

Parameters
  • image (str): Name of the Docker container image.
  • command (str): Command to execute in the Docker container.
  • name (str): Canonical name to assign to the container.
  • detach (bool): Whether to detach from the container or not.
  • ports (dict): Ports to expose from the container to the host.
  • network (str): Name of the Docker network to attach the container to.
  • environment (dict): Environment variables to set.
  • volumes (List[str]): Volumes to mount on the container from the host.
  • must_pull (bool): Whether the image should be pulled first, default is True.
Returns
  • success (bool): True if starting the container was successful.
  • container_id (str): ID of the container that was started.
def create_network(self, network: str) -> bool:
216    def create_network(self, network: str) -> bool:
217        """Create a Docker container network.
218
219        If the network already exist, it will not be recreated.
220
221        Parameters
222        ----------
223        network : str
224            Name of the network.
225
226        Returns
227        -------
228        success : bool
229            True if the network was created
230        """
231
232        # Check if network exist
233        cmd = f'docker network ls | grep "{network}"'
234        status_code, output = subprocess.getstatusoutput(cmd)
235        if status_code == 0:
236            return True
237
238        # Create it as it does not exist yet
239        cmd = f'docker network create "{network}"'
240        status_code, output = subprocess.getstatusoutput(cmd)
241        self._logger.debug(f'Created network "{network}"')
242
243        return status_code == 0

Create a Docker container network.

If the network already exist, it will not be recreated.

Parameters
  • network (str): Name of the network.
Returns
  • success (bool): True if the network was created
def info(self) -> Tuple[bool, dict]:
245    def info(self) -> Tuple[bool, dict]:
246        """Dump the Docker daemon system information.
247
248        Returns
249        -------
250        success : bool
251            True if the command succeed.
252        info : dict
253            Docker daemon system information as dictionary.
254        """
255
256        # Check if network exist
257        cmd = 'docker info --format \'{{json .}}\''
258        status_code, output = subprocess.getstatusoutput(cmd)
259
260        if status_code != 0:
261            return False, {}
262
263        info = json.loads(output)
264        self._logger.debug('Docker daemon system information successfully')
265        return True, info

Dump the Docker daemon system information.

Returns
  • success (bool): True if the command succeed.
  • info (dict): Docker daemon system information as dictionary.