|
#!/bin/python3 |
|
from cshogi import Board, move_to_usi |
|
from cshogi.dlshogi import FEATURES1_NUM, FEATURES2_NUM, make_input_features, make_move_label |
|
import numpy as np |
|
import onnxruntime |
|
import time |
|
|
|
# License: GPLv3 https://www.gnu.org/licenses/gpl-3.0.html |
|
|
|
# エンジン名 |
|
enginename = "OnnxPolicyPlayer" |
|
# モデルファイル名(より強いモデルを使えばより強くなる) |
|
modelfile_default: str = "amano.onnx" |
|
modelfile: str = modelfile_default |
|
# 温度パラメータ(温度が低いと最大の指し手が選択されやすくなる、温度が高いと全ての指し手が均等に選択されやすくなる) |
|
temperature_default: int = 200 |
|
temperature: int = temperature_default |
|
# 平手初期局面のSFEN文字列 |
|
SFEN_HIRATE = "lnsgkgsnl/1r5b1/ppppppppp/9/9/9/PPPPPPPPP/1B5R1/LNSGKGSNL b - 1" |
|
# 局面情報 |
|
board = Board() |
|
# 推論クラス |
|
player = None |
|
# device_id(複数のCUDAデバイスが存在する場合) |
|
device_id: int = 0 |
|
# enable_cuda(CUDAを使用するかどうか) |
|
enable_cuda: bool = False |
|
# enable_tensorrt(TensorRTを使用するかどうか) |
|
enable_tensorrt: bool = False |
|
# usi_ponder |
|
usi_ponder: bool = False |
|
# 先読みモード |
|
go_ponder: bool = False |
|
|
|
class OnnxPolicyPlayer: |
|
# セッション初期化 |
|
def __init__(self, modelfile:str="model.onnx", temperature:int=200): |
|
self.temperature = temperature |
|
print("info string load modelfile {}".format(modelfile)) |
|
# ONNXモデルの推論セッション |
|
available_providers = onnxruntime.get_available_providers() |
|
enable_providers = [] |
|
if enable_tensorrt and 'TensorrtExecutionProvider' in available_providers: |
|
enable_providers.append(('TensorrtExecutionProvider', { |
|
'device_id': device_id, |
|
'trt_fp16_enable': True, |
|
'trt_engine_cache_enable': True, |
|
})) |
|
print("info string enable TensorrtExecutionProvider") |
|
if enable_cuda and 'CUDAExecutionProvider' in available_providers: |
|
enable_providers.append(('CUDAExecutionProvider', { |
|
'device_id': device_id, |
|
})) |
|
print("info string enable CUDAExecutionProvider") |
|
if 'CPUExecutionProvider' in available_providers: |
|
enable_providers.append('CPUExecutionProvider') |
|
print("info string enable CPUExecutionProvider") |
|
self.session = onnxruntime.InferenceSession(modelfile, providers=enable_providers) |
|
# 入力特徴量の作成 |
|
batchsize_max: int = 1 |
|
self.features1 = np.empty((batchsize_max, FEATURES1_NUM, 9, 9), dtype=np.float32) |
|
self.features2 = np.empty((batchsize_max, FEATURES2_NUM, 9, 9), dtype=np.float32) |
|
# 推論の実行(初回は実行に時間が掛かる事があるため、ここで慣らし実行しておく) |
|
self.move_infer_choice(board) |
|
|
|
# 局面の推論と指し手の選択 |
|
def move_infer_choice(self, board): |
|
# 合法手生成 |
|
color: int = board.turn |
|
legalmoves = np.array(list(board.legal_moves), dtype=np.uint32) |
|
# 合法手が無ければ投了 |
|
if len(legalmoves) == 0: |
|
return None, None, None, None |
|
# 合法手が1個のみなら推論せずにそれを返す |
|
if len(legalmoves) == 1: |
|
return int(legalmoves[0]), None, None, None |
|
# 入力特徴量の作成 |
|
make_input_features(board, self.features1[0], self.features2[0]) |
|
# 推論 |
|
batchsize: int = 1 |
|
io_binding = self.session.io_binding() |
|
io_binding.bind_cpu_input('input1', self.features1[0:batchsize]) |
|
io_binding.bind_cpu_input('input2', self.features2[0:batchsize]) |
|
io_binding.bind_output('output_policy') |
|
io_binding.bind_output('output_value') |
|
self.session.run_with_iobinding(io_binding) |
|
# 推論結果の取り出し |
|
infer_policy_logits, infer_values = io_binding.copy_outputs_to_cpu() |
|
infer_policy_logit = infer_policy_logits[0] |
|
infer_value = float(infer_values[0]) |
|
# 合法手ごとの選択強度 |
|
probabilities_logit = np.empty(len(legalmoves), dtype=np.float32) |
|
for i in range(len(legalmoves)): |
|
move = legalmoves[i] |
|
move_label = make_move_label(move, color) |
|
probabilities_logit[i] = infer_policy_logit[move_label] |
|
# Boltzmann分布 |
|
probabilities = self.softmax_temperature_with_normalize(probabilities_logit, self.temperature) |
|
# probabilitiesの確率に従って乱択 |
|
cmove: int = np.random.choice(legalmoves, p=probabilities) |
|
return cmove, infer_value, probabilities, legalmoves |
|
|
|
def set_temperature(self, temperature:float): |
|
self.temperature = temperature |
|
|
|
# 温度パラメータの適用と正規化 |
|
def softmax_temperature_with_normalize(self, logits, temperature:int): |
|
# 温度パラメータを適用 |
|
logits /= (0.001 * float(temperature)) |
|
|
|
# 確率を計算(オーバーフローを防止するため最大値で引く) |
|
max_logit = max(logits) |
|
probabilities = np.exp(logits - max_logit) |
|
|
|
# 合計が1になるように正規化 |
|
sum_probabilities = sum(probabilities) |
|
probabilities /= sum_probabilities |
|
|
|
return probabilities |
|
|
|
while True: |
|
# コマンド入力行全体 |
|
cmdline: str = input() |
|
# 時間計測開始 |
|
time_sta = time.perf_counter() |
|
# 空白文字区切りに分割 |
|
cmds = cmdline.split() |
|
# 先頭コマンド節取り出し |
|
cmd0: str = cmds.pop(0) |
|
# USI応答 |
|
if cmd0 == "usi": |
|
print("id name {}".format(enginename)) |
|
print("option name DNN_Model type string default {}".format(modelfile_default)) |
|
print("option name Device_ID type spin default 0 min 0 max 255") |
|
print("option name Enable_CUDA type check default false") |
|
print("option name Enable_TensorRT type check default false") |
|
print("option name Softmax_Temperature type spin default {:d} min 1 max 5000".format(temperature_default)) |
|
print("option name USI_Ponder type check default false") |
|
print("usiok") |
|
continue |
|
# オプション設定 |
|
if cmd0 == "setoption": |
|
if len(cmds) < 4: |
|
continue |
|
cmd1 = cmds.pop(0) |
|
cmd2 = cmds.pop(0) |
|
cmd3 = cmds.pop(0) |
|
cmd4 = ' '.join(cmds) |
|
if cmd1 != "name": |
|
continue |
|
if cmd3 != "value": |
|
continue |
|
if cmd2 == "DNN_Model": |
|
modelfile = cmd4 |
|
continue |
|
if cmd2 == "Device_ID": |
|
valueint = int(cmd4) |
|
if 0 <= valueint and valueint < 16: |
|
device_id = valueint |
|
continue |
|
if cmd2 == "Enable_CUDA": |
|
if cmd4 == "true": |
|
enable_cuda = True |
|
else: |
|
enable_cuda = False |
|
continue |
|
if cmd2 == "Enable_TensorRT": |
|
if cmd4 == "true": |
|
enable_tensorrt = True |
|
else: |
|
enable_tensorrt = False |
|
continue |
|
if cmd2 == "Softmax_Temperature": |
|
valueint = int(cmd4) |
|
if 0 < valueint and valueint <= 5000: |
|
temperature = valueint |
|
if player is not None: |
|
player.set_temperature(valueint) |
|
continue |
|
if cmd2 == "USI_Ponder": |
|
if cmd4 == "true": |
|
usi_ponder = True |
|
else: |
|
usi_ponder = False |
|
continue |
|
continue |
|
# 初期化 |
|
if cmd0 == "isready": |
|
board.set_sfen(SFEN_HIRATE) |
|
go_ponder = False |
|
# 推論処理初期化 |
|
player = OnnxPolicyPlayer(modelfile=modelfile, temperature=temperature) |
|
# 初期化完了 |
|
print("readyok") |
|
continue |
|
# 局面設定 |
|
if cmd0 == "position": |
|
# 次のコマンド節 |
|
cmd1: str = cmds.pop(0) |
|
# 初期局面が平手の場合 |
|
if cmd1 == "startpos": |
|
board.set_sfen(SFEN_HIRATE) |
|
# 初期局面が任意の場合 |
|
if cmd1 == "sfen": |
|
board.set_sfen(' '.join(cmds[0:4])) |
|
del cmds[0:4] |
|
# 指し手文字列の処理 |
|
while len(cmds) > 0: |
|
move = cmds.pop(0) |
|
if move == "moves": |
|
continue |
|
board.push_usi(move) |
|
continue |
|
# 探索 |
|
# 今回は長時間の探索処理を省いているので、以下のタイミングで指し手を決定して出力する。 |
|
# - 「先読み以外の探索開始(go ponder ではない go)」 |
|
# - 「先読みした相手の指し手が当たり(go ponder 中の ponderhit)」 |
|
# - 「先読みした相手の指し手が外れ(go ponder 中の stop)」 |
|
if cmd0 == "go" or (go_ponder and (cmd0 == "ponderhit" or cmd0 == "stop")): |
|
# 推論セッションの初期化が済んでいなければエラーを出力 |
|
if player is None: |
|
print("info string Error! go cmd before isready cmd.") |
|
continue |
|
# 先読み探索はしない |
|
if "ponder" in cmds: |
|
go_ponder = True |
|
continue |
|
else: |
|
go_ponder = False |
|
# 終局判定 |
|
if board.is_game_over(): |
|
time_dur_ms = int(1000 * (time.perf_counter() - time_sta)) |
|
print("info time {:d} pv resign".format(time_dur_ms)) |
|
print("bestmove resign") |
|
continue |
|
# 入玉宣言勝ち判定 |
|
if board.is_nyugyoku(): |
|
time_dur_ms = int(1000 * (time.perf_counter() - time_sta)) |
|
print("info time {:d} pv win".format(time_dur_ms)) |
|
print("bestmove win") |
|
continue |
|
# 指し手を選択 |
|
bestmove, bestmove_value, bestmove_probs, bestmove_legals = player.move_infer_choice(board) |
|
if bestmove is None: |
|
time_dur_ms = int(1000 * (time.perf_counter() - time_sta)) |
|
print("info time {:d} pv resign".format(time_dur_ms)) |
|
print("bestmove resign") |
|
continue |
|
bestmoveusi: str = move_to_usi(bestmove) |
|
# 指し手を進める |
|
board.push_usi(bestmoveusi) |
|
# 相手の指し手を選択 |
|
if usi_ponder: |
|
pondermove, pondermove_value, pondermove_probs, pondermove_legals = player.move_infer_choice(board) |
|
else: |
|
pondermove, pondermove_value, pondermove_probs, pondermove_legals = None, None, None, None |
|
if pondermove is not None: |
|
pondermoveusi: str = move_to_usi(pondermove) |
|
else: |
|
pondermoveusi: str = "resign" |
|
# 指し手を戻す |
|
board.pop() |
|
# 評価値を出力 |
|
time_dur_ms = int(1000 * (time.perf_counter() - time_sta)) |
|
if (bestmove_value is not None) and (bestmove_probs is not None) and (bestmove_legals is not None): |
|
score: int = int(4000 * (bestmove_value - 0.5)) |
|
for i, mvidx in reversed(list(enumerate(np.argsort(-bestmove_probs)))): |
|
prob_percent = 100.0 * bestmove_probs[mvidx] |
|
if prob_percent < 0.1: |
|
continue |
|
if usi_ponder and bestmove == pondermove_legals[mvidx]: |
|
print("info time {:d} score cp {:d} multipv {:d} pv {} {} ({:.1f}%)".format(time_dur_ms, score, i + 1, bestmoveusi, pondermoveusi, prob_percent)) |
|
else: |
|
print("info time {:d} score cp {:d} multipv {:d} pv {} ({:.1f}%)".format(time_dur_ms, score, i + 1, move_to_usi(bestmove_legals[mvidx]), prob_percent)) |
|
else: |
|
print("info time {:d} pv {}".format(time_dur_ms, bestmoveusi)) |
|
# 指し手を出力 |
|
if pondermove is not None: |
|
print("bestmove {} ponder {}".format(bestmoveusi, pondermoveusi)) |
|
else: |
|
print("bestmove {}".format(bestmoveusi)) |
|
continue |
|
# 対局終了:先読み状態のみ解消する |
|
if cmd0 == "gameover": |
|
go_ponder = False |
|
# エンジン終了:ここではループの外に出るだけ |
|
if cmd0 == "quit": |
|
break |
|
# デバッグ用:局面情報の出力 |
|
if cmd0 == "d": |
|
print(board) |
|
print(board.sfen()) |
|
# どれにも引っかからない場合は無視してループに戻る |
|
continue |
|
|
|
# ループを抜けたら終了 |
|
exit |