diff --git a/services/__init__.py b/services/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/services/camera/Dockerfile b/services/camera/Dockerfile index 7f5e7d965dffc08b2c7e43352d63242ad0da6939..7b2c83d31cc373b751bbe0ff9b068478aae4c999 100644 --- a/services/camera/Dockerfile +++ b/services/camera/Dockerfile @@ -12,6 +12,8 @@ WORKDIR /app COPY . . # Install required Python packages +ENV DEBIAN_FRONTEND=noninteractive +RUN apt update && apt install -y libzmq3-dev && rm -rf /var/lib/apt/lists/* RUN pip install --no-cache-dir -r requirements.txt # Set environment variables with default values @@ -20,7 +22,6 @@ ENV CAMERA="false" \ APPEARANCE_RATE=600 \ MDHOST="localhost" \ MDPORT=9998 - # Expose the port (if needed) # EXPOSE <port_number> diff --git a/services/camera/requirements.txt b/services/camera/requirements.txt index ee390536a6c091cfd012df6cbdeb5271cc0a9611..16f8a5204cb0ecf81bb2585eb999ce34ce6c060a 100644 --- a/services/camera/requirements.txt +++ b/services/camera/requirements.txt @@ -7,4 +7,5 @@ opentelemetry-sdk opentelemetry-exporter-jaeger opentelemetry-exporter-otlp opentelemetry-exporter-otlp-proto-grpc -pyzmq \ No newline at end of file +pyzmq +cffi \ No newline at end of file diff --git a/services/camera/src/camera.py b/services/camera/src/camera.py index b1f5f72178c4a944520045e7bd637b008c8464c3..16c3f9c8d1e8884c3b10da994413cbafc34cc9de 100644 --- a/services/camera/src/camera.py +++ b/services/camera/src/camera.py @@ -1,16 +1,15 @@ import logging import os # Import os for environment variables import random -import time +from time import perf_counter, sleep, perf_counter_ns from contextlib import contextmanager import cv2 import imutils # pip install imutils import zmq from opentelemetry.propagate import inject -from zmq.backend.cffi import Socket -from services.common.camera_to_motiondetector_packet import CameraToMotionDetectorPacket +from camera_to_motiondetector_packet import CameraToMotionDetectorPacket from telemetry.tracerprovider import tracer, meter @@ -29,23 +28,23 @@ logging.basicConfig( level=logging.INFO, # You can change this to DEBUG, ERROR, etc. handlers=[ logging.StreamHandler(), # Log to console - logging.FileHandler("/app/output/camera.log") # Log to a file + #logging.FileHandler("/app/output/camera.log") # Log to a file ] ) + frametime_histogram = meter.create_gauge( name="camera_frame_time", description="Frames time", unit="ms" ) - sleep_time_between_frame = meter.create_gauge( name="camera_sleep_time_between_frame", description="sleep_time_between_frame", unit="ms" ) -frame_rate:int = 30 +frame_rate:int = 10 MIN_FRAME_TIME:float = 1.0 / frame_rate # Get environment variables (with defaults if not set) @@ -58,9 +57,9 @@ port = int(os.getenv("MDPORT", 9998)) # Map animal to the appropriate video filenames animal_map = { - 'bear': ('footage/bear/no_bear.mp4', 'footage/bear/with_bear.mp4'), - 'tiger': ('footage/tiger/no_tiger.mp4', 'footage/tiger/with_tiger.mp4'), - 'wolf': ('footage/wolf/no_wolf.mp4', 'footage/wolf/with_wolf.mp4') + 'bear': ('../footage/bear/no_bear.mp4', '../footage/bear/with_bear.mp4'), + 'tiger': ('../footage/tiger/no_tiger.mp4', '../footage/tiger/with_tiger.mp4'), + 'wolf': ('../footage/wolf/no_wolf.mp4', '../footage/wolf/with_wolf.mp4') } if animal_name not in animal_map: @@ -95,8 +94,8 @@ def main(): socket = context.socket(zmq.PAIR) logging.info(f"Attempting to connect to {host_ip}:{port}") socket.connect(f"tcp://{host_ip}:{port}") - stream_video_clip(socket) logging.info(f"Connected to {host_ip}:{port}") + stream_video_clip(socket) @@ -105,7 +104,7 @@ def stream_video_clip(socket): return # Initialize FPS calculation - app_start_time = time.time() + app_start_time = perf_counter() while True: # Time no_motion_clip_length between motion to sample fivode with tracer.start_as_current_span("sending frame from camera") as frame_span: @@ -116,14 +115,17 @@ def stream_video_clip(socket): # Alternate between video with motion and video without motion for motion in (False, True): - # Initialize FPS calculation - frame_beginning_time = time.time() + frame_number = 0 # Open the appropriate video file video_path = with_animal_video if motion else no_animal_video with videocapture(video_path) as video_capture: + if(video_capture is None): + logging.error("Could not open video capture.") + exit(1) + # Determine the number of frames to process according to the clip length if motion: max_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) # Process the entire clip to avoid creating incoherent clip @@ -133,9 +135,12 @@ def stream_video_clip(socket): logging.info(f"Motion: {motion}, Max Frame Count: {max_frames}") while frame_number < max_frames: + + # Initialize FPS calculation + frame_beginning_time = perf_counter() + frame_number += 1 success, frame = video_capture.read() - frame_start_time = time.time() #Loop video clip if there are no more frame or if we cannot read the next frame. # Used for looping the short clip where no animal are present @@ -153,18 +158,20 @@ def stream_video_clip(socket): packet= CameraToMotionDetectorPacket(frame, str(camera_index), carrier) socket.send(packet.pickle()) + logging.info(f"Frame sent: {frame_number}, {motion}, {frame_beginning_time}") frame_number += 1 #Metric logging - frametime = time.time() - app_start_time + frametime = perf_counter() - frame_beginning_time + logging.info(f"Frame time: {frametime}") frametime_histogram.set(frametime) - #Sleep if needed to maintain frame rate - elapsed_time = time.time() - frame_beginning_time - sleep_time = MIN_FRAME_TIME - elapsed_time + + sleep_time = MIN_FRAME_TIME - frametime + logging.info(f"fps: {1/frametime}, sleep time: {sleep_time}, fps at sleep time: {1/sleep_time}") sleep_time_between_frame.set(sleep_time) - time.sleep(max(0.0, sleep_time)) + sleep(max(0.0, sleep_time)) except Exception as error: logging.error(f"Error sending frame: {error}") socket.close() diff --git a/services/camera/src/camera_to_motiondetector_packet.py b/services/camera/src/camera_to_motiondetector_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..fb108f4222b5f59772a9ec17420bdee6d9f3468c --- /dev/null +++ b/services/camera/src/camera_to_motiondetector_packet.py @@ -0,0 +1,27 @@ +import datetime +import pickle +import time +from time import perf_counter + +import cv2 +import numpy as np + + +class CameraToMotionDetectorPacket: + def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, telemetry_carrier: dict): + self._opencv_frame: np.ndarray = opencv_frame + self.source_camera_identifier = source_camera_identifier + self.timestamp = perf_counter() + self.telemetry_carrier = telemetry_carrier + + def pickle (self): + #Make sure timestamp is up to date before sending + self.timestamp= perf_counter() + return pickle.dumps(self) + + def decode_frame(self): + return self._opencv_frame + + @staticmethod + def unpickle(buffer): + return pickle.loads(buffer) \ No newline at end of file diff --git a/services/camera/src/telemetry/tracerprovider.py b/services/camera/src/telemetry/tracerprovider.py index 6e7d5c80f373161d4b6799e2dbc5d7a7b27c95ba..37733660f4d26dc3a3b7ccf049ed54f453e595b5 100644 --- a/services/camera/src/telemetry/tracerprovider.py +++ b/services/camera/src/telemetry/tracerprovider.py @@ -14,34 +14,76 @@ file_name = "Camera " index_value = os.environ.get("INDEX", str(random.randint(1, 10))) # Generate a random integer resource=Resource.create({SERVICE_NAME: f"{file_name}{index_value}"}) + +DISABLE_OTEL = os.getenv("DISABLE_OTEL", "false").lower() == "true" + +if DISABLE_OTEL: + + class Dummy(): + ''' + Dummy that can be called and is also a context manager. + It will always return itself, so that you can chain calls. + ''' + + # Accessing attributes + def __getattr__(self, name): + return self + + # Callable + def __call__(self, *args, **kwargs): + return self + + # Context manager + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + pass + + # Indexing, Subscripting, Slicing + def __getitem__(self, *args, **kwargs): + return self + + def __setitem__(self, key, value): + self + + # String representation + def __str__(self) -> str: + return '<Just a Dummy>' + + + tracer = Dummy() + meter = Dummy() +else: + # Set up the tracer provider for tracing -trace.set_tracer_provider( - TracerProvider( - resource=resource + trace.set_tracer_provider( + TracerProvider( + resource=resource + ) ) -) -# Create an OpenTelemetry Collector exporter for exporting traces -otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure=True) + # Create an OpenTelemetry Collector exporter for exporting traces + otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure=True) -# Add the OpenTelemetry Collector exporter to the tracer provider -trace.get_tracer_provider().add_span_processor( - BatchSpanProcessor(otlp_trace_exporter) -) -tracer = trace.get_tracer("camera_tracer") + # Add the OpenTelemetry Collector exporter to the tracer provider + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(otlp_trace_exporter) + ) + tracer = trace.get_tracer("camera_tracer") -# Set up the meter provider for metrics -meter_provider = MeterProvider() + # Set up the meter provider for metrics + meter_provider = MeterProvider() -# Create an OpenTelemetry Collector exporter for exporting metrics -otlp_metrics_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True) + # Create an OpenTelemetry Collector exporter for exporting metrics + otlp_metrics_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True) -# Register the OpenTelemetry Collector exporter with the metrics provider + # Register the OpenTelemetry Collector exporter with the metrics provider -reader = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint="http://otel-collector:4317",insecure=True) -) -meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) -metrics.set_meter_provider(meterProvider) -meter = metrics.get_meter("camera_meter") \ No newline at end of file + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint="http://otel-collector:4317",insecure=True) + ) + meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(meterProvider) + meter = metrics.get_meter("camera_meter") \ No newline at end of file diff --git a/services/common/motion_detector_to_object_recognizer_packet.py b/services/common/motion_detector_to_object_recognizer_packet.py index 805ab969000a733f05aeded4e0c39daf14c7888e..d4df1c065129a26b23af16dc8baaa1ccf2023b9e 100644 --- a/services/common/motion_detector_to_object_recognizer_packet.py +++ b/services/common/motion_detector_to_object_recognizer_packet.py @@ -16,17 +16,7 @@ class MotionDetectorToObjectRecognizerPacket: def pickle(self): self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - data = { - "opencv_frame": self._opencv_frame, - "source_camera_identifier": self.source_camera_identifier, - "timestamp": self.timestamp, - "camera_timestamp" : self.camera_timestamp, - "telemetry_carrier": self.telemetry_carrier, - "contour": self.contour - } - - return pickle.dumps(data) + return pickle.dumps(self) def decode_frame(self): @@ -34,11 +24,5 @@ class MotionDetectorToObjectRecognizerPacket: @staticmethod def unpickle(buffer): - data = pickle.loads(buffer) - - return MotionDetectorToObjectRecognizerPacket(cv2.imdecode(data["opencv_frame"], cv2.IMREAD_COLOR), - data["source_camera_identifier"], - data["camera_timestamp"], - data["telemetry_carrier"], - data["contour"]) + return pickle.loads(buffer) diff --git a/services/motion_detector/Dockerfile b/services/motion_detector/Dockerfile index 1c40c56d61b8ef39699bc565f8959db456322a43..b54ccefd15a4284b9ae1537e43ba24e621aa2c2e 100644 --- a/services/motion_detector/Dockerfile +++ b/services/motion_detector/Dockerfile @@ -1,5 +1,5 @@ # Use the official Python image as base -FROM python:3.12-slim as build +FROM python:3.12-slim-bookworm as build # Set the working directory inside the container WORKDIR /app @@ -13,6 +13,8 @@ WORKDIR /app COPY . . # Install required Python packages +ENV DEBIAN_FRONTEND=noninteractive +RUN apt update && apt install -y libzmq3-dev && rm -rf /var/lib/apt/lists/* RUN pip install --no-cache-dir -r requirements.txt # Set environment variables diff --git a/services/motion_detector/requirements.txt b/services/motion_detector/requirements.txt index b487e267490e85e22d95c84b991fdb8f6e0e642a..494ceb6f8789f3d7b4bc9e710b119fd5b782ef30 100644 --- a/services/motion_detector/requirements.txt +++ b/services/motion_detector/requirements.txt @@ -7,4 +7,5 @@ opentelemetry-exporter-jaeger opentelemetry-exporter-otlp opentelemetry-exporter-otlp-proto-grpc psutil -pyzmq \ No newline at end of file +pyzmq +cffi \ No newline at end of file diff --git a/services/motion_detector/src/camera_to_motiondetector_packet.py b/services/motion_detector/src/camera_to_motiondetector_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..570fd48918c39820cff8096cc7969be4841ec61f --- /dev/null +++ b/services/motion_detector/src/camera_to_motiondetector_packet.py @@ -0,0 +1,26 @@ +import datetime +import pickle +import time + +import cv2 +import numpy as np + + +class CameraToMotionDetectorPacket: + def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, telemetry_carrier: dict): + self._opencv_frame: np.ndarray = opencv_frame + self.source_camera_identifier = source_camera_identifier + self.timestamp = time.perf_counter() + self.telemetry_carrier = telemetry_carrier + + def pickle (self): + #Make sure timestamp is up to date before sending + self.timestamp= time.perf_counter() + return pickle.dumps(self) + + def decode_frame(self): + return self._opencv_frame + + @staticmethod + def unpickle(buffer): + return pickle.loads(buffer) \ No newline at end of file diff --git a/services/motion_detector/src/motion_detection.py b/services/motion_detector/src/motion_detection.py index 5b0b1459acd54f0f204f0fe3618cb3771ebcb256..6a4189accf54f00a9adf8e04e8542784492c5e4b 100644 --- a/services/motion_detector/src/motion_detection.py +++ b/services/motion_detector/src/motion_detection.py @@ -3,8 +3,9 @@ import datetime import logging # Import the logging module import os import pickle -import socket + import time +from time import perf_counter import cv2 import imutils @@ -12,8 +13,8 @@ import psutil import zmq from opentelemetry.propagate import extract, inject -from services.common.camera_to_motiondetector_packet import CameraToMotionDetectorPacket -from services.common.motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket +from camera_to_motiondetector_packet import CameraToMotionDetectorPacket +from motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket from telemetry.tracerprovider import tracer, meter # Configure logging @@ -22,10 +23,11 @@ logging.basicConfig( level=logging.INFO, # You can change this to DEBUG, ERROR, etc. handlers=[ logging.StreamHandler(), # Log to console - logging.FileHandler("/app/output/motion_detector.log") # Log to a file + #logging.FileHandler("/app/output/motion_detector.log") # Log to a file ] ) + fps_histo = meter.create_histogram( name="md_fps_histo", description="Frames per second", @@ -73,7 +75,6 @@ def detect_motion(frame, source_camera_identifier, trace_carrier): with tracer.start_as_current_span("processing the frame",context=extracted_context) as span: try: inject(trace_carrier) - logging.debug("Frame loaded for processing.") logging.debug(f"FRAME RECEIVE FROM CLIENT: {source_camera_identifier} | TIME: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) @@ -82,7 +83,7 @@ def detect_motion(frame, source_camera_identifier, trace_carrier): if first_frame is None: first_frame = gray - logging.info("Initialized reference frame for motion detection.") + logging.debug("Initialized reference frame for motion detection.") continue starting_processing_time = time.time() @@ -102,6 +103,9 @@ def detect_motion(frame, source_camera_identifier, trace_carrier): cv2.CHAIN_APPROX_SIMPLE) contours = imutils.grab_contours(contours) + if len(contours) == 0: + logging.debug("No contours detected.") + detected_cnt = 0 detected = False bounding_rectangle = () @@ -110,7 +114,7 @@ def detect_motion(frame, source_camera_identifier, trace_carrier): continue detected = True detected_cnt += 1 - logging.debug(f"Motion detected. Contour area: {cv2.contourArea(contour)}.") + logging.info(f"Motion detected. Contour area: {cv2.contourArea(contour)}.") (x, y, w, h) = cv2.boundingRect(contour) bounding_rectangle = (x, y, w, h) @@ -133,49 +137,53 @@ def main(): # Retrieve environment variables instead of command-line arguments or_host_ip = os.getenv('OR_HOST', 'localhost') # Default to 'localhost' or_port = int(os.getenv('OR_PORT', 9999)) # Default to 9999 - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - logging.info(f'HOST IP: {host_ip}') + #host_name = socket.gethostname() + #host_ip = socket.gethostbyname(host_name) + #logging.info(f'HOST IP: {host_ip}') host_port = 9998 - context = zmq.Context + context = zmq.Context() camera_socket = context.socket(zmq.PAIR) - camera_socket.bind(f"tcp://{host_ip}:{host_port}") + camera_socket.bind(f"tcp://localhost:{host_port}") + logging.info(f'Camera socket bound to {host_port}') - or_socket_context = zmq.Context() - or_socket = or_socket_context.socket(zmq.PAIR) + or_socket = context.socket(zmq.PAIR) + logging.info(f"Connecting to {or_host_ip}:{or_port}") or_socket.connect(f"tcp://{or_host_ip}:{or_port}") + logging.info(f"connected to {or_host_ip}:{or_port}") while True: - try: - message = camera_socket.recv() - camera_packet: CameraToMotionDetectorPacket = pickle.loads(message) - - transmission_time = datetime.datetime.now() - camera_packet.timestamp - transmission_time_in_seconds = transmission_time.total_seconds() - c2e_transmission_time.set(transmission_time_in_seconds) - logging.info(f"Frame received with transmission time: {transmission_time_in_seconds} seconds.") - - starting_processing_time = time.time() - - frame = camera_packet.decode_frame() - flag, contours = detect_motion(frame, camera_packet.source_camera_identifier, - camera_packet.telemetry_carrier) - - if flag: - md_packet = MotionDetectorToObjectRecognizerPacket(frame, camera_packet.source_camera_identifier, - camera_packet.telemetry_carrier, contours) - pickled = md_packet.pickle() - logging.info(f"Packet size: {len(pickled)} bytes.") # Log the size of the packet - or_socket.send(pickled) - logging.info("Frame successfully sent to Object Recognizer.") - - processing_time.set(time.time() - starting_processing_time) - logging.debug("Processing time logged for frame.") - except Exception as error: - logging.error(f"Error in event loop: {error}") - time.sleep(1) - continue + + message = camera_socket.recv() + camera_packet: CameraToMotionDetectorPacket = pickle.loads(message) + + transmission_time = perf_counter() - camera_packet.timestamp + c2e_transmission_time.set(transmission_time) + logging.info(f"Frame received with transmission time: {transmission_time} seconds.") + + starting_processing_time = perf_counter() + + frame = camera_packet.decode_frame() + cv2.imshow("Frame", frame) + cv2.waitKey(1) + + if frame is None: + logging.error("Invalid frame received.") + + flag, contours = detect_motion(frame, camera_packet.source_camera_identifier, + camera_packet.telemetry_carrier) + #logging.INFO(str(flag),str(contours)) + if flag: + md_packet = MotionDetectorToObjectRecognizerPacket(frame, camera_packet.source_camera_identifier, + camera_packet.timestamp, + camera_packet.telemetry_carrier, contours) + pickled = md_packet.pickle() + logging.info(f"Packet size: {len(pickled)} bytes.") # Log the size of the packet + or_socket.send(pickled) + logging.info("Frame successfully sent to Object Recognizer.") + + processing_time.set(time.time() - starting_processing_time) + logging.debug("Processing time logged for frame.") diff --git a/services/motion_detector/src/motion_detector_to_object_recognizer_packet.py b/services/motion_detector/src/motion_detector_to_object_recognizer_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..d4df1c065129a26b23af16dc8baaa1ccf2023b9e --- /dev/null +++ b/services/motion_detector/src/motion_detector_to_object_recognizer_packet.py @@ -0,0 +1,28 @@ +import datetime +import pickle + +import cv2 +import numpy as np + + +class MotionDetectorToObjectRecognizerPacket: + def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, camera_timestamp, telemetry_carrier: dict, contour: np.ndarray): + _, self._opencv_frame = cv2.imencode('.jpg', opencv_frame) + self.source_camera_identifier = source_camera_identifier + self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.camera_timestamp = camera_timestamp + self.telemetry_carrier = telemetry_carrier + self.contour = contour # OpenCV contour data + + def pickle(self): + self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + return pickle.dumps(self) + + + def decode_frame(self): + return cv2.imdecode(self._opencv_frame, cv2.IMREAD_COLOR) + + @staticmethod + def unpickle(buffer): + return pickle.loads(buffer) + diff --git a/services/motion_detector/src/telemetry/tracerprovider.py b/services/motion_detector/src/telemetry/tracerprovider.py index 515d81cf54a6368b1f5f998d4dd3430ac7db7e3c..4e3a5d2d029aa4fc0448d585c504c8e6c8517fc8 100644 --- a/services/motion_detector/src/telemetry/tracerprovider.py +++ b/services/motion_detector/src/telemetry/tracerprovider.py @@ -11,42 +11,70 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter # Extract the file name without extension file_name = "Motion Detector " -# try: -# with open('../index.txt', 'r') as file: -# index_value = int(file.read().strip()) -# except (FileNotFoundError, ValueError) as e: -# print(f"Error reading index.txt: {e}") -# index_value = random.randint(1, 10) index_value = os.environ.get("INDEX", str(random.randint(1, 10))) resource=Resource.create({SERVICE_NAME: f"{file_name}{index_value}"}) -# Set up the tracer provider for tracing -trace.set_tracer_provider( - TracerProvider( - resource=resource +# Disable OpenTelemetry if running locally +DISABLE_OTEL = os.environ.get("DISABLE_OTEL", 'False') + +if DISABLE_OTEL: + + class Dummy(): + ''' + Dummy that can be called and is also a context manager. + It will always return itself, so that you can chain calls. + ''' + # Accessing attributes + def __getattr__(self, name): + return self + # Callable + def __call__(self, *args, **kwargs): + return self + # Context manager + def __enter__(self): + return self + def __exit__(self, *args, **kwargs): + pass + # Indexing, Subscripting, Slicing + def __getitem__(self, *args, **kwargs): + return self + def __setitem__(self, key, value): + self + # String representation + def __str__(self) -> str: + return '<Just a Dummy>' + + + tracer = Dummy() + meter = Dummy() +else: + # Set up the tracer provider for tracing + trace.set_tracer_provider( + TracerProvider( + resource=resource + ) ) -) -# Create an OpenTelemetry Collector exporter for exporting traces -otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure=True) + # Create an OpenTelemetry Collector exporter for exporting traces + otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure=True) -# Add the OpenTelemetry Collector exporter to the tracer provider -trace.get_tracer_provider().add_span_processor( - BatchSpanProcessor(otlp_trace_exporter) -) -tracer = trace.get_tracer("motion detector tracer") + # Add the OpenTelemetry Collector exporter to the tracer provider + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(otlp_trace_exporter) + ) + tracer = trace.get_tracer("motion detector tracer") -# Set up the meter provider for metrics -meter_provider = MeterProvider() + # Set up the meter provider for metrics + meter_provider = MeterProvider() -# Create an OpenTelemetry Collector exporter for exporting metrics -otlp_metrics_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True) + # Create an OpenTelemetry Collector exporter for exporting metrics + otlp_metrics_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True) -# Register the OpenTelemetry Collector exporter with the metrics provider + # Register the OpenTelemetry Collector exporter with the metrics provider -reader = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint="http://otel-collector:4317",insecure=True) -) -meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) -metrics.set_meter_provider(meterProvider) -meter = metrics.get_meter("motion detector meter") \ No newline at end of file + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint="http://otel-collector:4317",insecure=True) + ) + meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(meterProvider) + meter = metrics.get_meter("motion detector meter") \ No newline at end of file diff --git a/services/object_recognizer/Dockerfile b/services/object_recognizer/Dockerfile index 36448ff717c2938f332dde530e0cf375ee5f7d57..28c5c9331b54169ddcaa4fcfa83edab06e9f7aa0 100644 --- a/services/object_recognizer/Dockerfile +++ b/services/object_recognizer/Dockerfile @@ -11,8 +11,9 @@ WORKDIR /app # Copy the server code into the container COPY . . -# Install required Python packages -RUN pip install --no-cache-dir -r requirements.txt +ENV DEBIAN_FRONTEND=noninteractive +RUN apt update && apt install -y libzmq3-dev && rm -rf /var/lib/apt/lists/* +RUN pip install --no-cache-dir -r requirements.txt # Expose port 5000 EXPOSE 5000 diff --git a/services/object_recognizer/requirements.txt b/services/object_recognizer/requirements.txt index deb5c7f7deea3f3fb5ca24f93484da86210de175..0e660b4b5a8493da1e90263c0509f62302d0b6cb 100644 --- a/services/object_recognizer/requirements.txt +++ b/services/object_recognizer/requirements.txt @@ -8,4 +8,5 @@ opentelemetry-exporter-otlp opentelemetry-exporter-otlp-proto-grpc docker flask -pyzmq \ No newline at end of file +pyzmq +cffi \ No newline at end of file diff --git a/services/object_recognizer/src/motion_detector_to_object_recognizer_packet.py b/services/object_recognizer/src/motion_detector_to_object_recognizer_packet.py new file mode 100644 index 0000000000000000000000000000000000000000..f4b8adf57c5534669c134587b5bb6879b3aa0890 --- /dev/null +++ b/services/object_recognizer/src/motion_detector_to_object_recognizer_packet.py @@ -0,0 +1,28 @@ +import datetime +import pickle + +import cv2 +import numpy as np + + +class MotionDetectorToObjectRecognizerPacket: + def __init__(self, opencv_frame: np.ndarray, source_camera_identifier: str, camera_timestamp, telemetry_carrier: dict, contour: np.ndarray): + _, self._opencv_frame = cv2.imencode('.jpg', opencv_frame) + self.source_camera_identifier = source_camera_identifier + self.timestamp = datetime.datetime.now() + self.camera_timestamp = camera_timestamp + self.telemetry_carrier = telemetry_carrier + self.contour = contour # OpenCV contour data + + def pickle(self): + self.timestamp = datetime.datetime.now() + return pickle.dumps(self) + + + def decode_frame(self): + return cv2.imdecode(self._opencv_frame, cv2.IMREAD_COLOR) + + @staticmethod + def unpickle(buffer): + return pickle.loads(buffer) + diff --git a/services/object_recognizer/src/object_recognizer.py b/services/object_recognizer/src/object_recognizer.py index 2999e955a1dddbebab03fb1d9e417ba17e968379..15c08d6780d148a4ddba4be6f58b482c5229be7f 100644 --- a/services/object_recognizer/src/object_recognizer.py +++ b/services/object_recognizer/src/object_recognizer.py @@ -8,7 +8,7 @@ import zmq from opentelemetry.propagate import extract from model.model import recognize -from services.common.motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket +from motion_detector_to_object_recognizer_packet import MotionDetectorToObjectRecognizerPacket from telemetry.tracerprovider import meter, tracer from webserver import run_flask @@ -103,6 +103,7 @@ def main(): while True: try: + logging.info("Waiting for new frame...") receive_frames(server_socket, frame_queue) except Exception as e: logging.error(f"Error in server loop: {e}")