Source code for DOLOST.blueprints.websocket

from flask_socketio import Namespace, emit
from ..services.logger import DolostLogger
from ..services.docker_manager import ContainerStatsManager
from ..services.activity import ActivityViewer
import threading

[docs] class WebSocketNamespace(Namespace): """Namespace for WebSocket connections.""" def __init__(self, namespace=None): """ Initialize the WebSocketNamespace. :param str namespace: The namespace for the WebSocket. """ super().__init__(namespace) self.logger = DolostLogger.get_instance() self.activity_logs_thread = None self.thread_stop_event = threading.Event() # Event to signal thread termination
[docs] def on_connect(self, *args): """ Callback function triggered when a client connects. :param \*args: Variable length argument list. """ self.logger.trace("[WS] Client connected", ws=False) emit('heartbeat', {'data': 'Connected'})
[docs] def on_disconnect(self, *args): """ Callback function triggered when a client disconnects. :param \*args: Variable length argument list. """ self.logger.trace("[WS] Client disconnected", ws=False) if self.activity_logs_thread: self.thread_stop_event.set()
[docs] def on_request_data(self, json_data): """ Callback function triggered on request for data. :param dict json_data: JSON object containing the request data. Should contain 'container_id' key. """ if 'container_id' in json_data and isinstance(json_data['container_id'], str): container_id = json_data['container_id'] data = ContainerStatsManager().fetch_container_stats(container_id) emit('update_data', data) else: emit('update_data_error', {'error': 'Invalid or missing container_id'})
[docs] def send_activity_logs(self): """ Send activity logs to clients. Continuously fetches and emits activity logs until stop event is set. """ auxcount = 0 while not self.thread_stop_event.is_set(): logs = ActivityViewer.review_logs() self.emit('activity_logs', {'logs': logs}) observable_ips = ActivityViewer.review_observable_ips() self.emit('activity_observable_ips', {'observable_ips': observable_ips}) self.thread_stop_event.wait(5) # Wait for 5 second before sending next update # Auxiliary function turn on crond every 60 seconds if (auxcount == 0): ActivityViewer.turn_on_crond() auxcount = 60 auxcount = auxcount - 1
[docs] def on_request_activity(self): """ Callback function triggered on request for activity logs. Starts a new thread to continuously send activity logs to clients. """ if not self.activity_logs_thread or not self.activity_logs_thread.is_alive(): # Start a new thread only if it's not already running self.thread_stop_event.clear() # Clear the event flag self.activity_logs_thread = threading.Thread(target=self.send_activity_logs) self.activity_logs_thread.start()