Skip to content

Instantly share code, notes, and snippets.

@btmxh
Created August 15, 2024 06:18
Show Gist options
  • Save btmxh/903d12e07a904fdbe0dac3843b9c9347 to your computer and use it in GitHub Desktop.
Save btmxh/903d12e07a904fdbe0dac3843b9c9347 to your computer and use it in GitHub Desktop.
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable
from typing import cast, override, Protocol, Self
from random import choice, choices, random
from heapq import nsmallest, heapify, heappush, heappop
from math import sqrt
import csv
class Comparable(Protocol):
def __lt__(self, value: Self, /) -> bool: ...
class PriorityQueue[T, K: Comparable]:
heap: list[tuple[K, int, T]]
counter: int
key_fn: Callable[[T], K]
def __init__(
self,
lst: Iterable[T],
key_fn: Callable[[T], K] = lambda value: cast(K, value),
) -> None:
super().__init__()
self.counter = 0
self.key_fn = key_fn
self.heap = [self.make_element(value) for value in lst]
heapify(self.heap)
def make_element(self, value: T) -> tuple[K, int, T]:
self.counter += 1
return (self.key_fn(value), self.counter, value)
def push(self, value: T):
heappush(self.heap, self.make_element(value))
def pop(self) -> T | None:
try:
return heappop(self.heap)[2]
except IndexError:
return None
def __len__(self) -> int:
return len(self.heap)
class Request:
x: float
y: float
demand: float
open: float
close: float
service_time: float
time: float
def __init__(
self,
x: float,
y: float,
demand: float,
open: float,
close: float,
service_time: float,
time: float,
):
self.x = x
self.y = y
self.demand = demand
self.open = open
self.close = close
self.service_time = service_time
self.time = time
@staticmethod
def load_csv(path: str):
reqs: list[Request] = []
with open(path) as f:
r = csv.reader(f)
_headers = next(r)
for row in r:
x, y, demand, time_open, close, service_time, _drone_serve, time = [
float(t) for t in row
]
reqs.append(Request(x, y, demand, time_open, close, service_time, time))
return reqs
class Problem:
requests: list[Request]
truck_speed: float
truck_capacity: float
num_trucks: int
def start_x(self) -> float:
assert self.requests[0].demand == 0
return self.requests[0].x
def start_y(self) -> float:
assert self.requests[0].demand == 0
return self.requests[0].y
def __init__(
self, csv: str, truck_speed: float, truck_capacity: float, num_trucks: int
):
self.truck_speed = truck_speed
self.truck_capacity = truck_capacity
self.num_trucks = num_trucks
self.requests = Request.load_csv(csv)
RoutingRule = Callable[["Simulator", Request], int]
SequencingRule = Callable[["Simulator", int, list["QueueElem"]], int]
class Event(ABC):
@abstractmethod
def time(self) -> float: ...
class RequestEvent(Event):
request: Request
def __init__(self, request: Request) -> None:
super().__init__()
self.request = request
@override
def time(self) -> float:
return self.request.time
class MachineFinishEvent(Event):
machine: int
finish_time: float
def __init__(self, machine: int, finish_time: float):
self.machine = machine
self.finish_time = finish_time
@override
def time(self):
return self.finish_time
class QueueElem:
request: Request
ready_time: float
def __init__(self, request: Request, ready_time: float):
self.request = request
self.ready_time = ready_time
class MachineState:
queue: list[QueueElem]
machine: int
busy_until: float
x: float
y: float
def __init__(self, machine: int, x: float, y: float) -> None:
self.queue = []
self.busy_until = 0.0
self.machine = machine
self.x = x
self.y = y
def enqueue(self, s: "Simulator", r: Request) -> bool:
total_demand = sum(e.request.demand for e in self.queue)
if total_demand + r.demand > s.problem.truck_capacity:
return False
self.queue.append(QueueElem(r, s.time))
s.update_queue(self.machine)
return True
def dequeue(self, s: "Simulator") -> int:
if s.time < self.busy_until:
return 0
num_failed = 0
while len(self.queue) > 0:
index = s.sequencing_rule(s, self.machine, self.queue)
value = self.queue.pop(index)
busy_until = s.time + self.cost_of(value.request, s)
if busy_until > value.request.close:
num_failed += 1
continue
self.x = value.request.x
self.y = value.request.y
self.busy_until = busy_until
s.events.push(MachineFinishEvent(self.machine, self.busy_until))
break
return num_failed
def cost_of(self, r: Request, s: "Simulator"):
dist = sqrt((self.x - r.x) ** 2 + (self.y - r.y) ** 2)
return max(r.open - s.time, 0) + r.service_time + dist / s.problem.truck_speed
class Simulator:
problem: Problem
routing_rule: RoutingRule
sequencing_rule: SequencingRule
time: float
states: list[MachineState]
events: PriorityQueue[Event, float]
num_failed_requests: int
def __init__(
self,
problem: Problem,
routing_rule: RoutingRule,
sequencing_rule: SequencingRule,
):
self.problem = problem
self.routing_rule = routing_rule
self.sequencing_rule = sequencing_rule
self.time = 0.0
self.states = [
MachineState(i, problem.start_x(), problem.start_y())
for i in range(problem.num_trucks)
]
self.events = PriorityQueue([], key_fn=lambda e: e.time())
self.num_failed_requests = 0
def simulate(self) -> tuple[float, int]:
for request in self.problem.requests[1:]:
self.events.push(RequestEvent(request))
self.time = 0.0
while True:
event = self.events.pop()
if event is None:
break
assert self.time <= event.time()
self.time = event.time()
if isinstance(event, RequestEvent):
self.handle_request(event)
elif isinstance(event, MachineFinishEvent):
self.handle_machine_finish(event)
return self.time, self.num_failed_requests
def handle_request(self, e: RequestEvent):
machine = self.routing_rule(self, e.request)
if not self.states[machine].enqueue(self, e.request):
self.num_failed_requests += 1
def handle_machine_finish(self, e: MachineFinishEvent):
self.update_queue(e.machine)
def update_queue(self, machine: int):
num_failed = self.states[machine].dequeue(self)
self.num_failed_requests += num_failed
for i in range(1, 4):
def CR(s: Simulator, r: Request) -> int:
return min(
range(s.problem.num_trucks),
key=lambda machine: s.states[machine].cost_of(r, s),
)
def CS(s: Simulator, machine: int, r: list[QueueElem]) -> int:
return min(
range(len(r)), key=lambda i: s.states[machine].cost_of(r[i].request, s)
)
def W(s: Simulator, machine: int, r: list[QueueElem]) -> int:
return min(range(len(r)), key=lambda i: r[i].request.demand)
def WIQ(s: Simulator, _: Request) -> int:
return min(
range(s.problem.num_trucks),
key=lambda machine: sum(
s.states[machine].cost_of(r.request, s) for r in s.states[machine].queue
),
)
problem = Problem("/home/torani/100/h100c103.csv", 50 / 60, 1300, i)
solver = Simulator(problem, CR, CS)
time, num_fail = solver.simulate()
print("C+C", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1)))
solver = Simulator(problem, CR, W)
time, num_fail = solver.simulate()
print("C+W", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1)))
solver = Simulator(problem, WIQ, CS)
time, num_fail = solver.simulate()
print("WIQ+C", i, time, num_fail, (1 - num_fail / (len(problem.requests) - 1)))
class NodeType[C](ABC):
name: str
def __init__(self, name: str):
self.name = name
@abstractmethod
def num_children(self) -> int: ...
@abstractmethod
def calc(self, ctx: C, children: list[float]) -> float: ...
class Node[C]:
typ: NodeType[C]
children: list["Node[C]"]
def __init__(self, typ: NodeType[C], children: list["Node[C]"]) -> None:
self.typ = typ
self.children = children
def eval(self, c: C) -> float:
return self.typ.calc(c, [child.eval(c) for child in self.children])
def all_nodes(self) -> list["Node[C]"]:
return [self] + [c for child in self.children for c in child.all_nodes()]
def depth(self) -> int:
return max((child.depth() + 1 for child in self.children), default=0)
def assign(self, other: "Node[C]"):
self.typ = other.typ
self.children = other.children
def copy(self) -> "Node[C]":
return Node(self.typ, [c.copy() for c in self.children])
@override
def __str__(self) -> str:
if len(self.children) == 0:
return self.typ.name
else:
return (
self.typ.name
+ "("
+ ", ".join(str(child) for child in self.children)
+ ")"
)
class RoutingContext:
s: Simulator
r: Request
machine: int
def __init__(self, s: Simulator, r: Request, machine: int) -> None:
self.s = s
self.r = r
self.machine = machine
class SequencingContext:
s: Simulator
machine: int
elem: QueueElem
def __init__(self, s: Simulator, machine: int, elem: QueueElem) -> None:
self.s = s
self.machine = machine
self.elem = elem
class Individual:
routing: Node[RoutingContext]
sequencing: Node[SequencingContext]
simulate_result: tuple[float, int] | None
fitness: float
def __init__(
self, routing: Node[RoutingContext], sequencing: Node[SequencingContext]
):
self.routing = routing
self.sequencing = sequencing
self.simulate_result = None
self.fitness = 0.0
def as_routing_rule(self) -> RoutingRule:
def rr(s: Simulator, r: Request) -> int:
return min(
range(s.problem.num_trucks),
key=lambda machine: self.routing.eval(RoutingContext(s, r, machine)),
)
return rr
def as_sequencing_rule(self) -> SequencingRule:
def sr(s: Simulator, machine: int, queue: list[QueueElem]) -> int:
return min(
range(len(queue)),
key=lambda i: self.sequencing.eval(
SequencingContext(s, machine, queue[i])
),
)
return sr
class TerminalNodeType[C](NodeType[C]):
fn: Callable[[C], float]
def __init__(self, name: str, fn: Callable[[C], float]):
super().__init__(name)
self.fn = fn
@override
def num_children(self) -> int:
return 0
@override
def calc(self, ctx: C, children: list[float]) -> float:
return self.fn(ctx)
R_CIQ = TerminalNodeType[RoutingContext](
"CIQ", lambda c: len(c.s.states[c.machine].queue)
)
R_WIQ = TerminalNodeType[RoutingContext](
"WIQ",
lambda c: sum(
c.s.states[c.machine].cost_of(elem.request, c.s)
for elem in c.s.states[c.machine].queue
),
)
R_C = TerminalNodeType[RoutingContext](
"C", lambda c: c.s.states[c.machine].cost_of(c.r, c.s)
)
S_W = TerminalNodeType[SequencingContext]("W", lambda c: c.elem.request.demand)
S_C = TerminalNodeType[SequencingContext](
"C", lambda c: c.s.states[c.machine].cost_of(c.elem.request, c.s)
)
S_WT = TerminalNodeType[SequencingContext]("WT", lambda c: c.s.time - c.elem.ready_time)
class BinaryInternalNodeType(NodeType[object]):
fn: Callable[[float, float], float]
def __init__(self, name: str, fn: Callable[[float, float], float]):
super().__init__(name)
self.fn = fn
@override
def num_children(self) -> int:
return 2
@override
def calc(self, ctx: object, children: list[float]) -> float:
return self.fn(children[0], children[1])
ADD = BinaryInternalNodeType("ADD", lambda x, y: x + y)
SUB = BinaryInternalNodeType("SUB", lambda x, y: x - y)
MUL = BinaryInternalNodeType("MUL", lambda x, y: x * y)
DIV = BinaryInternalNodeType("DIV", lambda x, y: x / y if abs(y) > 1e-4 else 1.0)
MIN = BinaryInternalNodeType("MIN", lambda x, y: min(x, y))
MAX = BinaryInternalNodeType("MAX", lambda x, y: max(x, y))
class GP:
problem: Problem
pop_size: int
max_depth: int
reproduction_rate: tuple[float, float]
rnt: list[NodeType[RoutingContext]]
snt: list[NodeType[SequencingContext]]
def __init__(
self,
problem: Problem,
pop_size: int,
max_depth: int,
reproduction_rate: tuple[float, float],
rnt: list[NodeType[RoutingContext]],
snt: list[NodeType[SequencingContext]],
):
self.problem = problem
self.pop_size = pop_size
self.max_depth = max_depth
self.reproduction_rate = reproduction_rate
self.rnt = rnt
self.snt = snt
def get_node_types[C](self, cls: type[C]) -> list[NodeType[C]]:
if cls == RoutingContext:
return cast(list[NodeType[C]], self.rnt)
elif cls == SequencingContext:
return cast(list[NodeType[C]], self.snt)
else:
return []
def gen_grow[C](self, cls: type[C], depth: int) -> Node[C]:
types = [
t for t in self.get_node_types(cls) if t.num_children() == 0 or depth > 0
]
typ = choice(types)
children = [self.gen_grow(cls, depth - 1) for _ in range(typ.num_children())]
return Node(typ, children)
def gen_full[C](self, cls: type[C], depth: int) -> Node[C]:
types = [
t for t in self.get_node_types(cls) if (t.num_children() > 0) == (depth > 0)
]
typ = choice(types)
children = [self.gen_grow(cls, depth - 1) for _ in range(typ.num_children())]
return Node(typ, children)
def ramp_half_and_half[C](self, cls: type[C]) -> list[Node[C]]:
half_size = self.pop_size // 2
pop: list[Node[C]] = []
for depth in range(1, self.max_depth + 1):
for _ in range(half_size // self.max_depth):
pop.append(self.gen_grow(cls, depth))
pop.append(self.gen_full(cls, depth))
while len(pop) < self.pop_size:
pop.append(self.gen_grow(cls, self.max_depth))
return pop
def reproduce(self, pop: list[Individual]):
match choices([1, 2], weights=list(self.reproduction_rate), k=1)[0]:
case 1:
p1 = max(choices(pop, k=8), key=lambda i: i.fitness)
p2 = max(choices(pop, k=8), key=lambda i: i.fitness)
r = self.crossover(p1.routing, p2.routing, RoutingContext)
s = self.crossover(p1.sequencing, p2.sequencing, SequencingContext)
return Individual(r, s)
case _:
p = max(choices(pop, k=8), key=lambda i: i.fitness)
r = self.mutate(p.routing, RoutingContext)
s = self.mutate(p.sequencing, SequencingContext)
return Individual(r, s)
def mutate[C](self, p: Node[C], cls: type[C]) -> Node[C]:
p = p.copy()
pos = choice(p.all_nodes())
pos.assign(self.gen_grow(cls, self.max_depth - p.depth() + pos.depth()))
return p
def crossover[C](self, p1: Node[C], p2: Node[C], cls: type[C]) -> Node[C]:
if random() < 0.5:
p1, p2 = p2, p1
p1 = p1.copy()
pos1 = choice(p1.all_nodes())
pos2 = choice([
n
for n in p2.all_nodes()
if n.depth() <= self.max_depth - p1.depth() + pos1.depth()
])
pos1.assign(pos2)
return p1
def fitness(self, values: tuple[float, int]) -> float:
time, num_fail = values
return time / 100 + num_fail
def solve(self):
pop = list(
map(
lambda t: Individual(t[0], t[1]),
zip(
self.ramp_half_and_half(RoutingContext),
self.ramp_half_and_half(SequencingContext),
),
)
)
while True:
for i in pop:
i.simulate_result = Simulator(
problem, i.as_routing_rule(), i.as_sequencing_rule()
).simulate()
i.fitness = self.fitness(i.simulate_result)
pop = nsmallest(self.pop_size, pop, key=lambda i: i.fitness)
yield pop[0]
pop.extend([self.reproduce(pop) for _ in range(self.pop_size)])
for i in range(1, 4):
problem = Problem("100/h100c103.csv", 50 / 60, 1300, i)
gp = GP(
problem,
100,
6,
(0.8, 0.2),
[R_CIQ, R_WIQ, R_C, ADD, SUB, MUL, DIV, MIN, MAX],
[S_W, S_C, S_WT, ADD, SUB, MUL, DIV, MIN, MAX],
)
for gen, best in zip(range(1, 101), gp.solve()):
print(gen, best, best.simulate_result, best.fitness)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment