#!/bin/python3
import rospy
from controller_manager_msgs.srv import *
[docs]def load_controller_srv(controller_name, ns=None, max_retries=5) -> bool:
"""
Function to load a controller on the namespace.
:param controller_name: name of the controller to load
:type controller_name: string
:param ns: namespace
:type ns: string
:param max_retries: number of retries to load the controller.
:type max_retries: int
:return: true if the controller is loaded.
:rtype: bool
"""
if ns is not None:
srv_name = ns + '/controller_manager/load_controller'
else:
srv_name = '/controller_manager/load_controller'
rospy.wait_for_service(srv_name)
client_srv = rospy.ServiceProxy(srv_name, LoadController)
try:
for ii in range(max_retries):
srv_request = LoadControllerRequest(name=controller_name)
resp1 = client_srv(srv_request)
if resp1.ok:
return True
if resp1.ok is False:
print("Controller " + controller_name + " could not be loaded.")
return resp1.ok
except rospy.ServiceException as exc:
print("Service did not process request: " + str(exc))
return False
[docs]def load_controller_list_srv(controller_list,ns=None, max_retries=5) -> None:
"""
Function to load a list of controllers on the namespace.
:param controller_list: list of controllers to load
:type controller_list: list of strings
:param ns: namespace
:type ns: string
:param max_retries: number of retries to load the controller.
:type max_retries: int
"""
for controller in controller_list:
load_controller_srv(controller, ns=ns, max_retries=max_retries)
[docs]def unload_controller_srv(controller_name,ns=None, max_retries=5) -> bool:
"""
Function to unload a controller on the namespace.
:param controller_name: name of the controller to unload
:type controller_name: string
:param ns: namespace
:type ns: string
:param max_retries: number of retries to unload the controller.
:type max_retries: int
:return: true if the controller is unloaded.
:rtype: bool
"""
if ns is not None:
srv_name = ns + '/controller_manager/unload_controller'
else:
srv_name = '/controller_manager/unload_controller'
rospy.wait_for_service(srv_name)
client_srv = rospy.ServiceProxy(srv_name, UnloadController)
try:
for ii in range(max_retries):
srv_request = UnloadControllerRequest(name=controller_name)
resp1 = client_srv(srv_request)
if resp1.ok:
return True
if resp1.ok is False:
print("Controller " + controller_name + " could not be unloaded.")
return resp1.ok
except rospy.ServiceException as exc:
print("Service did not process request: " + str(exc))
return False
[docs]def unload_controller_list_srv(controller_list,ns=None, max_retries=5) -> None:
"""
Function to unload a list of controllers on the namespace.
:param controller_list: list of controllers to unload
:type controller_list: list of strings
:param ns: namespace
:type ns: string
:param max_retries: number of retries to unload the controller.
:type max_retries: int
"""
for controller in controller_list:
unload_controller_srv(controller, ns=ns, max_retries=max_retries)
[docs]def switch_controllers_srv( start_controllers, stop_controllers, ns=None,
strictness=1, start_asap=False, timeout=3.0, max_retries=5) -> bool:
"""
Function to switch controllers on the namespace.
:param start_controllers: list of controllers to start
:type start_controllers: list of strings
:param stop_controllers: list of controllers to stop
:type stop_controllers: list of strings
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:param max_retries: number of retries to switch the controller.
:type max_retries: int
:return: true if the operation is successful.
:rtype: bool
"""
if ns is not None:
srv_name = ns + '/controller_manager/switch_controller'
else:
srv_name = '/controller_manager/switch_controller'
rospy.wait_for_service(srv_name)
client_srv = rospy.ServiceProxy(srv_name, SwitchController)
try:
for ii in range(max_retries):
srv_request = SwitchControllerRequest( start_controllers=start_controllers,stop_controllers=stop_controllers,
strictness=strictness,start_asap=start_asap,timeout=timeout)
resp1 = client_srv(srv_request)
if resp1.ok:
return True
if resp1.ok is False:
print("Controllers could not be switched.")
return resp1.ok
except rospy.ServiceException as exc:
print("Service did not process request: " + str(exc))
return False
[docs]def start_controllers_srv(start_controllers, ns=None, strictness=1, start_asap=False, timeout=3.0) -> bool:
"""
Function to start controllers on the namespace.
:param start_controllers: list of controllers to start
:type start_controllers: list of strings
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:return: true if the operation is successful.
:rtype: bool
"""
return switch_controllers_srv(start_controllers, [], ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
[docs]def stop_controllers_srv(stop_controllers, ns=None, strictness=1, start_asap=False, timeout=3.0) -> bool:
"""
Function to start controllers on the namespace.
:param stop_controllers: list of controllers to stop
:type stop_controllers: list of strings
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:return: true if the operation is successful.
:rtype: bool
"""
return switch_controllers_srv([], stop_controllers, ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
[docs]def reset_controllers_srv(reset_controllers, max_retries=10, ns=None, strictness=1, start_asap=False, timeout=3.0) -> bool:
"""
Function to reset controllers on the namespace.
:param reset_controllers: list of controllers to reset
:type reset_controllers: list of strings
:param max_retries: number of times to retry resetting a controller before giving up.
:type max_retries: int
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:return: true if the operation is successful.
:rtype: bool
"""
done_switch_off = False
for ii in range(max_retries):
done_switch_off = stop_controllers_srv(reset_controllers, ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
if done_switch_off:
break
if not done_switch_off:
return False
done_switch_on = False
for ii in range(max_retries):
done_switch_on = start_controllers_srv(reset_controllers, ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
if done_switch_on:
break
if not done_switch_on:
return False
return True
[docs]def spawn_controllers_srv(spawn_controllers, ns=None, strictness=1, start_asap=False, timeout=3.0) -> bool:
"""
Function to spawn controllers on the namespace.
:param spawn_controllers: list of controllers to spawn
:type spawn_controllers: list of strings
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:return: true if the operation is successful.
:rtype: bool
"""
load_controller_list_srv(spawn_controllers, ns=ns)
return start_controllers_srv(spawn_controllers, ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
[docs]def kill_controllers_srv(kill_controllers, ns=None, strictness=1, start_asap=False, timeout=3.0) -> bool:
"""
Function to kill controllers on the namespace.
:param kill_controllers: list of controllers to kill
:type kill_controllers: list of strings
:param ns: namespace
:type ns: string
:param strictness: strictness of the controller manager: BEST_EFFORT or STRICT (1 and 2, respectively).
:type strictness: int
:param start_asap: start the controllers as soon as their hardware dependencies are ready, will wait for all interfaces to be ready otherwise.
:type start_asap: bool
:param timeout: the timeout in seconds before aborting pending controllers. Zero for infinite.
:type timeout: float
:return: true if the operation is successful.
:rtype: bool
"""
res = stop_controllers_srv(kill_controllers, ns=ns, strictness=strictness, start_asap=start_asap, timeout=timeout)
if res:
unload_controller_list_srv(kill_controllers, ns=ns)
return True
else:
return False