Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 22b149bc authored by AUTERNAUD Alex's avatar AUTERNAUD Alex
Browse files

GoalFinder and GlobalPathPlanner

parent 3e5f60c0
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import rospy
import pkg_resources
import yaml
from robot_behavior import GoalFinderNode
if __name__ == '__main__':
# Init node
rospy.init_node('GoalFinderNode', log_level=rospy.DEBUG, anonymous=True)
controller = GoalFinderNode()
controller.run()
controller.shutdown()
# rospy.spin()
from .behavior_global_path_planner import GlobalPathPlanner
from .behavior_goal_finder import GoalFinder
from .behavior_generator_node import BehaviorGenerator
from .behavior_global_path_planner_node import GlobalPathPlannerNode
from .behavior_goal_finder_node import GoalFinderNode
from .behavior_go_to_body_action_client import GoToBodyActionClient
from .behavior_go_to_body_action_server import GoToBodyActionServer
from .behavior_go_to_group_action_client import GoToGroupActionClient
......
......@@ -90,14 +90,12 @@ class GlobalPathPlanner():
'''
if self.state_config is None:
return None
#print(self.target)
t_i, t_j = self.position_to_px_coord(self.target)
phi = ma.MaskedArray(np.ones(global_map.shape), mask=(global_map > 0))
phi.data[t_i, t_j] = 0.0
dx = 1/self.state_config.global_map_scale
self.distance = skfmm.distance(phi, dx=dx)
def get_waypoints(self, pos):
r''' Return the next waypoint position from a given position
......@@ -141,8 +139,8 @@ class GlobalPathPlanner():
self.target = [target_pose[0], target_pose[1], target_pose[2]]
robot_pose = [robot_pose[0], robot_pose[1], robot_pose[2]]
if np.linalg.norm(np.array(robot_pose[0:2]) - np.array(self.target[0:2]))<self.waypoint_resolution_in_meters:
return self.target[0:2]
# if np.linalg.norm(np.array(robot_pose[0:2]) - np.array(self.target[0:2]))<self.waypoint_resolution_in_meters:
# return self.target[0:2]
self.get_distance_map(global_map)
......@@ -172,7 +170,6 @@ class GlobalPathPlanner():
else: # nominal case else, point on shortest path with orientation towards target
wpt_pos_closest = [xl[waypoint_index], yl[waypoint_index], np.arctan2(yl[waypoint_index] - robot_pose[1], xl[waypoint_index] - robot_pose[0])]
#print(wpt_pos_closest)
self.get_map_with_subgoals(waypoints=(xl, yl))
return wpt_pos_closest
else :
......
......@@ -77,7 +77,7 @@ class GlobalPathPlannerNode:
self._check_all_sensors_ready()
rospy.loginfo("BehaviorGenerator Initialization Ended")
rospy.loginfo("GlobalPathPlannerNode Initialization Ended")
def read_config(self, filename=None):
......@@ -101,6 +101,7 @@ class GlobalPathPlannerNode:
r''' Initialize the subscribers and publishers '''
self._subscribers = []
self._publishers = []
self._action_server = []
self._timers = []
### ROS Subscribers
......@@ -122,59 +123,66 @@ class GlobalPathPlannerNode:
def run(self):
r''' Runs the ros wrapper '''
# At time step 0, final goal = robot position
self.get_robot_pose()
self.x_final = self.robot_x_position
self.y_final = self.robot_y_position
self.yaw_final = self.robot_yaw
init_t = rospy.Time.now()
while not rospy.is_shutdown():
self.step(init_t)
def step(self, init_time):
r''' Step the ros wrapper'''
# Localizing the robot in the environment
self.get_robot_pose()
# Get the final goal tf
self.get_final_goal_pose()
if self.global_map_static is not None :
if np.linalg.norm(np.array(self.robot_position_former) - np.array(self.robot_position)) > 0.05 :
# Case where the waypoints were not computed yet or when the goal position was changed
waypoint_target = self.global_path_planner.run(
global_map = cv2.dilate(self.global_map_static,kernel=cv2.getStructuringElement(cv2.MORPH_RECT,(8,8)),iterations = 1),
target_pose = [self.goal_position[0],self.goal_position[1], 0],
robot_pose=[self.robot_x_position, self.robot_y_position, self.robot_yaw]
)
if waypoint_target :
self.flag_waypoint_okay = True
self.x_wp, self.y_wp, self.yaw_wp = waypoint_target
print('Robot position : {} | Waypoint position : {}'.format(self.robot_position, waypoint_target))
else :
self.flag_waypoint_okay = False
rospy.logwarn("The waypoint was not computed properly")
if self.global_path_planner.global_map_with_waypoints is not None :
self.publish_global_waypoint_map(self.global_path_planner.global_map_with_waypoints)
else :
rospy.logwarn('waypoint map not ready !')
self.robot_position_former = self.robot_position
# if np.linalg.norm(np.array(self.robot_position_former) - np.array(self.robot_position)) > 0.05 :
# Case where the waypoints were not computed yet or when the goal position was changed
waypoint_target = self.global_path_planner.run(
global_map = cv2.dilate(self.global_map_static,kernel=cv2.getStructuringElement(cv2.MORPH_RECT,(8,8)),iterations = 1),
target_pose = [self.x_final, self.y_final, self.yaw_final],
robot_pose=[self.robot_x_position, self.robot_y_position, self.robot_yaw]
)
if waypoint_target:
self.flag_waypoint_okay = True
self.x_wp, self.y_wp, self.yaw_wp = waypoint_target
rospy.logdebug('Robot position : x {:.2f} \t y {:.2f} \t yaw {:.2f}'.format(self.robot_x_position, self.robot_y_position, self.robot_yaw))
rospy.logdebug('Goal position : x {:.2f} \t y {:.2f} \t yaw {:.2f}'.format(self.x_final, self.y_final, self.yaw_final))
else:
self.flag_waypoint_okay = False
rospy.logwarn("The waypoint was not computed properly")
if self.global_path_planner.global_map_with_waypoints is not None :
self.publish_global_waypoint_map(self.global_path_planner.global_map_with_waypoints)
else :
rospy.logwarn('waypoint map not ready !')
self.robot_position_former = self.robot_position
self.publish_tf_go_to_goals()
rospy.sleep(self.controller_config.path_loop_time)
def publish_tf_go_to_goals(self):
current_time = rospy.Time.now()
# to debug
quat_final = tf.transformations.quaternion_from_euler(0, 0, self.yaw_final)
self.x_final, self.y_final = self.goal_position
self.tf_broadcaster.sendTransform(
(self.x_final, self.y_final, 0),
quat_final,
current_time,
"final goal",
self.map_frame
)
# quat_final = tf.transformations.quaternion_from_euler(0, 0, self.yaw_final)
# self.x_final, self.y_final = self.goal_position
# self.tf_broadcaster.sendTransform(
# (self.x_final, self.y_final, 0),
# quat_final,
# current_time,
# "final goal",
# self.map_frame
# )
if self.flag_waypoint_okay:
quat_wp = tf.transformations.quaternion_from_euler(0, 0, self.yaw_wp)
self.tf_broadcaster.sendTransform(
......@@ -184,7 +192,21 @@ class GlobalPathPlannerNode:
"intermediate waypoint goal",
self.map_frame
)
def get_final_goal_pose(self):
try:
map_2_final_goal = self.tf_listener.lookupTransform(self.map_frame, 'final goal', rospy.Time(0))
#rospy.logwarn("Could not get transformation between {} and {} at correct time. Using latest available.".format(self.map_frame, self.robot_frame))
except:
#rospy.logerr("Could not get transformation between {} and {}.".format(self.map_frame, self.robot_frame))
return None
self.x_final, self.y_final, _ = map_2_final_goal[0]
(_, _, yaw) = tf.transformations.euler_from_quaternion([map_2_final_goal[1][0],
map_2_final_goal[1][1],
map_2_final_goal[1][2],
map_2_final_goal[1][3]])
self.yaw_final = float(constraint_angle(yaw))
def get_robot_pose(self):
......@@ -240,7 +262,6 @@ class GlobalPathPlannerNode:
self.global_map_size = [[self.x_global_map, self.x_global_map + self.global_map_width*self.global_map_resolution],[self.y_global_map, self.y_global_map + self.global_map_height*self.global_map_resolution]]
self.last_shape_global_map = (self.global_map_height, self.global_map_width)
self.global_map_static = np.flip((np.asarray(self.global_map_data.data) / 100).reshape(self.last_shape_global_map), axis=0)
self.global_map_static_original = self.global_map_data.data
if self.global_path_planner.state_config is None :
self.global_path_planner.set_state_config(global_map_size=self.global_map_size,
global_map_height=self.global_map_height,
......@@ -250,9 +271,9 @@ class GlobalPathPlannerNode:
def shutdown(self):
rospy.loginfo("Stopping the BehaviorGenerator")
rospy.loginfo("Stopping the GlobalPathPlannerNode")
self.close()
rospy.loginfo("Killing the BehaviorGenerator node")
rospy.loginfo("Killing the GlobalPathPlannerNode node")
def close(self):
if self._subscribers:
......
# This file, part of Social MPC in the WP6 of the Spring project,
# is part of a project that has received funding from the
# European Union’s Horizon 2020 research and innovation programme
# under grant agreement No 871245.
#
# Copyright (C) 2020-2023 by Inria
# Authors : Victor Sanchez
# victor.sanchez@inria.fr
#
# Code inspired from : https://gitlab.inria.fr/spring/wp6_robot_behavior/Motor_Controller/-/blob/master/social_mpc/path.py?ref_type=heads
# From : Alex Auternaud, Timothée Wintz
# alex.auternaud@inria.fr
# timothee.wintz@inria.fr
from social_mpc.ssn_model.social_spaces import SocialSpaces, Group
from social_mpc import utils
import numpy as np
import time
import rospy
import exputils as eu
class GoalFinder():
def __init__(self, controller_config):
self.state_config = None
self.ssn = None
self.controller_config = controller_config
def set_state_config(self,
global_map_size=[[-11.26118716597557, 12.588813189417124], [-10.207198709249496, 14.492801658809185]],
global_map_height=494,
global_map_width=477,
global_map_scale=1/0.05
):
self.state_config=eu.AttrDict(global_map_size=global_map_size,
global_map_height=global_map_height,
global_map_width=global_map_width,
global_map_scale=global_map_scale
)
self.ssn = SocialSpaces(
bbox=self.state_config.global_map_size,
map_shape=(self.state_config.global_map_height,
self.state_config.global_map_width)
)
def run(self,
shared_humans,
shared_groups,
shared_robot_pose,
shared_goto_target_human_id,
shared_goto_target_group_id,
shared_global_map,
):
last_time = time.time()
if not self.ssn:
return None
global_map = (np.array([*shared_global_map])).squeeze()
if self.controller_config.goal_loop_time:
new_time = time.time()
if (new_time - last_time) < self.controller_config.goal_loop_time:
time.sleep(self.controller_config.goal_loop_time - (new_time - last_time))
last_time = time.time()
robot_pose = (shared_robot_pose[0], shared_robot_pose[1])
sh_goto_target_human_id = np.array([*shared_goto_target_human_id])
sh_goto_target_group_id = np.array([*shared_goto_target_group_id])
if shared_humans is not None:
sh_humans = np.array([*shared_humans])
if shared_groups is not None:
sh_groups = np.array([*shared_groups])
if shared_groups is None and shared_humans is None:
return None
if not sh_goto_target_human_id[0] and not sh_goto_target_group_id[0]:
return None
n_human = np.sum(sh_humans[:, -1] >= 0)
if n_human < 1:
return None
humans = np.zeros((n_human, sh_humans.shape[1]))
for j in range(n_human):
humans[j, :] = sh_humans[j, :]
human_ids = np.array(sh_humans[sh_humans[:, -1] >= 0, -1], dtype=int).tolist()
n_group = np.sum(sh_groups[:, -1] >= 0)
sh_groups = sh_groups[:n_group, :]
group_ids = np.array(sh_groups[sh_groups[:, -1] >= 0, -1], dtype=int).tolist()
target = None
if sh_goto_target_human_id[0]:
if sh_goto_target_human_id[1] in human_ids:
idx = human_ids.index(sh_goto_target_human_id[1])
if humans[idx, -2] != -1:
gr_idx = group_ids.index(humans[idx, -2])
target = ('human_in_group', idx, gr_idx, humans[idx, -2]) # type, human id, group index, group id
else:
target = ('isolated_human', idx) # type, human id
else:
rospy.logwarn("Target human not in sight")
return None
if sh_goto_target_group_id[0]:
if sh_goto_target_group_id[1] in group_ids:
gr_idx = group_ids.index(sh_goto_target_group_id[1])
target = ('group', gr_idx, sh_goto_target_group_id[1]) # type, group index, group id
else:
rospy.logwarn("Target group not in sight")
return None
groups = []
for idx, group_id in enumerate(group_ids):
groups.append(Group(center=sh_groups[idx, :2], person_ids=np.where(humans[:, -2] == group_id)[0]))
humans_fdz = np.zeros((n_human, 5))
humans_fdz[:, :3] = humans[:, :3]
f_dsz = self.ssn.calc_dsz(humans_fdz, groups=groups)
if target:
if target[0] == 'human_in_group':
center = [sh_groups[target[2], 0], sh_groups[target[2], 1]]
persons = humans[humans[:, -2] == target[3], :]
goal = self.ssn.calc_goal_pos(
f_dsz=f_dsz,
map=global_map,
persons=persons,
group_center=center,
robot=robot_pose)
goal = list(goal)
goal[2] = np.arctan2(humans[target[1]][1] - goal[1], humans[target[1]][0] - goal[0]) # to face the human
elif target[0] == 'isolated_human':
person = humans[target[1], :]
ts_x = person[0] + np.cos(person[2]) * self.controller_config.group_detection_stride
ts_y = person[1] + np.sin(person[2]) * self.controller_config.group_detection_stride
center = [ts_x, ts_y]
person = person[np.newaxis, :]
goal = self.ssn.calc_goal_pos(
f_dsz=f_dsz,
map=global_map,
persons=person,
group_center=center,
robot=robot_pose)
goal = list(goal)
goal[2] = np.arctan2(humans[target[1]][1] - goal[1],humans[target[1]][0] - goal[0]) # to face the human
elif target[0] == 'group':
center = [sh_groups[target[1], 0], sh_groups[target[1], 1]]
persons = humans[humans[:, -2] == target[2], :]
goal = self.ssn.calc_goal_pos(
f_dsz=f_dsz,
map=global_map,
persons=persons,
group_center=center,
robot=robot_pose)
goal = list(goal)
else:
return None
else:
return None
rospy.loginfo("goal : {}".format(goal))
return goal
\ No newline at end of file
This diff is collapsed.
......@@ -21,6 +21,23 @@ def constraint_angle(angle, min_value=-np.pi, max_value=np.pi):
return new_angle
def rotmat_2d(angle):
return np.matrix([[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]])
def local_to_global(robot_position, x):
angle = robot_position[-1]
y = rotmat_2d(angle).dot(x) + robot_position[:2]
return np.array(y).reshape(x.shape)
def global_to_local(robot_position, x):
angle = robot_position[-1]
y = rotmat_2d(-angle).dot(x - robot_position[:2])
return np.array(y).reshape(x.shape)
def pose_stamped_2_mat(p):
q = p.pose.orientation
pos = p.pose.position
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment