import docker
import os, shutil, tempfile, ast, json
from .logger import DolostLogger
from ..context import Context
[docs]
class DockerManager:
"""
A class for managing the interactions with docker's API
Attributes:
_instance (cls): Instance of the class.
"""
_instance = None
[docs]
@classmethod
def get_instance(cls, docker_client=None):
"""
Get an instance of the DockerManager class.
If an instance does not exist, create a new one. If a `docker_client` is provided,
configure the Docker client and check the connection.
Args:
docker_client (docker.client.DockerClient, optional): An instance of DockerClient to use for interaction.
Returns:
DockerManager: An instance of the DockerManager class.
"""
if cls._instance is None:
cls._instance = cls(docker_client=docker_client)
elif docker_client is not None:
cls._instance.configure_client(docker_client)
cls._instance.check_connection()
return cls._instance
def __init__(self, docker_client=None):
"""
Initialize a DockerManager instance.
Args:
docker_client: The way to connect to docker
Returns:
None
"""
self.logger = DolostLogger.get_instance()
self.client = None
if docker_client:
self.configure_client(docker_client)
[docs]
def check_connection(self):
"""
Check if the Docker connection is successful.
"""
if self.client is None:
self.logger.error("[Docker] client is not set")
try:
self.client.ping()
self.logger.debug("[Docker] connection successful")
return True
except Exception as e:
self.logger.error(f"Failed to connect to Docker: {e}")
return False
[docs]
def check_client_configuration(self, docker_client):
"""
Check before configuring Docker client based on the provided configuration.
Args:
docker_client: Configuration for connecting to Docker client
Returns:
None
"""
try:
if 'from_env' in docker_client:
client = docker.from_env()
self.logger.trace("Docker client configured from env")
elif 'tcp' in docker_client:
client = docker.DockerClient(base_url=docker_client['tcp'])
self.logger.trace("Docker client configured from TCP")
elif 'ssl' in docker_client:
tls_config = docker.tls.TLSConfig(
client_cert=(docker_client['tcp_ssl']['cert_path'], docker_client['tcp_ssl']['key_path']),
ca_cert=docker_client['tcp_ssl']['ca_path']
)
client = docker.DockerClient(base_url=f"tcp://{docker_client['tcp_ssl']['host']}:{docker_client['tcp_ssl']['port']}", tls=tls_config)
self.logger.trace("Docker client configured from TCP+SSL")
elif 'socket' in docker_client:
client = docker.DockerClient(base_url=docker_client['socket'])
self.logger.trace("Docker client configured from SOCKET")
else:
raise ValueError("Invalid Docker client configuration")
return False
client.ping()
self.logger.debug("Docker connection successful")
return True
except Exception as e:
self.logger.error(f"Failed to configure Docker client: {e}")
return False
[docs]
def get_current_client_config(self):
"""
Get the current Docker client configuration.
Returns:
dict or None: The current Docker client configuration loaded from the config file,
or None if the config file does not exist.
"""
docker_client = None
if os.path.exists(Context.CONFIG_PATH):
with open(Context.CONFIG_PATH, 'r') as config_file:
docker_client = json.load(config_file)
return docker_client
[docs]
def build_context(self, source_service_folder, tmp_location, dockerfile_used):
"""
Copy files to a temporary folder for Docker build context.
Args:
source_service_folder (str): Path to the source directory.
tmp_location (str): Temporary directory to store the Docker build context.
dockerfile_used (str): Path to the Dockerfile to be used.
Returns:
None
"""
# Copy files to tmp folder
tmp_app_destination = os.path.join(tmp_location, "app")
# Verify if tmp location already exist
if not os.path.exists(tmp_app_destination):
os.makedirs(tmp_app_destination)
# Copy service content to tmp_folder
for item in os.listdir(source_service_folder):
s = os.path.join(source_service_folder, item)
d = os.path.join(tmp_app_destination, item)
if os.path.isdir(s):
shutil.copytree(s, d, dirs_exist_ok=True)
else:
shutil.copy2(s, d)
# Copy Dockerfile to tmp_folder
tmp_dockerfile_destination = tmp_location + "/Dockerfile"
shutil.copy(dockerfile_used, tmp_dockerfile_destination)
[docs]
def build_image(self, image_name=None, decoy_files=None, dockerfile_path=".",):
"""
Build a Docker image based on the specified Dockerfile and service files.
Args:
image_name (str, optional): Name to tag the built image. If None, a default name will be used.
decoy_files (str, optional): Path to the service files.
dockerfile_path (str, optional): Path to the Dockerfile. Defaults to '.'.
Returns:
str: The image's id.
"""
source_service_folder = os.path.join(Context.base_dir, f'decoyfiles/{decoy_files}')
# Temp directory to copy to the Docker image
with tempfile.TemporaryDirectory() as tmp_dir_name:
self.build_context(source_service_folder=source_service_folder, tmp_location=tmp_dir_name, dockerfile_used=dockerfile_path)
if (image_name == "dolost-collector"):
image_name = "collector_image:dolost"
else:
image_name = f"decoy_image_{decoy_files}:dolost"
try:
self.logger.info(f"Building Docker image '{image_name}'...")
self.logger.trace(tmp_dir_name)
build_output = self.client.api.build(
path=tmp_dir_name,
tag=image_name,
forcerm=True,
decode=True
)
# Iterate through the output and print progress
for event in build_output:
if 'stream' in event:
lines = event['stream'].split('\n')
for line in lines:
if line.strip():
if "Step" in line.strip():
self.logger.debug(line.strip())
else:
self.logger.trace(line.strip())
if 'aux' in event and 'ID' in event['aux']:
image_id = event['aux']['ID']
self.logger.info(f"Image '{image_name}' successfully built!")
return image_name
except docker.errors.BuildError as e:
self.logger.error(f"Error building Docker image: {e}")
except docker.errors.APIError as e:
self.logger.error(f"Error accessing Docker API: {e}")
[docs]
def run_container(self, image_name, hostname, name, network_name=None, ipv4_address=None, subnet=None, gateway=None, ports=None ):
"""
Run a Docker container based on the built image, with custom hostname and container name.
Args:
image_name (str): Name of the Docker image to run the container from.
hostname (str): The container's decoy hostname.
name (str): The container's name, used for management purposes.
network_name (str, optional): The name of the network to connect the container to.
ipv4_address (str, optional): The IPv4 address to assign to the container.
subnet (str, optional): The subnet address to assign to the container's network.
gateway (str, optional): The gateway IP address for the container's network.
ports (dict): The ports configuration for mapping to containers.
Returns:
None
"""
# Rename the name paramenter for the container
name = f"DolosT-{name}"
network_name =f"DolosT-{network_name}"
# Check if the container already exist
dolost_containers = self.get_containers()
for dolost_container in dolost_containers:
if name == dolost_container['Names'][0].lstrip('/'):
self.logger.info(f"Container {name} already deployed. Skiping...")
return
if network_name:
filters = {'name': [network_name]}
network = self.client.api.networks(filters=filters)
if network:
self.logger.info(f"Network {network_name} already deployed. Skiping...")
else:
self.logger.info(f"Deploying network {network_name} ...")
# Create the specified internal network if not already exists
# Define the IPAM config, including the subnet
ipam_pool = docker.types.IPAMPool(
subnet = subnet,
gateway = gateway
)
ipam_config = docker.types.IPAMConfig(
pool_configs=[ipam_pool]
)
# Not working BINDING + Internal or Host
# If the network is not to the Collector leave it with internal access only
#if (network_name == "DolosTCollectorNetwork"):
# NetworkDriver="bridge"
#else:
# NetworkDriver="host"
network = self.client.api.create_network(network_name, ipam=ipam_config)
network_id = network["Id"]
# Define extra_hosts for setting the gateway
extra_hosts = {hostname: ipv4_address, "gateway": gateway} if network_name else None
#Convert ports var from str to dict
ports = ast.literal_eval(ports)
log_config = {
"type": "syslog",
"config": {
"syslog-address": f"udp://200.100.0.247:514",
# "syslog-format": "rfc5424", # Avoid sending the host's hostname
"tag": hostname,
}
}
# Defining log configuratión and port binding
host_config = self.client.api.create_host_config(
log_config=log_config,
port_bindings=ports,
)
endpoint_config = self.client.api.create_endpoint_config(
ipv4_address=ipv4_address,
)
networking_config = self.client.api.create_networking_config({
network_name: endpoint_config,
})
# Continue with deploying the service without network
container = self.client.api.create_container(
image_name,
detach=True,
hostname=hostname,
name=name,
host_config=host_config,
networking_config=networking_config
# extra_hosts=extra_hosts, # Pass extra_hosts here
)
#self.client.api.connect_container_to_network(
# container=container["Id"],
# net_id=network_id,
# ipv4_address=ipv4_address,
#)
#Once we have the container, start it
starting = self.client.api.start(container=container.get('Id'))
self.logger.debug(f"Container ID: {container.get('Id')}")
[docs]
def start(self, container_id):
"""
Starts a Docker container.
Args:
container_id (str): The ID of the container to start.
Returns:
None
"""
try:
self.client.api.start(container_id)
self.logger.info(f"Container {container_id} started successfully.")
except docker.errors.APIError as e:
self.logger.error(f"Failed to start container {container_id}: {e}")
[docs]
def stop(self, container_id):
"""
Stops a Docker container.
Args:
container_id (str): The ID of the container to stop.
Returns:
None
"""
try:
self.client.api.stop(container_id)
self.logger.info(f"Container {container_id} stopped successfully.")
except docker.errors.APIError as e:
self.logger.error(f"Failed to stop container {container_id}: {e}")
[docs]
def clean_container(self, name):
"""
Stop and remove a Docker container with the specified name.
Args:
name (str): The name of the Docker container to be cleaned.
Returns:
None
"""
name = f"DolosT-{name}"
dolost_containers = self.get_containers()
for container in dolost_containers:
if name == container['Names'][0].lstrip('/'):
self.logger.info(f"Cleaning Docker container '{name}'...")
self.client.api.remove_container(container=name, force=True)
self.logger.info(f"Container '{name}' successfully cleaned!")
[docs]
def clean_networks(self):
"""
Remove unused Docker networks with the prefix DolosT-.
Returns:
None
"""
network_name_prefix = "DolosT-"
dolost_networks = self.client.networks.list(names="DolosT-*")
for network in dolost_networks:
if network.containers == []:
try:
self.client.api.remove_network(network.id)
self.logger.info("Network " + network.name + " successfully cleaned!")
except:
self.logger.info("Network " + network.name + " cannot be cleaned, maybe its in use")
[docs]
def create_network(self, network_name, subnet, gateway):
"""
Create a Docker network with the specified name, subnet, and gateway if it does not already exist.
Args:
network_name (str): The name of the network.
subnet (str): The subnet in CIDR notation (e.g., '10.0.0.0/24').
gateway (str): The gateway IP address for the network.
Returns:
None
"""
try:
networks = self.client.networks.get(network_name)
except docker.errors.NotFound:
ipam_pool = docker.types.IPAMPool(subnet=subnet, gateway=gateway)
ipam_config = docker.types.IPAMConfig(pool_configs=[ipam_pool])
self.client.networks.create(network_name, driver="bridge", ipam=ipam_config)
[docs]
def connect_to_network(self, container_name, network_name, ipv4_address, gateway):
"""
Connect a Docker container to a network with the specified name, IP address, and gateway.
Args:
container_name (str): The name of the container to connect to the network.
network_name (str): The name of the network.
ipv4_address (str): The IPv4 address to assign to the container.
gateway (str): The gateway IP address for the container's network.
Returns:
None
"""
networks = self.client.networks.get(network_name)
# Use extra_hosts to set the gateway
extra_hosts = {container_name: ipv4_address, "gateway": gateway}
networks.connect(container=container_name, ipv4_address=ipv4_address, extra_hosts=extra_hosts)
[docs]
def get_containers(self, all:bool=True):
"""
Get the docker containers
Args:
all (bool): Retrieve all the containers, active or inactive ones.
Returns:
containers (list): The list of containers
"""
return self.client.api.containers(all=all)
[docs]
class ContainerStatsManager(DockerManager):
"""
A class for managing container statistics retrieval and calculations.
Inherits from DockerManager.
Attributes:
logger (DolostLogger, optional): The logger instance for logging messages.
image_name (str, optional): The name of the Docker image.
decoy_files (str, optional): The name of the service files directory.
dockerfile_path (str, optional): The path to the Dockerfile.
Methods:
fetch_container_stats(container_id):
Fetches stats for a specific container identified by its ID.
calculate_cpu_percentage(precpu_stats, cpu_stats):
Calculates the CPU usage percentage based on the provided CPU stats.
calculate_memory_percentage(memory_stats):
Calculates the memory usage percentage based on the provided memory stats.
parse_container_stats(stats):
Parses the raw container stats and returns a dictionary containing calculated statistics.
"""
def __init__(self):
"""
Initialize a ContainerStatsManager instance.
Returns:
None
"""
docker_manager_instance = DockerManager.get_instance()
self.logger = docker_manager_instance.logger
self.client = docker_manager_instance.client
[docs]
def fetch_container_stats(self, container_id):
"""
Fetches stats for a specific container identified by its ID.
Args:
container_id (str): The ID of the container.
Returns:
dict: A dictionary containing various container statistics.
"""
stats = self.client.api.stats(container_id, stream=False)
return self.parse_container_stats(stats)
[docs]
def calculate_cpu_percentage(self, precpu_stats, cpu_stats):
"""
Calculates the CPU usage percentage based on the provided CPU stats.
Args:
precpu_stats (dict): The previous CPU stats.
cpu_stats (dict): The current CPU stats.
Returns:
float: The CPU usage percentage.
"""
cpu_delta = cpu_stats['cpu_usage']['total_usage'] - precpu_stats['cpu_usage']['total_usage']
system_delta = cpu_stats['system_cpu_usage'] - precpu_stats['system_cpu_usage']
number_cpus = cpu_stats['online_cpus']
cpu_percentage = (cpu_delta / system_delta) * number_cpus * 100.0 if system_delta > 0 and cpu_delta > 0 else 0
return cpu_percentage
[docs]
def calculate_memory_percentage(self, memory_stats):
"""
Calculates the memory usage percentage based on the provided memory stats.
Args:
memory_stats (dict): The memory stats.
Returns:
float: The memory usage percentage.
"""
memory_usage = memory_stats['usage']
memory_limit = memory_stats['limit']
memory_percentage = (memory_usage / memory_limit) * 100.0 if memory_limit > 0 else 0
return memory_percentage
[docs]
def parse_container_stats(self, stats):
"""
Parses the raw container stats and returns a dictionary containing calculated statistics.
Args:
stats (dict): The raw container stats.
Returns:
dict: A dictionary containing various container statistics.
"""
cpu_percentage = self.calculate_cpu_percentage(stats['precpu_stats'], stats['cpu_stats'])
memory_percentage = self.calculate_memory_percentage(stats['memory_stats'])
network_rx = stats['networks']['eth0']['rx_bytes'] if 'networks' in stats else 0
network_tx = stats['networks']['eth0']['tx_bytes'] if 'networks' in stats else 0
disk_read, disk_write = 0, 0
try:
if 'blkio_stats' in stats and 'io_service_bytes_recursive' in stats['blkio_stats']:
for stat in stats['blkio_stats']['io_service_bytes_recursive']:
if stat['op'] == 'Read':
disk_read += stat['value']
elif stat['op'] == 'Write':
disk_write += stat['value']
except TypeError:
pass
return {
'cpu_percentage': cpu_percentage,
'memory_percentage': memory_percentage,
'network_rx': network_rx,
'network_tx': network_tx,
# 'disk_read': disk_read,
# 'disk_write': disk_write,
}