FoPra Beluga Challenge - Reinforcement Learning v1.0
Deep Reinforcement Learning solution for the Beluga Challenge shipping container optimization problem using PPO and MCTS
rl.utils.problem_filter Namespace Reference

Functions

 filter_problem (input_file, output_file, max_jigs, max_belugas, max_prod_lines, max_racks)
 Filter a problem instance to reduce its complexity.
 
 generate_problems (num_problems=20, input_folder="problems", output_folder="problemset1", jig_range=(5, 30), beluga_range=(2, 8), prod_line_range=(2, 6), rack_range=(2, 10))
 

Variables

dict JIG_TYPES
 

Function Documentation

◆ filter_problem()

rl.utils.problem_filter.filter_problem ( input_file,
output_file,
max_jigs,
max_belugas,
max_prod_lines,
max_racks )

Filter a problem instance to reduce its complexity.

This function takes a problem JSON file and creates a simplified version by limiting the number of jigs, belugas, production lines, and racks.

Parameters
input_filePath to the input problem JSON file
output_filePath for the output filtered problem JSON file
max_jigsMaximum number of jigs to keep
max_belugasMaximum number of belugas to keep
max_prod_linesMaximum number of production lines to keep
max_racksMaximum number of racks to keep
23def filter_problem(input_file, output_file, max_jigs, max_belugas, max_prod_lines, max_racks):
24 """!
25 @brief Filter a problem instance to reduce its complexity
26
27 This function takes a problem JSON file and creates a simplified version
28 by limiting the number of jigs, belugas, production lines, and racks.
29
30 @param input_file Path to the input problem JSON file
31 @param output_file Path for the output filtered problem JSON file
32 @param max_jigs Maximum number of jigs to keep
33 @param max_belugas Maximum number of belugas to keep
34 @param max_prod_lines Maximum number of production lines to keep
35 @param max_racks Maximum number of racks to keep
36 """
37
38 with open(input_file) as f:
39 data = json.load(f)
40
41 # Step 1: Trim jigs
42 all_jig_keys = list(data["jigs"].keys())
43 kept_jig_keys = set(all_jig_keys[:max_jigs])
44
45 # Remove unnecessary jigs
46 data["jigs"] = {k: v for k, v in data["jigs"].items() if k in kept_jig_keys}
47
48 # Clean racks
49 for rack in data["racks"]:
50 rack["jigs"] = []
51
52 # Clean production lines
53 for pl in data["production_lines"]:
54 pl["schedule"] = []
55
56 # Clean flights
57 for flight in data["flights"]:
58 flight["incoming"] = []
59 flight["outgoing"] = []
60
61 # Schritt 2: Belugas, Racks und Produktionslinien trimmen
62 data["flights"] = data["flights"][:max_belugas]
63 data["racks"] = data["racks"][:max_racks]
64 data["production_lines"] = data["production_lines"][:max_prod_lines]
65
66 # Schritt 3: Wir nehmen ein sample von Jigs und packen es in die Produktionslinien
67 non_empty_jigs = [k for k, v in data["jigs"].items() if not v.get("empty", False)]
68 random.shuffle(non_empty_jigs)
69
70 used_jigs = set()
71 for pl in data["production_lines"]:
72 if not non_empty_jigs:
73 break
74
75 max_count = min(15, len(non_empty_jigs)) # max 15 Jigs pro Linie oder weniger, je nach Verfügbarkeit
76 count = random.randint(1, max_count) # zufällige Anzahl Jigs (1 bis max_count)
77
78 selected = non_empty_jigs[:count]
79 pl["schedule"] = selected
80
81 used_jigs.update(selected)
82 non_empty_jigs = non_empty_jigs[count:]
83
84 # Schritt 4: Verteile die verwendeten Jigs aus den Produktionslinien
85 used_jigs = set(jig for pl in data["production_lines"] for jig in pl["schedule"])
86
87 # Liste der verwendbaren Jigs mit Größenangabe
88 jig_objects = []
89 for jig_id in used_jigs:
90 jig = data["jigs"][jig_id]
91 jig_type = jig["type"]
92 type_info = data["jig_types"][jig_type]
93 size_loaded = type_info["size_loaded"]
94 jig_objects.append((jig_id, size_loaded))
95
96 # Zuerst ein paar wenige Jigs für Racks auswählen
97 rack_fraction = max(1, int(0.1 * len(jig_objects))) # 10% für jigs in Racks
98 random.shuffle(jig_objects)
99
100 rack_jigs = []
101 beluga_jigs = []
102
103 # Kopie der racks zum Füllen (mit current_size zum Tracken)
104 racks_state = []
105 for rack in data["racks"]:
106 racks_state.append({
107 "rack": rack,
108 "remaining_size": rack["size"],
109 "jigs": []
110 })
111
112 # 1. Packe ein paar wenige Jigs in passende Racks
113 for jig_id, jig_size in jig_objects:
114 if len(rack_jigs) >= rack_fraction:
115 break
116
117 # Suche einen Rack mit genug Platz
118 for rack_info in racks_state:
119 if jig_size <= rack_info["remaining_size"]:
120 rack_info["jigs"].append(jig_id)
121 rack_info["remaining_size"] -= jig_size
122 rack_jigs.append(jig_id)
123 break
124
125 # Update racks mit den zugewiesenen Jigs
126 for rack_info in racks_state:
127 rack_info["rack"]["jigs"] = rack_info["jigs"]
128
129 # 2. Der Rest kommt in die Belugas (gleichmäßig verteilen)
130 beluga_jigs = [jig for jig in used_jigs if jig not in rack_jigs]
131 num_belugas = len(data["flights"])
132 for i, jig in enumerate(beluga_jigs):
133 beluga = data["flights"][i % num_belugas]
134 beluga["incoming"].append(jig)
135
136 # Schritt 5: Paar random jigs in racks und belugas verteilen (Jigs die aber davor nicht benutzt worden sind)
137 # Alle Jig-IDs, die bisher noch NICHT verwendet wurden
138 all_jig_ids = set(data["jigs"].keys())
139 unused_jigs = list(all_jig_ids - used_jigs)
140 random.shuffle(unused_jigs)
141
142 # Wie viele zusätzliche Jigs wollen wir zufällig verteilen?
143 random_int = random.randint(3, 10) # (3, 10) für kleine Probleme, mehr für größere
144 extra_count = min(random_int, len(unused_jigs))
145 extra_jigs = unused_jigs[:extra_count]
146
147 # Racks vorbereiten mit verbleibender Größe
148 racks_state = []
149 for rack in data["racks"]:
150 current_jigs = rack.get("jigs", [])
151 remaining = rack["size"]
152 for jig_id in current_jigs:
153 jig_type = data["jigs"][jig_id]["type"]
154 is_empty = data["jigs"][jig_id].get("empty", False)
155 size = data["jig_types"][jig_type]["size_empty" if is_empty else "size_loaded"]
156 remaining -= size
157 racks_state.append({
158 "rack": rack,
159 "remaining_size": remaining,
160 "jigs": current_jigs
161 })
162
163 # Jetzt verteilen wir die zusätzlichen Jigs
164 for jig_id in extra_jigs:
165 jig = data["jigs"][jig_id]
166 jig_type = jig["type"]
167 is_empty = jig.get("empty", False)
168 size = data["jig_types"][jig_type]["size_empty" if is_empty else "size_loaded"]
169
170 # Zufällig entscheiden: Rack oder Beluga
171 target = random.choice(["rack", "beluga"])
172
173 if target == "rack":
174 random.shuffle(racks_state)
175 placed = False
176 for rack_info in racks_state:
177 if size <= rack_info["remaining_size"]:
178 rack_info["rack"]["jigs"].append(jig_id)
179 rack_info["remaining_size"] -= size
180 placed = True
181 break
182 if not placed:
183 # Kein Rack mit Platz gefunden → auf Beluga ausweichen
184 target = "beluga"
185
186 if target == "beluga" and not is_empty:
187 beluga = random.choice(data["flights"])
188 beluga["incoming"].append(jig_id)
189
190 # Schritt 6: Von den ganzen Jigs nemmen wir ein random sample von Jig_Typen und verteile es auf die Outgoing der Belugas
191
192 # Alle benutzten Jigs: Belugas und Racks
193 all_used_jigs = set()
194
195 for rack in data["racks"]:
196 all_used_jigs.update(rack.get("jigs", []))
197 for beluga in data["flights"]:
198 all_used_jigs.update(beluga.get("incoming", []))
199
200 # Alle Jig-Typen extrahieren
201 jig_types_used = [data["jigs"][jig_id]["type"] for jig_id in all_used_jigs]
202
203 random.shuffle(jig_types_used)
204 max_types = min(8, len(jig_types_used)) # 8 bei kleinen Problemen, sonst mehr je schweriger das Problem
205
206 if max_types >= 1:
207 count_to_use = random.randint(0, max_types)
208 else:
209 count_to_use = 0
210
211 types_to_distribute = jig_types_used[:count_to_use]
212
213 # Verteile die Typen zufällig auf die Belugas
214 for jig_type in types_to_distribute:
215 beluga = random.choice(data["flights"])
216 beluga["outgoing"].append(jig_type)
217
218 # Schritt 7: Entferne belugas und produktionslinien, die leer sind
219 # Production lines filtern: nur behalten, wenn schedule nicht leer ist
220 data["production_lines"] = [
221 pl for pl in data["production_lines"] if pl.get("schedule")
222 ]
223
224 # Belugas filtern: nur behalten, wenn incoming oder outgoing nicht leer sind
225 data["flights"] = [
226 fl for fl in data["flights"]
227 if fl.get("incoming") or fl.get("outgoing")
228 ]
229
230 # Speichern
231 Path(output_file).parent.mkdir(parents=True, exist_ok=True)
232 with open(output_file, "w") as f:
233 json.dump(data, f, indent=2)
234

◆ generate_problems()

rl.utils.problem_filter.generate_problems ( num_problems = 20,
input_folder = "problems",
output_folder = "problemset1",
jig_range = (5, 30),
beluga_range = (2, 8),
prod_line_range = (2, 6),
rack_range = (2, 10) )
243):
244 os.makedirs(output_folder, exist_ok=True)
245 problem_files = [f for f in os.listdir(input_folder) if f.endswith(".json")]
246
247 for i in range(1, num_problems + 1):
248 random_input_file = random.choice(problem_files)
249 input_path = os.path.join(input_folder, random_input_file)
250
251 # Zufällige Werte generieren und zwischenspeichern
252 max_jigs_val = random.randint(*jig_range)
253 max_belugas_val = random.randint(*beluga_range)
254 max_prod_lines_val = random.randint(*prod_line_range)
255 max_racks_val = random.randint(*rack_range)
256
257 # Temporärer Dateiname
258 temp_filename = f"problem{i}_tmp.json"
259 temp_path = os.path.join(output_folder, temp_filename)
260
261 filter_problem(
262 input_file=input_path,
263 output_file=temp_path,
264 max_jigs=max_jigs_val,
265 max_belugas=max_belugas_val,
266 max_prod_lines=max_prod_lines_val,
267 max_racks=max_racks_val
268 )
269
270 # JSON wieder einlesen
271 with open(temp_path) as f:
272 data = json.load(f)
273
274 # Infos extrahieren
275 num_jigs = len(data.get("jigs", []))
276 num_racks = len(data.get("racks", []))
277 num_belugas = len(data.get("flights", []))
278 num_prod_lines = len(data.get("production_lines", []))
279
280 # Neuer Dateiname
281 new_filename = f"problem{i}_j{num_jigs}_r{num_racks}_b{num_belugas}_pl{num_prod_lines}.json"
282 new_path = os.path.join(output_folder, new_filename)
283
284 # Datei umbenennen
285 os.rename(temp_path, new_path)
286

Variable Documentation

◆ JIG_TYPES

dict rl.utils.problem_filter.JIG_TYPES
Initial value:
1= {
2 "typeA": {"name": "typeA", "size_empty": 4, "size_loaded": 4},
3 "typeB": {"name": "typeB", "size_empty": 8, "size_loaded": 11},
4 "typeC": {"name": "typeC", "size_empty": 9, "size_loaded": 18},
5 "typeD": {"name": "typeD", "size_empty": 18, "size_loaded": 25},
6 "typeE": {"name": "typeE", "size_empty": 32, "size_loaded": 32}
7}