Attachment 'my_turtlebot2_wall.py'
Download 1 import rospy
2 import numpy
3 from gym import spaces
4 from openai_ros.robot_envs import turtlebot2_env
5 from gym.envs.registration import register
6 from geometry_msgs.msg import Point
7
8 timestep_limit_per_episode = 10000 # Can be any Value
9
10 register(
11 id='MyTurtleBot2Wall-v0',
12 entry_point='my_turtlebot2_wall:MyTurtleBot2WallEnv',
13 timestep_limit=timestep_limit_per_episode,
14 )
15
16 class MyTurtleBot2WallEnv(turtlebot2_env.TurtleBot2Env):
17 def __init__(self):
18 """
19 This Task Env is designed for having the TurtleBot2 in some kind of maze.
20 It will learn how to move around the maze without crashing.
21 """
22
23 # Only variable needed to be set here
24 number_actions = rospy.get_param('/turtlebot2/n_actions')
25 self.action_space = spaces.Discrete(number_actions)
26
27 # We set the reward range, which is not compulsory but here we do it.
28 self.reward_range = (-numpy.inf, numpy.inf)
29
30
31 #number_observations = rospy.get_param('/turtlebot2/n_observations')
32 """
33 We set the Observation space for the 6 observations
34 cube_observations = [
35 round(current_disk_roll_vel, 0),
36 round(y_distance, 1),
37 round(roll, 1),
38 round(pitch, 1),
39 round(y_linear_speed,1),
40 round(yaw, 1),
41 ]
42 """
43
44 # Actions and Observations
45 self.linear_forward_speed = rospy.get_param('/turtlebot2/linear_forward_speed')
46 self.linear_turn_speed = rospy.get_param('/turtlebot2/linear_turn_speed')
47 self.angular_speed = rospy.get_param('/turtlebot2/angular_speed')
48 self.init_linear_forward_speed = rospy.get_param('/turtlebot2/init_linear_forward_speed')
49 self.init_linear_turn_speed = rospy.get_param('/turtlebot2/init_linear_turn_speed')
50
51 self.new_ranges = rospy.get_param('/turtlebot2/new_ranges')
52 self.min_range = rospy.get_param('/turtlebot2/min_range')
53 self.max_laser_value = rospy.get_param('/turtlebot2/max_laser_value')
54 self.min_laser_value = rospy.get_param('/turtlebot2/min_laser_value')
55
56 # Get Desired Point to Get
57 self.desired_point = Point()
58 self.desired_point.x = rospy.get_param("/turtlebot2/desired_pose/x")
59 self.desired_point.y = rospy.get_param("/turtlebot2/desired_pose/y")
60 self.desired_point.z = rospy.get_param("/turtlebot2/desired_pose/z")
61
62 # We create two arrays based on the binary values that will be assigned
63 # In the discretization method.
64 laser_scan = self._check_laser_scan_ready()
65 num_laser_readings = len(laser_scan.ranges)/self.new_ranges
66 high = numpy.full((num_laser_readings), self.max_laser_value)
67 low = numpy.full((num_laser_readings), self.min_laser_value)
68
69 # We only use two integers
70 self.observation_space = spaces.Box(low, high)
71
72 rospy.logdebug("ACTION SPACES TYPE===>"+str(self.action_space))
73 rospy.logdebug("OBSERVATION SPACES TYPE===>"+str(self.observation_space))
74
75 # Rewards
76 self.forwards_reward = rospy.get_param("/turtlebot2/forwards_reward")
77 self.turn_reward = rospy.get_param("/turtlebot2/turn_reward")
78 self.end_episode_points = rospy.get_param("/turtlebot2/end_episode_points")
79
80 self.cumulated_steps = 0.0
81
82 # Here we will add any init functions prior to starting the MyRobotEnv
83 super(MyTurtleBot2WallEnv, self).__init__()
84
85 def _set_init_pose(self):
86 """Sets the Robot in its init pose
87 """
88 self.move_base( self.init_linear_forward_speed,
89 self.init_linear_turn_speed,
90 epsilon=0.05,
91 update_rate=10)
92
93 return True
94
95
96 def _init_env_variables(self):
97 """
98 Inits variables needed to be initialised each time we reset at the start
99 of an episode.
100 :return:
101 """
102 # For Info Purposes
103 self.cumulated_reward = 0.0
104 # Set to false Done, because its calculated asyncronously
105 self._episode_done = False
106
107 odometry = self.get_odom()
108 self.previous_distance_from_des_point = self.get_distance_from_desired_point(odometry.pose.pose.position)
109
110
111 def _set_action(self, action):
112 """
113 This set action will Set the linear and angular speed of the turtlebot2
114 based on the action number given.
115 :param action: The action integer that set s what movement to do next.
116 """
117
118 rospy.logdebug("Start Set Action ==>"+str(action))
119 # We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv
120 if action == 0: #FORWARD
121 linear_speed = self.linear_forward_speed
122 angular_speed = 0.0
123 self.last_action = "FORWARDS"
124 elif action == 1: #LEFT
125 linear_speed = self.linear_turn_speed
126 angular_speed = self.angular_speed
127 self.last_action = "TURN_LEFT"
128 elif action == 2: #RIGHT
129 linear_speed = self.linear_turn_speed
130 angular_speed = -1*self.angular_speed
131 self.last_action = "TURN_RIGHT"
132
133
134 # We tell TurtleBot2 the linear and angular speed to set to execute
135 self.move_base(linear_speed, angular_speed, epsilon=0.05, update_rate=10)
136
137 rospy.logdebug("END Set Action ==>"+str(action))
138
139 def _get_obs(self):
140 """
141 Here we define what sensor data defines our robots observations
142 To know which Variables we have acces to, we need to read the
143 TurtleBot2Env API DOCS
144 :return:
145 """
146 rospy.logdebug("Start Get Observation ==>")
147 # We get the laser scan data
148 laser_scan = self.get_laser_scan()
149
150 discretized_laser_scan = self.discretize_observation( laser_scan,
151 self.new_ranges
152 )
153
154
155 # We get the odometry so that SumitXL knows where it is.
156 odometry = self.get_odom()
157 x_position = odometry.pose.pose.position.x
158 y_position = odometry.pose.pose.position.y
159
160 # We round to only two decimals to avoid very big Observation space
161 odometry_array = [round(x_position, 2),round(y_position, 2)]
162
163 # We only want the X and Y position and the Yaw
164
165 observations = discretized_laser_scan + odometry_array
166
167 rospy.logdebug("Observations==>"+str(observations))
168 rospy.logdebug("END Get Observation ==>")
169 return observations
170
171
172 def _is_done(self, observations):
173
174 if self._episode_done:
175 rospy.logerr("TurtleBot2 is Too Close to wall==>")
176 else:
177 rospy.logerr("TurtleBot2 didnt crash at least ==>")
178
179
180 current_position = Point()
181 current_position.x = observations[-2]
182 current_position.y = observations[-1]
183 current_position.z = 0.0
184
185 MAX_X = 6.0
186 MIN_X = -1.0
187 MAX_Y = 3.0
188 MIN_Y = -3.0
189
190 # We see if we are outside the Learning Space
191
192 if current_position.x <= MAX_X and current_position.x > MIN_X:
193 if current_position.y <= MAX_Y and current_position.y > MIN_Y:
194 rospy.logdebug("TurtleBot Position is OK ==>["+str(current_position.x)+","+str(current_position.y)+"]")
195
196 # We see if it got to the desired point
197 if self.is_in_desired_position(current_position):
198 self._episode_done = True
199
200
201 else:
202 rospy.logerr("TurtleBot to Far in Y Pos ==>"+str(current_position.x))
203 self._episode_done = True
204 else:
205 rospy.logerr("TurtleBot to Far in X Pos ==>"+str(current_position.x))
206 self._episode_done = True
207
208
209
210
211 return self._episode_done
212
213 def _compute_reward(self, observations, done):
214
215 current_position = Point()
216 current_position.x = observations[-2]
217 current_position.y = observations[-1]
218 current_position.z = 0.0
219
220 distance_from_des_point = self.get_distance_from_desired_point(current_position)
221 distance_difference = distance_from_des_point - self.previous_distance_from_des_point
222
223
224 if not done:
225
226 if self.last_action == "FORWARDS":
227 reward = self.forwards_reward
228 else:
229 reward = self.turn_reward
230
231 # If there has been a decrease in the distance to the desired point, we reward it
232 if distance_difference < 0.0:
233 rospy.logwarn("DECREASE IN DISTANCE GOOD")
234 reward += self.forwards_reward
235 else:
236 rospy.logerr("ENCREASE IN DISTANCE BAD")
237 reward += 0
238
239 else:
240
241 if self.is_in_desired_position(current_position):
242 reward = self.end_episode_points
243 else:
244 reward = -1*self.end_episode_points
245
246
247 self.previous_distance_from_des_point = distance_from_des_point
248
249
250 rospy.logdebug("reward=" + str(reward))
251 self.cumulated_reward += reward
252 rospy.logdebug("Cumulated_reward=" + str(self.cumulated_reward))
253 self.cumulated_steps += 1
254 rospy.logdebug("Cumulated_steps=" + str(self.cumulated_steps))
255
256 return reward
257
258
259 # Internal TaskEnv Methods
260
261 def discretize_observation(self,data,new_ranges):
262 """
263 Discards all the laser readings that are not multiple in index of new_ranges
264 value.
265 """
266 self._episode_done = False
267
268 discretized_ranges = []
269 mod = len(data.ranges)/new_ranges
270
271 rospy.logdebug("data=" + str(data))
272 rospy.logwarn("new_ranges=" + str(new_ranges))
273 rospy.logwarn("mod=" + str(mod))
274
275 for i, item in enumerate(data.ranges):
276 if (i%mod==0):
277 if item == float ('Inf') or numpy.isinf(item):
278 discretized_ranges.append(self.max_laser_value)
279 elif numpy.isnan(item):
280 discretized_ranges.append(self.min_laser_value)
281 else:
282 discretized_ranges.append(int(item))
283
284 if (self.min_range > item > 0):
285 rospy.logerr("done Validation >>> item=" + str(item)+"< "+str(self.min_range))
286 self._episode_done = True
287 else:
288 rospy.logwarn("NOT done Validation >>> item=" + str(item)+"< "+str(self.min_range))
289
290
291 return discretized_ranges
292
293
294 def is_in_desired_position(self,current_position, epsilon=0.05):
295 """
296 It return True if the current position is similar to the desired poistion
297 """
298
299 is_in_desired_pos = False
300
301
302 x_pos_plus = self.desired_point.x + epsilon
303 x_pos_minus = self.desired_point.x - epsilon
304 y_pos_plus = self.desired_point.y + epsilon
305 y_pos_minus = self.desired_point.y - epsilon
306
307 x_current = current_position.x
308 y_current = current_position.y
309
310 x_pos_are_close = (x_current <= x_pos_plus) and (x_current > x_pos_minus)
311 y_pos_are_close = (y_current <= y_pos_plus) and (y_current > y_pos_minus)
312
313 is_in_desired_pos = x_pos_are_close and y_pos_are_close
314
315 return is_in_desired_pos
316
317
318 def get_distance_from_desired_point(self, current_position):
319 """
320 Calculates the distance from the current position to the desired point
321 :param start_point:
322 :return:
323 """
324 distance = self.get_distance_from_point(current_position,
325 self.desired_point)
326
327 return distance
328
329 def get_distance_from_point(self, pstart, p_end):
330 """
331 Given a Vector3 Object, get distance from current position
332 :param p_end:
333 :return:
334 """
335 a = numpy.array((pstart.x, pstart.y, pstart.z))
336 b = numpy.array((p_end.x, p_end.y, p_end.z))
337
338 distance = numpy.linalg.norm(a - b)
339
340 return distance
Attached Files
To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.You are not allowed to attach a file to this page.