Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 36e18c3d authored by AUTERNAUD Alex's avatar AUTERNAUD Alex
Browse files

global_planner_node to ROS

parent 782a7674
No related branches found
No related tags found
No related merge requests found
...@@ -172,7 +172,8 @@ include_directories( ...@@ -172,7 +172,8 @@ include_directories(
## Mark executable scripts (Python etc.) for installation ## Mark executable scripts (Python etc.) for installation
## in contrast to setup.py, you can choose the destination ## in contrast to setup.py, you can choose the destination
catkin_install_python(PROGRAMS scripts/behavior_generator_main.py catkin_install_python(PROGRAMS
scripts/behavior_generator_main.py
scripts/behavior_go_to_body_action_client_main.py scripts/behavior_go_to_body_action_client_main.py
scripts/behavior_go_to_body_action_server_main.py scripts/behavior_go_to_body_action_server_main.py
scripts/behavior_go_to_position_action_client_main.py scripts/behavior_go_to_position_action_client_main.py
...@@ -181,6 +182,7 @@ catkin_install_python(PROGRAMS scripts/behavior_generator_main.py ...@@ -181,6 +182,7 @@ catkin_install_python(PROGRAMS scripts/behavior_generator_main.py
scripts/behavior_look_at_body_action_server_main.py scripts/behavior_look_at_body_action_server_main.py
scripts/behavior_look_at_position_action_client_main.py scripts/behavior_look_at_position_action_client_main.py
scripts/behavior_look_at_position_action_server_main.py scripts/behavior_look_at_position_action_server_main.py
scripts/behavior_global_path_planner_main.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
) )
......
#!/usr/bin/env python3
import rospy
import pkg_resources
import yaml
from robot_behavior import GlobalPathPlannerNode
if __name__ == '__main__':
# Init node
rospy.init_node('GlobalPathPlannerNode', log_level=rospy.DEBUG, anonymous=True)
controller = GlobalPathPlannerNode()
controller.run()
controller.shutdown()
# rospy.spin()
from .behavior_global_path_planner import GlobalPathPlanner
from .behavior_generator_node import BehaviorGenerator from .behavior_generator_node import BehaviorGenerator
from .behavior_global_path_planner_node import GlobalPathPlannerNode
from .behavior_go_to_body_action_client import GoToBodyActionClient from .behavior_go_to_body_action_client import GoToBodyActionClient
from .behavior_go_to_body_action_server import GoToBodyActionServer from .behavior_go_to_body_action_server import GoToBodyActionServer
from .behavior_go_to_group_action_client import GoToGroupActionClient from .behavior_go_to_group_action_client import GoToGroupActionClient
......
# This file, part of Social MPC in the WP6 of the Spring project,
# is part of a project that has received funding from the
# European Union’s Horizon 2020 research and innovation programme
# under grant agreement No 871245.
#
# Copyright (C) 2020-2023 by Inria
# Authors : Victor Sanchez
# victor.sanchez@inria.fr
#
# Code inspired from : https://gitlab.inria.fr/spring/wp6_robot_behavior/Motor_Controller/-/blob/master/social_mpc/path.py?ref_type=heads
# From : Alex Auternaud, Timothée Wintz
# alex.auternaud@inria.fr
# timothee.wintz@inria.fr
import cv2
import skfmm
import numpy.ma as ma
import numpy as np
from robot_behavior.utils import constraint_angle, optimal_path_2d
# from drl_navigation.utils.social_spaces import SocialSpaces
import rospy
import exputils as eu
class GlobalPathPlanner():
def __init__(self, waypoint_resolution_in_meters=0.5, threshold_distance_arrival=1.0, threshold_distance_orientation=0.5, max_target_angle=np.pi/4):
self.state_config = None
self.threshold_distance_arrival = threshold_distance_arrival
self.max_target_angle = max_target_angle
self.threshold_distance_orientation = threshold_distance_orientation
self.waypoint_resolution_in_meters = waypoint_resolution_in_meters
self.distance = None
self.ssn = None
self.global_map_with_waypoints = None
self.map_with_subgoals=None
def set_state_config(self,
global_map_size=[[-11.26118716597557, 12.588813189417124], [-10.207198709249496, 14.492801658809185]],
global_map_height=494,
global_map_width=477,
global_map_scale=1/0.05
):
self.state_config=eu.AttrDict(global_map_size=global_map_size,
global_map_height=global_map_height,
global_map_width=global_map_width,
global_map_scale=global_map_scale
)
self.y_coordinates = np.linspace(global_map_size[1][0], global_map_size[1][1], self.state_config.global_map_height)
self.x_coordinates = np.linspace(global_map_size[0][0], global_map_size[0][1], self.state_config.global_map_width)
# self.ssn =SocialSpaces(
# bbox=global_map_size,
# map_shape=(self.state_config.global_map_height,
# self.state_config.global_map_width))
self.map_with_subgoals = np.zeros((global_map_width, global_map_height))
def set_global_map_for_waypoints(self, map):
self.global_map_with_waypoints = map
def position_to_px_coord(self, pos):
r''' Tranform real position to coordinate in pixel in the global map coordinate system '''
if self.state_config is None:
return None
scale = self.state_config.global_map_scale
x_min = self.state_config.global_map_size[0][0]
y_max = self.state_config.global_map_size[1][1]
i = (y_max - pos[1]) * scale
j = (pos[0] - x_min) * scale
return int(i), int(j)
def px_coord_grid(self):
r''' Tranform coordinate in pixel in the global map to real position coordinate '''
if self.state_config is None:
return None
scale = self.state_config.global_map_scale
x_min = self.state_config.global_map_size[0][0]
y_min = self.state_config.global_map_size[1][0]
x = [x_min + (j + 0.5) /
scale for j in range(self.state_config.global_map_width)]
y = [y_min + (i + 0.5) /
scale for i in range(self.state_config.global_map_height)]
return x, y
def get_distance_map(self, global_map, big_number=100):
r''' Set the distance map from sffmm
[see https://pythonhosted.org/scikit-fmm/ for more info about distance map]
'''
if self.state_config is None:
return None
#print(self.target)
t_i, t_j = self.position_to_px_coord(self.target)
phi = ma.MaskedArray(np.ones(global_map.shape), mask=(global_map > 0))
phi.data[t_i, t_j] = 0.0
dx = 1/self.state_config.global_map_scale
self.distance = skfmm.distance(phi, dx=dx)
def get_waypoints(self, pos):
r''' Return the next waypoint position from a given position
pos : [x,y,yaw] '''
#x, y = self.px_coord_grid()
if self.state_config is None:
return None, None, None, None, None
# x, y = self.ssn.x_coordinates, self.ssn.y_coordinates
x, y = self.x_coordinates, self.y_coordinates
optimal_path_2d_output = optimal_path_2d(
travel_time = np.flip(self.distance, axis=0),
starting_point = pos[:2],
dx=self.waypoint_resolution_in_meters,
coords=(x, y),
max_d_orientation=0.3)
if optimal_path_2d_output :
xl, yl, vxl, vyl, dl = optimal_path_2d_output
else :
return None
if len(xl) == 0 or len(yl) == 0 or len(vxl) == 0 or len(vyl) == 0:
return None
else :
return xl, yl, vxl, vyl, dl
def run(self, global_map, target_pose, robot_pose):
if robot_pose is None or (target_pose is None or global_map is None): # not ready
if robot_pose is None :
rospy.logwarn('Warning robot_pose is None')
if target_pose is None :
rospy.logwarn('Warning target_pose is None')
if global_map is None :
rospy.logwarn('Warning global_map is None')
self.distance = None
return None
self.target = [target_pose[0], target_pose[1], target_pose[2]]
robot_pose = [robot_pose[0], robot_pose[1], robot_pose[2]]
if np.linalg.norm(np.array(robot_pose[0:2]) - np.array(self.target[0:2]))<self.waypoint_resolution_in_meters:
return self.target[0:2]
self.get_distance_map(global_map)
get_waypoint_output = self.get_waypoints(robot_pose)
if get_waypoint_output :
xl, yl, vxl, vyl, dl = get_waypoint_output
# Victor's method
# wpt_pos_closest = [xl[0], yl[0]]
# Alex's method
target_dist = np.linalg.norm(np.array(robot_pose[:2]) - np.array(self.target[:2]))
waypoint_index = min(int(self.threshold_distance_arrival / self.waypoint_resolution_in_meters), len(dl)-1) # get the index of the next waypoint at threshold_distance_arrival with self.waypoint_resolution_in_meters step
if target_dist < self.threshold_distance_orientation: # if we are near the target, we only turn on ourself
wpt_pos_closest = [robot_pose[0], robot_pose[1], self.target[2]]
else :
angle = np.abs(constraint_angle(np.arctan2(yl[waypoint_index] - robot_pose[1], xl[waypoint_index] - robot_pose[0]) - robot_pose[2]))
# if the angle to the next point is too big, also first turn on ourself to its direction
if angle > self.max_target_angle:
wpt_pos_closest = [robot_pose[0], robot_pose[1], np.arctan2(yl[waypoint_index] - robot_pose[1], xl[waypoint_index] - robot_pose[0])]
else :
if target_dist < self.threshold_distance_arrival: # we are near the target
wpt_pos_closest = [self.target[0],
self.target[1],
np.arctan2(yl[-1] - robot_pose[1], xl[-1] - robot_pose[0])]
else: # nominal case else, point on shortest path with orientation towards target
wpt_pos_closest = [xl[waypoint_index], yl[waypoint_index], np.arctan2(yl[waypoint_index] - robot_pose[1], xl[waypoint_index] - robot_pose[0])]
#print(wpt_pos_closest)
self.get_map_with_subgoals(waypoints=(xl, yl))
return wpt_pos_closest
else :
return None
def get_map_with_subgoals(self, waypoints):
#self.global_map_with_waypoints = np.where(self.global_map_with_waypoints>0, 0 ,1)
self.global_map_with_waypoints = np.zeros((self.state_config.global_map_height, self.state_config.global_map_width))
for id in range(len(waypoints[0])):
if str(waypoints[0][id]) != 'nan':
x_px, y_px = self.position_to_px_coord(pos=(waypoints[0][id],waypoints[1][id]))
x_px = np.clip(x_px, 0, np.shape(self.global_map_with_waypoints)[0]-1)
y_px = np.clip(y_px, 0, np.shape(self.global_map_with_waypoints)[1]-1)
self.global_map_with_waypoints[x_px][y_px] = 1
self.global_map_with_waypoints = cv2.dilate(self.global_map_with_waypoints, kernel = np.ones((2,2)))
\ No newline at end of file
#!/usr/bin/env python3
import numpy as np
from collections import namedtuple
import pkg_resources
import tf
import os
import yaml
import cv2
from multiprocessing import Lock
import plotly.express as px
import plotly.graph_objects as go
import numpy as np
import rospy
from geometry_msgs.msg import Point, Pose, Quaternion, Twist, Vector3, PoseArray, Transform
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64
from nav_msgs.msg import Odometry, OccupancyGrid, MapMetaData
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
import actionlib
from robot_behavior.behavior_global_path_planner import GlobalPathPlanner
from robot_behavior.utils import constraint_angle
from social_mpc.config.config import ControllerConfig
import ast
import sys
from subprocess import Popen, PIPE
import time
class GlobalPathPlannerNode:
def __init__(self):
# os.environ['XLA_PYTHON_CLIENT_PREALLOCATE'] = 'false'
mpc_config_name = rospy.get_param('~mpc_config_name', 'None')
mpc_config_path = None if mpc_config_name == 'None' else mpc_config_name
self.mpc_config_path = mpc_config_path
self.namespace_slam = rospy.get_param('~namespace_slam', '/slam')
self.map_frame = rospy.get_param('~map_frame', 'map')
self.robot_frame = rospy.get_param('~robot_frame', 'base_footprint')
self.global_occupancy_map_topic = rospy.get_param('~global_occupancy_map_topic', '/slam/global_map_rl_navigation')
self.read_config(filename=self.mpc_config_path)
self.global_path_planner = GlobalPathPlanner(
waypoint_resolution_in_meters=self.controller_config.orientation_distance_threshold,
threshold_distance_orientation=self.controller_config.orientation_distance_threshold,
threshold_distance_arrival=self.controller_config.waypoint_distance)
self.compute_waypoint_only_once = True
self.waypoint_list = None
self.waypoint_list_cursor = 0
self.joint_states_data = None
self.global_map_data = None
self.local_map_data = None
self.rtabmap_ready = False
self.global_map_static = None
self.go_to_position_target = None
self.go_to_done = False
# To get indexes of the joints
self.get_indexes = False
self.base_goal = False
self.goal_position = [5, 1]
self.wrong_target = False
self.x_wp = 0.
self.y_wp = 0.
self.x_final = 0.
self.y_final = 0.
self.th_final = 0.
self.yaw_wp = 0.
self.flag_waypoint_okay = False
self.robot_x_position = 0.
self.robot_y_position = 0.
self.robot_yaw = 0.
self.robot_position = [0, 0]
self.robot_position_former = [0, 0]
self.former_robot_position = None
self.former_yaw = None
self.former_local_map = None
self.timer_local_map_change = 0
self.former_local_map_step_function = None
self.timer_local_map_change_step_function = 0
self.lock_data_entrance = Lock()
# continuous flags
self.is_continuous_go_to = False
self.continuous_go_to_done = False
self.robot_config = None
self.controller_status_msg = None
self.timestamp_tracking= None
self.timestamp_tracking_latest = None
self.init_ros_subscriber_and_publicher()
self.tf_broadcaster = tf.TransformBroadcaster() # Publish Transform to look for human
self.tf_listener = tf.TransformListener() # Listen Transform to look for human
self._check_all_sensors_ready()
rospy.loginfo("BehaviorGenerator Initialization Ended")
def read_config(self, filename=None):
if filename is None:
filename = pkg_resources.resource_filename(
'social_mpc', 'config/social_mpc.yaml')
elif os.path.isfile(filename):
self.passed_config_loaded = True
else:
filename = pkg_resources.resource_filename(
'social_mpc', 'config/social_mpc.yaml')
config = yaml.load(open(filename), Loader=yaml.FullLoader)
self.controller_config = ControllerConfig(config)
self.goal_finder_enabled = self.controller_config.goal_finder_enabled
self.path_planner_enabled = self.controller_config.path_planner_enabled
self.update_goals_enabled = self.controller_config.update_goals_enabled
def init_ros_subscriber_and_publicher(self):
r''' Initialize the subscribers and publishers '''
self._subscribers = []
self._publishers = []
self._timers = []
### ROS Subscribers
self._subscribers.append(rospy.Subscriber(self.global_occupancy_map_topic, OccupancyGrid, callback=self._global_map_callback, queue_size=1))
self.time_per_ros_step = 2 # in sec
### ROS Publishers
# self._cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)
# self._publishers.append(self._cmd_vel_pub)
### Publishing map in Rviz
self._global_waypoint_map_pub = rospy.Publisher(self.namespace_slam + '/global_waypoint_map', OccupancyGrid, queue_size=10, latch=True)
self._publishers.append(self._global_waypoint_map_pub)
rospy.loginfo("Initializing the BehaviorGenerator")
# self._check_publishers_connection()
def run(self):
r''' Runs the ros wrapper '''
init_t = rospy.Time.now()
while not rospy.is_shutdown():
self.step(init_t)
def step(self, init_time):
r''' Step the ros wrapper'''
# Localizing the robot in the environment
self.get_robot_pose()
if self.global_map_static is not None :
if np.linalg.norm(np.array(self.robot_position_former) - np.array(self.robot_position)) > 0.05 :
# Case where the waypoints were not computed yet or when the goal position was changed
waypoint_target = self.global_path_planner.run(
global_map = cv2.dilate(self.global_map_static,kernel=cv2.getStructuringElement(cv2.MORPH_RECT,(8,8)),iterations = 1),
target_pose = [self.goal_position[0],self.goal_position[1], 0],
robot_pose=[self.robot_x_position, self.robot_y_position, self.robot_yaw]
)
if waypoint_target :
self.flag_waypoint_okay = True
self.x_wp, self.y_wp, self.yaw_wp = waypoint_target
print('Robot position : {} | Waypoint position : {}'.format(self.robot_position, waypoint_target))
else :
self.flag_waypoint_okay = False
rospy.logwarn("The waypoint was not computed properly")
if self.global_path_planner.global_map_with_waypoints is not None :
self.publish_global_waypoint_map(self.global_path_planner.global_map_with_waypoints)
else :
rospy.logwarn('waypoint map not ready !')
self.robot_position_former = self.robot_position
self.publish_tf_go_to_goals()
rospy.sleep(self.controller_config.path_loop_time)
def publish_tf_go_to_goals(self):
current_time = rospy.Time.now()
# to debug
quat_final = tf.transformations.quaternion_from_euler(0, 0, self.th_final)
self.x_final, self.y_final = self.goal_position
self.tf_broadcaster.sendTransform(
(self.x_final, self.y_final, 0),
quat_final,
current_time,
"final goal",
self.map_frame
)
if self.flag_waypoint_okay:
quat_wp = tf.transformations.quaternion_from_euler(0, 0, self.yaw_wp)
self.tf_broadcaster.sendTransform(
(self.x_wp, self.y_wp, 0),
quat_wp,
current_time,
"intermediate waypoint goal",
self.map_frame
)
def get_robot_pose(self):
r""" Function that compute the position of the robot on the map """
try:
map_2_robot = self.tf_listener.lookupTransform(self.map_frame, self.robot_frame, self.timestamp_tracking_latest)
except:
try:
map_2_robot = self.tf_listener.lookupTransform(self.map_frame, self.robot_frame, rospy.Time(0))
#rospy.logwarn("Could not get transformation between {} and {} at correct time. Using latest available.".format(self.map_frame, self.robot_frame))
except:
#rospy.logerr("Could not get transformation between {} and {}.".format(self.map_frame, self.robot_frame))
return None
self.robot_x_position, self.robot_y_position, _ = map_2_robot[0]
(_, _, yaw) = tf.transformations.euler_from_quaternion([map_2_robot[1][0],
map_2_robot[1][1],
map_2_robot[1][2],
map_2_robot[1][3]])
self.robot_yaw = float(constraint_angle(yaw))
self.robot_position = np.array([self.robot_x_position, self.robot_y_position])
def publish_global_waypoint_map(self, map):
self._global_waypoint_map_pub.publish(self.set_global_map_object(map))
def set_global_map_object(self, map):
occupancy_grid = OccupancyGrid()
occupancy_grid.info = MapMetaData()
occupancy_grid.info.map_load_time = rospy.Time.now()
occupancy_grid.info.resolution = self.global_map_resolution
occupancy_grid.info.width = self.global_map_width
occupancy_grid.info.height = self.global_map_height
occupancy_grid.info.origin = self.origin_global_map
occupancy_grid.data = (np.flip(map, axis=0)*100).flatten().astype(np.int8)
occupancy_grid.header.stamp = self.timestamp_global_map_cb
occupancy_grid.header.frame_id = self.map_frame
return occupancy_grid
def _global_map_callback(self, data):
r''' Function that computes the global map from the data and store it into the memory under self.global_map_static '''
if data:
with self.lock_data_entrance:
self.global_map_data = data
self.timestamp_global_map_cb = self.global_map_data.header.stamp
self.x_global_map = self.global_map_data.info.origin.position.x
self.y_global_map = self.global_map_data.info.origin.position.y
self.origin_global_map = self.global_map_data.info.origin
self.global_map_width = self.global_map_data.info.width
self.global_map_height = self.global_map_data.info.height
self.global_map_resolution = self.global_map_data.info.resolution
self.global_map_size = [[self.x_global_map, self.x_global_map + self.global_map_width*self.global_map_resolution],[self.y_global_map, self.y_global_map + self.global_map_height*self.global_map_resolution]]
self.last_shape_global_map = (self.global_map_height, self.global_map_width)
self.global_map_static = np.flip((np.asarray(self.global_map_data.data) / 100).reshape(self.last_shape_global_map), axis=0)
self.global_map_static_original = self.global_map_data.data
if self.global_path_planner.state_config is None :
self.global_path_planner.set_state_config(global_map_size=self.global_map_size,
global_map_height=self.global_map_height,
global_map_width=self.global_map_width,
global_map_scale=1/0.05
)
def shutdown(self):
rospy.loginfo("Stopping the BehaviorGenerator")
self.close()
rospy.loginfo("Killing the BehaviorGenerator node")
def close(self):
if self._subscribers:
for subscriber in self._subscribers:
subscriber.unregister()
if self._publishers:
for publisher in self._publishers:
if isinstance(publisher, dict):
for pub in publisher.values():
pub.unregister()
else:
publisher.unregister()
if self._timers:
for timer in self._timers:
timer.shutdown()
if self._action_server :
self._action_server.shutdown()
def _check_all_sensors_ready(self):
rospy.logdebug("START ALL SENSORS READY")
self._check_rtabmap_ready()
self._check_global_map_ready()
rospy.logdebug("ALL SENSORS READY")
def _check_global_map_ready(self):
self.global_map_data = None
rospy.logdebug("Waiting for {} to be READY...".format(self.global_occupancy_map_topic))
while self.global_map_data is None and not rospy.is_shutdown():
try:
self.global_map_data = rospy.wait_for_message(self.global_occupancy_map_topic, OccupancyGrid, timeout=5.0)
rospy.logdebug("Current {} READY=>".format(self.global_occupancy_map_topic))
except:
rospy.logerr("Current {} not ready yet, retrying for getting global map".format(self.global_occupancy_map_topic))
return self.global_map_data
def _check_rtabmap_ready(self):
rospy.logdebug("Waiting for rtabmap pose to be READY...")
while self.rtabmap_ready is None and not rospy.is_shutdown():
try:
self.tf_listener.waitForTransform(self.map_frame, self.robot_frame, rospy.Time(0), rospy.Duration(5.0))
self.rtabmap_ready = True
rospy.logdebug("Current rtabmap pose READY=>")
except:
rospy.logerr("Current rtabmap pose not ready yet, retrying for getting rtabmap pose")
return self.rtabmap_ready
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment