FoPra Beluga Challenge - Reinforcement Learning v1.0
Deep Reinforcement Learning solution for the Beluga Challenge shipping container optimization problem using PPO and MCTS
rl.env.environment.Env Class Reference

Beluga Challenge environment for reinforcement learning. More...

Public Member Functions

 __init__ (self, str path, int base_index=-1)
 Initialize the Beluga Challenge environment.
 
 step (self, str action_name, params=None)
 Execute a single environment step with the given action.
 
 reset (self)
 Reset the environment with a new problem instance.
 
 reset_specific_problem (self, problem)
 Reset the environment with a specific problem instance.
 
 get_reward (self, bool could_execute, str action_name, production_line_n_old)
 Calculate the reward for the current action.
 
 get_observation_high_level (self)
 Get the high-level observation of the current state.
 
 get_max_steps (self)
 Get the maximum number of steps to solve the current problem.
 
 check_action_execution (self, str action_name, obs)
 Check if the action can be executed without actually executing it.
 

Public Attributes

ProblemState state = None
 
 path = path
 
int step_count = 0
 
 problem_name = None
 
list sorted_problems = []
 
int problem_count = 0
 
 base_index = base_index
 
int problems_solved = 0
 
int block_size = 6
 
dict check_action_map
 

Detailed Description

Beluga Challenge environment for reinforcement learning.

This class implements the main environment for the Beluga Challenge shipping container optimization problem. It manages problem states, action execution, reward calculation, and episode management.

Constructor & Destructor Documentation

◆ __init__()

rl.env.environment.Env.__init__ ( self,
str path,
int base_index = -1 )

Initialize the Beluga Challenge environment.

Parameters
pathPath to the directory containing problem JSON files
base_indexBase index for problem selection
34 def __init__(self, path: str, base_index: int = -1):
35 """!
36 @brief Initialize the Beluga Challenge environment
37 @param path Path to the directory containing problem JSON files
38 @param base_index Base index for problem selection
39 """
40 # Initialize environment variables here
41 self.state : ProblemState = None # Not initialized yet, will be set in reset()
42 self.path = path
43 self.step_count = 0 # Counter for the number of steps taken, as termination condition in training/evaluation
44 self.problem_name = None
45 self.sorted_problems = [] # List to hold sorted problems by jig count
46 self.problem_count = 0 # Counter for the number of problems solved
47 self.base_index = base_index # Base index for problem selection, used to select problems in ascending order of jig count
48 self.problems_solved = 0 # Counter for the number of problems solved
49 self.block_size = 6 # Number of problems to select in each block
50
51 # Map action names to action functions
52 self.check_action_map = {
53 "load_beluga": check_load_beluga,
54 "unload_beluga": check_unload_beluga,
55 "get_from_hangar": check_get_from_hangar,
56 "deliver_to_hangar": check_deliver_to_hangar,
57 "left_stack_rack": check_left_stack_rack,
58 "right_stack_rack": check_right_stack_rack,
59 "left_unstack_rack": check_left_unstack_rack,
60 "right_unstack_rack": check_right_unstack_rack
61 }
62
63 # Find all JSON files in the problems folder
64 if os.path.exists(self.path):
65 problem_files = [f for f in os.listdir(self.path) if f.endswith('.json')]
66
67 # Extract the number of jigs from each filename using regular expression
68 jig_counts = []
69 for file in problem_files:
70 match = re.search(r'_j(\d+)_', file)
71 if match:
72 jig_count = int(match.group(1))
73 jig_counts.append((file, jig_count))
74
75 # Sort by number of jigs (ascending)
76 self.sorted_problems = sorted(jig_counts, key=lambda x: x[1])
77 self.problem_count = len(self.sorted_problems) # Set the problem count based on the sorted problems
78
79
80

Member Function Documentation

◆ check_action_execution()

rl.env.environment.Env.check_action_execution ( self,
str action_name,
obs )

Check if the action can be executed without actually executing it.

Parameters
action_nameName of the action to check
obsCurrent observation of the environment
Returns
True if the action can be executed, False otherwise
211 def check_action_execution(self, action_name: str, obs):
212 """!
213 @brief Check if the action can be executed without actually executing it
214 @param action_name Name of the action to check
215 @param obs Current observation of the environment
216 @return True if the action can be executed, False otherwise
217 """
218
219 if action_name in self.check_action_map:
220 return self.check_action_map[action_name](self.state, obs)

◆ get_max_steps()

rl.env.environment.Env.get_max_steps ( self)

Get the maximum number of steps to solve the current problem.

Returns
Maximum steps based on the problem size
203 def get_max_steps(self):
204 """!
205 @brief Get the maximum number of steps to solve the current problem
206 @return Maximum steps based on the problem size
207 """
208
209 return len(self.state.jigs) * 20 + 100
210

◆ get_observation_high_level()

rl.env.environment.Env.get_observation_high_level ( self)

Get the high-level observation of the current state.

Returns
High-level observation array
196 def get_observation_high_level(self):
197 """!
198 @brief Get the high-level observation of the current state
199 @return High-level observation array
200 """
201 return self.state.get_observation_high_level()
202

◆ get_reward()

rl.env.environment.Env.get_reward ( self,
bool could_execute,
str action_name,
production_line_n_old )

Calculate the reward for the current action.

Parameters
could_executeBoolean indicating if the action was successfully executed
action_nameName of the action taken
production_line_n_oldNumber of production lines before the action
Returns
Reward value based on the action and state
150 def get_reward(self, could_execute: bool, action_name: str, production_line_n_old):
151 """!
152 @brief Calculate the reward for the current action
153 @param could_execute Boolean indicating if the action was successfully executed
154 @param action_name Name of the action taken
155 @param production_line_n_old Number of production lines before the action
156 @return Reward value based on the action and state
157 """
158
159 # Goal completed
160 if self.state.is_terminal():
161 return 10000
162
163 # Penalty if action fails, but less severe
164 if not could_execute:
165 return -1000 + min(20, self.step_count) * 10 # Mild penalty with moderate increase
166
167 if action_name == "unload_beluga":
168 # Reward if beluga is completely unloaded
169 if len(self.state.belugas) > 0:
170 if len(self.state.belugas[0].current_jigs) == 1:
171 return 2000.0
172 return 100.0
173
174 if action_name == "load_beluga":
175 if len(self.state.belugas) > 0:
176 if len(self.state.belugas[0].outgoing) == 1:
177 return 2000.0
178 return 100.0
179 else:
180 return 5000.0
181
182 # Actions that stack or unstack racks
183 if action_name in ["right_stack_rack", "left_stack_rack", "right_unstack_rack", "left_unstack_rack"]:
184 return 10.0
185
186 if action_name == "deliver_to_hangar":
187 if production_line_n_old > len(self.state.production_lines):
188 return 2000.0
189 return 200.0
190
191 if action_name == "get_from_hangar":
192 return 50.0
193
194 return 0
195

◆ reset()

rl.env.environment.Env.reset ( self)

Reset the environment with a new problem instance.

Resets the environment's state from a JSON file in the problems folder, selecting problems in ascending order of jigs count (in blocks of 6 problems)

Returns
Initial observation of the new episode
111 def reset(self):
112 """!
113 @brief Reset the environment with a new problem instance
114
115 Resets the environment's state from a JSON file in the problems folder,
116 selecting problems in ascending order of jigs count (in blocks of 6 problems)
117
118 @return Initial observation of the new episode
119 """
120
121 number = randint(1, self.block_size + 1)
122
123 # Increase base_index only if problems_solved > 0 (to prevent increase at initialization)
124 # and if it's a multiple of block_size * 3 (every 18 problems when block_size=6)
125 if self.problems_solved > 0 and (self.problems_solved % (self.block_size * 3)) == 0:
126 self.base_index += 1
127 self.problems_solved = 0
128
129 # If we have reached the end of the sorted problems, choose a random problem
130 if self.base_index + number >= self.problem_count:
131 self.problem_name = os.path.join(self.path, self.sorted_problems[randint(0, self.problem_count)][0])
132 else:
133 self.problem_name = os.path.join(self.path, self.sorted_problems[self.base_index + number][0])
134
135 self.state = load_from_json(self.problem_name)
136 return self.get_observation_high_level()
137

◆ reset_specific_problem()

rl.env.environment.Env.reset_specific_problem ( self,
problem )

Reset the environment with a specific problem instance.

Parameters
problemPath to the specific problem JSON file
Returns
Initial observation of the new episode
138 def reset_specific_problem(self, problem):
139 """!
140 @brief Reset the environment with a specific problem instance
141 @param problem Path to the specific problem JSON file
142 @return Initial observation of the new episode
143 """
144 self.problem_name = problem
145 self.state = load_from_json(self.problem_name)
146 self.step_count = 0
147
148 return self.get_observation_high_level()
149

◆ step()

rl.env.environment.Env.step ( self,
str action_name,
params = None )

Execute a single environment step with the given action.

Parameters
action_nameName of the action to execute
paramsParameters for the action (optional)
Returns
Tuple of (observation, reward, done_flag)
81 def step(self, action_name: str, params=None):
82 """!
83 @brief Execute a single environment step with the given action
84 @param action_name Name of the action to execute
85 @param params Parameters for the action (optional)
86 @return Tuple of (observation, reward, done_flag)
87 """
88
89 n_production_lines = len(self.state.production_lines)
90 could_execute = False
91
92 if params == [] and action_name == "unload_beluga":
93 could_execute = self.state.apply_action(action_name, {})
94 else:
95 # Unpack params as needed. This example assumes params is a dictionary
96 # containing the arguments to be passed (besides state).
97 if params != []:
98 could_execute = self.state.apply_action(action_name, params)
99 else:
100 could_execute = False
101
102 obs = self.get_observation_high_level() # Get the current observation before executing the action
103 reward = self.get_reward(could_execute, action_name, n_production_lines)
104 self.step_count += 1 # Increment the step count
105
106 if self.state.is_terminal():
107 self.problems_solved += 1
108
109 return obs, reward, self.state.is_terminal()
110

Member Data Documentation

◆ base_index

rl.env.environment.Env.base_index = base_index

◆ block_size

int rl.env.environment.Env.block_size = 6

◆ check_action_map

dict rl.env.environment.Env.check_action_map
Initial value:
= {
"load_beluga": check_load_beluga,
"unload_beluga": check_unload_beluga,
"get_from_hangar": check_get_from_hangar,
"deliver_to_hangar": check_deliver_to_hangar,
"left_stack_rack": check_left_stack_rack,
"right_stack_rack": check_right_stack_rack,
"left_unstack_rack": check_left_unstack_rack,
"right_unstack_rack": check_right_unstack_rack
}

◆ path

rl.env.environment.Env.path = path

◆ problem_count

int rl.env.environment.Env.problem_count = 0

◆ problem_name

rl.env.environment.Env.problem_name = None

◆ problems_solved

int rl.env.environment.Env.problems_solved = 0

◆ sorted_problems

list rl.env.environment.Env.sorted_problems = []

◆ state

rl.env.environment.Env.state = None

◆ step_count

rl.env.environment.Env.step_count = 0

The documentation for this class was generated from the following file: