bench_executor.docker
This module holds the Docker class which implements the necessary API calls for the Container class to control Docker containers. The docker-py module which is similar has serious issues with resource leaking for years.
1#!/usr/bin/env python3 2 3""" 4This module holds the Docker class which implements the necessary API calls 5for the Container class to control Docker containers. The docker-py module 6which is similar has serious issues with resource leaking for years. 7""" 8 9import json 10import subprocess 11from time import sleep 12from typing import List, Tuple 13from bench_executor.logger import Logger 14 15 16class Docker(): 17 """Client for the Docker CLI.""" 18 19 def __init__(self, logger: Logger): 20 """Creates an instance of the Docker class. 21 22 """ 23 self._logger = logger 24 25 def exec(self, container_id: str, command: str) -> int: 26 """Execute a command inside a running Docker container. 27 28 Parameters 29 ---------- 30 container_id : str 31 ID of the Docker container. 32 command : str 33 Command to execute inside the Docker container. 34 35 Returns 36 ------- 37 status_code : int 38 The exit status code of the executed command. 39 """ 40 41 cmd = f'docker exec "{container_id}" {command}' 42 self._logger.debug(f'Executing command in Docker container: {cmd}') 43 status_code, output = subprocess.getstatusoutput(cmd) 44 45 return status_code 46 47 def wait(self, container_id: str) -> int: 48 """Wait for Docker container to exit. 49 50 Parameters 51 ---------- 52 container_id : str 53 ID of the Docker container. 54 55 Returns 56 ------- 57 status_code : int 58 The exit status code of the Docker container. 59 """ 60 61 cmd = f'docker wait "{container_id}"' 62 self._logger.debug(f'Waiting for Docker container: {cmd}') 63 status_code, output = subprocess.getstatusoutput(cmd) 64 65 return status_code 66 67 def stop(self, container_id: str) -> bool: 68 """Stop a running Docker container. 69 70 Parameters 71 ---------- 72 container_id : str 73 ID of the Docker container. 74 75 Returns 76 ------- 77 success : bool 78 True if stopping the container was successful. 79 """ 80 81 cmd = f'docker stop "{container_id}"' 82 self._logger.debug(f'Stopping Docker container: {cmd}') 83 status_code, output = subprocess.getstatusoutput(cmd) 84 85 if status_code != 0: 86 return False 87 88 cmd = f'docker rm "{container_id}"' 89 self._logger.debug(f'Removing Docker container: {cmd}') 90 status_code, output = subprocess.getstatusoutput(cmd) 91 92 return status_code == 0 93 94 def logs(self, container_id: str) -> List[str]: 95 """Retrieve the logs of a container. 96 97 Parameters 98 ---------- 99 container_id : str 100 ID of the Docker container. 101 102 Returns 103 ------- 104 logs : List[str] 105 List of loglines from the container. 106 """ 107 108 cmd = f'docker logs "{container_id}"' 109 status_code, output = subprocess.getstatusoutput(cmd) 110 111 logs = [] 112 for line in output.split('\n'): 113 logs.append(line.strip()) 114 115 return logs 116 117 def pull(self, image: str) -> bool: 118 """Pull a Docker image from DockerHub or other registries. 119 120 If the image exists locally, pulling will be skipped without 121 the force flag. 122 123 Parameters 124 ---------- 125 image : str 126 Name of the Docker container image. 127 128 Returns 129 ------- 130 success : bool 131 True if starting the container was successful. 132 """ 133 # Check if the image already exists 134 cmd = f'docker inspect "{image}"' 135 status_code, output = subprocess.getstatusoutput(cmd) 136 if status_code == 0: 137 return True 138 139 # Pull the image 140 cmd = f'docker pull -q "{image}"' 141 status_code, output = subprocess.getstatusoutput(cmd) 142 return status_code == 0 143 144 def run(self, image: str, command: str, name: str, detach: bool, 145 ports: dict, network: str, environment: dict, 146 volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]: 147 """Start a Docker container. 148 149 Parameters 150 ---------- 151 image : str 152 Name of the Docker container image. 153 command : str 154 Command to execute in the Docker container. 155 name : str 156 Canonical name to assign to the container. 157 detach : bool 158 Whether to detach from the container or not. 159 ports : dict 160 Ports to expose from the container to the host. 161 network : str 162 Name of the Docker network to attach the container to. 163 environment : dict 164 Environment variables to set. 165 volumes : List[str] 166 Volumes to mount on the container from the host. 167 must_pull: bool 168 Whether the image should be pulled first, default is True. 169 170 Returns 171 ------- 172 success : bool 173 True if starting the container was successful. 174 container_id : str 175 ID of the container that was started. 176 """ 177 178 # Make sure the image is available locally 179 if must_pull: 180 self.pull(image) 181 182 # Avoid race condition between removing and starting the same container 183 removing = False 184 while (True): 185 cmd = f'docker ps -a | grep "{name}"' 186 status_code, output = subprocess.getstatusoutput(cmd) 187 if status_code == 0 and not removing: 188 cmd = f'docker stop "{name}" && docker rm "{name}"' 189 subprocess.getstatusoutput(cmd) 190 self._logger.debug(f'Schedule container "{name}" for removal') 191 removing = True 192 elif status_code != 0: 193 break 194 sleep(0.1) 195 196 # Start container 197 cmd = f'docker run --name "{name}"' 198 if detach: 199 cmd += ' --detach' 200 for variable, value in environment.items(): 201 cmd += f' --env "{variable}={value}"' 202 for host_port, container_port in ports.items(): 203 cmd += f' -p "{host_port}:{container_port}"' 204 for volume in volumes: 205 cmd += f' -v "{volume}"' 206 cmd += f' --network "{network}"' 207 cmd += f' {image} {command}' 208 self._logger.debug(f'Starting Docker container: {cmd}') 209 status_code, container_id = subprocess.getstatusoutput(cmd) 210 container_id = container_id.strip() 211 self._logger.debug(f'Container "{container_id}" running') 212 213 return status_code == 0, container_id 214 215 def create_network(self, network: str) -> bool: 216 """Create a Docker container network. 217 218 If the network already exist, it will not be recreated. 219 220 Parameters 221 ---------- 222 network : str 223 Name of the network. 224 225 Returns 226 ------- 227 success : bool 228 True if the network was created 229 """ 230 231 # Check if network exist 232 cmd = f'docker network ls | grep "{network}"' 233 status_code, output = subprocess.getstatusoutput(cmd) 234 if status_code == 0: 235 return True 236 237 # Create it as it does not exist yet 238 cmd = f'docker network create "{network}"' 239 status_code, output = subprocess.getstatusoutput(cmd) 240 self._logger.debug(f'Created network "{network}"') 241 242 return status_code == 0 243 244 def info(self) -> Tuple[bool, dict]: 245 """Dump the Docker daemon system information. 246 247 Returns 248 ------- 249 success : bool 250 True if the command succeed. 251 info : dict 252 Docker daemon system information as dictionary. 253 """ 254 255 # Check if network exist 256 cmd = 'docker info --format \'{{json .}}\'' 257 status_code, output = subprocess.getstatusoutput(cmd) 258 259 if status_code != 0: 260 return False, {} 261 262 info = json.loads(output) 263 self._logger.debug('Docker daemon system information successfully') 264 return True, info
class
Docker:
17class Docker(): 18 """Client for the Docker CLI.""" 19 20 def __init__(self, logger: Logger): 21 """Creates an instance of the Docker class. 22 23 """ 24 self._logger = logger 25 26 def exec(self, container_id: str, command: str) -> int: 27 """Execute a command inside a running Docker container. 28 29 Parameters 30 ---------- 31 container_id : str 32 ID of the Docker container. 33 command : str 34 Command to execute inside the Docker container. 35 36 Returns 37 ------- 38 status_code : int 39 The exit status code of the executed command. 40 """ 41 42 cmd = f'docker exec "{container_id}" {command}' 43 self._logger.debug(f'Executing command in Docker container: {cmd}') 44 status_code, output = subprocess.getstatusoutput(cmd) 45 46 return status_code 47 48 def wait(self, container_id: str) -> int: 49 """Wait for Docker container to exit. 50 51 Parameters 52 ---------- 53 container_id : str 54 ID of the Docker container. 55 56 Returns 57 ------- 58 status_code : int 59 The exit status code of the Docker container. 60 """ 61 62 cmd = f'docker wait "{container_id}"' 63 self._logger.debug(f'Waiting for Docker container: {cmd}') 64 status_code, output = subprocess.getstatusoutput(cmd) 65 66 return status_code 67 68 def stop(self, container_id: str) -> bool: 69 """Stop a running Docker container. 70 71 Parameters 72 ---------- 73 container_id : str 74 ID of the Docker container. 75 76 Returns 77 ------- 78 success : bool 79 True if stopping the container was successful. 80 """ 81 82 cmd = f'docker stop "{container_id}"' 83 self._logger.debug(f'Stopping Docker container: {cmd}') 84 status_code, output = subprocess.getstatusoutput(cmd) 85 86 if status_code != 0: 87 return False 88 89 cmd = f'docker rm "{container_id}"' 90 self._logger.debug(f'Removing Docker container: {cmd}') 91 status_code, output = subprocess.getstatusoutput(cmd) 92 93 return status_code == 0 94 95 def logs(self, container_id: str) -> List[str]: 96 """Retrieve the logs of a container. 97 98 Parameters 99 ---------- 100 container_id : str 101 ID of the Docker container. 102 103 Returns 104 ------- 105 logs : List[str] 106 List of loglines from the container. 107 """ 108 109 cmd = f'docker logs "{container_id}"' 110 status_code, output = subprocess.getstatusoutput(cmd) 111 112 logs = [] 113 for line in output.split('\n'): 114 logs.append(line.strip()) 115 116 return logs 117 118 def pull(self, image: str) -> bool: 119 """Pull a Docker image from DockerHub or other registries. 120 121 If the image exists locally, pulling will be skipped without 122 the force flag. 123 124 Parameters 125 ---------- 126 image : str 127 Name of the Docker container image. 128 129 Returns 130 ------- 131 success : bool 132 True if starting the container was successful. 133 """ 134 # Check if the image already exists 135 cmd = f'docker inspect "{image}"' 136 status_code, output = subprocess.getstatusoutput(cmd) 137 if status_code == 0: 138 return True 139 140 # Pull the image 141 cmd = f'docker pull -q "{image}"' 142 status_code, output = subprocess.getstatusoutput(cmd) 143 return status_code == 0 144 145 def run(self, image: str, command: str, name: str, detach: bool, 146 ports: dict, network: str, environment: dict, 147 volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]: 148 """Start a Docker container. 149 150 Parameters 151 ---------- 152 image : str 153 Name of the Docker container image. 154 command : str 155 Command to execute in the Docker container. 156 name : str 157 Canonical name to assign to the container. 158 detach : bool 159 Whether to detach from the container or not. 160 ports : dict 161 Ports to expose from the container to the host. 162 network : str 163 Name of the Docker network to attach the container to. 164 environment : dict 165 Environment variables to set. 166 volumes : List[str] 167 Volumes to mount on the container from the host. 168 must_pull: bool 169 Whether the image should be pulled first, default is True. 170 171 Returns 172 ------- 173 success : bool 174 True if starting the container was successful. 175 container_id : str 176 ID of the container that was started. 177 """ 178 179 # Make sure the image is available locally 180 if must_pull: 181 self.pull(image) 182 183 # Avoid race condition between removing and starting the same container 184 removing = False 185 while (True): 186 cmd = f'docker ps -a | grep "{name}"' 187 status_code, output = subprocess.getstatusoutput(cmd) 188 if status_code == 0 and not removing: 189 cmd = f'docker stop "{name}" && docker rm "{name}"' 190 subprocess.getstatusoutput(cmd) 191 self._logger.debug(f'Schedule container "{name}" for removal') 192 removing = True 193 elif status_code != 0: 194 break 195 sleep(0.1) 196 197 # Start container 198 cmd = f'docker run --name "{name}"' 199 if detach: 200 cmd += ' --detach' 201 for variable, value in environment.items(): 202 cmd += f' --env "{variable}={value}"' 203 for host_port, container_port in ports.items(): 204 cmd += f' -p "{host_port}:{container_port}"' 205 for volume in volumes: 206 cmd += f' -v "{volume}"' 207 cmd += f' --network "{network}"' 208 cmd += f' {image} {command}' 209 self._logger.debug(f'Starting Docker container: {cmd}') 210 status_code, container_id = subprocess.getstatusoutput(cmd) 211 container_id = container_id.strip() 212 self._logger.debug(f'Container "{container_id}" running') 213 214 return status_code == 0, container_id 215 216 def create_network(self, network: str) -> bool: 217 """Create a Docker container network. 218 219 If the network already exist, it will not be recreated. 220 221 Parameters 222 ---------- 223 network : str 224 Name of the network. 225 226 Returns 227 ------- 228 success : bool 229 True if the network was created 230 """ 231 232 # Check if network exist 233 cmd = f'docker network ls | grep "{network}"' 234 status_code, output = subprocess.getstatusoutput(cmd) 235 if status_code == 0: 236 return True 237 238 # Create it as it does not exist yet 239 cmd = f'docker network create "{network}"' 240 status_code, output = subprocess.getstatusoutput(cmd) 241 self._logger.debug(f'Created network "{network}"') 242 243 return status_code == 0 244 245 def info(self) -> Tuple[bool, dict]: 246 """Dump the Docker daemon system information. 247 248 Returns 249 ------- 250 success : bool 251 True if the command succeed. 252 info : dict 253 Docker daemon system information as dictionary. 254 """ 255 256 # Check if network exist 257 cmd = 'docker info --format \'{{json .}}\'' 258 status_code, output = subprocess.getstatusoutput(cmd) 259 260 if status_code != 0: 261 return False, {} 262 263 info = json.loads(output) 264 self._logger.debug('Docker daemon system information successfully') 265 return True, info
Client for the Docker CLI.
Docker(logger: bench_executor.logger.Logger)
20 def __init__(self, logger: Logger): 21 """Creates an instance of the Docker class. 22 23 """ 24 self._logger = logger
Creates an instance of the Docker class.
def
exec(self, container_id: str, command: str) -> int:
26 def exec(self, container_id: str, command: str) -> int: 27 """Execute a command inside a running Docker container. 28 29 Parameters 30 ---------- 31 container_id : str 32 ID of the Docker container. 33 command : str 34 Command to execute inside the Docker container. 35 36 Returns 37 ------- 38 status_code : int 39 The exit status code of the executed command. 40 """ 41 42 cmd = f'docker exec "{container_id}" {command}' 43 self._logger.debug(f'Executing command in Docker container: {cmd}') 44 status_code, output = subprocess.getstatusoutput(cmd) 45 46 return status_code
Execute a command inside a running Docker container.
Parameters
- container_id (str): ID of the Docker container.
- command (str): Command to execute inside the Docker container.
Returns
- status_code (int): The exit status code of the executed command.
def
wait(self, container_id: str) -> int:
48 def wait(self, container_id: str) -> int: 49 """Wait for Docker container to exit. 50 51 Parameters 52 ---------- 53 container_id : str 54 ID of the Docker container. 55 56 Returns 57 ------- 58 status_code : int 59 The exit status code of the Docker container. 60 """ 61 62 cmd = f'docker wait "{container_id}"' 63 self._logger.debug(f'Waiting for Docker container: {cmd}') 64 status_code, output = subprocess.getstatusoutput(cmd) 65 66 return status_code
Wait for Docker container to exit.
Parameters
- container_id (str): ID of the Docker container.
Returns
- status_code (int): The exit status code of the Docker container.
def
stop(self, container_id: str) -> bool:
68 def stop(self, container_id: str) -> bool: 69 """Stop a running Docker container. 70 71 Parameters 72 ---------- 73 container_id : str 74 ID of the Docker container. 75 76 Returns 77 ------- 78 success : bool 79 True if stopping the container was successful. 80 """ 81 82 cmd = f'docker stop "{container_id}"' 83 self._logger.debug(f'Stopping Docker container: {cmd}') 84 status_code, output = subprocess.getstatusoutput(cmd) 85 86 if status_code != 0: 87 return False 88 89 cmd = f'docker rm "{container_id}"' 90 self._logger.debug(f'Removing Docker container: {cmd}') 91 status_code, output = subprocess.getstatusoutput(cmd) 92 93 return status_code == 0
Stop a running Docker container.
Parameters
- container_id (str): ID of the Docker container.
Returns
- success (bool): True if stopping the container was successful.
def
logs(self, container_id: str) -> List[str]:
95 def logs(self, container_id: str) -> List[str]: 96 """Retrieve the logs of a container. 97 98 Parameters 99 ---------- 100 container_id : str 101 ID of the Docker container. 102 103 Returns 104 ------- 105 logs : List[str] 106 List of loglines from the container. 107 """ 108 109 cmd = f'docker logs "{container_id}"' 110 status_code, output = subprocess.getstatusoutput(cmd) 111 112 logs = [] 113 for line in output.split('\n'): 114 logs.append(line.strip()) 115 116 return logs
Retrieve the logs of a container.
Parameters
- container_id (str): ID of the Docker container.
Returns
- logs (List[str]): List of loglines from the container.
def
pull(self, image: str) -> bool:
118 def pull(self, image: str) -> bool: 119 """Pull a Docker image from DockerHub or other registries. 120 121 If the image exists locally, pulling will be skipped without 122 the force flag. 123 124 Parameters 125 ---------- 126 image : str 127 Name of the Docker container image. 128 129 Returns 130 ------- 131 success : bool 132 True if starting the container was successful. 133 """ 134 # Check if the image already exists 135 cmd = f'docker inspect "{image}"' 136 status_code, output = subprocess.getstatusoutput(cmd) 137 if status_code == 0: 138 return True 139 140 # Pull the image 141 cmd = f'docker pull -q "{image}"' 142 status_code, output = subprocess.getstatusoutput(cmd) 143 return status_code == 0
Pull a Docker image from DockerHub or other registries.
If the image exists locally, pulling will be skipped without the force flag.
Parameters
- image (str): Name of the Docker container image.
Returns
- success (bool): True if starting the container was successful.
def
run( self, image: str, command: str, name: str, detach: bool, ports: dict, network: str, environment: dict, volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]:
145 def run(self, image: str, command: str, name: str, detach: bool, 146 ports: dict, network: str, environment: dict, 147 volumes: List[str], must_pull: bool = True) -> Tuple[bool, str]: 148 """Start a Docker container. 149 150 Parameters 151 ---------- 152 image : str 153 Name of the Docker container image. 154 command : str 155 Command to execute in the Docker container. 156 name : str 157 Canonical name to assign to the container. 158 detach : bool 159 Whether to detach from the container or not. 160 ports : dict 161 Ports to expose from the container to the host. 162 network : str 163 Name of the Docker network to attach the container to. 164 environment : dict 165 Environment variables to set. 166 volumes : List[str] 167 Volumes to mount on the container from the host. 168 must_pull: bool 169 Whether the image should be pulled first, default is True. 170 171 Returns 172 ------- 173 success : bool 174 True if starting the container was successful. 175 container_id : str 176 ID of the container that was started. 177 """ 178 179 # Make sure the image is available locally 180 if must_pull: 181 self.pull(image) 182 183 # Avoid race condition between removing and starting the same container 184 removing = False 185 while (True): 186 cmd = f'docker ps -a | grep "{name}"' 187 status_code, output = subprocess.getstatusoutput(cmd) 188 if status_code == 0 and not removing: 189 cmd = f'docker stop "{name}" && docker rm "{name}"' 190 subprocess.getstatusoutput(cmd) 191 self._logger.debug(f'Schedule container "{name}" for removal') 192 removing = True 193 elif status_code != 0: 194 break 195 sleep(0.1) 196 197 # Start container 198 cmd = f'docker run --name "{name}"' 199 if detach: 200 cmd += ' --detach' 201 for variable, value in environment.items(): 202 cmd += f' --env "{variable}={value}"' 203 for host_port, container_port in ports.items(): 204 cmd += f' -p "{host_port}:{container_port}"' 205 for volume in volumes: 206 cmd += f' -v "{volume}"' 207 cmd += f' --network "{network}"' 208 cmd += f' {image} {command}' 209 self._logger.debug(f'Starting Docker container: {cmd}') 210 status_code, container_id = subprocess.getstatusoutput(cmd) 211 container_id = container_id.strip() 212 self._logger.debug(f'Container "{container_id}" running') 213 214 return status_code == 0, container_id
Start a Docker container.
Parameters
- image (str): Name of the Docker container image.
- command (str): Command to execute in the Docker container.
- name (str): Canonical name to assign to the container.
- detach (bool): Whether to detach from the container or not.
- ports (dict): Ports to expose from the container to the host.
- network (str): Name of the Docker network to attach the container to.
- environment (dict): Environment variables to set.
- volumes (List[str]): Volumes to mount on the container from the host.
- must_pull (bool): Whether the image should be pulled first, default is True.
Returns
- success (bool): True if starting the container was successful.
- container_id (str): ID of the container that was started.
def
create_network(self, network: str) -> bool:
216 def create_network(self, network: str) -> bool: 217 """Create a Docker container network. 218 219 If the network already exist, it will not be recreated. 220 221 Parameters 222 ---------- 223 network : str 224 Name of the network. 225 226 Returns 227 ------- 228 success : bool 229 True if the network was created 230 """ 231 232 # Check if network exist 233 cmd = f'docker network ls | grep "{network}"' 234 status_code, output = subprocess.getstatusoutput(cmd) 235 if status_code == 0: 236 return True 237 238 # Create it as it does not exist yet 239 cmd = f'docker network create "{network}"' 240 status_code, output = subprocess.getstatusoutput(cmd) 241 self._logger.debug(f'Created network "{network}"') 242 243 return status_code == 0
Create a Docker container network.
If the network already exist, it will not be recreated.
Parameters
- network (str): Name of the network.
Returns
- success (bool): True if the network was created
def
info(self) -> Tuple[bool, dict]:
245 def info(self) -> Tuple[bool, dict]: 246 """Dump the Docker daemon system information. 247 248 Returns 249 ------- 250 success : bool 251 True if the command succeed. 252 info : dict 253 Docker daemon system information as dictionary. 254 """ 255 256 # Check if network exist 257 cmd = 'docker info --format \'{{json .}}\'' 258 status_code, output = subprocess.getstatusoutput(cmd) 259 260 if status_code != 0: 261 return False, {} 262 263 info = json.loads(output) 264 self._logger.debug('Docker daemon system information successfully') 265 return True, info
Dump the Docker daemon system information.
Returns
- success (bool): True if the command succeed.
- info (dict): Docker daemon system information as dictionary.