diff --git a/images/cadvisor-ui-image.png b/images/cadvisor-ui-image.png new file mode 100644 index 0000000000000000000000000000000000000000..1f4274c7296172874da923029459cb877f0bec61 Binary files /dev/null and b/images/cadvisor-ui-image.png differ diff --git a/images/jaeger-ui-image.png b/images/jaeger-ui-image.png new file mode 100644 index 0000000000000000000000000000000000000000..bbbe32b348ed8fce539c4554131ebf10f95bf1ba Binary files /dev/null and b/images/jaeger-ui-image.png differ diff --git a/images/object-recognizer-ui-image.png b/images/object-recognizer-ui-image.png new file mode 100644 index 0000000000000000000000000000000000000000..43cce140e8b378d6e1d00a6af5a1a13fab9c509c Binary files /dev/null and b/images/object-recognizer-ui-image.png differ diff --git a/images/prometheus-ui-image.png b/images/prometheus-ui-image.png new file mode 100644 index 0000000000000000000000000000000000000000..8db24869abe6e9e65600db8828f051a9e29d82d6 Binary files /dev/null and b/images/prometheus-ui-image.png differ diff --git a/images/zipkin-ui-image.png b/images/zipkin-ui-image.png new file mode 100644 index 0000000000000000000000000000000000000000..b6e3c29b8d00cf0bba92d9716bf634e5359e6360 Binary files /dev/null and b/images/zipkin-ui-image.png differ diff --git a/readme.md b/readme.md index aee8cfb6ae05ba3df8ebd33f73c50ac45d685a05..906672130a781607e50662e18d305bc884b1255a 100644 --- a/readme.md +++ b/readme.md @@ -328,11 +328,21 @@ services: ``` 2. **Access the services**: - - Access Jaeger UI: [http://localhost:16686](http://localhost:16686) - - Access Zipkin UI: [http://localhost:9411](http://localhost:9411) - - Access Prometheus UI: [http://localhost:9090](http://localhost:9090) - - Access cAdvisor UI: [http://localhost:8080](http://localhost:8080) - - Access object recognizer interface : [http://localhost:5000](http://localhost:5000) + + - Access Jaeger UI: [http://localhost:16686](http://localhost:16686) +  + + - Access Zipkin UI: [http://localhost:9411](http://localhost:9411) +  + + - Access Prometheus UI: [http://localhost:9090](http://localhost:9090) +  + + - Access cAdvisor UI: [http://localhost:8080](http://localhost:8080) +  + + - Access Object Recognizer Interface: [http://localhost:5000](http://localhost:5000) +  ## **Deploying on K3S using Enoslib on Grid'5000** diff --git a/services/camera/src/camera.py b/services/camera/src/camera.py index c74b37194b797c417c3134085f9bfacdd233e6ce..eb2f091d239f01f1ab947717620aa89d0a133e4c 100644 --- a/services/camera/src/camera.py +++ b/services/camera/src/camera.py @@ -9,7 +9,7 @@ import os # Import os for environment variables from telemetry.tracerprovider import tracer, meter import imutils # pip install imutils import logging - +from opentelemetry.propagate import inject # Configure logging logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', @@ -108,8 +108,10 @@ def main(): logging.info(f"Motion: {motion}") while (frame_number < interval_frame_count) or (not motion): frame_number += 1 - with tracer.start_as_current_span("sending frame") as span: + carrier = {} + with tracer.start_as_current_span("sending frame from camera") as span: try: + inject(carrier) img, frame = vid.read() if not img: if motion: @@ -126,7 +128,9 @@ def main(): data = { 'frame': frame, - 'capture_time': current_time + 'capture_time': current_time, + 'carrier': carrier + } a = pickle.dumps(data) message = struct.pack("Q", len(a)) + a diff --git a/services/motion_detector/src/motion_detection.py b/services/motion_detector/src/motion_detection.py index a6b74122f9b02855748d55d5ac178e110fcfc560..c02ac1204cdf421a9b3c3513a78ce3b66d363243 100644 --- a/services/motion_detector/src/motion_detection.py +++ b/services/motion_detector/src/motion_detection.py @@ -11,7 +11,9 @@ import queue import psutil import imutils import logging # Import the logging module -from telemetry.tracerprovider import meter +from telemetry.tracerprovider import tracer , meter +from opentelemetry import trace +from opentelemetry.propagate import extract, inject # Configure logging logging.basicConfig( @@ -40,6 +42,7 @@ def receive_frames(client_socket, frame_queue): unit="s" ) while True: + try: while len(data) < payload_size: packet = client_socket.recv(4 * 1024) # 4K @@ -106,77 +109,87 @@ def process_frames(addr, frame_queue, or_host_ip, or_port): except queue.Empty: logging.info(f"Queue is empty for {addr}, waiting for frames...") continue - - frame = recieveddata['frame'] - logging.debug("Frame loaded for processing.") - text = f"CLIENT: {addr}" - - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - gray = cv2.GaussianBlur(gray, (21, 21), 0) - logging.debug("Frame converted to grayscale and blurred.") - - if firstFrame is None: - firstFrame = gray - logging.info("Initialized reference frame for motion detection.") - continue - - starting_processing_time = time.time() - fps_frame_count += 1 - elapsed_time = time.time() - fps_start_time - if elapsed_time >= 10.0: - fps = round(fps_frame_count / elapsed_time, 2) - fps_count.set(fps) - logging.info(f"FPS: {fps}") - fps_frame_count = 0 - fps_start_time = time.time() - - frameDelta = cv2.absdiff(firstFrame, gray) - thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] - thresh = cv2.dilate(thresh, None, iterations=2) - cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE) - cnts = imutils.grab_contours(cnts) - - detected_cnt = 0 - detected = False - for c in cnts: - if cv2.contourArea(c) < 10000: - continue - detected = True - detected_cnt += 1 - logging.debug(f"Motion detected. Contour area: {cv2.contourArea(c)}.") - - (x, y, w, h) = cv2.boundingRect(c) - cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) - text = "md: motion detected" - if detected: - logging.info("Motion detected, preparing to send frame to Object Recognizer.") - md_detected_motion.set(1) - orclient_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - orclient_socket.connect((or_host_ip, or_port)) - ordata = { - "frame": frame, - "capture_time": recieveddata["capture_time"], - "sentfromedgetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } - while True: - try: - a = pickle.dumps(ordata) - message = struct.pack("Q", len(a)) + a - logging.info(f"Packet size: {len(message)} bytes.") # Log the size of the packet - orclient_socket.sendall(message) - logging.info("Frame successfully sent to Object Recognizer.") - break - except Exception as e: - logging.error(f"Sending frame to Object Recognizer failed: {e}") - time.sleep(1) + received_carrier=recieveddata['carrier'] + extracted_context = extract(received_carrier) + carrier = {} + with tracer.start_as_current_span("processing the frame",context=extracted_context) as span: + try: + inject(carrier) + frame = recieveddata['frame'] + logging.debug("Frame loaded for processing.") + text = f"CLIENT: {addr}" + + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + gray = cv2.GaussianBlur(gray, (21, 21), 0) + logging.debug("Frame converted to grayscale and blurred.") + + if firstFrame is None: + firstFrame = gray + logging.info("Initialized reference frame for motion detection.") continue - orclient_socket.close() - - md_detected_motion.set(0) - processing_time.set(time.time() - starting_processing_time) - logging.debug("Processing time logged for frame.") + starting_processing_time = time.time() + fps_frame_count += 1 + elapsed_time = time.time() - fps_start_time + if elapsed_time >= 10.0: + fps = round(fps_frame_count / elapsed_time, 2) + fps_count.set(fps) + logging.info(f"FPS: {fps}") + fps_frame_count = 0 + fps_start_time = time.time() + + frameDelta = cv2.absdiff(firstFrame, gray) + thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] + thresh = cv2.dilate(thresh, None, iterations=2) + cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE) + cnts = imutils.grab_contours(cnts) + + detected_cnt = 0 + detected = False + for c in cnts: + if cv2.contourArea(c) < 10000: + continue + detected = True + detected_cnt += 1 + logging.debug(f"Motion detected. Contour area: {cv2.contourArea(c)}.") + + (x, y, w, h) = cv2.boundingRect(c) + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + text = "md: motion detected" + if detected: + logging.info("Motion detected, preparing to send frame to Object Recognizer.") + + md_detected_motion.set(1) + orclient_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + orclient_socket.connect((or_host_ip, or_port)) + ordata = { + "frame": frame, + "capture_time": recieveddata["capture_time"], + "sentfromedgetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'carrier':carrier + } + while True: + try: + a = pickle.dumps(ordata) + message = struct.pack("Q", len(a)) + a + logging.info(f"Packet size: {len(message)} bytes.") # Log the size of the packet + orclient_socket.sendall(message) + logging.info("Frame successfully sent to Object Recognizer.") + break + except Exception as e: + logging.error(f"Sending frame to Object Recognizer failed: {e}") + time.sleep(1) + continue + orclient_socket.close() + + md_detected_motion.set(0) + processing_time.set(time.time() - starting_processing_time) + logging.debug("Processing time logged for frame.") + except Exception as e: + logging.error(f"Error processing frame: {e}") + + break def main(): # Retrieve environment variables instead of command-line arguments diff --git a/services/motion_detector/src/telemetry/tracerprovider.py b/services/motion_detector/src/telemetry/tracerprovider.py index 68cb24d89702fe33fa301c44edbc0531e7fc0d03..515d81cf54a6368b1f5f998d4dd3430ac7db7e3c 100644 --- a/services/motion_detector/src/telemetry/tracerprovider.py +++ b/services/motion_detector/src/telemetry/tracerprovider.py @@ -33,7 +33,7 @@ otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure= trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(otlp_trace_exporter) ) -tracer = trace.get_tracer("md tracer") +tracer = trace.get_tracer("motion detector tracer") # Set up the meter provider for metrics meter_provider = MeterProvider() @@ -49,4 +49,4 @@ reader = PeriodicExportingMetricReader( ) meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meterProvider) -meter = metrics.get_meter("md meter") \ No newline at end of file +meter = metrics.get_meter("motion detector meter") \ No newline at end of file diff --git a/services/object_recognizer/src/object_recognizer.py b/services/object_recognizer/src/object_recognizer.py index e698fb0e1807c0af6149a2e9eca104ba5a24f712..68106b2b6397f9487b3b3f4573df82841998cbb8 100644 --- a/services/object_recognizer/src/object_recognizer.py +++ b/services/object_recognizer/src/object_recognizer.py @@ -6,10 +6,10 @@ import pickle import struct import cv2 import logging - +from opentelemetry.propagate import extract from model.model import recognize from webserver import run_flask -from telemetry.tracerprovider import meter +from telemetry.tracerprovider import meter, tracer # Configure logging logging.basicConfig( @@ -48,7 +48,12 @@ def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h, classes def process_frame(mddata): - recognize(mddata['frame']) + received_carrier = mddata['carrier'] + extracted_context = extract(received_carrier) + carrier = {} + with tracer.start_as_current_span("detecting motion in the frame", context=extracted_context) as span: + + recognize(mddata['frame']) def receive_frames(client_socket, frame_queue): data = b"" diff --git a/services/object_recognizer/src/webserver.py b/services/object_recognizer/src/webserver.py index 37a550c9982c6a5e5bd02adc34f5f2a44635985f..99e673e98b5f2b46aaae356dd6bfdc32ede32f87 100644 --- a/services/object_recognizer/src/webserver.py +++ b/services/object_recognizer/src/webserver.py @@ -1,30 +1,68 @@ -from flask import Flask, send_file -import threading +from flask import Flask, send_file, render_template_string import os -import time # Flask application setup app = Flask(__name__) IMAGE_PATH = "/app/output/res.jpg" + # Disable browser caching by adding cache headers @app.after_request def add_no_cache_headers(response): response.cache_control.no_store = True return response + @app.route("/") def home(): return "<h1>Object Recognizer</h1><p>Go to <a href='/image'>/image</a> to see the latest detection.</p>" + @app.route("/image") def serve_image(): if os.path.exists(IMAGE_PATH): - return send_file(IMAGE_PATH, mimetype='image/jpeg') + html_template = """ + <!DOCTYPE html> + <html> + <head> + <title>Object Recognizer</title> + <script> + // Refresh the image every 5 seconds + function refreshImage() { + const image = document.getElementById('image'); + image.src = '/static_image?' + new Date().getTime(); // Append timestamp to bypass cache + } + + // Update the date-time every second + function updateDateTime() { + const now = new Date(); + document.getElementById('datetime').innerText = now.toLocaleString(); + } + + setInterval(refreshImage, 5000); // Refresh every 5 seconds + setInterval(updateDateTime, 1000); // Update date-time every second + </script> + </head> + <body> + <h1>Object Recognizer</h1> + <p id="datetime"></p> + <img id="image" src="/static_image" alt="Latest detection" width="600"> + </body> + </html> + """ + return render_template_string(html_template) else: return "<h1>No image available</h1>", 404 +@app.route("/static_image") +def static_image(): + if os.path.exists(IMAGE_PATH): + return send_file(IMAGE_PATH, mimetype='image/jpeg') + else: + return "", 404 + + # Flask server runner def run_flask(): - app.run(host="0.0.0.0", port=5000, debug=False, threaded=True) \ No newline at end of file + app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)