FoPra Beluga Challenge - Reinforcement Learning v1.0
Deep Reinforcement Learning solution for the Beluga Challenge shipping container optimization problem using PPO and MCTS
rl.env.state.ProblemState Class Reference

Complete state representation for the Beluga Challenge. More...

Public Member Functions

 __init__ (self, list[Jig] jigs, list[Beluga] belugas, list[int|None] trailers_beluga, list[int|None] trailers_factory, list[Rack] racks, list[ProductionLine] production_lines, list[int|None] hangars)
 Initialize the complete problem state.
 
 copy (self)
 Create a deep copy of the entire problem state.
 
 clone (self)
 Create a clone of the current state (alias for copy)
 
 is_terminal (self)
 Check if this state represents a terminal (goal) state.
 
float evaluate (self, int depth, mu=0.05)
 Evaluate the current state for MCTS scoring.
 
dict[str, float] get_subgoals (self)
 Calculate subgoal achievements for evaluation.
 
 apply_action (self, action_name, params)
 Apply an action to this state.
 
bool check_action_valid (self, str action_name, params=None)
 Check if an action with given parameters is valid.
 
 enumerate_valid_params (self, action)
 Enumerate all valid parameter combinations for a given action.
 
 get_possible_actions (self)
 Get list of all possible actions in the current state.
 
bool beluga_complete (self)
 Mark current beluga as complete and remove it.
 
 get_observation_high_level (self)
 Get high-level observation array for RL agents.
 
 __str__ (self)
 
 __repr__ (self)
 
 __hash__ (self)
 
 __eq__ (self, other)
 

Public Attributes

 jigs = jigs
 
 belugas = belugas
 
 trailers_beluga = trailers_beluga
 
 trailers_factory = trailers_factory
 
 racks = racks
 
 production_lines = production_lines
 
 hangars = hangars
 
int belugas_unloaded = 0
 
int belugas_finished = 0
 
int production_lines_finished = 0
 
 total_lines = len(self.production_lines)
 
 total_belugas = len(self.belugas)
 
bool problem_solved = False
 
str jigs = "\t" + str(count) + ": " + str(jig) + "\n"
 
str belugas = "\t" + str(count) + ": " + str(beluga) + "\n"
 
str racks = "\t" + str(count) + ": " + str(rack) + "\n"
 
str production_lines = "\t" + str(count) + ": " + str(production_line) + "\n"
 

Detailed Description

Complete state representation for the Beluga Challenge.

Contains all components of the problem: jigs, ships, storage, and facilities. Provides the main API for MCTS and RL algorithms including state transitions, validation, and evaluation functions.

Constructor & Destructor Documentation

◆ __init__()

rl.env.state.ProblemState.__init__ ( self,
list[Jig] jigs,
list[Beluga] belugas,
list[int | None] trailers_beluga,
list[int | None] trailers_factory,
list[Rack] racks,
list[ProductionLine] production_lines,
list[int | None] hangars )

Initialize the complete problem state.

Parameters
jigsList of all jigs in the problem
belugasList of Beluga ships
trailers_belugaList of Beluga trailer slots (jig IDs or None)
trailers_factoryList of factory trailer slots (jig IDs or None)
racksList of storage racks
production_linesList of production lines
hangarsList of hangar slots (jig IDs or None)
193 def __init__(self, jigs : list[Jig], belugas: list[Beluga], trailers_beluga: list[int | None], trailers_factory: list[int | None], racks: list[Rack], production_lines: list[ProductionLine], hangars: list[int | None]):
194 """!
195 @brief Initialize the complete problem state
196 @param jigs List of all jigs in the problem
197 @param belugas List of Beluga ships
198 @param trailers_beluga List of Beluga trailer slots (jig IDs or None)
199 @param trailers_factory List of factory trailer slots (jig IDs or None)
200 @param racks List of storage racks
201 @param production_lines List of production lines
202 @param hangars List of hangar slots (jig IDs or None)
203 """
204 self.jigs = jigs
205 self.belugas = belugas
206 self.trailers_beluga = trailers_beluga
207 self.trailers_factory = trailers_factory
208 self.racks = racks
209 self.production_lines = production_lines
210 self.hangars = hangars
211
212 # Subgoals
213 # for reward (High-Level) and evaluation (Low-Level-MCTS)
214 self.belugas_unloaded = 0 #counter
215 self.belugas_finished = 0 #counter
216 self.production_lines_finished = 0 #counter
217 self.total_lines = len(self.production_lines) # total production lines, for evaluation
218 self.total_belugas = len(self.belugas) # total belugas, for evaluation
219 self.problem_solved = False
220
221
222

Member Function Documentation

◆ __eq__()

rl.env.state.ProblemState.__eq__ ( self,
other )
633 def __eq__(self, other):
634 return str(self) == str(other)
635
636

◆ __hash__()

rl.env.state.ProblemState.__hash__ ( self)
630 def __hash__(self):
631 return hash(str(self))
632

◆ __repr__()

rl.env.state.ProblemState.__repr__ ( self)
627 def __repr__(self):
628 return self.__str__()
629

◆ __str__()

rl.env.state.ProblemState.__str__ ( self)
601 def __str__(self):
602 count = 0
603 out = "jigs:\n"
604 for jig in self.jigs:
605 out += "\t" + str(count) + ": " + str(jig) + "\n"
606 count += 1
607 out += "belugas:\n"
608 count = 0
609 for beluga in self.belugas:
610 out += "\t" + str(count) + ": " + str(beluga) + "\n"
611 count += 1
612 out += "trailers_beluga: " + str(self.trailers_beluga) + "\n"
613 out += "trailers_factory: " + str(self.trailers_factory) + "\n"
614 out += "racks:\n"
615 count = 0
616 for rack in self.racks:
617 out += "\t" + str(count) + ": " + str(rack) + "\n"
618 count += 1
619 out += "production_lines:\n"
620 count = 0
621 for production_line in self.production_lines:
622 out += "\t" + str(count) + ": " + str(production_line) + "\n"
623 count += 1
624 out += "hangars: " + str(self.hangars)
625 return out
626

◆ apply_action()

rl.env.state.ProblemState.apply_action ( self,
action_name,
params )

Apply an action to this state.

Parameters
action_nameName of the action to execute
paramsParameters for the action (dict or list)
Returns
True if action was successfully applied, False otherwise
303 def apply_action(self, action_name, params):
304 """!
305 @brief Apply an action to this state
306 @param action_name Name of the action to execute
307 @param params Parameters for the action (dict or list)
308 @return True if action was successfully applied, False otherwise
309 """
310 params = list(params.values()) if isinstance(params, dict) else list(params) # ensure params is a list
311 #action_name, params = candidate
312 if action_name == "left_stack_rack":
313 return left_stack_rack(self, *params)
314 elif action_name == "right_stack_rack":
315 return right_stack_rack(self, *params)
316 elif action_name == "left_unstack_rack":
317 return left_unstack_rack(self, *params)
318 elif action_name == "right_unstack_rack":
319 return right_unstack_rack(self, *params)
320 elif action_name == "load_beluga":
321 return load_beluga(self, *params)
322 elif action_name == "unload_beluga":
323 return unload_beluga(self)
324 elif action_name == "get_from_hangar":
325 return get_from_hangar(self, *params)
326 elif action_name == "deliver_to_hangar":
327 return deliver_to_hangar(self, *params)
328 else:
329 raise NotImplementedError(f"Action name not known: {action_name}")
330
331

◆ beluga_complete()

bool rl.env.state.ProblemState.beluga_complete ( self)

Mark current beluga as complete and remove it.

Returns
True if beluga was successfully marked complete, False otherwise
472 def beluga_complete(self) -> bool:
473 """!
474 @brief Mark current beluga as complete and remove it
475 @return True if beluga was successfully marked complete, False otherwise
476 """
477 if not self.belugas:
478 return False
479
480 beluga = self.belugas[0]
481 if beluga.outgoing or beluga.current_jigs:
482 return False
483
484 # Effects
485 self.belugas.pop(0)
486 return True
487
488
489

◆ check_action_valid()

bool rl.env.state.ProblemState.check_action_valid ( self,
str action_name,
params = None )

Check if an action with given parameters is valid.

Parameters
action_nameName of the action to validate
paramsParameters for the action (optional)
Returns
True if action is valid, False otherwise

This function validates an action without modifying the current state.

332 def check_action_valid(self, action_name: str, params=None) -> bool:
333 """!
334 @brief Check if an action with given parameters is valid
335 @param action_name Name of the action to validate
336 @param params Parameters for the action (optional)
337 @return True if action is valid, False otherwise
338
339 This function validates an action without modifying the current state.
340 """
341 state_copy = self.copy()
342
343 try:
344 if action_name == "left_stack_rack":
345 return left_stack_rack(state_copy, *params)
346 elif action_name == "right_stack_rack":
347 return right_stack_rack(state_copy, *params)
348 elif action_name == "left_unstack_rack":
349 return left_unstack_rack(state_copy, *params)
350 elif action_name == "right_unstack_rack":
351 return right_unstack_rack(state_copy, *params)
352 elif action_name == "load_beluga":
353 return load_beluga(state_copy, *params)
354 elif action_name == "unload_beluga":
355 return unload_beluga(state_copy)
356 elif action_name == "get_from_hangar":
357 return get_from_hangar(state_copy, *params)
358 elif action_name == "deliver_to_hangar":
359 return deliver_to_hangar(state_copy, *params)
360 else:
361 return False
362 except Exception as e:
363 print(f"Error in action {action_name} with params {params}: {e}")
364 return False
365

◆ clone()

rl.env.state.ProblemState.clone ( self)

Create a clone of the current state (alias for copy)

Returns
Deep copy of this ProblemState
256 def clone(self):
257 """!
258 @brief Create a clone of the current state (alias for copy)
259 @return Deep copy of this ProblemState
260 """
261 return self.copy()
262

◆ copy()

rl.env.state.ProblemState.copy ( self)

Create a deep copy of the entire problem state.

Returns
New ProblemState instance with all components copied
223 def copy(self):
224 """!
225 @brief Create a deep copy of the entire problem state
226 @return New ProblemState instance with all components copied
227 """
228 new_state = ProblemState(
229 jigs=[jig.copy() for jig in self.jigs],
230 belugas=[beluga.copy() for beluga in self.belugas],
231 trailers_beluga=self.trailers_beluga[:],
232 trailers_factory=self.trailers_factory[:],
233 racks=[rack.copy() for rack in self.racks],
234 production_lines=[pl.copy() for pl in self.production_lines],
235 hangars=self.hangars[:] # List of ints or None
236 )
237 new_state.belugas_unloaded = self.belugas_unloaded
238 new_state.belugas_finished = self.belugas_finished
239 new_state.production_lines_finished = self.production_lines_finished
240 new_state.total_lines = self.total_lines
241 new_state.total_belugas = self.total_belugas
242 new_state.problem_solved = self.problem_solved
243 return new_state
244

◆ enumerate_valid_params()

rl.env.state.ProblemState.enumerate_valid_params ( self,
action )

Enumerate all valid parameter combinations for a given action.

Parameters
actionName of the action to enumerate parameters for
Returns
List of valid parameter tuples for the action
366 def enumerate_valid_params(self, action):
367 """!
368 @brief Enumerate all valid parameter combinations for a given action
369 @param action Name of the action to enumerate parameters for
370 @return List of valid parameter tuples for the action
371 """
372 action_name = action
373 params = []
374
375 if action_name == "left_stack_rack":
376 all_param = [(rack_id, trailer_id)
377 for rack_id in range(len(self.racks))
378 for trailer_id in range(len(self.trailers_beluga))]
379
380 for t in all_param:
381 if self.check_action_valid(action_name, t):
382 params.append(t)
383
384 elif action_name == "right_stack_rack":
385 all_param = [(rack_id, trailer_id)
386 for rack_id in range(len(self.racks))
387 for trailer_id in range(len(self.trailers_factory))]
388
389 for t in all_param:
390 if self.check_action_valid(action_name, t):
391 params.append(t)
392
393 elif action_name == "left_unstack_rack":
394 all_param = [(rack_id, trailer_id)
395 for rack_id in range(len(self.racks))
396 for trailer_id in range(len(self.trailers_beluga))]
397
398 for t in all_param:
399 if self.check_action_valid(action_name, t):
400 params.append(t)
401
402 elif action_name == "right_unstack_rack":
403 all_param = [(rack_id, trailer_id)
404 for rack_id in range(len(self.racks))
405 for trailer_id in range(len(self.trailers_factory))]
406
407 for t in all_param:
408 if self.check_action_valid(action_name, t):
409 params.append(t)
410
411 elif action_name == "load_beluga":
412 all_param = [trailer_id for trailer_id in range(len(self.trailers_beluga))]
413 for t in all_param:
414 if self.check_action_valid(action_name, (t, None)):
415 params.append((t, None))
416
417 elif action_name == "deliver_to_hangar":
418 all_param = [(hangar_id, trailer_id)
419 for hangar_id in range(len(self.hangars))
420 for trailer_id in range(len(self.trailers_factory))]
421
422 for t in all_param:
423 if self.check_action_valid(action_name, t):
424 params.append(t)
425
426 elif action_name == "get_from_hangar":
427 all_param = [(hangar_id, trailer_id)
428 for hangar_id in range(len(self.hangars))
429 for trailer_id in range(len(self.trailers_factory))]
430
431 for t in all_param:
432 if self.check_action_valid(action_name, t):
433 params.append(t)
434
435 return params
436
437
438

◆ evaluate()

float rl.env.state.ProblemState.evaluate ( self,
int depth,
mu = 0.05 )

Evaluate the current state for MCTS scoring.

Parameters
depthCurrent depth in the search tree
muPenalty factor for depth (default 0.05)
Returns
Floating point score for this state
270 def evaluate(self, depth: int, mu = 0.05) -> float:
271 """!
272 @brief Evaluate the current state for MCTS scoring
273 @param depth Current depth in the search tree
274 @param mu Penalty factor for depth (default 0.05)
275 @return Floating point score for this state
276 """
277 score = 0.0
278 subgoals = self.get_subgoals()
279 score += sum(subgoals.values())
280 # Penalty based on path depth
281 score -= mu * depth
282 return score
283
284

◆ get_observation_high_level()

rl.env.state.ProblemState.get_observation_high_level ( self)

Get high-level observation array for RL agents.

Returns
NumPy array representing the current state for high-level agents

The observation includes information about belugas, trailers, hangars, and racks. High-level agents convert this array into tensors for neural network processing.

490 def get_observation_high_level(self):
491 """!
492 @brief Get high-level observation array for RL agents
493 @return NumPy array representing the current state for high-level agents
494
495 The observation includes information about belugas, trailers, hangars, and racks.
496 High-level agents convert this array into tensors for neural network processing.
497 """
498 # Return the current state of the environment for a high-level agent as array
499 # High-Level-Agents converts array into tensor
500
501
502 n_racks = 10
503
504 out = np.zeros(10 + 3*n_racks)
505
506 needed_outgoing_types = []
507 needed_in_production_lines = []
508
509 for pl in self.production_lines:
510 if len(pl.scheduled_jigs) > 0:
511 needed_in_production_lines.append(pl.scheduled_jigs[0])
512
513 # First slot 0 beluga
514 if len(self.belugas) > 0:
515 out[0] = max(0, min(len(self.belugas[0].current_jigs), 1))
516 if out[0] == 0:
517 needed_outgoing_types = self.belugas[0].outgoing
518 else:
519 out[0] = -1
520
521 # Slot 1-3 Beluga Trailer
522 slot = 1
523 for i in range(3):
524 if i < len(self.trailers_beluga):
525 if self.trailers_beluga[i] is None:
526 out[slot + i] = 0.5
527 else:
528 if self.jigs[self.trailers_beluga[i]].empty and out[0] == 0:
529 if needed_outgoing_types.__contains__(
530 self.jigs[self.trailers_beluga[i]].jig_type):
531 out[slot + i] = 0
532 else:
533 out[slot + i] = 0.25
534 else:
535 out[slot + i] = 1
536 else:
537 out[slot + i] = -1
538
539 # Slot 4-6 Factory Trailer
540 slot = 4
541 for i in range(3):
542 if i < len(self.trailers_factory):
543 if self.trailers_factory[i] is None:
544 out[slot + i] = 0.5
545 else:
546 if not self.jigs[self.trailers_factory[i]].empty:
547 if needed_in_production_lines.__contains__(self.trailers_factory[i]):
548 out[slot + i] = 1
549 else:
550 out[slot + i] = 0.75
551 else:
552 out[slot + i] = 0
553 else:
554 out[slot + i] = -1
555
556 # Slot 7-9 Hangars
557 slot = 7
558 for i in range(3):
559 if i < len(self.hangars):
560 if self.hangars[i] is None:
561 out[slot + i] = 0
562 else:
563 out[slot + i] = 1
564 else:
565 out[slot + i] = -1
566
567 # Slot 10-39 Racks
568 slot = 10
569 for i in range(n_racks):
570 if i < len(self.racks):
571 rack = self.racks[i]
572 items = len(rack.current_jigs)
573 if items == 0:
574 out[slot + i * 3] = 0
575 out[slot + i * 3 + 1] = 0
576 out[slot + i * 3 + 2] = 0
577
578 else:
579 out[slot + i * 3] = 0
580 out[slot + i * 3 + 1] = 0
581 out[slot + i * 3 + 2] = rack.get_free_space(self.jigs)/rack.size
582 for k in range(items):
583 jig = self.jigs[rack.current_jigs[k]]
584 if jig.empty and needed_outgoing_types.__contains__(jig.jig_type):
585 out[slot + i * 3] = (items - k) / items
586 continue
587 for k in range(items):
588 if needed_in_production_lines.__contains__(rack.current_jigs[k]):
589 out[slot + i * 3 + 1] = (k + 1) / items
590 continue
591 else:
592 out[slot + i * 3] = -1
593 out[slot + i * 3 + 1] = -1
594 out[slot + i * 3 + 2] = -1
595
596
597 return out
598
599
600

◆ get_possible_actions()

rl.env.state.ProblemState.get_possible_actions ( self)

Get list of all possible actions in the current state.

Returns
List of (action_name, parameters) tuples for all valid actions

An action is considered possible if at least one valid parameter combination exists.

439 def get_possible_actions(self):
440 """!
441 @brief Get list of all possible actions in the current state
442 @return List of (action_name, parameters) tuples for all valid actions
443
444 An action is considered possible if at least one valid parameter combination exists.
445 """
446 # action = ("action_name", "params")
447 possible_actions = []
448
449 # Check unload_beluga (no parameters)
450 if self.check_action_valid("unload_beluga"):
451 possible_actions.append(("unload_beluga", {}))
452
453 # Check actions with parameters
454 param_actions = [
455 "left_stack_rack",
456 "right_stack_rack",
457 "left_unstack_rack",
458 "right_unstack_rack",
459 "load_beluga",
460 "get_from_hangar",
461 "deliver_to_hangar"
462 ]
463 for action in param_actions:
464 # all actions with parameters, if there are no params, no legal actions
465 params = self.enumerate_valid_params(action)
466 possible_actions.extend([(action, param) for param in params])
467
468
469 return possible_actions
470
471

◆ get_subgoals()

dict[str, float] rl.env.state.ProblemState.get_subgoals ( self)

Calculate subgoal achievements for evaluation.

Returns
Dictionary mapping subgoal names to their scores
285 def get_subgoals(self) -> dict[str, float]:
286 """!
287 @brief Calculate subgoal achievements for evaluation
288 @return Dictionary mapping subgoal names to their scores
289 """
290 self.belugas_finished = self.total_belugas - len(self.belugas)
291 self.production_lines_finished = self.total_lines - len(self.production_lines)
292
293
294 if len(self.belugas) == 0 and len(self.production_lines) == 0:
295 self.problem_solved = True
296 return {
297 "subgoal_1": self.belugas_unloaded * 15,
298 "subgoal_2": self.belugas_finished * 60,
299 "subgoal_3": self.production_lines_finished * 100,
300 "goal": self.problem_solved * 1000
301 }
302

◆ is_terminal()

rl.env.state.ProblemState.is_terminal ( self)

Check if this state represents a terminal (goal) state.

Returns
True if all belugas and production lines are finished
263 def is_terminal(self):
264 """!
265 @brief Check if this state represents a terminal (goal) state
266 @return True if all belugas and production lines are finished
267 """
268 return len(self.belugas) == 0 and len(self.production_lines) == 0
269

Member Data Documentation

◆ belugas [1/2]

rl.env.state.ProblemState.belugas = belugas

◆ belugas [2/2]

str rl.env.state.ProblemState.belugas = "\t" + str(count) + ": " + str(beluga) + "\n"

◆ belugas_finished

int rl.env.state.ProblemState.belugas_finished = 0

◆ belugas_unloaded

int rl.env.state.ProblemState.belugas_unloaded = 0

◆ hangars

rl.env.state.ProblemState.hangars = hangars

◆ jigs [1/2]

rl.env.state.ProblemState.jigs = jigs

◆ jigs [2/2]

str rl.env.state.ProblemState.jigs = "\t" + str(count) + ": " + str(jig) + "\n"

◆ problem_solved

bool rl.env.state.ProblemState.problem_solved = False

◆ production_lines [1/2]

rl.env.state.ProblemState.production_lines = production_lines

◆ production_lines [2/2]

str rl.env.state.ProblemState.production_lines = "\t" + str(count) + ": " + str(production_line) + "\n"

◆ production_lines_finished

int rl.env.state.ProblemState.production_lines_finished = 0

◆ racks [1/2]

rl.env.state.ProblemState.racks = racks

◆ racks [2/2]

str rl.env.state.ProblemState.racks = "\t" + str(count) + ": " + str(rack) + "\n"

◆ total_belugas

rl.env.state.ProblemState.total_belugas = len(self.belugas)

◆ total_lines

rl.env.state.ProblemState.total_lines = len(self.production_lines)

◆ trailers_beluga

rl.env.state.ProblemState.trailers_beluga = trailers_beluga

◆ trailers_factory

rl.env.state.ProblemState.trailers_factory = trailers_factory

The documentation for this class was generated from the following file: