Source code for ros_node


import rospy
import rospkg
import subprocess
import time 

Functions related to the handling of ROS nodes.

[docs]def ros_node_from_pkg(pkg_name, node_name, launch_master=False, launch_master_term=True, launch_new_term=True, name=None, ns="/", output="log") -> bool: """ Function to launch a ROS node from a package. :param pkg_name: Name of the package to launch the node from. :type pkg_name: str :param node_name: Name of the node to launch. :type node_name: str :param launch_master: If ROSMASTER is not running launch it. :type launch_master: bool :param launch_master_term: If launch ROSMASTER do it in an external terminal. :type launch_master_term: bool :param launch_new_term: Launch the process in a new terminal (Xterm). :type launch_new_term: bool :param name: Name to give the node to be launched. :type name: str :param ns: Namespace to give the node to be launched. :type ns: str :param output: log, screen, or None. :type output: str :return: True if the node was launched, False otherwise. :rtype: bool """ rospack = rospkg.RosPack() try: rospack.get_path(pkg_name) rospy.logdebug("Package FOUND...") except rospkg.common.ResourceNotFound: rospy.logerr("Package NOT FOUND") return False if launch_master: print("Launching Master") try: rospy.get_master().getPid() except ConnectionRefusedError: print("Master not running") if launch_master_term: subprocess.Popen("xterm -e 'roscore' ", shell=True) else: subprocess.Popen("roscore", shell=True) time.sleep(5.0) else: print("Master is running") try: rospy.get_master().getPid() except ConnectionRefusedError: print("Master not running") return False else: print("Master is running") term_command = "rosrun " + pkg_name + " " + node_name if name is not None: term_command += " __name:=" + str(name) term_command += " __ns:=" + str(ns) term_command += " __log:=" + str(output) if launch_new_term: term_command = "xterm -e ' " + term_command + "'" subprocess.Popen(term_command, shell=True) time.sleep(5.0) return True
[docs]def ros_kill_node(node_name) -> bool: """ Function to kill a ROS node. :param node_name: Name of the node to kill. :type node_name: str :return: True if the node was killed, False otherwise. :rtype: bool """ term_command = "rosnode kill " + node_name subprocess.Popen("xterm -e ' " + term_command + "'", shell=True).wait() return True
[docs]def ros_kill_all_nodes() -> bool: """ Function to kill all running ROS nodes. :return: True if all nodes were killed, False otherwise. :rtype: bool """ term_command = "rosnode kill -a" subprocess.Popen("xterm -e ' " + term_command + "'", shell=True).wait() return True
[docs]def ros_kill_master() -> bool: """ Function to kill the ROS master. :return: True if the master was killed, False otherwise. :rtype: bool """ try: rospy.get_master().getPid() except ConnectionRefusedError: print("Master not running") return True else: print("Master is running") term_command = "rosnode kill -a" subprocess.Popen("xterm -e ' " + term_command + "'", shell=True).wait() time.sleep(0.5) term_command = "killall -9 rosout roslaunch rosmaster nodelet" subprocess.Popen("xterm -e ' " + term_command + "'", shell=True).wait() return True
[docs]def ros_kill_all_processes() -> bool: """ Function to kill all running ROS related processes. :return: True if all processes were killed, False otherwise. :rtype: bool """ term_command = "killall -9 rosout roslaunch rosmaster gzserver nodelet robot_state_publisher gzclient" subprocess.Popen("xterm -e ' " + term_command + "'", shell=True).wait() return True