Skip to content

Instantly share code, notes, and snippets.

@Gerenuk
Created May 22, 2020 19:39
Show Gist options
  • Save Gerenuk/54fc479b03aca2f0f7fa663f4114b4ca to your computer and use it in GitHub Desktop.
Save Gerenuk/54fc479b03aca2f0f7fa663f4114b4ca to your computer and use it in GitHub Desktop.
Haunted Pirates Bots
#!/usr/bin/env python
import contextlib as __stickytape_contextlib
@__stickytape_contextlib.contextmanager
def __stickytape_temporary_dir():
import tempfile
import shutil
dir_path = tempfile.mkdtemp()
try:
yield dir_path
finally:
shutil.rmtree(dir_path)
with __stickytape_temporary_dir() as __stickytape_working_dir:
def __stickytape_write_module(path, contents):
import os, os.path
def make_package(path):
parts = path.split("/")
partial_path = __stickytape_working_dir
for part in parts:
partial_path = os.path.join(partial_path, part)
if not os.path.exists(partial_path):
os.mkdir(partial_path)
open(os.path.join(partial_path, "__init__.py"), "w").write("\n")
make_package(os.path.dirname(path))
full_path = os.path.join(__stickytape_working_dir, path)
with open(full_path, "w") as module_file:
module_file.write(contents)
import sys as __stickytape_sys
__stickytape_sys.path.insert(0, __stickytape_working_dir)
__stickytape_write_module('agent.py', 'from dataclasses import dataclass\nfrom itertools import chain\nfrom typing import Dict, List, Optional\n\nfrom assign_solver import solve_assign\nfrom config import Config\nfrom halite_amount_solver import solve_halite_amount\nfrom observation import Id, Observation\nfrom plans import Plan\nfrom actions import Action\nfrom strategy import Strategy\n\n\nclass StrategyAssigner: # pylint: disable=too-few-public-methods\n def updated_strategies(\n self, strategies: Dict[Id, Strategy], config: Config\n ) -> Dict[Id, Strategy]:\n raise NotImplementedError()\n\n\n@dataclass\nclass Record:\n obs: Observation\n strategy_names: Dict[Id, str]\n plans: List[Plan]\n best_plans: List[Plan]\n actions: List[Action]\n best_actions: List[Action]\n halite_actions: Dict[Id, str]\n\n\nclass Agent: # pylint: disable=too-few-public-methods\n def __init__(self, config: Config, strategy_assigner: StrategyAssigner):\n self.config = config\n self.strategy_assigner = strategy_assigner\n self.strategies: Dict[Id, Strategy] = {}\n self.records = []\n\n def __call__(self, raw_obs) -> Dict[str, str]:\n obs = Observation.from_obs(raw_obs)\n\n self.config.set_obs(obs)\n\n self.strategies = self.strategy_assigner.updated_strategies(\n self.strategies, self.config\n )\n\n print(\n f"\\n----- Step {self.config.obs.step}, Halite {self.config.obs.my_halite:.0f} -----"\n )\n\n print(f"Config: {self.config}")\n\n print(f"Strategies: {self.strategies}")\n\n assert (\n obs.my_ships.keys() | obs.my_shipyards.keys() == self.strategies.keys()\n ), (\n obs.my_ships,\n obs.my_shipyards,\n self.strategies,\n )\n\n bot_plans = list(\n chain.from_iterable(\n bot.make_plans(num=len(self.strategies))\n for bot in self.strategies.values()\n )\n )\n\n best_plans = solve_assign(bot_plans) # TODO type\n print("Best plans", best_plans)\n\n possible_actions = list(\n chain.from_iterable(plan.actions for plan in best_plans)\n )\n print("Possible actions", possible_actions)\n\n affordable_actions = solve_halite_amount(possible_actions)\n\n best_actions = solve_assign(affordable_actions)\n print("Best actions", best_actions)\n\n for action in best_actions:\n self.strategies[action.id].notify_action(action)\n\n halite_actions: Dict[str, str] = {}\n\n for action in best_actions:\n halite_command = action.halite_command\n\n assert (\n not halite_actions.keys() & halite_command.keys()\n ), f"Overwriting commands {halite_actions.keys() & halite_command.keys()}"\n\n halite_actions.update(halite_command)\n\n self.records.append(\n Record(\n obs=obs,\n strategy_names={\n id: str(strategy) for id, strategy in self.strategies.items()\n },\n plans=bot_plans,\n best_plans=best_plans,\n actions=possible_actions,\n best_actions=best_actions,\n halite_actions=halite_actions,\n )\n )\n\n return halite_actions\n')
__stickytape_write_module('assign_solver.py', 'import itertools\nfrom collections import defaultdict\nfrom typing import Any, List, Tuple\n\nimport numpy as np\nfrom scipy.optimize import linear_sum_assignment\n\nfrom helper import is_unique\n\n\nclass AssignSolverMixin:\n """\n You can use goal=None to signify a non-conflicting goal\n """\n\n @property\n def obj_goal_penalty(self) -> Tuple[Any, Any, float]:\n raise NotImplementedError()\n\n\ndef solve_assign(penalty_objs: List[AssignSolverMixin]):\n if not penalty_objs:\n return []\n\n obj_goal_penalties = [penalty_obj.obj_goal_penalty for penalty_obj in penalty_objs]\n\n # Rewrite non-conflict goals\n non_conflict_goals = (("nc", i) for i in itertools.count())\n\n obj_goal_penalties = [\n (obj, goal if goal is not None else next(non_conflict_goals), score)\n for obj, goal, score in obj_goal_penalties\n ]\n\n all_to_resolve_objs = defaultdict(list)\n\n for obj_goal_penalty, penalty_obj in zip(obj_goal_penalties, penalty_objs):\n all_to_resolve_objs[obj_goal_penalty[:2]].append(\n (obj_goal_penalty, penalty_obj)\n )\n\n best_to_resolve_objs = list(\n min(objs, key=lambda x: x[1].obj_goal_penalty[2])\n for objs in all_to_resolve_objs.values()\n )\n\n best_obj_goal_penalties, best_penalty_objs = zip(*best_to_resolve_objs)\n\n matrix, obj_goal_penalty_map = make_matrix(\n best_obj_goal_penalties, best_penalty_objs\n )\n\n try:\n x_idxs, y_idxs = linear_sum_assignment(matrix)\n except ValueError as exc:\n raise ValueError(\n f"Assign solver failed with {exc} for {penalty_objs}. "\n f"You may need to add actions which guarantee resolution, like when bots can stay on the spot."\n )\n\n try:\n result = [\n obj_goal_penalty_map[x_idx, y_idx] for x_idx, y_idx in zip(x_idxs, y_idxs)\n ]\n except KeyError as exc:\n raise ValueError(\n f"Assignment solution could not be resolved for {exc}. "\n "You may need to add a stay on the spot move to the bot. Objects were:\\n"\n + "\\n".join(map(str, penalty_objs))\n )\n\n assert is_unique(x_idxs), penalty_objs\n assert is_unique(y_idxs), penalty_objs\n\n return result\n\n\ndef make_matrix(obj_goal_penalties, penalty_objs: List[AssignSolverMixin]):\n assert is_unique(obj[:2] for obj in obj_goal_penalties)\n\n xs = list(set(x[0] for x in obj_goal_penalties))\n ys = list(set(x[1] for x in obj_goal_penalties))\n\n result = np.full(shape=(len(xs), len(ys)), fill_value=np.inf)\n\n obj_goal_penalty_map = {}\n\n for (x, y, penalty), obj in zip(obj_goal_penalties, penalty_objs):\n x_idx = xs.index(x)\n y_idx = ys.index(y)\n\n obj_goal_penalty_map[x_idx, y_idx] = obj\n\n result[x_idx, y_idx] = penalty\n\n return result, obj_goal_penalty_map\n')
__stickytape_write_module('helper.py', 'from collections import Counter\nfrom dataclasses import dataclass\nfrom operator import itemgetter\nfrom typing import Iterable, Dict, Tuple, Optional\n\nimport numpy as np\nfrom scipy.signal import convolve2d\n\nfrom geometry import Pos\nfrom halite_config import dist, _diff_to\n\n\n@dataclass\nclass ClosestDist:\n idx: int\n pos: Pos\n dist: int\n\n\ndef find_closest(pos: Pos, dest_poses: Iterable[Pos]) -> ClosestDist:\n if not dest_poses:\n raise ValueError("Empty dest_poses provided")\n\n dists = [\n (i, dest_pos, dist(pos, dest_pos)) for i, dest_pos in enumerate(dest_poses)\n ]\n\n closest = min(dists, key=itemgetter(2),)\n\n return ClosestDist(*closest)\n\n\ndef max_xy_dist(pos1, pos2):\n dx, dy = _diff_to(pos1, pos2)\n return max(abs(dx), abs(dy))\n\n\ndef is_unique(elems: Iterable): # what if empty?\n cnts = Counter(elems)\n\n if not cnts:\n return True\n\n return cnts.most_common(1)[0][1] == 1\n\n\ndef shape_from_poses(poses):\n len_x = max(pox.x for pox in poses) + 1\n len_y = max(pos.y for pos in poses) + 1\n\n return (len_x, len_y)\n\n\ndef pos_dict_to_array(pos_dict: Dict[Pos, float]) -> np.ndarray:\n shape = shape_from_poses(pos_dict.keys())\n\n result = np.full(shape, fill_value=np.nan)\n\n for pos, val in pos_dict.items():\n result[pos] = val\n\n return result\n\n\ndef poses_to_dist_array(poses: Iterable[Pos], shape: Tuple[int, int]):\n result = np.full(shape, fill_value=np.nan)\n\n for i in range(shape[0]):\n for j in range(shape[1]):\n pos = Pos(i, j)\n result[i, j] = find_closest(pos, poses).dist\n\n return result\n\n\ndef make_filter(size: int):\n middle = size // 2\n dists_from_middle = np.array(\n [[abs(i - middle) + abs(j - middle) for i in range(size)] for j in range(size)]\n )\n result = np.exp(-0.5 * dists_from_middle)\n\n return result\n\n\ndef filter_array(array: np.ndarray, filter: np.ndarray):\n return convolve2d(array, filter, boundary="wrap", mode="same")\n\n\ndef score_new_shipyard(\n map_halite: np.ndarray,\n shipyard_dist: Optional[np.ndarray] = None,\n enemy_dist: Optional[np.ndarray] = None,\n):\n if shipyard_dist is not None:\n discounted_map_halite = map_halite * (1 - np.exp(-0.5 * shipyard_dist ** 2))\n else:\n discounted_map_halite = map_halite\n\n if enemy_dist is not None:\n discounted_enemy_map_halite = map_halite * (1 - np.exp(-0.5 * enemy_dist ** 2))\n discounted_map_halite -= discounted_enemy_map_halite\n\n filter_ = make_filter(size=11)\n\n filtered_map_halite = filter_array(discounted_map_halite, filter_)\n\n return filtered_map_halite\n\n\ndef max_pos_val(array: np.ndarray):\n linear_idx_map = array.argmax()\n\n x, y = np.unravel_index(linear_idx_map, array.shape)\n val = array[x, y]\n\n return Pos(x, y), val\n\n\ndef array_to_pos_dist(array: np.ndarray) -> Dict[Pos, float]:\n len_x, len_y = array.shape\n\n result = {}\n\n for i in range(len_x):\n for j in range(len_y):\n result[Pos(i, j)] = array[i, j]\n\n return result\n')
__stickytape_write_module('geometry.py', 'from typing import NamedTuple, Tuple, List, Set, FrozenSet, Dict\nimport itertools\nimport collections\n\n\nclass Pos(NamedTuple):\n x: int\n y: int\n\n def __repr__(self):\n return f"[{self.x}:{self.y}]"\n\n\nDiffType = Tuple[int, int]\n\n\ndef sliding_window(iterable, size):\n iterable = iter(iterable)\n window = collections.deque(itertools.islice(iterable, size - 1), maxlen=size)\n for item in iterable:\n window.append(item)\n yield tuple(window)\n\n\ndiff_order = [(1, 0), (0, 1), (-1, 0), (0, -1)]\nselect_side_map: Dict[bool, Dict[FrozenSet[DiffType], DiffType]] = {\n True: {},\n False: {},\n} # initialize map to pick left/right diff\n\nfor side_flag, elem_idx in [(True, 0), (False, -1)]:\n for length in [2, 3]:\n select_side_map[side_flag].update(\n {\n frozenset(order): order[elem_idx]\n for order in itertools.islice(\n sliding_window(itertools.cycle(diff_order), length), 4\n )\n }\n )\n\n\nclass Geometry:\n def __init__(self, size_x: int, size_y: int):\n self.size_x = size_x\n self.size_y = size_y\n\n self.poses = {Pos(x, y) for x in range(size_x) for y in range(size_y)}\n\n def int_to_pos(self, int_pos: int) -> Pos:\n x = int_pos % self.size_x\n y = int_pos // self.size_x\n assert 0 <= y < self.size_y\n return Pos(x, y)\n\n def _to_xy(self, pos: Pos) -> DiffType:\n assert isinstance(pos, Pos), f"Invalid position {pos}"\n x = pos.x\n y = pos.y\n if not 0 <= x < self.size_x and 0 <= y < self.size_y:\n raise ValueError(\n f"Position {pos} is illegal for geometry of size {self.size_x} x {self.size_y}"\n )\n\n return (x, y)\n\n def _to_pos(self, x: int, y: int) -> Pos:\n x %= self.size_x\n y %= self.size_y\n return Pos(x, y)\n\n def _diff_to(self, pos1: Pos, pos2: Pos) -> DiffType:\n """\n Return diff vector for shortest path (torus)\n External function currently are not supposed to deal with position differences\n """\n x1, y1 = self._to_xy(pos1)\n x2, y2 = self._to_xy(pos2)\n\n dxy_shifted = [\n (x2 + shift_x - x1, y2 + shift_y - y1)\n for shift_x, shift_y in itertools.product(\n [-self.size_x, 0, self.size_x], [-self.size_y, 0, self.size_y]\n )\n ]\n\n dx, dy = min(dxy_shifted, key=lambda dxy: self._raw_dist(dxy[0], dxy[1]))\n\n return (dx, dy)\n\n @staticmethod\n def _raw_dist(dx: int, dy: int) -> int:\n return abs(dx) + abs(dy)\n\n def dist(self, pos1: Pos, pos2: Pos) -> int:\n dx, dy = self._diff_to(pos1, pos2)\n return self._raw_dist(dx, dy)\n\n def pos_towards(self, pos1: Pos, pos2: Pos) -> Set[Pos]:\n if pos1 == pos2:\n return {pos2}\n\n x1, y1 = self._to_xy(pos1)\n dx, dy = self._diff_to(pos1, pos2)\n\n result = []\n\n if dx > 0:\n result.append((x1 + 1, y1))\n\n if dx < 0:\n result.append((x1 - 1, y1))\n\n if dy > 0:\n result.append((x1, y1 + 1))\n\n if dy < 0:\n result.append((x1, y1 - 1))\n\n return set(itertools.starmap(self._to_pos, result))\n\n def get_prox(self, pos: Pos, *dists: int) -> Set[Pos]:\n x, y = self._to_xy(pos)\n\n result = []\n for dist in dists:\n if dist == 0:\n result.append((x, y))\n continue\n for d in range(dist):\n result.append((x + d, y + dist - d))\n result.append((x - d, y - dist + d))\n result.append((x - dist + d, y + d))\n result.append((x + dist - d, y - d))\n\n return set(itertools.starmap(self._to_pos, result))\n\n def choose_side(self, pos: Pos, next_poses: List[Pos], left=True) -> Pos:\n if len(next_poses) == 1:\n return next_poses[0]\n\n diffs = frozenset(self._diff_to(pos, next_pos) for next_pos in next_poses)\n\n side_diff = select_side_map[left].get(diffs)\n\n if side_diff is None:\n raise ValueError(\n f"Cannot determine { {True:\'left\', False:\'right\'}[left] } side "\n "for positions {pos} to {\', \'.join(map(str, next_poses))}"\n )\n\n x, y = self._to_xy(pos)\n dx, dy = side_diff\n\n return self._to_pos(x + dx, y + dy)\n')
__stickytape_write_module('halite_config.py', 'from geometry import Geometry\n\n\nSIZE_X: int = 21\nSIZE_Y: int = 21\n\nSHIP_COST = 500\nSHIPYARD_COST = 500\n\n\ngeometry = Geometry(SIZE_X, SIZE_Y)\ndist = geometry.dist\nget_prox = geometry.get_prox\npos_towards = geometry.pos_towards\nint_to_pos = geometry.int_to_pos\n_diff_to = geometry._diff_to\n')
__stickytape_write_module('config.py', 'from dataclasses import dataclass, field\nfrom observation import Observation\n\n\n@dataclass\nclass Config:\n obs: Observation = field(default=None, repr=False)\n\n def set_obs(self, obs):\n self.obs = obs\n')
__stickytape_write_module('observation.py', 'from dataclasses import dataclass, field\nfrom typing import Dict, List, NamedTuple, Set, Tuple\n\nfrom geometry import Pos\nfrom halite_config import int_to_pos\n\n\nclass ObservationShip(NamedTuple):\n pos: Pos\n halite: float\n\n\nclass ObservationShipYard(NamedTuple):\n pos: Pos\n\n\nId = str\nEnemyId = Tuple[int, Id]\n\n\n@dataclass # pylint: disable=too-many-instance-attributes\nclass Observation:\n """\n Holds obs info with types Pos, ObservationShip, ObservationShipYard for clarity\n Precalculates some frequently needed information\n """\n\n step: int\n player_idx: int\n map_halite: Dict[Pos, float]\n\n player_halite: List[float]\n ships: List[Dict[Id, ObservationShip]]\n shipyards: List[Dict[Id, ObservationShipYard]]\n\n my_halite: float = field(init=False)\n my_ships: Dict[Id, ObservationShip] = field(init=False)\n my_shipyards: Dict[Id, ObservationShipYard] = field(init=False)\n ship_poses: Set[Pos] = field(init=False)\n shipyard_poses: Set[Pos] = field(init=False)\n\n enemy_ships: Dict[EnemyId, ObservationShip] = field(init=False)\n enemy_shipyards: Dict[EnemyId, ObservationShipYard] = field(init=False)\n enemy_ship_poses: Set[Pos] = field(init=False)\n enemy_shipyard_poses: Set[Pos] = field(init=False)\n\n num_ships: int = field(init=False)\n\n def __repr__(self):\n return (\n f"Observation(step={self.step}, {self.num_ships} ships, "\n f"{len(self.enemy_ships)} enemy ships)"\n )\n\n def __post_init__(self):\n self.my_halite = self.player_halite[self.player_idx]\n\n self.my_ships = self.ships[self.player_idx]\n\n self.enemy_ships = {\n (idx, id_): ship\n for idx, cur_ships in enumerate(self.ships)\n if idx != self.player_idx\n for id_, ship in cur_ships.items()\n }\n\n self.my_shipyards = self.shipyards[self.player_idx]\n\n self.enemy_shipyards = {\n (idx, id_): ship\n for idx, cur_shipyards in enumerate(self.shipyards)\n if idx != self.player_idx\n for id_, ship in cur_shipyards.items()\n }\n\n self.ship_poses = set(ship.pos for ship in self.my_ships.values())\n\n self.shipyard_poses = set(\n shipyard.pos for shipyard in self.my_shipyards.values()\n )\n\n self.enemy_ship_poses = set(ship.pos for ship in self.enemy_ships.values())\n\n self.enemy_shipyard_poses = set(\n shipyard.pos for shipyard in self.enemy_shipyards.values()\n )\n\n self.num_ships = len(self.my_ships)\n\n @classmethod\n def from_obs(cls, obs):\n player_halite = []\n shipyards = []\n ships = []\n\n for (cur_player_halite, cur_shipyards, cur_ships) in obs["players"]:\n player_halite.append(cur_player_halite)\n\n ships.append(\n {\n id_: ObservationShip(pos=int_to_pos(int_pos), halite=halite)\n for id_, (int_pos, halite) in cur_ships.items()\n }\n )\n\n shipyards.append(\n {\n id_: ObservationShipYard(pos=int_to_pos(int_pos))\n for id_, int_pos in cur_shipyards.items()\n }\n )\n\n return cls(\n step=obs["step"],\n player_idx=obs["player"],\n map_halite={int_to_pos(pos): val for pos, val in enumerate(obs["halite"])},\n player_halite=player_halite,\n ships=ships,\n shipyards=shipyards,\n )\n')
__stickytape_write_module('halite_amount_solver.py', 'def solve_halite_amount(objs):\n return objs\n')
__stickytape_write_module('plans.py', 'from dataclasses import dataclass, field\nfrom typing import Set\n\nfrom actions import ConvertAction, MoveAction, SpawnAction, StayAction, NoShipYardAction\nfrom assign_solver import AssignSolverMixin\nfrom geometry import Pos\nfrom halite_config import get_prox, pos_towards\n\n\n@dataclass\nclass Plan(AssignSolverMixin):\n """\n Bot plan with all information for resolution\n\n Every plan probably should also have a StayAction to guarantee resolution\n """\n\n id: str\n score: float\n\n @property\n def actions(self):\n raise NotImplementedError()\n\n\n@dataclass\nclass MovePlan(Plan):\n start_pos: Pos\n end_pos: Pos\n forbidden_pos: Set[Pos] = field(default_factory=set, repr=False)\n\n @property\n def obj_goal_penalty(self):\n return (self.id, self.end_pos, -self.score)\n\n @property\n def actions(self):\n danger_adj = (\n lambda score, pos: score\n if pos not in self.forbidden_pos\n else (\n self.forbidden_pos[pos]\n if self.forbidden_pos[pos] <= -10\n else score + self.forbidden_pos[pos]\n )\n )\n\n reachable_poses = get_prox(self.start_pos, 0, 1)\n\n if self.start_pos == self.end_pos:\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(5 if pos == self.end_pos else 0.5, pos),\n )\n for pos in reachable_poses\n ]\n\n next_poses = pos_towards(self.start_pos, self.end_pos)\n\n adj_dir = (\n lambda pos1, pos2: abs(pos1.x - pos2.x) <= 1 and abs(pos1.y - pos2.y) <= 1\n )\n\n if len(next_poses) == 1:\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(\n 1\n if pos in next_poses\n else (0.75 if adj_dir(pos, self.start_pos) else 0.5),\n pos,\n ),\n )\n for pos in reachable_poses\n ]\n\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(1 if pos in next_poses else 0.5, pos),\n )\n for pos in reachable_poses\n ]\n\n\n@dataclass\nclass ReturnHalitePlan(MovePlan):\n """\n This plan\'s goal is non-conflicting with same destination plans\n """\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n\n@dataclass\nclass ConvertPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n ConvertAction(id=self.id, pos=self.pos, score=self.score),\n StayAction(id=self.id, pos=self.pos, score=0),\n ]\n\n\n@dataclass\nclass SpawnPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n SpawnAction(id=self.id, pos=self.pos, score=self.score),\n NoShipYardAction(id=self.id, pos=self.pos, score=0),\n ]\n\n\n@dataclass\nclass ScatterPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n MoveAction(id=self.id, from_pos=self.pos, pos=next_pos, score=1)\n for next_pos in get_prox(self.pos, 1)\n ] + [StayAction(id=self.id, pos=self.pos, score=-10)]\n')
__stickytape_write_module('actions.py', 'from dataclasses import dataclass\n\nfrom assign_solver import AssignSolverMixin\nfrom geometry import Pos\nfrom halite_config import _diff_to\nfrom observation import Id\n\n\n@dataclass\nclass Action(AssignSolverMixin):\n id: Id\n score: float\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, self.pos, -self.score)\n\n @property\n def halite_command(self):\n raise NotImplementedError()\n\n\n@dataclass\nclass ConvertAction(Action):\n @property\n def obj_goal_penalty(self):\n return (self.id, ("sy", self.pos), -self.score)\n\n @property\n def halite_command(self):\n return {self.id: "CONVERT"}\n\n\n@dataclass\nclass SpawnAction(Action):\n @property\n def halite_command(self):\n return {self.id: "SPAWN"}\n\n\n@dataclass\nclass StayAction(Action):\n @property\n def halite_command(self):\n return {}\n\n\n@dataclass\nclass NoShipYardAction(Action):\n @property\n def obj_goal_penalty(self):\n return (self.id, ("sy", self.pos), -self.score)\n\n @property\n def halite_command(self):\n return {}\n\n\n@dataclass\nclass MoveAction(Action):\n from_pos: Pos\n\n def __repr__(self):\n return f"MoveAction({self.id}: {self.from_pos}->{self.pos}; {self.score})"\n\n @property\n def halite_command(self):\n if self.from_pos == self.pos:\n return {}\n\n dx, dy = _diff_to(self.from_pos, self.pos)\n result = {\n (1, 0): "EAST",\n (-1, 0): "WEST",\n (0, 1): "SOUTH",\n (0, -1): "NORTH",\n }.get((dx, dy))\n\n if result is None:\n raise ValueError(\n f"Cannot move in one step from {self.from_pos} to {self.pos}"\n )\n\n return {self.id: result}\n')
__stickytape_write_module('strategy.py', 'from typing import List\n\nfrom config import Config\nfrom actions import Action\nfrom plans import Plan\n\n\nclass Strategy:\n def __init__(self, *, config, id):\n self.config: Config = config\n self.id: str = id\n\n def make_plans(self, num=None) -> List[Plan]:\n raise NotImplementedError()\n\n def notify_action(self, action: Action) -> None:\n pass\n\n\nclass Ship(Strategy): # pylint: disable=abstract-method\n @property\n def pos(self):\n return self.config.obs.my_ships[self.id].pos\n\n @property\n def halite(self):\n return self.config.obs.my_ships[self.id].halite\n\n def __repr__(self):\n return f"{self.__class__.__name__}({self.id} @ {self.pos})"\n\n\nclass ShipYard(Strategy): # pylint: disable=abstract-method\n @property\n def pos(self):\n return self.config.obs.my_shipyards[self.id].pos\n\n def __repr__(self):\n return f"{self.__class__.__name__}({self.id} @ {self.pos})"\n')
__stickytape_write_module('strategies/my_config.py', 'from collections import defaultdict\nfrom dataclasses import dataclass, field\nfrom typing import Dict, NamedTuple, Set\n\nfrom config import Config\nfrom geometry import Pos\nfrom halite_config import geometry, get_prox\nfrom helper import find_closest, max_xy_dist\nfrom observation import Observation\n\n\nclass MineScore(NamedTuple):\n """\n Score of a mining position independent of ship\n Used only for precalculations\n """\n\n score: float # final value\n halite: float\n steps: int\n\n\n@dataclass\nclass MyConfig(Config):\n NUM_SHIPS: int = 8\n NUM_GATHERER: int = 1\n WHEN_DESTROYER: int = 4\n # NUM_HUNTER: int = 0\n NUM_SHIPYARDS: int = 2\n MAX_NUM_TURNS_BLOCKED: int = 2 # to avoid deadlocks ships may scatter\n # LAST_TURN_SPAWNING: int = 360\n\n # HALITE_LOSS_FACTOR: float = 0.99\n HALITE_LOSS_FACTOR: float = 0.8\n HALITE_MINE_FACTOR: float = 0.25\n TIME_FACTOR: float = 0.99\n\n mine_scores: Dict[Pos, MineScore] = field(default_factory=dict, repr=False)\n\n danger_pos_score: Dict[Pos, float] = field(default_factory=dict, repr=False)\n\n def set_obs(self, obs: Observation):\n super().set_obs(obs)\n\n self.danger_pos_score = defaultdict(float)\n\n for enemy_pos in obs.enemy_ship_poses:\n for score, prox in [(-100, 0), (-50, 1), (-10, 2), (-1, 3)]:\n for cur_pos in get_prox(enemy_pos, prox):\n self.danger_pos_score[cur_pos] += score\n if prox == 1 and max_xy_dist(cur_pos, enemy_pos) == 1:\n self.danger_pos_score[cur_pos] += -1 # HACK TODO\n\n for enemy_pos in obs.enemy_shipyard_poses:\n for score, prox in [(-50, 0), (-10, 1), (-1, 2)]:\n for cur_pos in get_prox(enemy_pos, prox):\n self.danger_pos_score[cur_pos] += score\n\n if not obs.my_shipyards:\n self.mine_scores = {}\n else:\n mine_scores = {}\n\n for pos in geometry.poses:\n closest = find_closest(pos, obs.shipyard_poses)\n pos_dist = closest.dist\n\n discount = self.HALITE_LOSS_FACTOR ** pos_dist\n\n halite = obs.map_halite[pos]\n\n mine_scores[pos] = MineScore(\n score=halite * discount, halite=halite, steps=pos_dist,\n )\n\n self.mine_scores = mine_scores\n')
__stickytape_write_module('strategies/__init__.py', '')
__stickytape_write_module('strategies/my_strategy_assigner.py', 'from collections import Counter\nfrom functools import partial\nfrom typing import Dict\n\nimport halite_config\nfrom agent import StrategyAssigner\nfrom helper import find_closest, max_pos_val\nfrom observation import Id\nfrom strategy import Strategy\n\nfrom .destroyer import Destroyer\nfrom .first_ship import FirstShip\nfrom .gatherer import Gatherer\nfrom .hunter import Hunter\nfrom .my_config import MyConfig\nfrom .my_helper import obs_to_new_shipyard_score\nfrom .new_shipyard import NewShipYard\nfrom .plainshipyard import PlainShipYard\n\n\nclass MyStrategyAssigner(StrategyAssigner):\n def updated_strategies(\n self, strategies: Dict[Id, Strategy], config: MyConfig\n ) -> Dict[Id, Strategy]:\n result = {}\n\n for id_ in config.obs.my_ships.keys():\n if id_ in strategies:\n result[id_] = strategies[id_]\n continue\n\n strat_class = self._new_ship_class(strategies, config)\n\n result[id_] = strat_class(config=config, id=id_)\n\n # Shipyards\n for id_ in config.obs.my_shipyards.keys():\n if id_ in strategies:\n result[id_] = strategies[id_]\n continue\n\n result[id_] = PlainShipYard(config=config, id=id_)\n\n result = self._try_new_shipyard(result, config)\n\n return result\n\n def _new_ship_class(self, strategies, config):\n if not strategies:\n result = FirstShip\n else:\n ship_type_cnts = Counter(\n strat.__class__.__name__ for strat in strategies.values()\n )\n\n if (\n ship_type_cnts["Gatherer"] + ship_type_cnts["NewShipYard"]\n < config.NUM_GATHERER\n ):\n result = Gatherer\n elif ship_type_cnts["Destroyer"] == 0:\n result = Destroyer\n else:\n result = Hunter\n\n return result\n\n def _change_ship_strategy(self, strategies, id, new_strategy):\n result = {}\n for cur_id, strategy in strategies.items():\n if cur_id != id:\n result[cur_id] = strategy\n continue\n\n result[cur_id] = new_strategy(config=strategy.config, id=strategy.id)\n\n return result\n\n def _try_new_shipyard(self, strategies, config):\n for id, strategy in list(strategies.items()):\n if not isinstance(strategy, NewShipYard):\n continue\n if strategy.steps_to_gather <= 0:\n strategies = self._change_ship_strategy(strategies, id, Gatherer)\n\n if config.obs.my_halite < 1000:\n return strategies\n\n virtual_shipyard_poses = {\n strategy.new_shipyard_pos\n for strategy in strategies.values()\n if isinstance(strategy, NewShipYard)\n }\n\n if (\n len(config.obs.shipyard_poses) + len(virtual_shipyard_poses)\n >= config.NUM_SHIPYARDS\n or config.obs.my_halite < halite_config.SHIPYARD_COST\n ):\n return strategies\n\n ships = [x for x in strategies.values() if isinstance(x, Gatherer)]\n\n if not ships:\n return strategies\n\n shipyard_scores = obs_to_new_shipyard_score(\n config.obs, virtual_shipyard_poses=virtual_shipyard_poses\n )\n\n new_shipyard_pos, _val = max_pos_val(shipyard_scores)\n\n ship_poses = [x.pos for x in ships]\n\n closest_ship_info = find_closest(new_shipyard_pos, ship_poses)\n closest_ship = ships[closest_ship_info.idx]\n\n result = self._change_ship_strategy(\n strategies,\n closest_ship.id,\n partial(NewShipYard, new_shipyard_pos=new_shipyard_pos),\n )\n\n return result\n')
__stickytape_write_module('strategies/destroyer.py', 'from operator import itemgetter\n\nfrom halite_config import dist\nfrom plans import MovePlan\nfrom strategy import Ship\n\n\nclass Destroyer(Ship):\n def make_plans(self, num=None):\n enemy_shipyard_poses = self.config.obs.enemy_shipyard_poses\n\n if not enemy_shipyard_poses:\n return []\n\n enemy_dists = [(pos, dist(pos, self.pos)) for pos in enemy_shipyard_poses]\n\n enemy_dists.sort(key=itemgetter(1))\n\n plans = [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=enemy_pos,\n score=1 / (1 + enemy_dist),\n )\n for enemy_pos, enemy_dist in enemy_dists\n ][:num]\n\n if len(plans) < num:\n plans.append(\n MovePlan(id=self.id, start_pos=self.pos, end_pos=self.pos, score=0)\n )\n\n return plans\n')
__stickytape_write_module('strategies/first_ship.py', 'from operator import itemgetter\n\nfrom halite_config import dist\nfrom helper import array_to_pos_dist\nfrom plans import ConvertPlan, MovePlan\nfrom strategy import Ship\n\nfrom .my_helper import obs_to_new_shipyard_score\n\n\nclass FirstShip(Ship):\n def __init__(self, *, config, id):\n super().__init__(config=config, id=id)\n self.first_shipyard_pos = None\n\n def make_plans(self, num=None):\n if self.first_shipyard_pos is None:\n # self.first_shipyard_pos = self._find_first_shipyard_pos()\n self.first_shipyard_pos = self.pos # TODO\n\n if self.pos == self.first_shipyard_pos:\n return [ConvertPlan(id=self.id, pos=self.pos, score=1)]\n\n return [\n MovePlan(\n id=self.id, start_pos=self.pos, end_pos=self.first_shipyard_pos, score=1\n )\n ]\n\n def _find_first_shipyard_pos(self):\n new_shipyard_scores = obs_to_new_shipyard_score(self.config.obs)\n\n pos_new_shipyard_scores = array_to_pos_dist(new_shipyard_scores)\n\n close_new_shipyard_scores = {\n pos: score\n for pos, score in pos_new_shipyard_scores.items()\n if dist(pos, self.pos) < 5\n }\n\n chosen_new_shipyard_pos, _ = max(\n close_new_shipyard_scores.items(), key=itemgetter(1)\n )\n\n return chosen_new_shipyard_pos\n')
__stickytape_write_module('strategies/my_helper.py', 'from typing import Set\n\nfrom geometry import Pos\nfrom helper import (\n pos_dict_to_array,\n poses_to_dist_array,\n score_new_shipyard,\n shape_from_poses,\n)\nfrom observation import Observation\nfrom strategies.my_config import MyConfig\n\n\ndef obs_to_new_shipyard_score(\n obs: Observation, virtual_shipyard_poses: Set[Pos] = None\n):\n if virtual_shipyard_poses is None:\n virtual_shipyard_poses = set()\n\n map_halite = pos_dict_to_array(obs.map_halite)\n\n shape = shape_from_poses(obs.map_halite.keys())\n\n shipyard_poses = obs.shipyard_poses | virtual_shipyard_poses\n\n if shipyard_poses:\n shipyard_dist = poses_to_dist_array(shipyard_poses, shape)\n else:\n shipyard_dist = None\n\n enemy_poses = obs.enemy_shipyard_poses | obs.enemy_ship_poses\n if enemy_poses:\n enemy_shipyard_dist = poses_to_dist_array(enemy_poses, shape)\n else:\n enemy_shipyard_dist = None\n\n result = score_new_shipyard(map_halite, shipyard_dist, enemy_shipyard_dist)\n\n return result\n')
__stickytape_write_module('strategies/gatherer.py', 'from operator import attrgetter\nfrom typing import List, cast\n\nfrom halite_config import dist, geometry\nfrom plans import MovePlan, Plan, ReturnHalitePlan\nfrom strategy import Ship\nfrom helper import find_closest\n\nfrom .my_config import MyConfig\n\n\nclass Gatherer(Ship):\n def make_plans(self, num=None) -> List[Plan]:\n plans = []\n\n config = cast(MyConfig, self.config)\n\n if not config.mine_scores:\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.pos,\n score=1,\n forbidden_pos=config.danger_pos_score,\n )\n ]\n\n if (\n self.halite > 0\n and (self.halite + self.config.obs.my_halite) // 500\n > self.config.obs.my_halite // 500\n ):\n closest_shipyard = find_closest(self.pos, self.config.obs.shipyard_poses)\n return [\n ReturnHalitePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=closest_shipyard.pos,\n score=1,\n forbidden_pos=config.danger_pos_score,\n ),\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.pos,\n score=0,\n forbidden_pos=config.danger_pos_score,\n ),\n ]\n\n for end_pos in geometry.poses:\n ship_pos_dist = dist(self.pos, end_pos)\n\n mine_score = config.mine_scores[end_pos]\n\n total_steps = ship_pos_dist + mine_score.steps\n\n extra_halite = (\n config.HALITE_MINE_FACTOR * mine_score.halite\n if self.pos == end_pos\n else 0\n )\n\n ##### MAGIC EQUATION #####\n trip_score = (\n (self.halite + extra_halite) * config.HALITE_LOSS_FACTOR ** total_steps\n + config.HALITE_MINE_FACTOR * mine_score.score\n ) * (config.TIME_FACTOR if self.pos == end_pos else 1)\n\n if end_pos in config.obs.shipyard_poses:\n plan = ReturnHalitePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=end_pos,\n score=trip_score,\n forbidden_pos=config.danger_pos_score,\n )\n else:\n plan = MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=end_pos,\n score=trip_score,\n forbidden_pos=config.danger_pos_score,\n )\n\n plans.append(plan)\n\n plans.sort(key=attrgetter("score"), reverse=True)\n\n return plans[:num]\n')
__stickytape_write_module('strategies/hunter.py', 'import random\nfrom dataclasses import dataclass\nfrom operator import itemgetter\n\nfrom halite_config import dist, get_prox\nfrom plans import MovePlan, ScatterPlan, ReturnHalitePlan\nfrom strategy import Ship\nfrom helper import find_closest\n\n\ndef choose_next_pos(config, pos, enemy_pos):\n if dist(pos, enemy_pos) > 1 or config.obs.map_halite[pos] > 0:\n return enemy_pos\n\n if random.random() < 0.1:\n return pos\n\n return enemy_pos\n\n\n@dataclass\nclass HunterPlan(MovePlan):\n """\n This plan\'s goal is non-conflicting with same destination plans\n """\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n\nclass Hunter(Ship):\n def make_plans(self, num=None):\n if self.halite > 0:\n if self.config.obs.shipyard_poses:\n closest_shipyard = find_closest(\n self.pos, self.config.obs.shipyard_poses\n )\n return [\n ReturnHalitePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=closest_shipyard.pos,\n score=1,\n forbidden_pos=self.config.danger_pos_score,\n )\n ]\n else:\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=pos,\n score=1,\n forbidden_pos=self.config.danger_pos_score,\n )\n for pos in get_prox(self.pos, 0, 1)\n ]\n\n enemy_ship_poses = self.config.obs.enemy_ship_poses\n if not enemy_ship_poses:\n return [ScatterPlan(id=self.id, pos=self.pos, score=1)]\n\n if self.config.obs.enemy_shipyard_poses:\n to_poses = self.config.obs.enemy_shipyard_poses\n else:\n to_poses = {self.pos}\n\n enemy_dists = [\n (pos, dist(pos, to_pos)) for pos in enemy_ship_poses for to_pos in to_poses\n ]\n\n enemy_dists.sort(key=itemgetter(1))\n\n plans = [\n HunterPlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=choose_next_pos(self.config, self.pos, enemy_pos),\n score=1 / (1 + enemy_dist),\n )\n for enemy_pos, enemy_dist in enemy_dists\n ][:num]\n\n if len(plans) < num:\n plans.append(\n MovePlan(id=self.id, start_pos=self.pos, end_pos=self.pos, score=-10)\n )\n\n return plans\n')
__stickytape_write_module('strategies/new_shipyard.py', 'from typing import cast\n\nimport halite_config\nfrom geometry import Pos\nfrom plans import ConvertPlan, MovePlan\nfrom strategy import Ship\nfrom halite_config import dist\n\nfrom .my_config import MyConfig\n\n\nclass NewShipYard(Ship):\n def __init__(self, *, config, id, new_shipyard_pos: Pos):\n super().__init__(config=config, id=id)\n self.new_shipyard_pos = new_shipyard_pos\n self.steps_to_gather = int(\n 1.2 * dist(self.pos, self.new_shipyard_pos)\n ) # hard-coded TODO\n\n def __repr__(self):\n return f"NewShipYard({self.id} @ {self.pos}; shipyard: {self.new_shipyard_pos})"\n\n def make_plans(self, num=None):\n self.steps_to_gather -= 1\n\n config = cast(MyConfig, self.config)\n\n if self.pos == self.new_shipyard_pos:\n if self.config.obs.my_halite >= halite_config.SHIPYARD_COST:\n return [\n ConvertPlan(id=self.id, pos=self.pos, score=1),\n ]\n else:\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.pos,\n score=0,\n forbidden_pos=config.danger_pos_score,\n ),\n ]\n\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.new_shipyard_pos,\n score=1,\n forbidden_pos=config.danger_pos_score,\n )\n ]\n')
__stickytape_write_module('strategies/plainshipyard.py', 'from strategy import ShipYard\nfrom plans import SpawnPlan\nimport halite_config\n\n\nclass PlainShipYard(ShipYard):\n def make_plans(self, num=None):\n if (\n self.config.obs.num_ships < self.config.NUM_SHIPS\n and self.config.obs.my_halite >= halite_config.SHIP_COST\n and self.pos not in self.config.obs.ship_poses\n ):\n return [SpawnPlan(id=self.id, pos=self.pos, score=1)]\n\n return []\n')
from agent import Agent
from strategies.my_config import MyConfig
from strategies.my_strategy_assigner import MyStrategyAssigner
agent = Agent(MyConfig(), MyStrategyAssigner())
def call_agent(obs):
return agent(obs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment