From da9e83e85ad4e29a20f1cdeb86c2944b423d5777 Mon Sep 17 00:00:00 2001
From: martin molli <martin.molli@inria.fr>
Date: Thu, 20 Mar 2025 16:52:11 +0100
Subject: [PATCH] Swapped socket for zmq

---
 services/camera/requirements.txt              |   3 +-
 services/camera/src/camera.py                 |  63 ++---
 services/common/__init__.py                   |   0
 .../common/camera_to_motiondetector_packet.py |  35 +++
 ...on_detector_to_object_recognizer_packet.py |  44 ++++
 services/motion_detector/requirements.txt     |   3 +-
 .../motion_detector/src/motion_detection.py   | 247 ++++++++----------
 services/object_recognizer/requirements.txt   |   3 +-
 .../src/object_recognizer.py                  | 160 +++++-------
 9 files changed, 273 insertions(+), 285 deletions(-)
 create mode 100644 services/common/__init__.py
 create mode 100644 services/common/camera_to_motiondetector_packet.py
 create mode 100644 services/common/motion_detector_to_object_recognizer_packet.py

diff --git a/services/camera/requirements.txt b/services/camera/requirements.txt
index 109b87e..ee39053 100644
--- a/services/camera/requirements.txt
+++ b/services/camera/requirements.txt
@@ -6,4 +6,5 @@ opentelemetry-api
 opentelemetry-sdk
 opentelemetry-exporter-jaeger
 opentelemetry-exporter-otlp
-opentelemetry-exporter-otlp-proto-grpc
\ No newline at end of file
+opentelemetry-exporter-otlp-proto-grpc
+pyzmq
\ No newline at end of file
diff --git a/services/camera/src/camera.py b/services/camera/src/camera.py
index 2c53c4d..b1f5f72 100644
--- a/services/camera/src/camera.py
+++ b/services/camera/src/camera.py
@@ -1,19 +1,18 @@
+import logging
+import os  # Import os for environment variables
 import random
-import socket
+import time
 from contextlib import contextmanager
 
 import cv2
-import pickle
-import struct
-import time
-import datetime
-import os  # Import os for environment variables
-
-
-from telemetry.tracerprovider import tracer, meter
 import imutils  # pip install imutils
-import logging
+import zmq
 from opentelemetry.propagate import inject
+from zmq.backend.cffi import Socket
+
+from services.common.camera_to_motiondetector_packet import CameraToMotionDetectorPacket
+from telemetry.tracerprovider import tracer, meter
+
 
 @contextmanager
 def videocapture(*args, **kwargs):
@@ -53,6 +52,7 @@ MIN_FRAME_TIME:float = 1.0 / frame_rate
 camera = os.getenv("CAMERA", "false").lower() == "true"
 animal_name = os.getenv("ANIMAL_NAME", "tiger")
 appearance_rate = int(os.getenv("APPEARANCE_RATE", 600))
+camera_index = int(os.getenv("INDEX", -1))
 host_ip = os.getenv("MDHOST", "localhost")
 port = int(os.getenv("MDPORT", 9998))
 
@@ -91,28 +91,17 @@ def generate_random_intervals(events_per_hour):
 
 def main():
     #Connect to motion detector and stream video
-    while True:
-        client_socket = create_socket_and_connect(host_ip, port)
-        stream_video_clip(client_socket)
+    context = zmq.Context()
+    socket = context.socket(zmq.PAIR)
+    logging.info(f"Attempting to connect to {host_ip}:{port}")
+    socket.connect(f"tcp://{host_ip}:{port}")
+    stream_video_clip(socket)
+    logging.info(f"Connected to {host_ip}:{port}")
 
 
-def create_socket_and_connect(host_ip:str, port:int):
-    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    while True:
-        try:
-            logging.info(f"Attempting to connect to {host_ip}:{port}")
-            client_socket.connect((host_ip, port))
-            logging.info(f"Connected to {host_ip}:{port}")
-            break
-        except Exception as e:
-            logging.warning(f"Cannot connect to motion detector: {e}")
-            time.sleep(1)
-            continue
-    return client_socket
-
-
-def stream_video_clip(client_socket: socket):
-    if not client_socket:
+
+def stream_video_clip(socket):
+    if not socket:
         return
 
     # Initialize FPS calculation
@@ -162,16 +151,8 @@ def stream_video_clip(client_socket: socket):
                                 # Resize frame for consistent processing
                                 frame = imutils.resize(frame, width=640)
 
-                                # Prepare data packet
-                                data_packet = {
-                                    'frame': frame,
-                                    'capture_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
-                                    'carrier': carrier
-                                }
-
-                                serialized_data = pickle.dumps(data_packet)
-                                message = struct.pack("Q", len(serialized_data)) + serialized_data
-                                client_socket.sendall(message)
+                                packet= CameraToMotionDetectorPacket(frame, str(camera_index), carrier)
+                                socket.send(packet.pickle())
 
                                 frame_number += 1
 
@@ -186,7 +167,7 @@ def stream_video_clip(client_socket: socket):
                                 time.sleep(max(0.0, sleep_time))
             except Exception as error:
                 logging.error(f"Error sending frame: {error}")
-                client_socket.close()
+                socket.close()
                 return
 
 
diff --git a/services/common/__init__.py b/services/common/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/common/camera_to_motiondetector_packet.py b/services/common/camera_to_motiondetector_packet.py
new file mode 100644
index 0000000..c39cd44
--- /dev/null
+++ b/services/common/camera_to_motiondetector_packet.py
@@ -0,0 +1,35 @@
+import datetime
+import pickle
+
+import cv2
+import numpy as np
+
+
+class CameraToMotionDetectorPacket:
+    def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, telemetry_carrier: dict):
+        self._opencv_frame: np.ndarray =  cv2.imencode('.jpg', opencv_frame)[1]
+        self.source_camera_identifier = source_camera_identifier
+        self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        self.telemetry_carrier = telemetry_carrier
+
+    def pickle (self):
+        #Make sure timestamp is up to date before sending
+        self.timestamp=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        data = {
+            "opencv_frame": self._opencv_frame,
+            "source_camera_identifier": self.source_camera_identifier,
+            "timestamp": self.timestamp,
+            "telemetry_carrier": self.telemetry_carrier
+        }
+
+        return pickle.dumps(data)
+
+    def decode_frame(self):
+        return cv2.imdecode(self._opencv_frame, cv2.IMREAD_COLOR)
+
+    @staticmethod
+    def unpickle(buffer):
+        data = pickle.loads(buffer)
+        return CameraToMotionDetectorPacket(cv2.imdecode(data["opencv_frame"], cv2.IMREAD_COLOR),
+                                            data["source_camera_identifier"],
+                                            data["telemetry_carrier"])
diff --git a/services/common/motion_detector_to_object_recognizer_packet.py b/services/common/motion_detector_to_object_recognizer_packet.py
new file mode 100644
index 0000000..805ab96
--- /dev/null
+++ b/services/common/motion_detector_to_object_recognizer_packet.py
@@ -0,0 +1,44 @@
+import datetime
+import pickle
+
+import cv2
+import numpy as np
+
+
+class MotionDetectorToObjectRecognizerPacket:
+    def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, camera_timestamp, telemetry_carrier: dict, contour: np.ndarray):
+        _, self._opencv_frame = cv2.imencode('.jpg', opencv_frame)
+        self.source_camera_identifier = source_camera_identifier
+        self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        self.camera_timestamp = camera_timestamp
+        self.telemetry_carrier = telemetry_carrier
+        self.contour = contour  # OpenCV contour data
+
+    def pickle(self):
+        self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+        data = {
+            "opencv_frame": self._opencv_frame,
+            "source_camera_identifier": self.source_camera_identifier,
+            "timestamp": self.timestamp,
+            "camera_timestamp" : self.camera_timestamp,
+            "telemetry_carrier": self.telemetry_carrier,
+            "contour": self.contour
+        }
+
+        return pickle.dumps(data)
+
+
+    def decode_frame(self):
+        return cv2.imdecode(self._opencv_frame, cv2.IMREAD_COLOR)
+
+    @staticmethod
+    def unpickle(buffer):
+        data = pickle.loads(buffer)
+
+        return MotionDetectorToObjectRecognizerPacket(cv2.imdecode(data["opencv_frame"], cv2.IMREAD_COLOR),
+                                            data["source_camera_identifier"],
+                                            data["camera_timestamp"],
+                                            data["telemetry_carrier"],
+                                            data["contour"])
+
diff --git a/services/motion_detector/requirements.txt b/services/motion_detector/requirements.txt
index f18cee3..b487e26 100644
--- a/services/motion_detector/requirements.txt
+++ b/services/motion_detector/requirements.txt
@@ -6,4 +6,5 @@ opentelemetry-sdk
 opentelemetry-exporter-jaeger
 opentelemetry-exporter-otlp
 opentelemetry-exporter-otlp-proto-grpc
-psutil
\ No newline at end of file
+psutil
+pyzmq
\ No newline at end of file
diff --git a/services/motion_detector/src/motion_detection.py b/services/motion_detector/src/motion_detection.py
index c8c4d67..5b0b145 100644
--- a/services/motion_detector/src/motion_detection.py
+++ b/services/motion_detector/src/motion_detection.py
@@ -1,20 +1,21 @@
 # Add logging for key events and metrics
-import os
 import datetime
-import socket
-import cv2
+import logging  # Import the logging module
+import os
 import pickle
-import struct
+import socket
 import time
-import threading
-import queue
-import psutil
+
+import cv2
 import imutils
-import logging  # Import the logging module
-from telemetry.tracerprovider import tracer , meter
-from opentelemetry import trace
+import psutil
+import zmq
 from opentelemetry.propagate import extract, inject
 
+from services.common.camera_to_motiondetector_packet import CameraToMotionDetectorPacket
+from services.common.motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket
+from telemetry.tracerprovider import tracer, meter
+
 # Configure logging
 logging.basicConfig(
     format='%(asctime)s - %(levelname)s - %(message)s',
@@ -25,107 +26,62 @@ logging.basicConfig(
     ]
 )
 
+fps_histo = meter.create_histogram(
+    name="md_fps_histo",
+    description="Frames per second",
+    unit="fps"
+)
+fps_count = meter.create_gauge(
+    name="md_fps_gauge",
+    description="Frames per second",
+    unit="fps"
+)
+processing_time = meter.create_gauge(
+    name="md_processing_time",
+    description="Processing time of one frame",
+    unit="s"
+)
+md_detected_motion = meter.create_gauge(
+    name="md_detected_motion",
+    description="Detected motions 1 or 0",
+)
+md_detected_motion_nbr = meter.create_gauge(
+    name="md_detected_motion_nbr",
+    description="number of detected motions from the starting of the service",
+)
+
+c2e_transmission_time = meter.create_gauge(
+    name="c2e_transmission_time",
+    description="transmission time of one frame from camera to motion detection",
+    unit="s"
+)
 
 def get_cpu_usage():
     usage = psutil.cpu_percent(interval=1)
     logging.debug(f"CPU usage retrieved: {usage}%")
     return usage
 
-
-def receive_frames(client_socket, frame_queue):
-    logging.info("Starting frame reception thread.")
-    data = b""
-    payload_size = struct.calcsize("Q")
-    c2e_transmission_time = meter.create_gauge(
-        name="c2e_transmission_time",
-        description="transmission time of one frame from camera to motion detection",
-        unit="s"
-    )
-    while True:
-        try:
-            while len(data) < payload_size:
-                packet = client_socket.recv(4 * 1024)  # 4K
-                if not packet:
-                    logging.warning("Client disconnected or no data received.")
-                    return
-                data += packet
-            packed_msg_size = data[:payload_size]
-            data = data[payload_size:]
-            msg_size = struct.unpack("Q", packed_msg_size)[0]
-            logging.debug(f"Expected frame size: {msg_size} bytes.")
-
-            while len(data) < msg_size:
-                data += client_socket.recv(4 * 1024)
-            frame_data = data[:msg_size]
-            data = data[msg_size:]
-
-            transmission_time = datetime.datetime.now() - datetime.datetime.strptime(
-                pickle.loads(frame_data)['capture_time'],
-                "%Y-%m-%d %H:%M:%S")
-            transmission_time_in_seconds = transmission_time.total_seconds()
-            c2e_transmission_time.set(transmission_time_in_seconds)
-            logging.info(f"Frame received with transmission time: {transmission_time_in_seconds} seconds.")
-
-            frame_queue.put(pickle.loads(frame_data))
-        except Exception as e:
-            logging.error(f"Error receiving frame: {e}")
-            return
-
-
-def process_frames(addr, frame_queue, or_host_ip, or_port):
-    logging.info(f"Starting frame processing thread for client: {addr}.")
-    firstFrame = None
+def detect_motion(frame, source_camera_identifier, trace_carrier):
+    first_frame = None
     fps_frame_count = 0
-
-    fps_histo = meter.create_histogram(
-        name="md_fps_histo",
-        description="Frames per second",
-        unit="fps"
-    )
-    fps_count = meter.create_gauge(
-        name="md_fps_gauge",
-        description="Frames per second",
-        unit="fps"
-    )
-    processing_time = meter.create_gauge(
-        name="md_processing_time",
-        description="Processing time of one frame",
-        unit="s"
-    )
-    md_detected_motion = meter.create_gauge(
-        name="md_detected_motion",
-        description="Detected motions 1 or 0",
-    )
-    md_detected_motion_nbr = meter.create_gauge(
-        name="md_detected_motion_nbr",
-        description="number of detected motions from the starting of the service",
-    )
     md_detected_motion_nbr_rec=0
     fps_start_time = time.time()
+
     while True:
-        try:
-            # Wait for an item in the queue with a timeout
-            recieveddata = frame_queue.get(timeout=1)  # Waits for 1 second
-            logging.debug("Frame dequeued for processing.")
-        except queue.Empty:
-            logging.info(f"Queue is empty for {addr}, waiting for frames...")
-            continue
-        received_carrier=recieveddata['carrier']
-        extracted_context = extract(received_carrier)
-        carrier = {}
+        extracted_context = extract(trace_carrier)
+        trace_carrier = {}
         with tracer.start_as_current_span("processing the frame",context=extracted_context) as span:
             try:
-                inject(carrier)
-                frame = recieveddata['frame']
+                inject(trace_carrier)
                 logging.debug("Frame loaded for processing.")
-                text = f"CLIENT: {addr}"
+                logging.debug(f"FRAME RECEIVE FROM CLIENT: {source_camera_identifier} | TIME: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
 
                 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                 gray = cv2.GaussianBlur(gray, (21, 21), 0)
                 logging.debug("Frame converted to grayscale and blurred.")
 
-                if firstFrame is None:
-                    firstFrame = gray
+                if first_frame is None:
+                    first_frame = gray
                     logging.info("Initialized reference frame for motion detection.")
                     continue
 
@@ -139,81 +95,88 @@ def process_frames(addr, frame_queue, or_host_ip, or_port):
                     fps_frame_count = 0
                     fps_start_time = time.time()
 
-                frameDelta = cv2.absdiff(firstFrame, gray)
-                thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1]
+                frame_delta = cv2.absdiff(first_frame, gray)
+                thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
                 thresh = cv2.dilate(thresh, None, iterations=2)
-                cnts = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
+                contours = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL,
                                         cv2.CHAIN_APPROX_SIMPLE)
-                cnts = imutils.grab_contours(cnts)
+                contours = imutils.grab_contours(contours)
 
                 detected_cnt = 0
                 detected = False
-                for c in cnts:
-                    if cv2.contourArea(c) < 10000:
+                bounding_rectangle = ()
+                for contour in contours:
+                    if cv2.contourArea(contour) < 10000:
                         continue
                     detected = True
                     detected_cnt += 1
-                    logging.debug(f"Motion detected. Contour area: {cv2.contourArea(c)}.")
+                    logging.debug(f"Motion detected. Contour area: {cv2.contourArea(contour)}.")
 
-                    (x, y, w, h) = cv2.boundingRect(c)
-                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
-                    text = "md: motion detected"
+                    (x, y, w, h) = cv2.boundingRect(contour)
+                    bounding_rectangle = (x, y, w, h)
+                    #cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
+                    "md: motion detected"
                 if detected:
-                    logging.info("Motion detected, preparing to send frame to Object Recognizer.")
+                    logging.info("Motion detected, updating metric.")
                     md_detected_motion_nbr_rec += 1
                     md_detected_motion_nbr.set(md_detected_motion_nbr_rec)
                     md_detected_motion.set(1)
-                    orclient_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                    orclient_socket.connect((or_host_ip, or_port))
-                    ordata = {
-                        "frame": frame,
-                        "capture_time": recieveddata["capture_time"],
-                        "sentfromedgetime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
-                        'carrier':carrier
-                    }
-                    while True:
-                        try:
-                            a = pickle.dumps(ordata)
-                            message = struct.pack("Q", len(a)) + a
-                            logging.info(f"Packet size: {len(message)} bytes.")  # Log the size of the packet
-                            orclient_socket.sendall(message)
-                            logging.info("Frame successfully sent to Object Recognizer.")
-                            break
-                        except Exception as e:
-                            logging.error(f"Sending frame to Object Recognizer failed: {e}")
-                            time.sleep(1)
-                            continue
-                    orclient_socket.close()
-
-                md_detected_motion.set(0)
-                processing_time.set(time.time() - starting_processing_time)
-                logging.debug("Processing time logged for frame.")
+                else:
+                    md_detected_motion.set(0)
+
+                return detected, bounding_rectangle
             except Exception as e:
                 logging.error(f"Error processing frame: {e}")
-
                 break
 
 def main():
     # Retrieve environment variables instead of command-line arguments
     or_host_ip = os.getenv('OR_HOST', 'localhost')  # Default to 'localhost'
     or_port = int(os.getenv('OR_PORT', 9999))  # Default to 9999
-
-    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     host_name = socket.gethostname()
     host_ip = socket.gethostbyname(host_name)
     logging.info(f'HOST IP: {host_ip}')
-    port = 9998
-    socket_address = (host_ip, port)
-    server_socket.bind(socket_address)
-    server_socket.listen()
-    logging.info(f"Listening at {socket_address}")
+    host_port = 9998
+
+    context = zmq.Context
+    camera_socket = context.socket(zmq.PAIR)
+    camera_socket.bind(f"tcp://{host_ip}:{host_port}")
+
+    or_socket_context = zmq.Context()
+    or_socket = or_socket_context.socket(zmq.PAIR)
+    or_socket.connect(f"tcp://{or_host_ip}:{or_port}")
 
     while True:
-        client_socket, addr = server_socket.accept()
-        logging.info(f"Accepted connection from {addr}.")
-        frame_queue = queue.Queue()
-        threading.Thread(target=receive_frames, args=(client_socket, frame_queue)).start()
-        threading.Thread(target=process_frames, args=(addr, frame_queue, or_host_ip, or_port)).start()
+        try:
+            message = camera_socket.recv()
+            camera_packet: CameraToMotionDetectorPacket = pickle.loads(message)
+
+            transmission_time = datetime.datetime.now() - camera_packet.timestamp
+            transmission_time_in_seconds = transmission_time.total_seconds()
+            c2e_transmission_time.set(transmission_time_in_seconds)
+            logging.info(f"Frame received with transmission time: {transmission_time_in_seconds} seconds.")
+
+            starting_processing_time = time.time()
+
+            frame = camera_packet.decode_frame()
+            flag, contours = detect_motion(frame, camera_packet.source_camera_identifier,
+                                           camera_packet.telemetry_carrier)
+
+            if flag:
+                md_packet = MotionDetectorToObjectRecognizerPacket(frame, camera_packet.source_camera_identifier,
+                                                                   camera_packet.telemetry_carrier, contours)
+                pickled = md_packet.pickle()
+                logging.info(f"Packet size: {len(pickled)} bytes.")  # Log the size of the packet
+                or_socket.send(pickled)
+                logging.info("Frame successfully sent to Object Recognizer.")
+
+            processing_time.set(time.time() - starting_processing_time)
+            logging.debug("Processing time logged for frame.")
+        except Exception as error:
+            logging.error(f"Error in event loop: {error}")
+            time.sleep(1)
+            continue
+
 
 
 if __name__ == "__main__":
diff --git a/services/object_recognizer/requirements.txt b/services/object_recognizer/requirements.txt
index 2628b99..deb5c7f 100644
--- a/services/object_recognizer/requirements.txt
+++ b/services/object_recognizer/requirements.txt
@@ -7,4 +7,5 @@ opentelemetry-exporter-jaeger
 opentelemetry-exporter-otlp
 opentelemetry-exporter-otlp-proto-grpc
 docker
-flask
\ No newline at end of file
+flask
+pyzmq
\ No newline at end of file
diff --git a/services/object_recognizer/src/object_recognizer.py b/services/object_recognizer/src/object_recognizer.py
index 8375d9f..d4bc4f1 100644
--- a/services/object_recognizer/src/object_recognizer.py
+++ b/services/object_recognizer/src/object_recognizer.py
@@ -1,15 +1,16 @@
 import datetime
+import logging
+import queue
 import socket
 import threading
-import queue
-import pickle
-import struct
-import cv2
-import logging
+
+import zmq
 from opentelemetry.propagate import extract
+
 from model.model import recognize
-from webserver import run_flask
+from services.common.motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket
 from telemetry.tracerprovider import meter, tracer
+from webserver import run_flask
 
 # Configure logging
 logging.basicConfig(
@@ -21,91 +22,57 @@ logging.basicConfig(
     ]
 )
 
-
-# Function to detect objects in a frame
-def get_output_layers(net):
-    layer_names = net.getLayerNames()
-    try:
-        output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
-    except Exception as e:
-        logging.error(f"Error getting output layers: {e}")
-        output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]
-    return output_layers
-
-
-def draw_prediction(img, class_id, confidence, x, y, x_plus_w, y_plus_h, classes, COLORS):
-    try:
-        label = str(classes[class_id])
-        logging.info(f"{label} detected with confidence {confidence:.2f}")
-        color = COLORS[class_id]
-        cv2.rectangle(img, (x, y), (x_plus_w, y_plus_h), color, 2)
-        cv2.putText(img, label, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
-        framename = "output/res.jpg"
-        cv2.imwrite(framename, img)
-        logging.info(f"Frame saved with detection: {framename}")
-    except Exception as e:
-        logging.error(f"An error occurred while drawing prediction: {e}")
+#Define metrics
+len_q_gauge = meter.create_gauge(name="or_len_q", description="Queue length in object recognizer", unit="frames")
+processing_time = meter.create_gauge(name="or_processing_time", description="Processing time of one frame", unit="s")
+md_e2c_transmission_time = meter.create_gauge(name="md_e2c_transmission_time",
+                                              description="Transmission time from edge to cloud", unit="s")
+response_time = meter.create_gauge(name="response_time", description="Response time for frames", unit="s")
+dropped_frames = meter.create_counter(name="dropped_frame", description="Dropped frame", unit="frames")
 
 
-def process_frame(mddata):
-    received_carrier = mddata['carrier']
+def process_frame(packet: MotionDetectorToObjectRecognizerPacket):
+    received_carrier = packet.telemetry_carrier
     extracted_context = extract(received_carrier)
-    carrier = {}
     with tracer.start_as_current_span("detecting motion in the frame", context=extracted_context) as span:
-
-            recognize(mddata['frame'])
+        recognize(packet.decode_frame())
 
 def receive_frames(client_socket, frame_queue):
-    data = b""
-    payload_size = struct.calcsize("Q")
+    try:
+        packet = MotionDetectorToObjectRecognizerPacket.unpickle(client_socket.recv())
 
-    while True:
         try:
-            while len(data) < payload_size:
-                packet = client_socket.recv(4 * 1024)  # 4KB
-                if not packet:
-                    logging.warning("No more packets received, closing connection.")
-                    return
-                data += packet
-
-            packed_msg_size = data[:payload_size]
-            data = data[payload_size:]
-            msg_size = struct.unpack("Q", packed_msg_size)[0]
-            while len(data) < msg_size:
-                data += client_socket.recv(4 * 1024)
-
-            frame_data = data[:msg_size]
-            data = data[msg_size:]
-            mddata = pickle.loads(frame_data)
-            frame_queue.put(mddata)
+            frame_queue.put_nowait(packet)
+        except queue.Full:
+            dropped_frames.add(1)
+            logging.warning(f"Queue full! Dropping frame")
 
-            len_q_gauge.set(frame_queue.qsize())
-            transmission_time = datetime.datetime.now() - datetime.datetime.strptime(mddata['sentfromedgetime'],
-                                                                                     "%Y-%m-%d %H:%M:%S")
-            transmission_time_in_seconds = transmission_time.total_seconds()
-            md_e2c_transmission_time.set(transmission_time_in_seconds)
-            logging.info(f"Frame received. Transmission time: {transmission_time_in_seconds:.2f} seconds")
+        len_q_gauge.set(frame_queue.qsize())
+        transmission_time = datetime.datetime.now() - datetime.datetime.strptime( packet.camera_timestamp, "%Y-%m-%d %H:%M:%S")
+        transmission_time_in_seconds = transmission_time.total_seconds()
+        md_e2c_transmission_time.set(transmission_time_in_seconds)
+        logging.info(f"Frame received. Transmission time: {transmission_time_in_seconds:.2f} seconds")
 
-        except Exception as e:
-            logging.error(f"Error receiving frame: {e}")
-            return
+    except Exception as e:
+        logging.error(f"Error receiving frame: {e}")
+        return
 
 
-def handle_client(frame_queue):
+def handle_client(frame_queue: queue.Queue[MotionDetectorToObjectRecognizerPacket]):
     while True:
         try:
-            mddata = frame_queue.get(timeout=1)  # Waits for 1 second
+            packet = frame_queue.get(timeout=1)  # Waits for 1 second
             logging.debug(f"Processing frame from queue. Queue size: {frame_queue.qsize()}")
             len_q_gauge.set(frame_queue.qsize())
 
             starting_processing_time = datetime.datetime.now()
-            process_frame(mddata)
+            process_frame(packet)
 
             processing_duration = (datetime.datetime.now() - starting_processing_time).total_seconds()
             processing_time.set(processing_duration)
             logging.info(f"Frame processed. Processing time: {processing_duration:.2f} seconds")
 
-            response_time_duration = (datetime.datetime.now() - datetime.datetime.strptime(mddata['capture_time'],
+            response_time_duration = (datetime.datetime.now() - datetime.datetime.strptime(packet.camera_timestamp,
                                                                                            "%Y-%m-%d %H:%M:%S")).total_seconds()
             response_time.set(response_time_duration)
             logging.info(f"Response time for frame: {response_time_duration:.2f} seconds")
@@ -117,36 +84,31 @@ def handle_client(frame_queue):
 
 
 # Setup server socket
-server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-host_name = socket.gethostname()
-host_ip = socket.gethostbyname(host_name)
-socket_address = (host_ip, 9999)
-server_socket.bind(socket_address)
-server_socket.listen()
-logging.info(f"Server started, listening at {socket_address}")
-
-flask_thread = threading.Thread(target=run_flask)
-flask_thread.daemon = True
-flask_thread.start()
-
-len_q_gauge = meter.create_gauge(name="or_len_q", description="Queue length in object recognizer", unit="f")
-processing_time = meter.create_gauge(name="or_processing_time", description="Processing time of one frame", unit="s")
-md_e2c_transmission_time = meter.create_gauge(name="md_e2c_transmission_time",
-                                              description="Transmission time from edge to cloud", unit="s")
-response_time = meter.create_gauge(name="response_time", description="Response time for frames", unit="s")
-
-client_threads = []
-frame_queue = queue.Queue()
+def main():
+    context = zmq.Context()
+    server_socket = context.socket(zmq.PAIR)
+    host_name = socket.gethostname()
+    host_ip = socket.gethostbyname(host_name)
+    server_address = f"tcp://{host_ip}:{9999}"
+    server_socket.bind(server_address)
+    logging.info(f"Server started, listening at {server_address}")
+
+    flask_thread = threading.Thread(target=run_flask)
+    flask_thread.daemon = True
+    flask_thread.start()
+    frame_queue = queue.Queue(maxsize=255)
+
+    process_thread = threading.Thread(target=handle_client, args=(frame_queue,))
+    process_thread.start()
 
-process_thread = threading.Thread(target=handle_client, args=(frame_queue,))
-process_thread.start()
-
-while True:
-    try:
-        client_socket, addr = server_socket.accept()
-        logging.info(f"New client connected: {addr}")
-        threading.Thread(target=receive_frames, args=(client_socket, frame_queue)).start()
-    except Exception as e:
-        logging.error(f"Error in server loop: {e}")
+    while True:
+        try:
+            client_socket, addr = server_socket.accept()
+            logging.info(f"New client connected: {addr}")
+            threading.Thread(target=receive_frames, args=(client_socket, frame_queue)).start()
+        except Exception as e:
+            logging.error(f"Error in server loop: {e}")
+            server_socket.close()
 
-server_socket.close()
+if __name__ == "__main__":
+    main()
\ No newline at end of file
-- 
GitLab