#!/bin/python3
import gym
from frobs_rl.common import ros_gazebo
from frobs_rl.common import ros_controllers
from frobs_rl.common import ros_node
from frobs_rl.common import ros_spawn
import rospy
[docs]class RobotBasicEnv(gym.Env):
"""
Basic enviroment for all the robot environments in the frobs_rl library.
To use a custom world, one can use two options: 1) set the path directly to the world file (``world_path``) or set the pkg name and world filename (``world_pkg`` and ``world_filename``).
:param launch_gazebo: If True, launch Gazebo at the start of the env.
:type launch_gazebo: bool
:param gazebo_init_paused: If True, Gazebo is initialized in a paused state.
:type gazebo_init_paused: bool
:param gazebo_use_gui: If True, Gazebo is launched with a GUI (through gzclient).
:type gazebo_use_gui: bool
:param gazebo_recording: If True, Gazebo is launched with a recording of the GUI (through gzclient).
:type gazebo_recording: bool
:param gazebo_freq: The publish rate of gazebo in Hz.
:type gazebo_freq: int
:param world_path: If using a custom world then the path to the world.
:type world_path: str
:param world_pkg: If using a custom world then the package name of the world.
:type world_pkg: str
:param world_filename: If using a custom world then the filename of the world.
:type world_filename: str
:param gazebo_max_freq: max update rate for gazebo in real time factor: 1 is real time, 10 is 10 times real time.
:type gazebo_max_freq: float
:param gazebo_timestep: The timestep of gazebo in seconds.
:type gazebo_timestep: float
:param spawn_robot: If True, the robot is spawned in the environment.
:type spawn_robot: bool
:param model_name_in_gazebo: The name of the model in gazebo.
:type model_name_in_gazebo: str
:param namespace: The namespace of the robot.
:type namespace: str
:param pkg_name: The package name where the robot model is located.
:type pkg_name: str
:param urdf_file: The path to the urdf file of the robot.
:type urdf_file: str
:param urdf_folder: The path to the folder where the urdf files are located. Default is "/urdf".
:type urdf_folder: str
:param urdf_xacro_args: The arguments to be passed to the xacro parser.
:type urdf_xacro_args: str
:param controller_file: The path to the controllers YAML file of the robot.
:type controller_file: str
:param controller_list: The list of controllers to be launched.
:type controller_list: list of str
:param rob_state_publisher_max_freq: The maximum frequency of the ros state publisher.
:type rob_state_publisher_max_freq: int
:param rob_st_term: If True, the robot state publisher is launched in a new terminal.
:type rob_st_term: bool
:param model_pos_x: The x position of the robot in the world.
:param model_pos_y: The y position of the robot in the world.
:param model_pos_z: The z position of the robot in the world.
:param model_ori_x: The x orientation of the robot in the world.
:param model_ori_y: The y orientation of the robot in the world.
:param model_ori_z: The z orientation of the robot in the world.
:param model_ori_w: The w orientation of the robot in the world.
:param reset_controllers: If True, the controllers are reset at the start of each episode.
:type reset_controllers: bool
:param reset_mode: If 1, reset Gazebo with a "reset_world" (Does not reset time)
If 2, reset Gazebo with a "reset_simulation" (Resets time)
:param step_mode: If 1, step Gazebo using the "pause_physics" and "unpause_physics" services.
If 2, step Gazebo using the "step_simulation" command.
:param num_gazebo_steps: If using step_mode 2, the number of steps to be taken.
"""
def __init__( self, launch_gazebo=False, gazebo_init_paused=True, gazebo_use_gui=True, gazebo_recording=False,
gazebo_freq=100, world_path=None, world_pkg=None, world_filename=None,
gazebo_max_freq=None, gazebo_timestep=None,
spawn_robot=False, model_name_in_gazebo="robot", namespace="/robot", pkg_name=None, urdf_file=None, urdf_folder="/urdf",
controller_file=None, controller_list=None, urdf_xacro_args=None, rob_state_publisher_max_freq= None, rob_st_term=False,
model_pos_x=0.0, model_pos_y=0.0, model_pos_z=0.0,
model_ori_x=0.0, model_ori_y=0.0, model_ori_z=0.0, model_ori_w=0.0,
reset_controllers=False, reset_mode=1, step_mode=1, num_gazebo_steps=1):
"""
Function to initialize the environment.
"""
super(RobotBasicEnv, self).__init__()
self.namespace = namespace
self.num_gazebo_steps = num_gazebo_steps
self.reset_controllers = reset_controllers
self.controllers_list = controller_list
self.reset_mode = reset_mode
self.step_mode = step_mode
# If Launch Gazebo, launch it
if launch_gazebo:
ros_gazebo.launch_Gazebo( paused=gazebo_init_paused, use_sim_time=True, gui=gazebo_use_gui,
recording=gazebo_recording, pub_clock_frequency=gazebo_freq,
custom_world_path=world_path, custom_world_pkg=world_pkg, custom_world_name=world_filename)
# Set the max frequency and timestep of Gazebo
if gazebo_max_freq is not None:
ros_gazebo.gazebo_set_max_update_rate(gazebo_max_freq)
if gazebo_timestep is not None:
ros_gazebo.gazebo_set_time_step(gazebo_timestep)
# If spawn robot, spawn it
if spawn_robot:
ros_spawn.spawn_model_in_gazebo(pkg_name, urdf_file, controller_file, self.controllers_list, model_urdf_folder=urdf_folder,
ns=self.namespace, args_xacro=urdf_xacro_args, max_pub_freq=rob_state_publisher_max_freq, rob_st_term=rob_st_term,
gazebo_name=model_name_in_gazebo, gaz_ref_frame="world",
pos_x=model_pos_x, pos_y=model_pos_y, pos_z=model_pos_z,
ori_x=model_ori_x, ori_y=model_ori_y, ori_z=model_ori_z, ori_w=model_ori_w)
if self.reset_controllers:
ros_gazebo.gazebo_unpause_physics()
ros_controllers.reset_controllers_srv(controller_list, ns=self.namespace)
ros_gazebo.gazebo_pause_physics()
rospy.loginfo("Started RobotBasicEnv")
[docs] def step(self, action):
"""
Function to send an action to the robot and get the observation and reward.
"""
rospy.loginfo("Step Env")
self.info = {}
self.observation = None
self.reward = 0.0
# If using pause and unpause services
if self.step_mode == 1:
ros_gazebo.gazebo_unpause_physics()
self._send_action(action)
ros_gazebo.gazebo_pause_physics()
# If using the gazebo step command
elif self.step_mode == 2:
self._send_action(action)
ros_gazebo.gazebo_step_physics(steps=self.num_gazebo_steps)
self.observation = self._get_observation()
self.reward = self._get_reward()
self.done = self._check_if_done()
return self.observation, self.reward, self.done, self.info
[docs] def reset(self):
"""
Function to reset the enviroment after an episode is done.
"""
rospy.loginfo("Resetting Env")
ros_gazebo.gazebo_pause_physics()
self._reset_gazebo()
self.observation = self._get_observation()
return self.observation
[docs] def close (self):
"""
Function to close the environment when training is done.
"""
rospy.loginfo("Closing Env")
rospy.signal_shutdown("Closing RobotGazeboEnvironment")
ros_gazebo.close_Gazebo()
ros_node.ros_kill_all_nodes()
ros_node.ros_kill_master()
ros_node.ros_kill_all_processes()
print("Closed ROS and Env")
#---------------------------------------------#
# Methods to be overridden in the child Env #
[docs] def _send_action(self, action):
"""
Function to send an action to the robot
"""
raise NotImplementedError()
[docs] def _get_observation(self):
"""
Function to get the observation from the enviroment.
"""
raise NotImplementedError()
[docs] def _get_reward(self):
"""
Function to get the reward from the enviroment.
"""
raise NotImplementedError()
[docs] def _check_if_done(self):
"""
Function to check if the episode is done.
If the episode has a success condition then set done as:
self.info['is_success'] = 1.0
"""
raise NotImplementedError()
#------------------------------------------#
# Custom methods for the RobotBasicEnv #
[docs] def _reset_gazebo(self):
"""
Function to reset the gazebo simulation.
"""
# If resetting world (Does not reset time)
if self.reset_mode == 1:
ros_gazebo.gazebo_reset_world()
# If resetting simulation (Resets time)
elif self.reset_mode == 2:
ros_gazebo.gazebo_reset_sim()
if self.reset_controllers:
ros_gazebo.gazebo_unpause_physics()
ros_controllers.reset_controllers_srv(self.controllers_list, ns=self.namespace)
ros_gazebo.gazebo_pause_physics()
ros_gazebo.gazebo_unpause_physics()
self._check_subs_and_pubs_connection()
self._set_episode_init_params()
ros_gazebo.gazebo_pause_physics()
#------------------------------------------#
# Custom methods for the RobotBasicEnv #
[docs] def _check_subs_and_pubs_connection(self):
"""
Function to check if the gazebo and ros connections are ready
"""
raise NotImplementedError()
[docs] def _set_episode_init_params(self):
"""
Function to set some parameters, like the position of the robot, at the begining of each episode.
"""
raise NotImplementedError()