Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 5a1c8fd5 authored by sidimohammedkaddour's avatar sidimohammedkaddour
Browse files

first protoype , only monitoring work

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1042 additions and 0 deletions
# DeployementManager.py
import threading
import subprocess
import sys
class DeployThread(threading.Thread):
def __init__(self, docker_username, services, tag, deploy_env):
super().__init__()
self.docker_username = docker_username
self.services = services
self.tag = tag
self.deploy_env = deploy_env
self._stop_event = threading.Event()
def run(self):
for service in self.services:
self.build_and_push(service)
self.deploy_images()
def build_and_push(self, service):
if self._stop_event.is_set():
return
print(f"Building Docker image for {service}...")
self.run_command(f"docker build -t {self.docker_username}/{service}:{self.tag} ./{service}")
if self.deploy_env == "cloud":
print(f"Pushing Docker image for {service} to Docker Hub...")
self.run_command(f"docker push {self.docker_username}/{service}:{self.tag}")
def deploy_images(self):
if self._stop_event.is_set():
return
if self.deploy_env == "local":
print("Running Docker images locally with Docker Compose...")
self.run_command("docker-compose up -d --build")
elif self.deploy_env == "cloud":
print("Pulling and running Docker images from Docker Hub with Docker Compose...")
self.run_command("docker-compose pull")
self.run_command("docker-compose up -d --build")
else:
print("Invalid DEPLOY_ENV value. Please use 'local' or 'cloud'.")
sys.exit(1)
def run_command(self, command):
result = subprocess.run(command, shell=True, text=True, capture_output=True)
if result.returncode != 0:
print(f"Error: {result.stderr}")
sys.exit(result.returncode)
else:
print(result.stdout)
def stop(self):
self._stop_event.set()
# FeedBackMechanisme.py
import time
from MonitoringManager import MonitorThread
from config_loader import load_config
from DeployementManager import DeployThread
def create_monitor(prometheus_url,services, metric_name, interval, metrics, indicators, conditions):
monitor_thread = MonitorThread(prometheus_url,services, metric_name, interval, metrics, indicators, conditions)
monitor_thread.start()
return monitor_thread
def stop_monitor(monitor_thread):
monitor_thread.stop()
monitor_thread.join()
def create_deploy_thread(docker_username, services, tag, deploy_env):
deploy_thread = DeployThread(docker_username, services, tag, deploy_env)
deploy_thread.start()
return deploy_thread
def stop_deploy_thread(deploy_thread):
deploy_thread.stop()
deploy_thread.join()
def main(config_file):
config = load_config(config_file)
prometheus_url = "http://127.0.0.1:9090/"
deploy_threads = []
docker_username = "medkaddour"
services = ["camera", "motion_detector", "object_recognizer"]
tag = "latest"
deploy_env = "local" # Or "cloud"
deploy_thread = create_deploy_thread(docker_username, services, tag, deploy_env)
deploy_threads.append(deploy_thread)
monitor_threads = []
for app in config['applications']:
monitor_thread = create_monitor(prometheus_url, app['name'],
app['monitor']['interval'],
app['services'],
app['monitor']['metrics'],
app['monitor']['indicators'],
app['monitor']['conditions'])
monitor_threads.append(monitor_thread)
print(f"Started monitoring for {app['name']}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for monitor_thread in monitor_threads:
stop_monitor(monitor_thread)
for deploy_thread in deploy_threads:
stop_deploy_thread(deploy_thread)
print("Monitoring and deployment stopped.")
if __name__ == "__main__":
config_file = "config.yaml"
main(config_file)
import threading
from tabulate import tabulate
import time
from datetime import datetime, timedelta
from metrics import get_metrics
from preprocess import preprocess_metrics
from inference import infer_critical_metrics, infer_status
import pandas as pd
import plotly.graph_objs as go
import ast
class MonitorThread(threading.Thread):
def __init__(self, prometheus_url, app_name, interval,services, metrics, indicators, conditions):
super().__init__()
self.prometheus_url = prometheus_url
self.app_name = app_name
self.interval = interval
self.services=services
self.metrics = metrics
self.indicators = indicators
self.conditions = [ast.literal_eval(s) for s in conditions]
self.criticalmetrics = {}
self.app_status = ""
self.running = True
self.fig = go.FigureWidget()
self.metric_data = []
self.metric_df = pd.DataFrame()
def run(self):
first = True
start_time = datetime.now() - timedelta(minutes=30)
while self.running:
end_time = datetime.now()
self.metric_data,metric_df = get_metrics(self.prometheus_url, self.metrics + self.indicators,self.services, start_time, end_time)
if self.metric_data:
preprocessed_data = preprocess_metrics(metric_df)
self.criticalmetrics = infer_critical_metrics(preprocessed_data,self.indicators)
pd.set_option('display.max_columns', None)
self.app_status = infer_status(self.metric_data,metric_df, preprocessed_data, self.indicators, self.conditions)
with self.fig.batch_update():
self.plot_preprocessed_data(preprocessed_data, first)
print(tabulate(preprocessed_data, headers='keys', tablefmt='psql'))
print(f"Status:{self.app_status}")
print(self.criticalmetrics)
csv_filename = f"output/preprocessed_data_{datetime.now().strftime('%Y-%m-%d_%H')}.csv"
csv_metrics_df = preprocessed_data.copy()
csv_metrics_df.reset_index(inplace=True)
csv_metrics_df.to_csv(csv_filename, index=False)
csv_filename = f"output/metric_df_{datetime.now().strftime('%Y-%m-%d_%H')}.csv"
csv_metrics_df=metric_df.copy()
csv_metrics_df.reset_index(inplace=True)
csv_metrics_df.to_csv(csv_filename, index=False)
with self.fig.batch_update():
self.plot_preprocessed_data(preprocessed_data, first)
time.sleep(self.interval)
first = False
def stop(self):
self.running = False
def plot_preprocessed_data(self, data, first):
if data is not None and not data.empty:
if first:
self.fig.data = []
for column in data.columns:
self.fig.add_trace(go.Scatter(x=data.index, y=data[column], mode='lines', name=column))
self.fig.update_layout(title='Preprocessed Metrics Over Time',
xaxis_title='Timestamp',
yaxis_title='Value')
# self.fig.show()
else:
for column in data.columns:
for i in range(len(self.fig.data) - 1):
if self.fig.data[i].name == column:
self.fig.data[i].y = data[column].values
\ No newline at end of file
# config.yaml
applications:
- name: "Surveillance System"
services:
- "motion_detector"
- "object_recognizer"
- "camera"
allowed_actions:
- "restart"
- "scale"
monitor:
interval: 1
metrics:
- "camera_fps_gauge"
- "md_fps_gauge"
- "md_processing_time_seconds"
- "or_processing_time_seconds"
- "md_e2c_transmission_time_seconds"
- "container_cpu_usage_seconds_total"
- "or_len_q_f"
indicators:
- "responstime_seconds"
- "framedropingrate"
conditions:
- "( ('responstime_seconds', '<',1))"
- "( ('framedropingrate', '<',5))"
\ No newline at end of file
# config_loader.py
import yaml
def load_config(config_file):
with open(config_file, 'r') as file:
config = yaml.safe_load(file)
return config
import docker
def getdockerservicenamefromid(container_id):
client = docker.from_env()
container_id=container_id.split("/")[-1]
try:
# Get the container details
container = client.containers.get(container_id)
# Return the container name
return container.name
except docker.errors.NotFound:
#print(f"Can't find a container with ID {container_id}")
return ""
except docker.errors.APIError as e:
#print(f"API error occurred: {str(e)}")
return ""
except Exception as e:
#print(f"An unexpected error occurred: {str(e)}")
return ""
# inference.py
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from scipy.stats import pearsonr, spearmanr, kendalltau
def find_anomalies_isolation_forest(metrics_df, contamination=0.1):
"""
Find anomalies in each metric series using Isolation Forest.
:param metrics_df: The input DataFrame containing metrics columns.
:param contamination: The proportion of outliers in the data (Isolation Forest parameter).
:return: A dictionary containing pandas Series for each column with True indicating anomalies.
"""
anomalies = {}
for column in metrics_df.columns:
metric_series = metrics_df[column]
metric_values = metric_series.values.reshape(-1, 1)
scaler = StandardScaler()
scaled_data = scaler.fit_transform(metric_values)
clf = IsolationForest(contamination=contamination, random_state=0)
clf.fit(scaled_data)
anomalies_mask = clf.predict(scaled_data) == -1
anomalies_series = pd.Series(anomalies_mask, index=metric_series.index)
anomalies[column] = anomalies_series
return anomalies
def find_anomalies_nbr(metrics_df, target_metric, threshold):
"""
Find the number of intersection anomalies between the target_metric and other columns using precomputed anomalies.
:param metrics_df: The input DataFrame containing metrics columns.
:param target_metric: The name of the target metric column for anomaly detection.
:return: A dictionary containing the number of intersection anomalies for each column compared to the target_metric.
"""
anomalies_dict = find_anomalies_isolation_forest(metrics_df)
anomaly_series = anomalies_dict[target_metric]
anomalies_target_metric = anomaly_series[anomaly_series].index.tolist()
anomalies_nbr = {}
if len(anomalies_target_metric):
for column in metrics_df.columns:
if column != target_metric:
anomaly_series = anomalies_dict[column]
anomalies_current_metric = anomaly_series[anomaly_series].index.tolist()
intersection = list(set(anomalies_target_metric) & set(anomalies_current_metric))
anomalies_nbr[column] = len(intersection)
anomalies_nbr[column] = len(intersection)/len(anomalies_target_metric)
anomalies_nbr = {k: v for k, v in anomalies_nbr.items() if v >= threshold}
return anomalies_nbr
def compute_correlations(df, method='pearson'):
num_cols = df.shape[1]
corr_matrix = np.zeros((num_cols, num_cols))
p_matrix = np.zeros((num_cols, num_cols))
for i in range(num_cols):
for j in range(num_cols):
col1, col2 = df.iloc[:, i], df.iloc[:, j]
# Drop rows where either column has NaN values
valid_idx = col1.notna() & col2.notna()
col1, col2 = col1[valid_idx], col2[valid_idx]
if method == 'pearson':
corr, p_value = pearsonr(col1, col2)
elif method == 'spearman':
corr, p_value = spearmanr(col1, col2)
elif method == 'kendall':
corr, p_value = kendalltau(col1, col2)
corr_matrix[i, j] = corr
p_matrix[i, j] = p_value
return corr_matrix, p_matrix
def find_highly_correlated_metrics(df, metric, threshold):
if metric not in df.columns:
raise ValueError(f"Metric '{metric}' not found in DataFrame columns")
pearson_corr, pearson_p = compute_correlations(df, method='pearson')
spearman_corr, spearman_p = compute_correlations(df, method='spearman')
kendall_corr, kendall_p = compute_correlations(df, method='kendall')
corr_matrix=pearson_corr
metric_idx = df.columns.get_loc(metric)
high_corr_metrics = []
for i in range(corr_matrix.shape[0]):
if i != metric_idx and abs(corr_matrix[metric_idx, i]) > threshold:
high_corr_metrics.append(df.columns[i])
return high_corr_metrics
def infer_critical_metrics(preprocessed_data,indicators):
critical_metrics={}
if preprocessed_data is not None:
for indicator in indicators:
for column in preprocessed_data.columns:
if indicator in column:
# critical_metrics[column]= find_highly_correlated_metrics(preprocessed_data,column,0.8)
critical_metrics[column]=find_anomalies_nbr(preprocessed_data,column,0.5)
return critical_metrics
return None
def infer_status(metrics_data,metrics_df,preprocessed_metrics,indacators,conditions):
if preprocessed_metrics is not None:
# Example: Calculate average value
status="good"
for condition in conditions:
for metric in metrics_data:
if metric.global_name==condition[0]:
if metric.specific_name in metrics_df.columns:
if len(metrics_df[metric.specific_name].values):
value = metrics_df[metric.specific_name].values[-1]
if not evaluate_condition(float(value),condition[1],condition[2]) :
return f"\033[91mCritical\033[0m"
critical_metrics = preprocessed_metrics
return f"\033[92mGood\033[0m"
return None
def evaluate_condition(real_value, operator, cond_value):
if operator == '<':
return real_value < cond_value
elif operator == '<=':
return real_value <= cond_value
elif operator == '>':
return real_value > cond_value
elif operator == '>=':
return real_value >= cond_value
elif operator == '==':
return real_value == cond_value
elif operator == '!=':
return real_value != cond_value
else:
raise ValueError(f"Unsupported operator: {operator}")
\ No newline at end of file
# metrics.py
import time
from prometheus_api_client import PrometheusConnect
import pandas as pd
from dockerloader import getdockerservicenamefromid
import datetime
class Metric:
def __init__(self, global_name, specific_name,service_name,values, description=False , unit_of_measure=False,):
self.global_name = global_name
self.specific_name = specific_name
self.description = description
self.service_name = service_name
self.unit_of_measure = unit_of_measure
self.values = values
def get_metrics(prometheus_url, globalmetrics,services,start_time, end_time):
prom = PrometheusConnect(url=prometheus_url, disable_ssl=True)
metric_data={}
metrics=[]
for metric in globalmetrics:
while True :
try:
data=prom.get_metric_range_data(
start_time=start_time,metric_name=metric,
end_time=end_time)
break
except :
print("error connecting to prometheus")
time.sleep(0.1)
continue
for rec in data:
if rec['metric']['job']=="otel-collector":
metrics.append(Metric(global_name=metric,
specific_name=rec['metric']['exported_job'].replace(' ', '_')+"_"+metric,
service_name=rec['metric']['exported_job'],
values=rec['values']))
if (rec['metric']['job']=="cadvisor") and len(rec['metric']['id'])>10:
service_name=getdockerservicenamefromid(rec['metric']['id']).replace(' ', '_')
for service in services:
if service in service_name:
metrics.append(Metric(global_name=metric,
specific_name= service_name+ "_" + metric,
service_name=service_name ,
values=rec['values']))
metric_data[metric] =data
metrics_df = pd.DataFrame()
for metric in metrics:
try :
metric_df = pd.DataFrame(metric.values, columns=['timestamp', metric.specific_name])
metric_df['timestamp'] = pd.to_datetime(metric_df['timestamp'], unit='s')
metric_df.set_index('timestamp', inplace=True)
metric_df[metric.specific_name]=pd.to_numeric(metric_df[metric.specific_name], errors='coerce')
if 'total' in metric.specific_name:
metric_df= metric_df.diff()
if metrics_df.empty:
metrics_df = metric_df
else:
metrics_df = metrics_df.join(metric_df, how='outer')
except Exception:
continue
return (metrics, metrics_df)
# preprocess.py
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from prometheus_api_client import MetricRangeDataFrame
import numpy as np
def preprocess_metrics(metric_data):
if not metric_data.empty:
metric_df = metric_data.copy() # Create a copy of the data to avoid modifying the original
for col in metric_df.columns:
metric_df[col] = pd.to_numeric(metric_df[col], errors='coerce')
metric_df.interpolate(method='linear', inplace=True)
metric_df = metric_df.dropna(axis=1, how='all')
metric_df = metric_df.dropna()
#for col in metric_df.columns:
# if metric_df[col].dtype.kind in 'biufc': # Check if column is numeric
# metric_df[col].fillna(metric_df[col].mean(), inplace=True)
# Fill missing values with the mean of each column
#numeric_data.fillna(numeric_data.mean(), inplace=True)
# Normalizing the data
scaler = MinMaxScaler()
# numeric_data = metric_df.select_dtypes(include=[np.number])
# normalized_values = scaler.fit_transform(metric_df)
#normalized_data = pd.DataFrame(normalized_values, columns=metric_df.columns, index=metric_df.index)
return metric_df
return None
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, spearmanr, kendalltau
from pgmpy.estimators import HillClimbSearch, BicScore
from pgmpy.models import BayesianNetwork
# Step 1: Load Data
df = pd.read_csv('output/preprocessed_data_2024-07-08_15.csv')
# Step 2: Convert Columns to Numeric
df = df.apply(pd.to_numeric, errors='coerce')
# Step 3: Drop Non-Numeric Columns
df = df.dropna(axis=1, how='all')
# Step 4: Compute Correlation Coefficients
def compute_correlations(df, method='pearson'):
num_cols = df.shape[1]
corr_matrix = np.zeros((num_cols, num_cols))
p_matrix = np.zeros((num_cols, num_cols))
for i in range(num_cols):
for j in range(num_cols):
col1, col2 = df.iloc[:, i], df.iloc[:, j]
valid_idx = col1.notna() & col2.notna()
col1, col2 = col1[valid_idx], col2[valid_idx]
if method == 'pearson':
corr, p_value = pearsonr(col1, col2)
elif method == 'spearman':
corr, p_value = spearmanr(col1, col2)
elif method == 'kendall':
corr, p_value = kendalltau(col1, col2)
corr_matrix[i, j] = corr
p_matrix[i, j] = p_value
return corr_matrix, p_matrix
# Compute Pearson, Spearman, and Kendall correlations
pearson_corr, pearson_p = compute_correlations(df, method='pearson')
spearman_corr, spearman_p = compute_correlations(df, method='spearman')
kendall_corr, kendall_p = compute_correlations(df, method='kendall')
def plot_heatmap(corr_matrix, title):
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', xticklabels=df.columns, yticklabels=df.columns)
plt.title(title)
plt.show()
# Plot heatmaps
plot_heatmap(pearson_corr, 'Pearson Correlation')
plot_heatmap(spearman_corr, 'Spearman Correlation')
plot_heatmap(kendall_corr, 'Kendall Correlation')
# Step 6: Function to find highly correlated metrics
def find_highly_correlated_metrics(corr_matrix, metric, threshold):
if metric not in df.columns:
raise ValueError(f"Metric '{metric}' not found in DataFrame columns")
metric_idx = df.columns.get_loc(metric)
high_corr_metrics = []
for i in range(corr_matrix.shape[0]):
if i != metric_idx and abs(corr_matrix[metric_idx, i]) > threshold:
high_corr_metrics.append(df.columns[i])
return high_corr_metrics
# Example usage:
metric_to_check = 'Object_Recognizer__responstime_seconds' # Replace with your metric name
threshold_value = 0.8 # Replace with your threshold value
highly_correlated_metrics = find_highly_correlated_metrics(pearson_corr, metric_to_check, threshold_value)
print(
f"Metrics highly correlated with '{metric_to_check}' (threshold > {threshold_value}): {highly_correlated_metrics}")
# Step 7: Function to find the Markov Blanket of a given metric
def find_markov_blanket(df, target_metric):
# Estimate the Bayesian Network structure using HillClimbSearch
hc = HillClimbSearch(df)
model = hc.estimate(scoring_method=BicScore(df))
if target_metric not in model.nodes():
raise ValueError(f"Metric '{target_metric}' not found in Bayesian Network nodes")
# Get the Markov Blanket of the target metric
markov_blanket = model.get_markov_blanket(target_metric)
return markov_blanket
# Example usage:
target_metric = 'Object_Recognizer__responstime_seconds' # Replace with your target metric
markov_blanket = find_markov_blanket(df, target_metric)
print(f"Markov Blanket of '{target_metric}': {markov_blanket}")
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from tabulate import tabulate
# Load Data
df = pd.read_csv('output/preprocessed_data_2024-07-08_15.csv')
# Convert Columns to Numeric
df = df.apply(pd.to_numeric, errors='coerce')
# Drop Non-Numeric Columns
df = df.dropna(axis=1, how='all')
# Drop Rows with NaN Values
df = df.dropna()
def find_anomalies_dbscan(df, target_metric, eps=0.5, min_samples=2):
"""
Find simultaneous anomalies in multiple columns compared to one target column using DBSCAN.
:param df: The input DataFrame.
:param target_metric: The name of the target metric column.
:param eps: The maximum distance between two samples for one to be considered as in the neighborhood of the other (DBSCAN parameter).
:param min_samples: The number of samples (or total weight) in a neighborhood for a point to be considered as a core point (DBSCAN parameter).
:return: A DataFrame containing the anomalies.
"""
metrics = df.columns.difference([target_metric])
# Standardize the data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df[metrics])
# Add the target metric to the scaled data
target_data = df[target_metric].values.reshape(-1, 1)
combined_data = np.hstack((scaled_data, target_data))
# Apply DBSCAN
db = DBSCAN(eps=eps, min_samples=min_samples).fit(combined_data)
# Identify anomalies (points classified as noise, labeled as -1)
df['dbscan_label'] = db.labels_
anomalies = df[df['dbscan_label'] == -1].drop(columns=['dbscan_label'])
# Find which metrics had simultaneous anomalies
scaled_df = pd.DataFrame(scaled_data, columns=metrics)
anomaly_indices = anomalies.index
anomaly_metrics = []#scaled_df.loc[anomaly_indices]
return anomalies, anomaly_metrics
# Find anomalies in the sample DataFrame
anomalies, anomaly_metrics = find_anomalies_dbscan(df, 'Object_Recognizer__responstime_seconds', eps=1.5, min_samples=2)
print("Anomalies Detected:")
print(tabulate(anomalies, headers='keys', tablefmt='psql'))
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from tabulate import tabulate
def find_anomalies_isolation_forest(metrics_df, contamination=0.1):
"""
Find anomalies in each metric series using Isolation Forest.
:param metrics_df: The input DataFrame containing metrics columns.
:param contamination: The proportion of outliers in the data (Isolation Forest parameter).
:return: A dictionary containing pandas Series for each column with True indicating anomalies.
"""
anomalies = {}
for column in metrics_df.columns:
metric_series = metrics_df[column]
metric_values = metric_series.values.reshape(-1, 1)
scaler = StandardScaler()
scaled_data = scaler.fit_transform(metric_values)
clf = IsolationForest(contamination=contamination, random_state=0)
clf.fit(scaled_data)
anomalies_mask = clf.predict(scaled_data) == -1
anomalies_series = pd.Series(anomalies_mask, index=metric_series.index)
anomalies[column] = anomalies_series
return anomalies
def find_anomalies_nbr(metrics_df, target_metric, threshold):
"""
Find the number of intersection anomalies between the target_metric and other columns using precomputed anomalies.
:param metrics_df: The input DataFrame containing metrics columns.
:param target_metric: The name of the target metric column for anomaly detection.
:return: A dictionary containing the number of intersection anomalies for each column compared to the target_metric.
"""
anomalies_dict = find_anomalies_isolation_forest(metrics_df)
anomaly_series = anomalies_dict[target_metric]
anomalies_target_metric = anomaly_series[anomaly_series].index.tolist()
anomalies_nbr = {}
if len(anomalies_target_metric):
for column in metrics_df.columns:
if column != target_metric:
anomaly_series = anomalies_dict[column]
anomalies_current_metric = anomaly_series[anomaly_series].index.tolist()
intersection = list(set(anomalies_target_metric) & set(anomalies_current_metric))
anomalies_nbr[column] = len(intersection)
anomalies_nbr[column] = len(intersection)/len(anomalies_target_metric)
anomalies_nbr = {k: v for k, v in anomalies_nbr.items() if v >= threshold}
return anomalies_nbr
# Example usage:
df = pd.read_csv('output/preprocessed_data_2024-07-08_15.csv')
df = df.apply(pd.to_numeric, errors='coerce')
df = df.dropna(axis=1, how='all')
df = df.dropna()
target_metric = 'Object_Recognizer__responstime_seconds'
anomalies_nbr = find_anomalies_nbr(df, target_metric,0.8)
print("Intersection Anomalies Number:")
print(anomalies_nbr)
#!/bin/bash
# Define variables
DOCKER_USERNAME="medkaddour"
SERVICES=("camera" "motion_detector" "object_recognizer")
TAG="latest"
DEPLOY_ENV=${1:-local} # Default to local if no argument is provided
# Function to build and push a Docker image
build_and_push() {
local service=$1
echo "Building Docker image for ${service}..."
docker build -t ${DOCKER_USERNAME}/${service}:${TAG} ./${service}
if [ "$DEPLOY_ENV" == "cloud" ]; then
echo "Pushing Docker image for ${service} to Docker Hub..."
docker push ${DOCKER_USERNAME}/${service}:${TAG}
fi
}
# Step 1: Build Docker images for all services
for service in "${SERVICES[@]}"; do
build_and_push ${service}
done
# Step 2: Deploy the images
if [ "$DEPLOY_ENV" == "local" ]; then
echo "Running Docker images locally with Docker Compose..."
docker-compose up -d --build
elif [ "$DEPLOY_ENV" == "cloud" ]; then
echo "Pulling and running Docker images from Docker Hub with Docker Compose..."
docker-compose pull
docker-compose up -d --build
else
echo "Invalid DEPLOY_ENV value. Please use 'local' or 'cloud'."
exit 1
fi
echo "Deployment completed."
# Kubernetes Deployment for Object Recognizer
apiVersion: apps/v1
kind: Deployment
metadata:
name: object-recognizer
spec:
replicas: 1
selector:
matchLabels:
app: object-recognizer
template:
metadata:
labels:
app: object-recognizer
spec:
containers:
- name: object-recognizer
image: medkaddour/object_recognizer
ports:
- containerPort: 9999
# Kubernetes Deployment for Motion Detector
apiVersion: apps/v1
kind: Deployment
metadata:
name: motion-detector
spec:
replicas: 1
selector:
matchLabels:
app: motion-detector
template:
metadata:
labels:
app: motion-detector
spec:
containers:
- name: motion-detector
image: medkaddour/motion_detector
ports:
- containerPort: 9998
env:
- name: OBJECT_RECOGNIZER_URL
value: "http://object-recognizer:9999"
# Kubernetes Deployment for Camera
apiVersion: apps/v1
kind: Deployment
metadata:
name: camera
spec:
replicas: 1
selector:
matchLabels:
app: camera
template:
metadata:
labels:
app: camera
spec:
containers:
- name: camera
image: medkaddour/camera
env:
- name: MOTION_DETECTOR_URL
value: "http://motion-detector:9998"
# Add other environment variables as needed
# Kubernetes Deployment for Jaeger
apiVersion: apps/v1
kind: Deployment
metadata:
name: jaeger
spec:
replicas: 1
selector:
matchLabels:
app: jaeger
template:
metadata:
labels:
app: jaeger
spec:
containers:
- name: jaeger
image: jaegertracing/all-in-one
ports:
- containerPort: 16686
- containerPort: 6831
- containerPort: 14268
- containerPort: 14250
# Kubernetes Service for Jaeger
apiVersion: v1
kind: Service
metadata:
name: jaeger
spec:
selector:
app: jaeger
ports:
- protocol: TCP
port: 16686
targetPort: 16686
- protocol: UDP
port: 6831
targetPort: 6831
- port: 14268
targetPort: 14268
- port: 14250
targetPort: 14250
# Kubernetes Deployment for Zipkin
apiVersion: apps/v1
kind: Deployment
metadata:
name: zipkin
spec:
replicas: 1
selector:
matchLabels:
app: zipkin
template:
metadata:
labels:
app: zipkin
spec:
containers:
- name: zipkin
image: openzipkin/zipkin:latest
ports:
- containerPort: 9411
# Kubernetes Service for Zipkin
apiVersion: v1
kind: Service
metadata:
name: zipkin
spec:
selector:
app: zipkin
ports:
- port: 9411
targetPort: 9411
# Kubernetes Deployment for Otel Collector
apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
containers:
- name: otel-collector
image: otel/opentelemetry-collector-contrib
ports:
- containerPort: 1888
- containerPort: 8888
- containerPort: 8889
- containerPort: 13133
- containerPort: 4317
- containerPort: 55679
# Add other container ports as needed
volumeMounts:
- name: otel-collector-config
mountPath: /etc/otel-collector-config.yaml
subPath: otel-collector-config.yaml
volumes:
- name: otel-collector-config
configMap:
name: otel-collector-config
# Kubernetes Service for Prometheus
apiVersion: v1
kind: Service
metadata:
name: prometheus
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
# Kubernetes ConfigMap for Otel Collector Configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
data:
otel-collector-config.yaml: |
# Your Otel Collector configuration goes here
6
\ No newline at end of file
# ReadMe for Surveillance System with Edge to Cloud Video Processing
## Overview
This project demonstrates a video processing surveillance system designed to enhance safety in residential areas and along roads by identifying and alerting residents and authorities to the presence of dangerous animals. The system integrates edge devices and cloud servers to ensure quick and accurate responses while minimizing data transmission to the cloud. A feedback mechanism is implemented to monitor, understand, and adjust to changing conditions, ensuring the system adheres to defined Service Level Objectives (SLOs).
## System Components
### Camera
- **Function:** Captures video frames from a camera or video file.
- **Process:** Resizes frames, serializes them using pickle, and sends them over the network.
- **Connection:** Establishes a connection with the Motion Detection service to send video frames for motion detection.
### Motion Detection (Edge)
- **Technology Used:** OpenCV and socket programming.
- **Function:** Continuously receives video frames from cameras, applies motion detection algorithms to detect significant changes in consecutive frames.
- **Alert:** Triggers alerts when motion is detected and forwards relevant frames to the Object Recognition component for further analysis.
### Object Recognition (Cloud)
- **Technology Used:** YOLOv3 model.
- **Function:** Receives video frames from Motion Detection. Performs object detection to identify dangerous animals.
- **Alert:** Notifies affected areas if a dangerous animal is detected.
## Scenarios Affecting Performance
### Normal Conditions
- **Description:** System detects and processes video frames efficiently, meeting all SLOs without any issues.
### High Traffic Load
- **Description:** During peak hours or significant increase in video feeds, the system experiences higher traffic load.
- **Impact:** Potential decrease in frame rate frequency.
### High Processing Time
- **Description:** Some nodes may experience longer processing times due to a lack of computational resources.
### Large Distance
- **Description:** Motion Detection deployed far from cameras.
- **Impact:** Delay in frame transmission.
## Metrics Collected
1. Number of frames received per second.
2. Execution time.
3. Duration of Edge-to-Cloud transmission.
4. CPU usage.
## Actions
1. **Scaling CPU on Cloud:** Adjusting computational resources on the cloud to handle increased load.
2. **Moving an instance of Motion Detector:** Repositioning the Motion Detection service to optimize performance.
## Service Level Objectives (SLOs)
1. **Response Time:** The time taken to detect and respond to an event.
2. **Frame Dropping Rate:** The frequency of dropped frames during transmission or processing.
## Feedback Mechanism
- **Role:** Monitors the application’s status in real-time, evaluates against SLOs, and identifies metrics significantly affecting SLOs.
- **Action:** In critical situations, applies corrective actions and records the impact on metrics to ensure optimal performance.
## Diagram
![use case: Video Processing Edge to Cloud Application](usecase.png)
*Figure 1: Real-time detection system for identifying and alerting dangerous animals.*
## References
- OpenCV library for motion detection.
- YOLOv3 model for object recognition.
---
Ensure all components are properly configured and connected before deploying the system. Regularly monitor performance metrics and utilize the feedback mechanism to maintain efficient and effective surveillance operations.
\ No newline at end of file
groups:
- name: example
rules:
- record: container_cpu_usage_rate_per_second
expr: rate(container_cpu_usage_seconds_total[1s])
usecase.png

116 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment