Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 87772986 authored by Severin Lemaignan's avatar Severin Lemaignan
Browse files

ROS-events unittest now called and passing

parent 21bea364
No related branches found
No related tags found
No related merge requests found
...@@ -41,5 +41,6 @@ catkin_install_python(PROGRAMS bin/knowledge_core ...@@ -41,5 +41,6 @@ catkin_install_python(PROGRAMS bin/knowledge_core
if(CATKIN_ENABLE_TESTING) if(CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED) find_package(rostest REQUIRED)
add_rostest(test/test_ros_events.test)
add_rostest(test/test_ros.test) add_rostest(test/test_ros.test)
endif() endif()
...@@ -308,120 +308,9 @@ class TestKB(unittest.TestCase): ...@@ -308,120 +308,9 @@ class TestKB(unittest.TestCase):
) )
class TestKBEvents(unittest.TestCase):
@classmethod
def setUpClass(cls):
rospy.wait_for_service(MANAGE_SRV)
rospy.wait_for_service(REVISE_SRV)
cls.manage = rospy.ServiceProxy(MANAGE_SRV, Manage)
cls.revise = rospy.ServiceProxy(REVISE_SRV, Revise)
cls.events = actionlib.SimpleActionClient(EVENTS_ACTION, EventAction)
cls.events.wait_for_server()
def setUp(self):
# clean-up the knowledge base before starting each test
self.manage(action=ManageRequest.CLEAR)
def on_evt(self, evt):
# evt = json.loads(evt.json)
self.evt_semaphore = True
def check_event(self, should_trigger):
MAX_WAIT = 200 # ms
elapsed = 0
while elapsed < MAX_WAIT:
if self.evt_semaphore:
break
rospy.sleep(0.01)
elapsed += 10
self.assertTrue(
not (self.evt_semaphore ^ should_trigger),
"The event did not trigger after %sms" % MAX_WAIT,
)
self.evt_semaphore = False
def test_event(self):
self.evt_semaphore = False
self.events.send_goal(
EventGoal(patterns=["?human rdf:type Human"], one_shot=False),
feedback_cb=self.on_evt,
)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"joe rdf:type Human",
],
)
self.check_event(should_trigger=True)
self.revise(
method=ReviseRequest.ADD,
statements=[
"joe rdf:type Human",
],
)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"ari rdf:type Robot",
],
)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"bill rdf:type Human",
],
)
self.check_event(should_trigger=True)
self.revise(
method=ReviseRequest.REMOVE,
statements=[
"bill rdf:type Human",
],
)
self.revise(
method=ReviseRequest.ADD,
statements=[
"bill rdf:type Human",
],
)
self.check_event(should_trigger=True)
class KBTestSuite(unittest.TestSuite):
def __init__(self):
super(KBTestSuite, self).__init__()
self.addTest(TestKB())
self.addTest(TestKBEvents())
if __name__ == "__main__": if __name__ == "__main__":
import rostest import rostest
rospy.init_node("knowledge_core_ros_test") rospy.init_node("knowledge_core_ros_test")
rostest.rosrun(PKG, "test_kb", TestKB) rostest.rosrun(PKG, "test_kb_ros", TestKB)
#!/usr/bin/env python
PKG = "test_knowledge_core"
import rospy
import unittest
import json
import actionlib
from knowledge_core.srv import Manage, ManageRequest
from knowledge_core.srv import Revise, ReviseRequest
from knowledge_core.srv import Query
from knowledge_core.srv import Sparql
from knowledge_core.msg import EventAction, EventGoal
MANAGE_SRV = "/kb/manage"
REVISE_SRV = "/kb/revise"
QUERY_SRV = "/kb/query"
SPARQL_SRV = "/kb/sparql"
EVENTS_ACTION = "/kb/events"
class TestKBEvents(unittest.TestCase):
@classmethod
def setUpClass(cls):
rospy.wait_for_service(MANAGE_SRV)
rospy.wait_for_service(REVISE_SRV)
cls.manage = rospy.ServiceProxy(MANAGE_SRV, Manage)
cls.revise = rospy.ServiceProxy(REVISE_SRV, Revise)
cls.events = actionlib.SimpleActionClient(EVENTS_ACTION, EventAction)
cls.events.wait_for_server()
def setUp(self):
# clean-up the knowledge base before starting each test
self.manage(action=ManageRequest.CLEAR)
self.evt_active = False
self.evt_semaphore = False
def on_evt(self, evt):
# evt = json.loads(evt.json)
self.evt_semaphore = True
def check_event(self, should_trigger):
MAX_WAIT = 200 # ms
elapsed = 0
while elapsed < MAX_WAIT:
if self.evt_semaphore:
break
rospy.sleep(0.01)
elapsed += 10
self.assertTrue(
not (self.evt_semaphore ^ should_trigger),
"The event did not trigger after %sms" % MAX_WAIT,
)
self.evt_semaphore = False
def on_active(self):
self.evt_active = True
def test_basic_event_triggering(self):
self.evt_semaphore = False
self.evt_active = False
self.events.send_goal(
EventGoal(patterns=["?human rdf:type Human"], one_shot=False),
active_cb=self.on_active,
feedback_cb=self.on_evt,
)
while not self.evt_active:
rospy.logwarn("waiting for the event to be registered...")
rospy.sleep(0.1)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"joe rdf:type Human",
],
)
self.check_event(should_trigger=True)
self.revise(
method=ReviseRequest.ADD,
statements=[
"joe rdf:type Human",
],
)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"ari rdf:type Robot",
],
)
self.check_event(should_trigger=False)
self.revise(
method=ReviseRequest.ADD,
statements=[
"bill rdf:type Human",
],
)
self.check_event(should_trigger=True)
self.revise(
method=ReviseRequest.REMOVE,
statements=[
"bill rdf:type Human",
],
)
self.revise(
method=ReviseRequest.ADD,
statements=[
"bill rdf:type Human",
],
)
self.check_event(should_trigger=True)
if __name__ == "__main__":
import rostest
rospy.init_node("knowledge_core_ros_test")
rostest.rosrun(PKG, "test_kb_ros_events", TestKBEvents)
<launch>
<node pkg="knowledge_core" type="knowledge_core" name="knowledge_core" args="--debug" output="screen"/>
<!-- Run ROS test -->
<test test-name="ros_test_events" pkg="knowledge_core" type="test_ros_events.py"/>
</launch>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment