diff --git a/.idea/misc.xml b/.idea/misc.xml index f6bca3d8ede6cc3253bc65e7e75929a468cea218..e30a8c698c3f29bd3b997b5aef6b74189c77328b 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,7 +3,7 @@ <component name="Black"> <option name="sdkName" value="Python 3.9 (motion detection)" /> </component> - <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (motion detection)" project-jdk-type="Python SDK" /> + <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (surveillance-system-edge-to-cloud-video-processing)" project-jdk-type="Python SDK" /> <component name="PyCharmProfessionalAdvertiser"> <option name="shown" value="true" /> </component> diff --git a/services/camera/src/camera.py b/services/camera/src/camera.py index 910e3b6299456040c77bd783fb78b4792d8dec21..912044f579d3914a8e04e0098493ab806f204289 100644 --- a/services/camera/src/camera.py +++ b/services/camera/src/camera.py @@ -6,10 +6,13 @@ import struct import time import datetime import os # Import os for environment variables + + from telemetry.tracerprovider import tracer, meter import imutils # pip install imutils import logging from opentelemetry.propagate import inject + # Configure logging logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', @@ -20,6 +23,40 @@ logging.basicConfig( ] ) +frametime_histogram = meter.create_histogram( + name="camera_frame_time", + description="Frames time", + unit="ms" +) + +sleep_time_between_frame = meter.create_histogram( + name="sleep_time_between_frame", + description="sleep_time_between_frame", + unit="ms" +) + +frame_rate:int = 30 +MIN_FRAME_TIME:float = 1.0 / frame_rate + +# Get environment variables (with defaults if not set) +camera = os.getenv("CAMERA", "false").lower() == "true" +animal_name = os.getenv("ANIMAL_NAME", "tiger") +appearance_rate = int(os.getenv("APPEARANCE_RATE", 600)) +host_ip = os.getenv("MDHOST", "localhost") +port = int(os.getenv("MDPORT", 9998)) + +# Map animal to the appropriate video filenames +animal_map = { + 'bear': ('footage/bear/no_bear.mp4', 'footage/bear/with_bear.mp4'), + 'tiger': ('footage/tiger/no_tiger.mp4', 'footage/tiger/with_tiger.mp4'), + 'wolf': ('footage/wolf/no_wolf.mp4', 'footage/wolf/with_wolf.mp4') +} + +if animal_name not in animal_map: + logging.error(f"No video available for {animal_name}") + exit(1) + +no_animal_video, with_animal_video = animal_map[animal_name] def generate_random_intervals(events_per_hour): # Total time in one hour (in seconds) @@ -42,123 +79,109 @@ def generate_random_intervals(events_per_hour): def main(): - # Get environment variables (with defaults if not set) - camera = os.getenv("CAMERA", "false").lower() == "true" - animal_name = os.getenv("ANIMAL_NAME", "tiger") - appearance_rate = int(os.getenv("APPEARANCE_RATE", 600)) - host_ip = os.getenv("MDHOST", "localhost") - port = int(os.getenv("MDPORT", 9998)) - - # Map animal to the appropriate video filenames - animal_map = { - 'bear': ('footage/bear/no_bear.mp4', 'footage/bear/with_bear.mp4'), - 'tiger': ('footage/tiger/no_tiger.mp4', 'footage/tiger/with_tiger.mp4'), - 'wolf': ('footage/wolf/no_wolf.mp4', 'footage/wolf/with_wolf.mp4') - } - - if animal_name not in animal_map: - logging.error(f"No video available for {animal_name}") - return - no_animal_video, with_animal_video = animal_map[animal_name] + + #Connect to motion detector and stream video while True: - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket = create_socket_and_connect(host_ip, port) + stream_video_clip(client_socket) - while True: - try: - logging.info(f"Attempting to connect to {host_ip}:{port}") - client_socket.connect((host_ip, port)) - logging.info(f"Connected to {host_ip}:{port}") - break - except Exception as e: - logging.warning(f"Cannot connect to motion detector: {e}") - time.sleep(1) - continue - - # Initialize FPS calculation - fps_start_time = time.time() - fps_frame_count = 0 - fps = 0 - fps_histo = meter.create_histogram( - name="camera_fps_histo", - description="Frames per second", - unit="fps" - ) - fps_count = meter.create_gauge( - name="camera_fps_gauge", - description="Frames per second", - unit="fps" - ) - - frame_rate = 30.0 - frame_interval = 1.0 / frame_rate - motion = False - if client_socket: - while True: - for interval in generate_random_intervals(appearance_rate): +def create_socket_and_connect(host_ip:str, port:int): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + while True: + try: + logging.info(f"Attempting to connect to {host_ip}:{port}") + client_socket.connect((host_ip, port)) + logging.info(f"Connected to {host_ip}:{port}") + break + except Exception as e: + logging.warning(f"Cannot connect to motion detector: {e}") + time.sleep(1) + continue + return client_socket + + +def stream_video_clip(client_socket: socket): + if not client_socket: + return + + # Initialize FPS calculation + app_start_time = time.time() + while True: + # Time no_motion_clip_length between motion to sample fivode + with tracer.start_as_current_span("sending frame from camera") as frame_span: + try: + carrier = {} + inject(carrier) + for no_motion_clip_length in generate_random_intervals(appearance_rate): + # Alternate between video with motion and video without motion for motion in (False, True): + + # Initialize FPS calculation + frame_beginning_time = time.time() frame_number = 0 - if motion: - vid = cv2.VideoCapture(with_animal_video) - interval_frame_count= int(vid.get(cv2.CAP_PROP_FRAME_COUNT)) - else: - vid = cv2.VideoCapture(no_animal_video) - interval_frame_count = interval * 30 - logging.info(f"Motion: {motion} max frame count: {interval_frame_count}") - while (frame_number < interval_frame_count): - frame_number += 1 - carrier = {} - with tracer.start_as_current_span("sending frame from camera") as span: - try: - inject(carrier) - img, frame = vid.read() - if not img: - if not motion: - frame_number -= 1 - vid = cv2.VideoCapture(no_animal_video) - continue - else: - logging.info(f"Motion frames count: {frame_number}") - break - frame = imutils.resize(frame, width=640) - - start_time = time.time() - - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - data = { - 'frame': frame, - 'capture_time': current_time, - 'carrier': carrier - - } - a = pickle.dumps(data) - message = struct.pack("Q", len(a)) + a - client_socket.sendall(message) - - # Update FPS calculation - fps_frame_count += 1 - elapsed_time = time.time() - fps_start_time - if elapsed_time >= 10.0: - fps = round(fps_frame_count / elapsed_time, 2) - logging.info(f"FPS: {fps} (Total frames: {fps_frame_count}, Time: {elapsed_time})") - fps_histo.record(fps) - fps_count.set(fps) - fps_frame_count = 0 - fps_start_time = time.time() - - # Maintain the frame rate - end_time = time.time() - elapsed_time = end_time - start_time - if elapsed_time < frame_interval: - time.sleep(0.025) # frame_interval - elapsed_time - - except Exception as e: - logging.error(f"Error sending frame: {e}") - client_socket.close() - break + + # Open the appropriate video file + video_path = with_animal_video if motion else no_animal_video + with cv2.VideoCapture(video_path) as video_capture: + + # Determine the number of frames to process according to the clip length + if motion: + max_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) # Process the entire clip to avoid creating incoherent clip + else: + max_frames = no_motion_clip_length * frame_rate + + logging.info(f"Motion: {motion}, Max Frame Count: {max_frames}") + + while frame_number < max_frames: + frame_number += 1 + success, frame = video_capture.read() + frame_start_time = time.time() + + #Loop video clip if there are no more frame or if we cannot read the next frame. + # Used for looping the short clip where no animal are present + if not success: + if not motion: + frame_number -= 1 # Retry with same frame number + video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0) # Reload video + continue + else: + logging.info(f"Motion frames processed: {frame_number}") + break + + # Resize frame for consistent processing + frame = imutils.resize(frame, width=640) + + # Prepare data packet + data_packet = { + 'frame': frame, + 'capture_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + 'carrier': carrier + } + + serialized_data = pickle.dumps(data_packet) + message = struct.pack("Q", len(serialized_data)) + serialized_data + client_socket.sendall(message) + + frame_number += 1 + + #Metric logging + frametime = time.time() - app_start_time + frametime_histogram.record(frametime) + + #Sleep if needed to maintain frame rate + elapsed_time = time.time() - frame_beginning_time + sleep_time = MIN_FRAME_TIME - elapsed_time + sleep_time_between_frame.record(sleep_time) + time.sleep(max(0.0, sleep_time)) + except Exception as error: + logging.error(f"Error sending frame: {error}") + client_socket.close() + return + + if __name__ == "__main__": diff --git a/services/camera/src/camera.txt b/services/camera/src/camera.txt deleted file mode 100644 index 2158f1435caa529469bc72b1e792614acd7063e3..0000000000000000000000000000000000000000 --- a/services/camera/src/camera.txt +++ /dev/null @@ -1,161 +0,0 @@ -import argparse -import random -import socket -import cv2 -import pickle -import struct -import time -import datetime -import os # Import os for environment variables -from tracerprovider import tracer, meter -import pyshine as ps # pip install pyshine -import imutils # pip install imutils -import logging - -# Configure logging -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s', - level=logging.INFO, # You can change this to DEBUG, ERROR, etc. - handlers=[ - logging.StreamHandler(), # Log to console - logging.FileHandler("output/camera.log") # Log to a file - ] -) - - -def generate_random_intervals(events_per_hour): - # Total time in one hour (in seconds) - total_time = 3600 - - # Calculate average time between events - avg_interval = total_time / events_per_hour - - # Generate random factors for intervals - random_factors = [random.uniform(0.5, 1.5) for _ in range(events_per_hour)] - - # Normalize the random factors so that their sum equals total_time - total_factor = sum(random_factors) - normalized_intervals = [avg_interval * (factor / total_factor * events_per_hour) for factor in random_factors] - - # Return intervals rounded to a few decimal points (optional) - result = [round(interval, 2) for interval in normalized_intervals] - logging.info(f"Generated intervals: {result}") - return result - - -def main(): - # Get environment variables (with defaults if not set) - camera = os.getenv("CAMERA", "false").lower() == "true" - animal_name = os.getenv("ANIMAL_NAME", "tiger") - appearance_rate = int(os.getenv("APPEARANCE_RATE", 600)) - host_ip = os.getenv("MDHOST", "localhost") - port = int(os.getenv("MDPORT", 9998)) - - # Map animal to the appropriate video filenames - animal_map = { - 'bear': ('footage/bear/no_bear.mp4', 'footage/bear/with_bear.mp4'), - 'tiger': ('footage/tiger/no_tiger.mp4', 'footage/tiger/with_tiger.mp4'), - 'wolf': ('footage/wolf/no_wolf.mp4', 'footage/wolf/with_wolf.mp4') - } - - if animal_name not in animal_map: - logging.error(f"No video available for {animal_name}") - return - - no_animal_video, with_animal_video = animal_map[animal_name] - - while True: - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - while True: - try: - logging.info(f"Attempting to connect to {host_ip}:{port}") - client_socket.connect((host_ip, port)) - logging.info(f"Connected to {host_ip}:{port}") - break - except Exception as e: - logging.warning(f"Cannot connect to motion detector: {e}") - time.sleep(1) - continue - - # Initialize FPS calculation - fps_start_time = time.time() - fps_frame_count = 0 - fps = 0 - fps_histo = meter.create_histogram( - name="camera_fps_histo", - description="Frames per second", - unit="fps" - ) - fps_count = meter.create_gauge( - name="camera_fps_gauge", - description="Frames per second", - unit="fps" - ) - - frame_rate = 30.0 - frame_interval = 1.0 / frame_rate - motion = False - if client_socket: - while True: - for interval in generate_random_intervals(appearance_rate): - interval_frame_count = interval * 30 - frame_number = 0 - if motion: - vid = cv2.VideoCapture(with_animal_video) - motion = False - else: - vid = cv2.VideoCapture(no_animal_video) - motion = True - logging.info(f"Motion: {motion}") - while (frame_number < interval_frame_count) or (not motion): - frame_number += 1 - with tracer.start_as_current_span("sending frame") as span: - try: - img, frame = vid.read() - if not img: - if motion: - vid = cv2.VideoCapture(no_animal_video) - continue - else: - logging.info(f"Motion frames count: {frame_number}") - break - frame = imutils.resize(frame, width=640) - - start_time = time.time() - - current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - data = { - 'frame': frame, - 'capture_time': current_time - } - a = pickle.dumps(data) - message = struct.pack("Q", len(a)) + a - client_socket.sendall(message) - - # Update FPS calculation - fps_frame_count += 1 - elapsed_time = time.time() - fps_start_time - if elapsed_time >= 10.0: - fps = round(fps_frame_count / elapsed_time, 2) - logging.info(f"FPS: {fps} (Total frames: {fps_frame_count}, Time: {elapsed_time})") - fps_histo.record(fps) - fps_count.set(fps) - fps_frame_count = 0 - fps_start_time = time.time() - - # Maintain the frame rate - end_time = time.time() - elapsed_time = end_time - start_time - if elapsed_time < frame_interval: - time.sleep(0.025) # frame_interval - elapsed_time - - except Exception as e: - logging.error(f"Error sending frame: {e}") - client_socket.close() - break - - -if __name__ == "__main__": - main() diff --git a/services/camera/src/old/__init__.py b/services/camera/src/old/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/services/camera/src/config.py b/services/camera/src/old/config.py similarity index 100% rename from services/camera/src/config.py rename to services/camera/src/old/config.py diff --git a/services/camera/src/logger.py b/services/camera/src/old/logger.py similarity index 100% rename from services/camera/src/logger.py rename to services/camera/src/old/logger.py diff --git a/services/camera/src/network.py b/services/camera/src/old/network.py similarity index 100% rename from services/camera/src/network.py rename to services/camera/src/old/network.py diff --git a/services/camera/src/utils.py b/services/camera/src/old/utils.py similarity index 100% rename from services/camera/src/utils.py rename to services/camera/src/old/utils.py diff --git a/services/camera/src/video.py b/services/camera/src/old/video.py similarity index 100% rename from services/camera/src/video.py rename to services/camera/src/old/video.py diff --git a/services/camera/src/telemetry/tracerprovider.py b/services/camera/src/telemetry/tracerprovider.py index 784a8a42609db39b1a0ced93206a52028f37295c..6e7d5c80f373161d4b6799e2dbc5d7a7b27c95ba 100644 --- a/services/camera/src/telemetry/tracerprovider.py +++ b/services/camera/src/telemetry/tracerprovider.py @@ -10,12 +10,7 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExport from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter file_name = "Camera " -# try: -# with open('../index.txt', 'r') as file: -# index_value = int(file.read().strip()) -# except (FileNotFoundError, ValueError) as e: -# print(f"Error reading index.txt: {e}") -# index_value = random.randint(1, 10) + index_value = os.environ.get("INDEX", str(random.randint(1, 10))) # Generate a random integer resource=Resource.create({SERVICE_NAME: f"{file_name}{index_value}"}) @@ -33,7 +28,7 @@ otlp_trace_exporter = OTLPSpanExporter(endpoint="otel-collector:4317", insecure= trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(otlp_trace_exporter) ) -tracer = trace.get_tracer("camera tracer") +tracer = trace.get_tracer("camera_tracer") # Set up the meter provider for metrics meter_provider = MeterProvider() @@ -49,4 +44,4 @@ reader = PeriodicExportingMetricReader( ) meterProvider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meterProvider) -meter = metrics.get_meter("camera meter") \ No newline at end of file +meter = metrics.get_meter("camera_meter") \ No newline at end of file diff --git a/services/motion_detector/src/motion_detection.py b/services/motion_detector/src/motion_detection.py index 78f285197707c5d4618fafeb4da9f067d88fd658..c8c4d67f32c47103fb25b43c1253ffb4e628597d 100644 --- a/services/motion_detector/src/motion_detection.py +++ b/services/motion_detector/src/motion_detection.py @@ -42,7 +42,6 @@ def receive_frames(client_socket, frame_queue): unit="s" ) while True: - try: while len(data) < payload_size: packet = client_socket.recv(4 * 1024) # 4K @@ -76,11 +75,8 @@ def receive_frames(client_socket, frame_queue): def process_frames(addr, frame_queue, or_host_ip, or_port): logging.info(f"Starting frame processing thread for client: {addr}.") firstFrame = None - fps_start_time = time.time() fps_frame_count = 0 - detected = False - orclient_socket = None fps_histo = meter.create_histogram( name="md_fps_histo", description="Frames per second",