from flask import Blueprint, request, jsonify, Response, url_for
from functools import wraps
from ..services.docker_manager import DockerManager
from ..services.logger import DolostLogger
import os, json
from ..context import Context
# Construct the path to dockerfiles_templates
dockerfiles_path = os.path.join(Context.base_dir, 'dockerfiles_templates')
api_blueprint = Blueprint('api', __name__)
docker_manager = DockerManager.get_instance()
logger = DolostLogger.get_instance()
# Decorator for error handling
[docs]
def error_handling(func):
"""
Decorator for error handling.
Args:
func (function): The function to be decorated.
Returns:
function: The wrapper function for error handling.
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f'Error during operation: {str(e)}')
raise
return jsonify({"status": "error", "message": str(e)}), 500
wrapper.__name__ = func.__name__
return wrapper
# Load existing operations from the JSON file
[docs]
def load_operations_db():
"""
Load existing operations from the JSON file.
Returns:
list: A list containing operations data loaded from the JSON file.
"""
try:
with open('operationsdb.json', 'r') as f:
operations = json.load(f)
except FileNotFoundError:
operations = []
return operations
# Update the JSON file with modified operations data
[docs]
def update_operations_db(data):
"""
Update the JSON file with modified operations data.
Args:
data (list): The modified operations data to be written to the JSON file.
"""
with open('operationsdb.json', 'w') as f:
json.dump(data, f, indent=4)
# Update an operation based on it's ID
[docs]
def update_operation_entry(operation_id, new_operation_content, update_decoys:bool=False):
"""
Update an operation based on its ID.
Args:
operation_id (int): The ID of the operation to be updated.
new_operation_content (dict): The new content for the operation.
update_decoys (bool, optional): Whether to update decoys data. Defaults to False.
"""
operations = load_operations_db()
for operation in operations:
if operation.get('id') == operation_id:
# Update the operation content
for key, value in new_operation_content.items():
operation[key] = value
# Update the decoys if update_decoys is True
if update_decoys:
operation['decoys'] = new_operation_content.get('decoys', operation.get('decoys', []))
break
update_operations_db(operations)
# Modify the operations's decoys
[docs]
def update_operation_decoys(operation_id, decoy):
"""
Modify the operation's decoys.
Args:
operation_id (int): The ID of the operation.
decoy (dict): Decoy information to be added to the operation.
"""
operations_data = load_operations_db()
operation = find_operation_by_id(operation_id)
# Ensure that the "decoys" key holds a list
if "decoys" not in operation or not isinstance(operation["decoys"], list):
operation["decoys"] = []
# Filter out any additional fields that are not needed for decoys
required_decoy_keys = ["Hostname", "Description", "IP", "Subnet", "Gateway", "DeceptionNetwork", "ServicePorts", "Service", "DecoyFiles"]
filtered_decoys = {key: decoy.get(key) for key in required_decoy_keys}
# Append the new operation decoys
operation["decoys"].append(filtered_decoys)
# Write the updated decoys into the file
return update_operation_entry(operation_id, operation, update_decoys=True)
# Generate ID for new operation based on the last ID in the JSON file
[docs]
def generate_new_id(operations):
"""
Generate ID for a new operation based on the last ID in the JSON file.
Args:
operations (list): List of existing operations.
Returns:
int: The ID for the new operation.
"""
if not operations:
return 1
else:
# We may need to implement a fail-safe approach
return operations[-1]['id'] + 1
# Get operation from DB using id
[docs]
def find_operation_by_id(operation_id):
"""
Get an operation from the database using its ID.
Args:
operation_id (int): The ID of the operation to find.
Returns:
dict: The operation data if found, otherwise None.
"""
operations = load_operations_db()
for operation in operations:
if operation.get('id') == operation_id:
return operation
return None # Return None if no operation with the given id is found
# Create a new operation
[docs]
@api_blueprint.route('/operations/new', methods=['POST'])
@error_handling
def new_operation():
"""
Create a new operation.
This endpoint allows the creation of a new operation. It expects a POST request with JSON data containing the necessary information for the new operation.
Returns:
jsonify: JSON response indicating the status of the operation creation.
- status (str): Status of the operation creation ("OK" or "error").
- message (str): Additional message providing details about the status.
- redirect (str, optional): URL to redirect the client to upon successful operation creation.
Raises:
BadRequest: If the request data is invalid or missing required fields.
"""
try:
required_keys = {
'preparation': ['name', 'objective', 'assets'],
'narrative': ['storytelling', 'deception_activities', 'monitoring'],
'closure_criteria': ['limits', 'end_date', 'commander'],
}
data = request.get_json()
# Filter out any additional fields that are not in the required keys
filtered_data = {}
for section, keys in required_keys.items():
if section in data:
filtered_data[section] = {key: data[section][key] for key in keys if key in data[section]}
else:
filtered_data[section] = {}
# Check if all required keys are present in the filtered data for each section
missing_keys = []
for section, keys in required_keys.items():
missing_keys.extend([f"Missing key '{key}' in section '{section}'" for key in keys if key not in filtered_data[section]])
if missing_keys:
return jsonify({"status": "error", "message": "\n".join(missing_keys)}), 400
# Check if the 'preparation' section is present and 'name' field is not empty
if not filtered_data['preparation'].get('name'):
return jsonify({"status": "error", "message": "The 'Operation Name' field under 'Preparation' section cannot be empty"}), 400
# Check if decoys are provided and filter out any additional keys
if 'decoys' in data:
decoys = data['decoys']
filtered_decoys = []
for decoy in decoys:
# Check if all required decoy keys are present
required_decoy_keys = ["Hostname", "Description", "IP", "Subnet", "Gateway", "DeceptionNetwork", "ServicePorts", "Service", "DecoyFiles"]
missing_decoy_keys = [key for key in required_decoy_keys if key not in decoy]
if missing_decoy_keys:
return jsonify({"status": "error", "message": f"Missing key(s) {', '.join(missing_decoy_keys)} in decoy"}), 400
filtered_decoys.append({key: decoy[key] for key in required_decoy_keys})
filtered_data['decoys'] = filtered_decoys
new_operation = filtered_data
actual_operations = load_operations_db()
# Generate ID for the new operation
new_id = generate_new_id(actual_operations)
# Add the new operation data to the list with the generated ID
new_operation['id'] = new_id
actual_operations.append(new_operation)
update_operations_db(actual_operations)
return jsonify({"status": "OK", "redirect": url_for('views.view_operation', operation_id=new_id)})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 400
# Edit an operation
[docs]
@api_blueprint.route('/operations/<int:operation_id>/edit', methods=['POST'])
@error_handling
def edit_operation(operation_id):
"""
Edit an existing operation.
Args:
operation_id (int): The ID of the operation to edit.
Returns:
jsonify: JSON response indicating the status of the operation editing.
"""
try:
required_keys = {
'preparation': ['name', 'objective', 'assets'],
'narrative': ['storytelling', 'deception_activities', 'monitoring'],
'closure_criteria': ['limits', 'end_date', 'commander'],
}
data = request.get_json()
# Filter out any additional fields that are not in the required keys
filtered_data = {}
for section, keys in required_keys.items():
if section in data:
filtered_data[section] = {key: data[section][key] for key in keys if key in data[section]}
else:
filtered_data[section] = {}
# Check if all required keys are present in the filtered data for each section
missing_keys = []
for section, keys in required_keys.items():
missing_keys.extend([f"Missing key '{key}' in section '{section}'" for key in keys if key not in filtered_data[section]])
if missing_keys:
return jsonify({"status": "error", "message": "\n".join(missing_keys)}), 400
# Check if the 'preparation' section is present and 'name' field is not empty
if not filtered_data['preparation'].get('name'):
return jsonify({"status": "error", "message": "The 'Operation Name' field under 'Preparation' section cannot be empty"}), 400
edited_operation_data = filtered_data
update_operation_entry(operation_id, edited_operation_data, update_decoys=False)
return jsonify({"status": "OK", "redirect": url_for('views.view_operation', operation_id=operation_id)})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# Remove operation
[docs]
@api_blueprint.route('/operations/remove/<string:operation_id>', methods=['DELETE'])
@error_handling
def remove_operation(operation_id):
"""
Remove an operation.
Args:
operation_id (str): The ID of the operation to remove.
Returns:
jsonify: JSON response indicating the status of the operation removal.
"""
try:
operations_data = load_operations_db()
# Find the operation by ID
operation_to_remove = find_operation_by_id(int(operation_id))
if operation_to_remove:
# Delete the operation data
operations_data.remove(operation_to_remove)
update_operations_db(operations_data)
return jsonify({'message': f'We removed the operation {operation_id}'}), 202
else:
return jsonify({'message': f'Operation {operation_id} not found'}), 404
except Exception:
return jsonify({'message': f'We can not remove operation {operation_id}'}), 406
# Create a new decoy
[docs]
@api_blueprint.route('/operations/<int:operation_id>/decoy/new', methods=['POST'])
@error_handling
def new_decoy(operation_id):
"""
Create a new decoy for an operation.
Args:
operation_id (int): The ID of the operation to add the decoy to.
Returns:
jsonify: JSON response indicating the status of the new decoy creation.
"""
data = request.get_json()
update_operation_decoys(operation_id, data)
return jsonify({"status": "OK", "message": "New decoy saved"}), 200
# Helper function for deployment logic
[docs]
def deploy_service(image_name, dockerfile_path, decoy_files, service_info):
"""
Deploy a service using Docker.
Args:
image_name (str): The name of the Docker image.
dockerfile_path (str): The path to the Dockerfile.
decoy_files (str): The path to service files.
service_info (dict): Information about the service.
Returns:
None
"""
image_name = docker_manager.build_image(image_name=image_name, decoy_files=decoy_files, dockerfile_path=dockerfile_path)
docker_manager.run_container(image_name=image_name,**service_info)
logger.info(f"Deployed container: {service_info['hostname']}")
# Fetch current available Dockerfiles
[docs]
def get_supported_services():
"""
Fetches the current available Dockerfile services.
Returns:
list: A list of supported services.
"""
supported_services = []
for filename in os.listdir(dockerfiles_path):
if filename.startswith("Dockerfile-"):
service_name = filename.replace("Dockerfile-", "")
supported_services.append(service_name)
return supported_services
[docs]
@api_blueprint.route('/deploy', methods=['POST'])
@error_handling
def deploy_env():
"""
Endpoint for deploying the environment.
Returns:
Response: A response indicating the deployment status.
"""
data = request.get_json()
deploy_collector()
for decoy in data:
deploy_decoy(decoy)
return Response("Env Deployed", mimetype='text/plain')
[docs]
def deploy_collector():
"""
Deploy the collector service.
Returns:
None
"""
service_dockerfile_path = os.path.join(dockerfiles_path, 'Dockerfile-collector')
if not os.path.exists(service_dockerfile_path):
logger.error('Dockerfile not found for Collector')
return
collector_info = {
"hostname": "Collector",
"name": "Collector",
"network_name": "CollectorNetwork",
"ipv4_address": "200.100.0.247",
"subnet": "200.100.0.0/24",
"gateway": "200.100.0.1",
"ports": "{'514/tcp': 524, '514/udp': 524 }"
}
deploy_service(
image_name="dolost-collector",
dockerfile_path=service_dockerfile_path,
decoy_files="collector",
service_info=collector_info
)
[docs]
def deploy_decoy(decoy):
"""
Deploy a decoy service.
Args:
decoy (dict): Decoy information including hostname, network settings, and service details.
Returns:
None
"""
deploy_collector()
supported_services = get_supported_services()
decoyservice = decoy["Service"].lower()
if decoyservice not in supported_services:
logger.error(f"We do not yet offer the service: {decoyservice}")
return
decoy_info = {
"hostname": decoy["Hostname"],
"name": decoy["Hostname"],
"network_name": decoy["DeceptionNetwork"],
"ipv4_address": decoy["IP"],
"subnet": decoy["Subnet"],
"gateway": decoy["Gateway"],
"ports": decoy["ServicePorts"]
}
dockerfile_path = os.path.join(dockerfiles_path, f'Dockerfile-{decoyservice}')
deploy_service(
image_name=decoyservice,
dockerfile_path=dockerfile_path,
decoy_files=decoy["DecoyFiles"],
service_info=decoy_info
)
# Remove deployed decoys
[docs]
@api_blueprint.route('/decoys/clean', methods=['POST'])
@error_handling
def clean_env():
"""
Endpoint to remove deployed decoys.
Returns:
Response: Response indicating the success of the operation.
"""
# PENDING search for dolost tag
data = request.get_json()
for decoy in data:
clean_decoy(decoy)
logger.info(f"Removed decoys")
return Response("Decoys cleaned", mimetype='text/plain')
# Remove decoy
[docs]
def clean_decoy(decoy):
"""
Remove decoy.
Args:
decoy (dict): Decoy information.
Returns:
Response: Response indicating the success of the operation.
"""
decoy_to_clean = decoy["Hostname"]
docker_manager.clean_container(decoy_to_clean)
return Response("A decoy was cleaned", mimetype='text/plain')
# Remove collector
[docs]
@api_blueprint.route('/collector/clean', methods=['POST'])
@error_handling
def clean_collector():
"""
Remove collector.
Returns:
Response: Response indicating the success of the operation.
"""
collector_to_clean = "Collector"
docker_manager.clean_container(collector_to_clean)
logger.info(f"Removed Collector")
return Response("Collector cleaned", mimetype='text/plain')
# Remove networks
[docs]
@api_blueprint.route('/networks/clean', methods=['POST'])
@error_handling
def clean_networks():
"""
Remove networks.
Returns:
Response: Response indicating the success of the operation.
"""
docker_manager.clean_networks()
logger.info(f"Removed Networks")
return Response("Networks cleaned", mimetype='text/plain')
# Start an specific decoy
[docs]
@api_blueprint.route('/decoys/start', methods=['POST'])
@error_handling
def decoys_start():
"""
Start a specific decoy.
Returns:
Response: JSON response indicating the status of the operation.
"""
try:
data = request.get_json()
docker_manager.start(container_id=data['containerId'])
return jsonify({"status": "OK", "message": "The container has been started successfully"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# Stop an specific decoy
[docs]
@api_blueprint.route('/decoys/stop', methods=['POST'])
@error_handling
def decoys_stop():
"""
Stop a specific decoy.
Returns:
Response: JSON response indicating the status of the operation.
"""
try:
data = request.get_json()
docker_manager.stop(container_id=data['containerId'])
return jsonify({"status": "OK", "message": "The container has been stopped successfully"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# Deploy an specific decoy, including building and deployment
[docs]
@api_blueprint.route('/operations/<int:operation_id>/decoys/deploy', methods=['POST'])
@error_handling
def deploy_container(operation_id):
"""
Deploy a specific decoy, including building and deployment.
Args:
operation_id (int): The ID of the operation associated with the decoy.
Returns:
Response: JSON response indicating the status of the operation.
"""
try:
data = request.get_json()
operation_data = find_operation_by_id(operation_id)
decoy_data = None
for decoy in operation_data.get('decoys', []):
if "DolosT-" + decoy.get('Hostname') == data.get('containerHostname'):
decoy_data = decoy
break
if decoy_data:
deploy_decoy(decoy_data)
return jsonify({"status": "OK", "message": "The container has been deployed successfully"}), 200
else:
return jsonify({"status": "error", "message": "Decoy not found on stored decoys"}), 404
except Exception as e:
logger.error(f"Error deploying container: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
# Undeploy an specific container decoy
[docs]
@api_blueprint.route('/operations/<int:operation_id>/decoys/undeploy', methods=['DELETE'])
@error_handling
def undeploy_container(operation_id):
"""
Undeploy a container decoy for an operation.
Args:
operation_id (int): The ID of the operation containing the decoy.
Returns:
jsonify: JSON response indicating the status of the container undeployment.
"""
try:
data = request.get_json()
operation_data = find_operation_by_id(operation_id)
decoy_data = None
for decoy in operation_data.get('decoys', []):
if "DolosT-" + decoy.get('Hostname') == data.get('containerHostname'):
decoy_data = decoy
break
if decoy_data:
clean_decoy(decoy_data)
return jsonify({"status": "OK", "message": "The container has been undeployed successfully"}), 200
else:
return jsonify({"status": "error", "message": "Decoy not found on stored decoys"}), 404
except Exception as e:
logger.error(f"Error undeploying container: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
# Delete an specific decoy
[docs]
@api_blueprint.route('/operations/<int:operation_id>/decoys/delete', methods=['DELETE'])
@error_handling
def delete_decoy(operation_id):
"""
Delete a specific decoy from an operation.
Args:
operation_id (int): The ID of the operation containing the decoy.
Returns:
jsonify: JSON response indicating the status of the decoy deletion.
"""
try:
data = request.get_json()
decoy_to_delete = data["decoyHostname"]
decoy_to_delete = decoy_to_delete.replace("DolosT-", "")
operation = find_operation_by_id(operation_id)
operation['decoys'] = [decoy for decoy in operation['decoys'] if decoy["Hostname"] != decoy_to_delete]
print(operation)
update_operation_entry(operation_id, operation, update_decoys=True)
return jsonify({"status": "OK", "message": "The decoy has been deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting decoy: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
[docs]
@api_blueprint.route('/config/docker_client', methods=['POST'])
@error_handling
def modify_docker_client():
"""
Modify the Docker client configuration.
Returns:
jsonify: JSON response indicating the status of the Docker client configuration modification.
"""
def save_file(file_path, content):
with open(file_path, 'w') as f:
f.write(content)
# Check if the folder exists, if not, create it
if not os.path.exists(Context.CONFIG_FOLDER):
os.makedirs(Context.CONFIG_FOLDER)
data = request.get_json()
# Extract data from JSON payload
config_type = data.get('type')
dc = {}
if config_type == "env":
dc = {'from_env': True}
elif config_type == "tcp":
host = data.get('host')
port = data.get('port')
dc = {'tcp': f"tcp://{host}:{port}"}
elif config_type == "socket":
socketPath = data.get('socketPath')
dc = {'socket': socketPath}
elif config_type == 'tcp_ssl':
host = data.get('host')
port = data.get('port')
ssl_cert = data.get('sslCert')
ssl_key = data.get('sslKey')
ssl_ca = data.get('sslCa')
save_file(os.path.join(Context.CONFIG_FOLDER, 'ssl_cert.pem'), ssl_cert)
save_file(os.path.join(Context.CONFIG_FOLDER, 'ssl_key.pem'), ssl_key)
save_file(os.path.join(Context.CONFIG_FOLDER, 'ssl_ca.pem'), ssl_ca)
dc['tcp_ssl'] = {
'host': host,
'port': port,
'cert_path': os.path.join(Context.CONFIG_FOLDER, 'ssl_cert.pem'),
'key_path': os.path.join(Context.CONFIG_FOLDER, 'ssl_key.pem'),
'ca_path': os.path.join(Context.CONFIG_FOLDER, 'ssl_ca.pem')
}
else:
return jsonify({"status": "error", "message": "Invalid configuration sent"}), 400
try:
status = docker_manager.check_client_configuration(docker_client=dc)
if status:
docker_manager.configure_client(docker_client=dc)
docker_manager.check_connection()
return jsonify({"status": "OK", "message": "Docker client updated successfully"}), 200
else:
return jsonify({"status": "error", "message": "Could not connect to docker client"}), 400
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500