Skip to content

Instantly share code, notes, and snippets.

@snuke
Created December 21, 2019 09:10
Show Gist options
  • Save snuke/492557822a885430ccd3deeb553c1556 to your computer and use it in GitHub Desktop.
Save snuke/492557822a885430ccd3deeb553c1556 to your computer and use it in GitHub Desktop.
天下一GameBattleContest(β)
#!/usr/bin/env python3
"""
辺をランダムに取得し、点数を計算するサンプルプログラムです。
実行には python3 環境が必要です。
TOKEN 変数を書き換えて実行してください。
"""
import os
import random
from typing import NamedTuple, List, Dict
from urllib.request import urlopen
import time
import copy
import heapq
# ゲームサーバのアドレス
GAME_SERVER = os.getenv('GAME_SERVER', 'https://obt.tenka1.klab.jp')
# あなたのトークン
TOKEN = os.getenv('TOKEN', '79c60c45bcc95e16226801122af377975f09d52b92e18db8ac4a09a5d0383562')
# 頂点と辺の数
NUM_NODES = 400
NUM_EDGES = 760
edge_set = set()
def call_api(x):
with urlopen(f'{GAME_SERVER}{x}') as res:
return res.read().decode()
def main():
global edge_set
while True:
# GET /api/game
game = call_api('/api/game')
game_id, remining_ms = list(map(int, game.split()))
print(f"ゲームID {game_id}")
print(f"残り時間 {remining_ms}ms")
if game_id < 0:
break
edge_set = set()
timer = time.time()
suc_num,tot_num = 0,0
while 0 < remining_ms:
# GET /api/stage/<game_id>
stage_resp = call_api(f'/api/stage/{game_id}')
stage = list(map(int, stage_resp.split()))
num_source = stage[0]
num_sink = stage[1]
sources = stage[2:2 + num_source]
sinks = stage[2 + num_source:2 + num_source + num_sink]
caps = stage[2 + num_source + num_sink:]
assert len(caps) == NUM_EDGES
# 辺の取得状況を確認し、スコアを計算します
# GET /api/edges/<token>/<game_id>
edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n')
if edges_resp[0] == "ok":
claim_counts = list(map(int, edges_resp[1].split()))
my_claims = list(map(int, edges_resp[2].split()))
score = calculate_score(sources, sinks, caps, claim_counts, my_claims)
# print("スコア", score)
for ei in edge_set:
if my_claims[ei] == 0:
my_claims[ei] = 1
claim_counts[ei] += 1
# 辺をランダムに選び取得します
edge_index = solve(sources, sinks, caps, claim_counts, my_claims)
# GET /api/claim/<token>/<game_id>/<edge_index>
claim_resp = call_api(f'/api/claim/{TOKEN}/{game_id}/{edge_index}').strip()
# print(claim_resp)
# API制限のため少しの間待ちます
# time.sleep(0.5)
while time.time()-timer < 0.54:
time.sleep(0.01)
pre_timer,timer = timer,time.time()
delta = timer-pre_timer
print(delta)
tot_num += 1
if claim_resp == "ok":
edge_set.add(edge_index)
# print(f"辺取得成功 {edge_index}")
suc_num += 1
pass
elif claim_resp == "game_finished":
break
else:
print(f"辺取得失敗 {edge_index} {claim_resp} !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
pass
timer = time.time()
print(f'{suc_num} / {tot_num} : {suc_num/max(1,tot_num)}')
# cf. https://github.com/spaghetti-source/algorithm/blob/9cca6b826f19ed7e42dd326a4fbbb9f4d34f04d3/graph/maximum_flow_dinic.cc
INF = 1 << 50
MX = 20
class Edge(NamedTuple):
src: int
dst: int
capacity: int
rev: int
l: int
i: int
class Graph:
def __init__(self, n: int, sources: List[int], sinks: List[int]):
self.n = n
self.adj: List[List[Edge]] = [[] for _ in range(n)]
self.flow: Dict[Edge, int] = {}
self.sources = sources[:]
self.sinks = sinks[:]
self.now_l = 0
self.es: List[Edge] = []
def add_edge(self, src: int, dst: int, capacity: int):
e1 = Edge(src, dst, capacity, len(self.adj[dst]), self.now_l, len(self.es))
self.adj[src].append(e1)
self.flow[e1] = 0
self.es.append(e1)
e2 = Edge(dst, src, 0, len(self.adj[src])-1, self.now_l, len(self.es))
self.adj[dst].append(e2)
self.flow[e2] = 0
self.es.append(e2)
def max_flow(self, s: int, t: int):
level = [0 for _ in range(self.n)]
iter = [0 for _ in range(self.n)]
def levelize():
for i in range(len(level)):
level[i] = -1
level[s] = 0
q = [s]
while q:
u, q = q[0], q[1:]
if u == t:
break
for e in self.adj[u]:
if e.capacity > self.flow[e] and level[e.dst] < 0:
q.append(e.dst)
level[e.dst] = level[u] + 1
return level[t]
def augment(u, cur):
if u == t: return cur
while iter[u] < len(self.adj[u]):
e = self.adj[u][iter[u]]
r = self.adj[e.dst][e.rev]
if e.capacity > self.flow[e] and level[u] < level[e.dst]:
f = augment(e.dst, min(cur, e.capacity - self.flow[e]))
if f > 0:
self.flow[e] += f
self.flow[r] -= f
return f
iter[u] += 1
return 0
flow = 0
while levelize() >= 0:
iter = [0 for _ in range(self.n)]
while True:
f = augment(s, INF)
if f <= 0:
break
flow += f
return flow
def scores(self):
res = {}
for v in range(self.n):
for e in self.adj[v]:
res[(e.src, e.dst)] = self.flow[e]
return res
def solve(self):
dp = [[(0,-1) for j in range(MX+1)] for i in range(self.n)]
q = [[] for i in range(MX+1)]
for v in self.sources:
dp[v][0] = (INF,-1)
heapq.heappush(q[0], (INF,v))
for l in range(MX):
while len(q[l]) > 0:
_d,v = heapq.heappop(q[l])
d = dp[v][l][0]
if d != _d:
continue
for e in self.adj[v]:
u = e.dst
nd = min(d, e.capacity-self.flow[e])
nl = l + e.l
if dp[u][nl][0] >= nd:
continue
dp[u][nl] = (nd,e.i)
heapq.heappush(q[nl], (nd,u))
best = (0,0,0)
for v in self.sinks:
for l in range(1,MX+1):
score = dp[v][l][0]/l
best = max(best, (score,v,l))
_,v,l = best
res = set()
while l > 0:
ei = dp[v][l][1]
e = self.es[ei]
if e.l == 1:
res.add((e.src, e.dst))
v = e.src
l -= e.l
# print(best, res)
return res
# フローを流して点数を計算します
def calculate_score(sources, sinks, caps, claim_counts, my_claims):
ratio = 1000
g = Graph(NUM_NODES + 2, sources, sinks)
# 各始点に向けて容量無限の辺を貼る
for source_node in sources:
g.add_edge(NUM_NODES, source_node, INF)
# 各終点から容量無限の辺を貼る
for sink_node in sinks:
g.add_edge(sink_node, NUM_NODES + 1, INF)
# 自分が取得した辺を貼る
for edge_index in range(NUM_EDGES):
if not my_claims[edge_index]:
continue
q = edge_index // 39
r = edge_index % 39
if r < 19:
v1 = 20 * q + r
v2 = 20 * q + r + 1
else:
v1 = 20 * q + r - 19
v2 = 20 * q + r - 19 + 20
cap = caps[edge_index] * ratio // claim_counts[edge_index]
g.add_edge(v1, v2, cap)
g.add_edge(v2, v1, cap)
return g.max_flow(NUM_NODES, NUM_NODES + 1) / ratio
# solve
def getV(edge_index):
q = edge_index // 39
r = edge_index % 39
if r < 19:
v1 = 20 * q + r
v2 = 20 * q + r + 1
else:
v1 = 20 * q + r - 19
v2 = 20 * q + r - 19 + 20
return v1, v2
def solve(sources, sinks, caps, claim_counts, my_claims):
ratio = 1000
g = Graph(NUM_NODES + 2, sources, sinks)
# 各始点に向けて容量無限の辺を貼る
for source_node in sources:
g.add_edge(NUM_NODES, source_node, INF)
# 各終点から容量無限の辺を貼る
for sink_node in sinks:
g.add_edge(sink_node, NUM_NODES + 1, INF)
# 自分が取得した辺を貼る
for edge_index in range(NUM_EDGES):
if not my_claims[edge_index]:
continue
v1,v2 = getV(edge_index)
cap = caps[edge_index] * ratio // claim_counts[edge_index]
g.add_edge(v1, v2, cap)
g.add_edge(v2, v1, cap)
g.max_flow(NUM_NODES, NUM_NODES + 1)
g.now_l = 1
for edge_index in range(NUM_EDGES):
if my_claims[edge_index]:
continue
v1,v2 = getV(edge_index)
cap = caps[edge_index] * ratio // (claim_counts[edge_index]+1)
g.add_edge(v1, v2, cap)
g.add_edge(v2, v1, cap)
# g.max_flow(NUM_NODES, NUM_NODES + 1)
# scores = g.scores()
# ans = (-100,0)
# for edge_index in range(NUM_EDGES):
# if my_claims[edge_index]:
# continue
# v1,v2 = getV(edge_index)
# score = max(scores[(v1,v2)], scores[(v2,v1)])
# ans = max(ans, (score, edge_index))
es = [i for i in range(NUM_EDGES)]
random.shuffle(es)
st = g.solve()
for edge_index in es:
if my_claims[edge_index]:
continue
v1,v2 = getV(edge_index)
if (v1,v2) in st or (v2,v1) in st:
return edge_index
return random.randint(0,NUM_EDGES-1)
if __name__ == "__main__":
main()
@snuke
Copy link
Author

snuke commented Dec 21, 2019

間違えて最小ヒープを使っているので計算量が壊れているということに気づきました。
一応正しい答えは出すし、ランダムな小さいグラフなので大丈夫ではありますが。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment