将棋漫画には独特の演出がある。
対局者ふたりが盤を挟んで黙って座っているだけなのに,地の文や心の声によって,まるで盤上で殴り合いをしているかのような濃密なやりとりが描かれる。固有結界のような心象風景の中でお喋りしたり。あれが昔から好きだ。もちろんあれらは漫画的な誇張で,本物のプロ棋士の対局室は,もっと張りつめた静けさがあるんだろうと思う(行ったことはないが)。それでもああいう,盤の上だけで会話が成立しているように見える状況には憧れがある。
ちなみに僕は将棋がほとんど分からない。中学生の頃にちょっとだけやったが,金と銀の動きの違いを,たぶん毎回間違えていた。当時,近所に穴熊の組み方を知っている同級生がいて,そいつがいつも無双していた。組まれきってしまうと,こちらの駒はろくに動けない。何回かやって,いじけて自陣で飛車を反復横跳びさせながら時間を潰すなんてことがたびたびあった。
まあ,そんなんでも漫画の演出は楽しめる。実際,ルールが完璧に分からなくても『ハチワンダイバー』も『3月のライオン』も『龍と苺』も,どれもめちゃめちゃおもしろい。
話を戻すと,僕はああいう「盤上で会話している」感じに憧れる。とはいえ,棋力はゼロに等しいので,別のアプローチを思いついた。
棋譜のなかにテキストの会話そのものを物理的に埋め込んでしまえばいいのではないか。
外見上は普通の対局譜にしか見えない。でも,鍵を持っている人間が読み解くと,対局の裏でAさんとBさんが普通に雑談しているというような。
調べたら,こういうのはステガノグラフィーという分野らしい。古典的には画像のピクセルの最下位ビットにメッセージを忍ばせたり,文章中の特定の文字を変えて情報を埋めたりする技術が知られている。今回は媒体が画像でも英文でもなく棋譜になる,という話だ。
さいしょ
将棋の盤は9×9の81マスある。日本語のひらがなは清音だけなら46字,濁音や半濁音,拗音や促音を入れても80字には収まる。じゃあ盤面の各マスにひらがなを割り当てて,駒の行き先で文字を表現すればいけるのでは?
たとえば5五は「あ」,5六は「い」,みたいに対応表を決めておく。「Aさんが飛車を5五に動かす」=「あ」を送信,という感じ。これなら理屈はシンプルで,暗号鍵もいらない。秘伝の対応表だけ共有しておけば誰でも復号できる。
数秒後に無理だなと気づいた。
まず,指したいマスに駒を動かせるとは限らない。「あ」を送りたい局面で,5五に動かせる駒がそもそも存在しなかったらどうするのか。将棋は囲碁と違って,各駒が動ける範囲がきっちり決まっている。
次に,仮に動かせたとしても,その手が将棋として自然である保証がない。たとえば自陣の金を意味もなく後ろに引いたら,観戦者から見て明らかに不審な手になる。棋譜が将棋として自然に見えることは,この遊びの大前提だ。
しかし対応表でやるのが無理なら,どうすればいいのか。
しばらく考えて,別の角度に気づいた。文字とマスを直接対応付けようとするから無理がある。そうではなく,何か順番に並んだものから1つ選ぶという構造と対応付ければいいのだ。「N個の選択肢から何番目を選んだか」という事実だけが,純粋な数字として伝わる。各局面で『指せる手』はちゃんとリストとして列挙できるのだから,そのリストから何番目を選んだかで情報を表現すればよかろう。
合法手と文字表記
調べていくと,いくつかのことが分かってきた。
ある局面で指せる手のことを「合法手」と呼ぶらしい。中盤の局面で合法手は80〜100手くらいだそうで,案外たくさんある。一方,自分の玉の王手を避けないとか,二歩などが「指せない手」にあたる。
で,指し手は文字列で表現される。よく見かける「3四歩」とかの他にも,USI(Universal Shogi Interface)形式という表記方法がある。たとえば 7g7f は「7七から7六に駒を動かす」,P*5e は「持ち駒の歩を5五に打つ」を意味する。これはこれで格好いい。
最初の実装は簡単だった。合法手をぜんぶ列挙してUSI文字列の辞書順に並べ,その中から何番目を指したかで情報を表す。AさんとBさんで同じ並べ方をすれば,何番目を指したかは双方で一致するので,エンコードもデコードも機械的に成立する。
動かしてみたら,たしかに送受信できた。ただし,指される手があからさまに変だった。USIの辞書順は局面の意味とは何の関係もないので,序盤からいきなり香車を1マス上げたり,自陣の金を斜めに動かしたりするなど,僕のような素人が見ても明らかにおかしい棋譜ができあがった。まだまだ自然な棋譜とはいえない。
ここで将棋エンジンの登場となる。
チェスの世界で長年最強クラスを争ってきたオープンソースの探索エンジンに Stockfish というのがある。チェス界では常識みたいな存在らしい。これに将棋やその他の変種ルール対応を加えた fairy-stockfish という派生があって,局面を渡すと,各合法手にスコア(評価値)を付けて返してくれる。
7g7f → +120 (悪くない)
2g2f → +110
6i7h → +95
...
9g9f → -10 (微妙)
1g1f → -50
評価値の高い順に並べ替えて,上位 K 手だけを選択肢にすることにした。並べ方の基準を『辞書順』から『強い順』に取り替えたわけだ。これなら選ばれた手は必ず強めの手なので,棋譜全体としては自然に見えるはず。
たとえば K=8 にすれば,ある局面で「上位8手のなかのどれか」を選べる。つまり,この場合だと,1手で log₂(8) = 3 ビット分の情報を載せられる,ということになる。
1手で3ビット送れる。これでひらがなを送りたい。
文字をどう表すか
ひらがなは78字ある。これに,あとで導入する事情で1枠だけ余分に持っておきたいので,合計79のシンボルを使うとする。すると,1文字あたり log₂(79) ≈ 6.3 ビットが必要になる。
1手で3ビットだから,1文字を送るのに2手ちょっと必要になる。シンプルに割り切れない。
ここで,文字と手を直接対応付けるのは諦めることにした。
メッセージ → 巨大な整数 → 手のrank列 → 棋譜
メッセージはまず79進数として整数化する。普段使う10進数で「123」と書いたら 1×100 + 2×10 + 3 を意味するように,79進数で「あい」と書いたら,それぞれの文字IDに 79⁰,79¹ を掛けて足す。日常では10進数しか使わないが,コンピュータの世界では2進数や16進数を見るので,まあ79進でもなんでもいいわけだ。
できあがった巨大な整数を,今度は K(=8)進数として下から1桁ずつ取り出していく。10進数で「1234」を分解するときに,1234 mod 10 = 4(一の位),1234 ÷ 10 = 123,123 mod 10 = 3(十の位),と繰り返すのと同じ。modとは「割った余り」のことで,要するにいちばん下の桁を取り出す操作である。
取り出した値は 0 から K-1 の範囲に収まるので,これをそのまま「Top-K手のうち何番目を指すか」として使えば,整数を指し手の列に変換できる。
復号する側は逆操作をおこなう。同じ鍵(エンジンの探索の深さと K)で同じ局面の Top-K を再計算し,棋譜に登場する手が何番目だったかを取り出して,K進数として上から積み上げ直す。鍵を共有していない人間は,局面ごとの Top-K を再現できないので,何が書いてあるのか分からない。つまり,「局面の評価のやり方(=将棋観のようなもの)」が秘密鍵っぽくなるわけだ。
終わりをどう告げるか
ここまでで,ひとまず動くものができた。
動かしてみたところ,Aさんが永遠に喋り続ける問題が発生した。
考えてみれば当たり前である。「ここで送信を終えますよ」という約束事を一切作っていなかった。Aさんが指した手の rank を順に積み上げていくと整数はどんどん大きくなっていくが,デコーダ側からすると「いまの累積値こそが完成したメッセージ」なのか「まだ続きがあって,あと数手したらメッセージになる」のかを区別する手段がない。区別できないので,なんとなく一番もっともらしい長さで切る,みたいな判定もできない。
なので,メッセージの末尾に「終わりのしるし(EOT)」を付けることにした。普通の文字としては使われない記号を1つ用意して,メッセージ末尾にこれを付加してから整数化する。デコーダはその記号を見つけたら「ここで送信完了」と判定する。アルファベットを78字(ひらがな)+ 1字(EOT)= 79字にしたのはこのためだった。
これで送信完了が伝わるようになった。が,まだ問題があった。
メッセージを整数化する都合で,79進展開した最高位の桁が「0」だとそこの情報が消える(123 を 0123 と書いても同じ整数なのと同様)。EOTの直後に「最高位の桁を1にする」という追加のしるしも入れておく必要がある。これで終端パターンは [EOT, 1] の2要素になった。
ところがこのパターン,デコード時にそこそこの確率でメッセージ本体に偶然出現してしまうことが分かった。終わるつもりがなかった位置で『終わったと誤認』され,1つの発話が途中でぶった切られて,続きが次の発話に押し出される,みたいな現象が起きる。
しかたがないのでもう1個増やす。終端パターンを [EOT, EOT, 1] の3要素に拡張すると,本体に偶然出現する確率はぐっと下がって,実用上ぶつからない。こういう「配列の終わりを示す目印」のことを番兵(sentinel)と呼ぶらしいことをここで知った。これも格好いいですね。
できたもの
というわけで,ようやく安定して動くプロトコルができあがった。
Aさんが先手,Bさんが後手で,おたがいに2発話ずつ会話してから,適当に詰みまで指し続ける,という設定。なお受信側は Top-K のうち,ほどほどの手をランダムで選ぶようにしている。以下が生成された棋譜である。
先手:Aさん
後手:Bさん
手数----指手---------消費時間--
1 1八飛(28)
2 7二飛(82)
3 7八金(69)
4 8二銀(71)
5 5八飛(18)
6 2四歩(23)
7 6九玉(59)
8 5四歩(53)
9 2八銀(39)
10 7一飛(72)
11 3八飛(58)
12 9四歩(93)
... (中略) ...
140 5七角成(24)
141 5四銀(45)
142 5二玉(63)
143 5一金打
144 詰み
序盤の1八飛とか5八飛のあたりからすでに不自然な気がする。各手はエンジンが評価した Top-8 の中から選ばれてはいるが,エンジンはその局面だけしか考慮していない。将棋の「筋」や「構想」といった長期的な一連の流れをガン無視しているのだ。このあたりは改修の余地があるが,まぁ最初の方よりはまだマシになっていると思いたい。ルールも覚束ないような者同士の対局ならこういう感じになるんじゃないですかね。
さて,この棋譜の裏で,AさんとBさんはどういった会話をしているのだろうか。最近の調子を訊いているかもしれないし,勝負ごとゆえ憎まれ口を叩いているのかもしれない。
答え:
[1] 先手: 「こんきのあにめどう」
[2] 後手: 「かみいなぼたんいい」
[3] 先手: 「なるほどね」
[4] 後手: 「いいよ」
...え,『上伊那ぼたん、酔へる姿は百合の花』の話!!?!?
郡上かなで先輩!???!!!
ウー
復号できる相手のこと
ところで,これを実世界で人間ふたりがやるには,けっこう厳しい前提が要求される。
ひとつ目は,各局面で全合法手を瞬時に列挙できること。これはたぶん,将棋がばか強い人ならいけそうだ。たぶん。問題はふたつ目で,列挙したリストを完全に同じ基準で順位付けできること。
今回のプロトコルではエンジンが評価値を付けてくれるから,双方が同じエンジンを同じ設定で使う限り,Top-K の並びは一致する。
しかし,生身の人間ふたりが盤を挟んで本気でこれをやろうとすると,ふたりの将棋観が完全に同じであるか,もしくはお互いの将棋観を完全に把握していることが必要になる。つまり「あなたならこの局面でこの順に手を評価するはず」という予測が双方でばっっちり正確に成立しないと,復号できない。このプロトコルで,盤上の自然な対局を装ったまま誰にも気づかれずに雑談を成立させるには,相手の頭の中をかなり深いところまで知っている必要がある。
...それはそれで良いのでは?
盤上は将棋,盤下は雑談。その雑談の存在は,将棋観の共有によって担保される。漫画の演出で見ていた「盤の上だけで会話が成立しているように見える」ものの正体は,案外こういう種類の親密さだったのかもしれない,と思った。
おまけ
最終的なコード(長いので折りたたみ)
"""shogi_stego.py — 将棋・指し手ステガノグラフィー.
棋譜の中に会話を埋め込むステガノグラフィーツール.各局面で fairy-stockfish が
評価した上位 K 手の中から ``i = total mod K`` 番目を選ぶことで,対局譜として
自然な手順にひらがな会話を符号化する.
依存:
- Python 3.8+
- ``pip install python-shogi``
- fairy-stockfish の実行ファイル(largeboard 対応版が必要)
- Linux: ``apt install fairy-stockfish``
- Windows: https://github.com/fairy-stockfish/Fairy-Stockfish/releases から
``fairy-stockfish-largeboard_x86-64.exe`` をダウンロード
使い方:
符号化(送信側と同じ depth/top-k で復号する必要がある)::
python shogi_stego.py encode --message あ い う --depth 4 --top-k 8
復号(鍵パラメータが必須)::
python shogi_stego.py decode game.csa --depth 4 --top-k 8
プロトコル:
- ひらがな 78 字 + EOT 記号 = 79 シンボルで会話を符号化
- 各局面でエンジンの全合法手評価(MultiPV)を取り,スコア降順・USI 昇順
タイブレークでソート → 上位 K 手を採用
- N < K の局面は両者がスキップ判定(情報を載せない)
- 送信者は ``i = total mod K`` で手を選ぶ
- 送信者が EOT を送ると次のターンから相手が送信者になる
- 受信者は Top-K 内の指定範囲からランダムに rank を選ぶ(デコーダは CSA から
手を読むだけで rank を知る必要がない)
- オプションでメッセージ送信完了後も対局終了まで継続可能(``--play-to-end``).
継続部分はメッセージ送信中と同じ統計分布で手を選ぶので観察者から見て
不自然にならない.
決定論性:
Stockfish は ``Threads=1`` + ``ucinewgame`` でも完全な再現性は保証されない
(official-stockfish/Stockfish#859).本ツールはエンコーダとデコーダの両方が
``evaluate_all_moves`` のみを同じ順序で呼び出す設計にすることで,state リーク
があっても両者の経路が同一になり結果一致を保証する.受信者のランダム選択は
Python 側だけで完結し,エンジンに対するコマンド列には影響しない.
秘密鍵:
- ``StegoKey(depth, k)`` を両者で共有することが復号の前提
- エンジンバイナリ(バージョン)も実質的に鍵の一部
- 鍵が一致しないと番兵パターン ``[EOT, EOT, 1]`` が復元されないため,
プロトコルそのものが既知でも復号は事実上不可能
- 受信者の rank 範囲とシードはエンコーダ側だけの設定で,鍵には含まれない
"""
from __future__ import annotations
import argparse
import os
import platform
import random
import shutil
import subprocess
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
import shogi
# ============================================================
# プロトコル定数(アルファベット側)
# ============================================================
HIRAGANA_TABLE = list(
"あいうえおかきくけこさしすせそたちつてとなにぬねの"
"はひふへほまみむめもやゆよらりるれろわをんがぎぐげご"
"ざじずぜぞだぢづでどばびぶべぼぱぴぷぺぽゃゅょっー、。"
)
EOT_SYMBOL = "<EOT>"
ALPHABET = HIRAGANA_TABLE + [EOT_SYMBOL]
ALPHABET_SIZE = len(ALPHABET) # = 79
EOT_INDEX = ALPHABET_SIZE - 1
CHAR_TO_IDX = {c: i for i, c in enumerate(ALPHABET)}
def message_to_int(text: str) -> int:
"""テキストを 79 進整数に変換する.
末尾に ``[EOT, EOT]`` を付け,さらに最高位に ``1`` の番兵を追加した上で
79 進数として整数化する.EOT 2 連と最高位 1 がメッセージ終端の検出マーカー
として機能する.アルファベット側は 79 進で固定(手の選択側の K 進とは独立).
Args:
text: ひらがな等のメッセージ文字列.EOT 記号自体は含めない.
Returns:
変換された非負整数.
Raises:
ValueError: ``text`` にアルファベット外の文字が含まれる場合.
"""
bad = [c for c in text if c not in CHAR_TO_IDX or c == EOT_SYMBOL]
if bad:
raise ValueError(f"アルファベット外: {set(bad)}")
digits = [CHAR_TO_IDX[c] for c in text] + [EOT_INDEX, EOT_INDEX]
total = 0
for i, d in enumerate(digits):
total += d * (ALPHABET_SIZE**i)
total += ALPHABET_SIZE ** len(digits) # 番兵
return total
def int_to_one_message(total: int) -> str:
"""整数をメッセージ文字列に逆変換する.
``message_to_int`` の逆操作.末尾に ``[chars, EOT, EOT, 1]`` 構造を期待する.
構造が一致しない場合は不完全とみなし,可能な範囲で文字列化したものを返す
(EOT 位置には ``?`` を入れる).
Args:
total: ``message_to_int`` で生成された整数.0 以下なら空文字列.
Returns:
復元されたメッセージ文字列.構造異常時は部分復元結果.
"""
if total <= 0:
return ""
digits = []
while total > 0:
digits.append(total % ALPHABET_SIZE)
total //= ALPHABET_SIZE
if (
len(digits) < 3
or digits[-1] != 1
or digits[-2] != EOT_INDEX
or digits[-3] != EOT_INDEX
):
return "".join(ALPHABET[d] if d < ALPHABET_SIZE - 1 else "?" for d in digits)
return "".join(ALPHABET[d] for d in digits[:-3])
def is_message_complete(total: int) -> bool:
"""整数が完成したメッセージの終端パターンを持つか判定する.
79 進展開した最高位 3 桁が ``[番兵=1, EOT, EOT]`` ならメッセージ完成.
Args:
total: 累積整数値.
Returns:
終端パターンを満たすなら ``True``.
"""
if total <= 0:
return False
digits = []
t = total
while t > 0:
digits.append(t % ALPHABET_SIZE)
t //= ALPHABET_SIZE
return (
len(digits) >= 3
and digits[-1] == 1
and digits[-2] == EOT_INDEX
and digits[-3] == EOT_INDEX
)
# ============================================================
# 秘密鍵: Top-K強さ順ソートの設定
# ============================================================
class StegoKey:
"""Top-K ソート関数を決定する秘密鍵.
両者がこれを共有していれば復号できる.エンジンバイナリ(バージョン)も
実質的に鍵の一部.異なるバージョンの fairy-stockfish は評価値が完全には
一致しないので,送受信者で同じバイナリを使う必要がある.
Attributes:
depth: エンジン探索深さ.
k: 上位何手を選択肢とするか.
engine_id: エンジンバイナリ識別用のメモ(情報用,鍵の動作には影響しない).
"""
def __init__(
self, depth: int = 4, k: int = 8, engine_id: Optional[str] = None
) -> None:
"""StegoKey を初期化する.
Args:
depth: エンジン探索深さ.1 以上でなければならない.
k: 上位手数.2 以上でなければならない.
engine_id: エンジンバイナリ識別用のメモ.
Raises:
ValueError: ``depth`` または ``k`` が範囲外の場合.
"""
if k < 2:
raise ValueError("k must be >= 2")
if depth < 1:
raise ValueError("depth must be >= 1")
self.depth = depth
self.k = k
self.engine_id = engine_id
def __repr__(self) -> str:
return f"StegoKey(depth={self.depth}, k={self.k})"
# ============================================================
# fairy-stockfish ラッパー
# ============================================================
def find_fairy_stockfish(explicit_path: Optional[str] = None) -> Optional[str]:
"""fairy-stockfish の実行ファイルを探す.
優先順位: 引数 → 環境変数 ``FAIRY_STOCKFISH`` → プラットフォーム別の
既知のパス → ``PATH`` 内の検索.
Args:
explicit_path: 明示的なパス指定.存在すれば最優先.
Returns:
実行ファイルへのパス.見つからなければ ``None``.
"""
candidates: List[str] = []
if explicit_path:
candidates.append(explicit_path)
if os.environ.get("FAIRY_STOCKFISH"):
candidates.append(os.environ["FAIRY_STOCKFISH"])
if platform.system() == "Windows":
candidates += [
"fairy-stockfish.exe",
"fairy-stockfish-largeboard_x86-64.exe",
r"C:\tools\fairy-stockfish.exe",
r"C:\tools\fairy-stockfish-largeboard_x86-64.exe",
]
else:
candidates += [
"/usr/games/fairy-stockfish",
"/usr/local/bin/fairy-stockfish",
"fairy-stockfish",
]
for c in candidates:
if os.path.isfile(c):
return c
which = shutil.which(c)
if which:
return which
return None
class FairyStockfishEngine:
"""fairy-stockfish UCI ラッパー.
プロトコルの決定性を保つため,以下を厳格に守る:
- ``Threads=1`` 固定(並列探索は順序非決定的)
- ``Hash`` サイズ固定
- ``Skill Level=20`` 固定(弱化用の乱数を排除)
- 各 ``evaluate_all_moves`` の前に ``usinewgame`` で全探索状態をリセット
- エンジンに発行するコマンドは ``evaluate_all_moves`` のみ
(``best_move`` 等の余計なコマンドを混ぜると state リークによって
エンコーダ/デコーダの評価結果が乖離する原因になる)
"""
def __init__(self, path: str, hash_mb: int = 256) -> None:
"""エンジンを起動して初期化する.
Args:
path: fairy-stockfish 実行ファイルへのパス.
hash_mb: 置換表のサイズ(MiB).MultiPV=全合法手 + 深い探索で使うので
デフォルトは大きめの 256 MB.
"""
self.proc = subprocess.Popen(
[path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
)
self._send("usi")
self._wait_for("usiok")
self._send("setoption name UCI_Variant value shogi")
self._send("setoption name Protocol value usi")
self._send("setoption name Threads value 1")
self._send(f"setoption name Hash value {hash_mb}")
self._send("setoption name Skill Level value 20")
self._send("isready")
self._wait_for("readyok")
self._send("usinewgame")
def _send(self, cmd: str) -> None:
"""UCI コマンドを 1 行送信する.
Args:
cmd: 改行を含まないコマンド文字列.
"""
assert self.proc.stdin is not None
self.proc.stdin.write((cmd + "\n").encode("utf-8"))
self.proc.stdin.flush()
def _wait_for(self, prefix: str) -> str:
"""指定プレフィックスで始まる行が来るまで stdout を読む.
Args:
prefix: 待ち受けるプレフィックス(例 ``"readyok"``,``"bestmove"``).
Returns:
合致した行(前後の空白除去済).
Raises:
RuntimeError: エンジンが応答せずに閉じた場合.
"""
assert self.proc.stdout is not None
while True:
raw = self.proc.stdout.readline()
if not raw:
raise RuntimeError("engine closed")
line = raw.decode("utf-8", errors="replace").strip()
if line.startswith(prefix):
return line
def _set_position(self, board: shogi.Board) -> None:
"""``board`` の現在局面をエンジンに伝える.
``startpos moves ...`` 形式で送る(FEN は使わない.エンジンに局面の
移動履歴を見せる方がスキップ判定や評価で安定するため).
Args:
board: 現在の盤面.
"""
moves_str = " ".join(m.usi() for m in board.move_stack)
if moves_str:
self._send(f"position startpos moves {moves_str}")
else:
self._send("position startpos")
def evaluate_all_moves(self, board: shogi.Board, depth: int) -> Dict[str, int]:
"""``board`` の全合法手を MultiPV で評価する.
評価前に ``usinewgame`` で状態リセットを試みる.Stockfish issue #859 の
とおり ``ucinewgame`` でも完全リセットされない state が残る可能性は
あるが,エンコーダ/デコーダで同一コマンド列を流す限り両者は同じ state
履歴を経るので決定性自体は保たれる.
Args:
board: 評価対象の局面.
depth: 探索深さ(``go depth D``).
Returns:
``{usi文字列: スコア}`` の辞書(手番側から見た符号付きスコア).
合法手が無い場合は空辞書.MultiPV で報告されなかった手は最弱値
(-10^9) で埋められる.
Raises:
RuntimeError: 評価中にエンジンが閉じた場合.
"""
legal = list(board.legal_moves)
if not legal:
return {}
self._send("usinewgame")
self._send("isready")
self._wait_for("readyok")
self._send("setoption name Clear Hash")
self._send(f"setoption name MultiPV value {len(legal)}")
self._set_position(board)
self._send(f"go depth {depth}")
# MultiPV は各ランクごとに反復深化中の info を出すので,同じ usi に
# ついて何度も上書きされる.最後(最深)の値が残る.
scores: Dict[str, int] = {}
assert self.proc.stdout is not None
while True:
raw = self.proc.stdout.readline()
if not raw:
raise RuntimeError("engine closed during eval")
line = raw.decode("utf-8", errors="replace").strip()
if line.startswith("bestmove"):
break
if not line.startswith("info"):
continue
t = line.split()
if "multipv" not in t or "pv" not in t or "score" not in t:
continue
try:
si = t.index("score")
kind = t[si + 1]
val = int(t[si + 2])
if kind == "mate":
# 詰みは大きな絶対値に正規化(手数が短い詰みほど高評価)
score = 1_000_000 - val if val > 0 else -1_000_000 + val
else:
score = val
usi = t[t.index("pv") + 1]
scores[usi] = score
except (ValueError, IndexError):
continue
# MultiPV で報告されなかった手は最弱扱い(タイブレークでも USI 昇順で末尾)
for m in legal:
scores.setdefault(m.usi(), -(10**9))
return scores
def close(self) -> None:
"""エンジンを終了させる.例外は握りつぶす(cleanup 用途)."""
try:
self._send("quit")
self.proc.wait(timeout=2)
except Exception:
self.proc.kill()
# ============================================================
# Top-K強さ順ソート(プロトコルの中核)
# ============================================================
def keyed_top_k_moves(
board: shogi.Board, engine: FairyStockfishEngine, key: StegoKey
) -> Optional[List[shogi.Move]]:
"""秘密鍵 ``key`` に従ってソートした上位 K 手を返す.
エンジン評価値を降順,同点なら USI 文字列昇順でタイブレークしてソートする.
送信者・受信者・復号者すべてが同じ判定基準を使うので一貫性が保たれる.
Args:
board: 評価対象の局面.
engine: fairy-stockfish ラッパー.
key: 探索深さと K を含む秘密鍵.
Returns:
上位 K 手の ``shogi.Move`` のリスト.合法手数が K 未満なら ``None``
(情報を載せられないスキップ局面のシグナル).
"""
legal = list(board.legal_moves)
if len(legal) < key.k:
return None
scores = engine.evaluate_all_moves(board, key.depth)
legal_by_usi = {m.usi(): m for m in legal}
# スコア降順, USI 昇順でタイブレーク
sorted_usis = sorted(
legal_by_usi.keys(),
key=lambda u: (-scores[u], u),
)
return [legal_by_usi[u] for u in sorted_usis[: key.k]]
def fallback_move(board: shogi.Board) -> shogi.Move:
"""スキップ局面用の決定論的フォールバック手を返す.
USI 辞書順で最初の合法手を選ぶ.エンコーダ・デコーダ双方が同一の
フォールバックを使うことで一貫性を保つ.
Args:
board: 現在の盤面(合法手が 1 手以上ある必要がある).
Returns:
USI 昇順で最初の合法手.
"""
return sorted(board.legal_moves, key=lambda m: m.usi())[0]
# ============================================================
# 符号化・復号
# ============================================================
def encode_with_trace(
messages: List[str],
engine: FairyStockfishEngine,
key: StegoKey,
rng: random.Random,
receiver_rank_min: int,
receiver_rank_max: int,
max_plies: int = 400,
play_to_end: bool = False,
) -> Tuple[List[str], List[Dict[str, Any]]]:
"""メッセージのリストを棋譜化し,処理トレースも返す.
各局面でのロール選択ルール:
- 送信者ターン: ``Top-K[i]``(``i = total mod K``)
- 受信者ターン: ``Top-K[r]``(``r`` は ``[receiver_rank_min,
receiver_rank_max]`` から ``rng`` で一様ランダム選択)
- スキップ局面(合法手数 < K): ``fallback_move`` で決定論的に進める
``rng`` はエンコーダ側のみ参照される.受信者の手は CSA から読み戻されるだけ
なので,デコーダはランダム選択結果を知る必要がない.エンジンに発行される
コマンド列は ``evaluate_all_moves`` のみで,デコーダと完全に同一になる.
``play_to_end=True`` の場合,全メッセージ送信完了後も対局終了または
``max_plies`` まで指し続ける.継続部分の手はメッセージ送信中と同じ
統計分布に従う:
- 「送信者」相当ターン: rank uniform ``[0, K-1]``(``i = total mod K``
の分布と一致)
- 「受信者」相当ターン: rank uniform ``[receiver_rank_min,
receiver_rank_max]``
デコーダは継続部分も普通に読むので,ランダム rank が累積して意味不明な
「メッセージ」が出力される(番兵パターンに偶然当たると 1 つの「メッセージ」
が完結する).
Args:
messages: 符号化するメッセージのリスト.先頭から先手・後手と交互に
割り当てられる.
engine: fairy-stockfish ラッパー.
key: ``depth`` と ``k`` を含む秘密鍵.
rng: 受信者・継続フェーズで使う乱数生成器.
receiver_rank_min: 受信者 rank の下限(包含).
receiver_rank_max: 受信者 rank の上限(包含).
max_plies: 棋譜の最大手数.これを超えると失敗 or 継続停止.
play_to_end: メッセージ送信完了後も対局終了まで指し続けるか.
Returns:
``(moves, trace)`` のタプル.
- ``moves``: USI 文字列のリスト(CSA 化用).
- ``trace``: 各 ply の役割情報を含む辞書のリスト.キーは ``"ply"``,
``"usi"``, ``"role"`` 必須.``role`` は ``"sender"`` ``"receiver"``
``"skip"`` ``"filler_sender"`` ``"filler_receiver"`` ``"filler_skip"``
のいずれか.
Raises:
ValueError: ``receiver_rank_min/max`` が ``[0, K)`` の範囲外,または
``min > max`` の場合.
RuntimeError: メッセージ送信中に対局終了 or ``max_plies`` 超過した場合.
"""
if not (0 <= receiver_rank_min <= receiver_rank_max < key.k):
raise ValueError(
f"receiver_rank_range invalid: [{receiver_rank_min}, {receiver_rank_max}]"
)
msg_ints = [message_to_int(m) for m in messages]
msg_idx = 0
board = shogi.Board()
moves: List[str] = []
trace: List[Dict[str, Any]] = []
sender_color = shogi.BLACK
# ===== Phase 1: 実メッセージの符号化 =====
while msg_idx < len(msg_ints):
if board.is_game_over():
raise RuntimeError(f"対局終了: 残メッセージ #{msg_idx}, {len(moves)}手")
if len(moves) >= max_plies:
raise RuntimeError(f"max_plies超過: 残メッセージ #{msg_idx}")
top_k = keyed_top_k_moves(board, engine, key)
if top_k is None:
# N < K のスキップ局面.情報を載せず決定論的フォールバック手を指す.
move = fallback_move(board)
n_legal = sum(1 for _ in board.legal_moves)
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "skip",
"reason": f"N={n_legal} < K={key.k}",
}
)
elif board.turn == sender_color:
# 送信者: Top-K の中から i = total mod K 番目を選ぶ
total = msg_ints[msg_idx]
i = total % key.k
total //= key.k
move = top_k[i]
msg_ints[msg_idx] = total
completed: Optional[str] = None
if total == 0:
completed = messages[msg_idx]
msg_idx += 1
sender_color = (
shogi.WHITE if sender_color == shogi.BLACK else shogi.BLACK
)
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "sender",
"sender": "A" if board.turn == shogi.BLACK else "B",
"rank": i,
"completed_msg": completed,
}
)
else:
# 受信者: 指定範囲からランダムに rank を選ぶ
chosen_rank = rng.randint(receiver_rank_min, receiver_rank_max)
move = top_k[chosen_rank]
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "receiver",
"receiver": "A" if board.turn == shogi.BLACK else "B",
"rank": chosen_rank,
}
)
moves.append(move.usi())
board.push(move)
# ===== Phase 2: 対局終了まで埋める(オプション) =====
if play_to_end:
while not board.is_game_over() and len(moves) < max_plies:
top_k = keyed_top_k_moves(board, engine, key)
if top_k is None:
move = fallback_move(board)
n_legal = sum(1 for _ in board.legal_moves)
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "filler_skip",
"reason": f"N={n_legal} < K={key.k}",
}
)
elif board.turn == sender_color:
# 「送信者」相当: rank uniform [0, K-1]
# (メッセージ送信時の i = total mod K の分布と一致させる)
chosen_rank = rng.randint(0, key.k - 1)
move = top_k[chosen_rank]
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "filler_sender",
"rank": chosen_rank,
}
)
else:
# 「受信者」相当: メッセージ送信時と同じ分布
chosen_rank = rng.randint(receiver_rank_min, receiver_rank_max)
move = top_k[chosen_rank]
trace.append(
{
"ply": len(moves) + 1,
"usi": move.usi(),
"role": "filler_receiver",
"rank": chosen_rank,
}
)
moves.append(move.usi())
board.push(move)
return moves, trace
def decode_conversation(
usi_moves: List[str], engine: FairyStockfishEngine, key: StegoKey
) -> List[Tuple[str, str]]:
"""USI 棋譜から会話メッセージを復号する.
エンコード時と同じ ``key`` を使う必要がある.デコーダは各局面で Top-K を
再計算して指された手のランクを取り出し,K 進数として累積する.番兵パターン
が現れた時点で 1 つのメッセージを確定し,送信者を交代する.
Args:
usi_moves: 復号対象の USI 文字列のリスト.
engine: fairy-stockfish ラッパー.
key: エンコード時と同じ秘密鍵.
Returns:
``(label, message)`` のリスト.``label`` は ``"sente"`` または ``"gote"``.
最終局面で累積が残っている場合は末尾に ``" <INCOMPLETE>"`` を付した
断片が含まれる.
"""
board = shogi.Board()
turns: List[Tuple[str, str]] = []
sender_color = shogi.BLACK
accumulator = 0
factor = 1
for usi in usi_moves:
top_k = keyed_top_k_moves(board, engine, key)
move = shogi.Move.from_usi(usi)
if top_k is None:
# スキップ局面.accumulator/factor を更新せず先へ.
board.push(move)
continue
if board.turn == sender_color:
top_k_usis = [m.usi() for m in top_k]
if usi in top_k_usis:
i = top_k_usis.index(usi)
accumulator += i * factor
factor *= key.k
if is_message_complete(accumulator):
msg = int_to_one_message(accumulator)
label = "sente" if sender_color == shogi.BLACK else "gote"
turns.append((label, msg))
sender_color = (
shogi.WHITE if sender_color == shogi.BLACK else shogi.BLACK
)
accumulator = 0
factor = 1
# else: top_k に無い手=プロトコル違反(普通の対局譜の可能性).
# ここでは無視して先へ.accumulator は途中状態のまま.
# 受信者の手番では何もしない(プロトコル外).
board.push(move)
if accumulator > 0:
partial = int_to_one_message(accumulator)
label = "sente" if sender_color == shogi.BLACK else "gote"
turns.append((label, partial + " <INCOMPLETE>"))
return turns
# ============================================================
# CSA出力 + 表示用ユーティリティ
# ============================================================
_PIECE_TO_CSA = {
shogi.PAWN: "FU",
shogi.LANCE: "KY",
shogi.KNIGHT: "KE",
shogi.SILVER: "GI",
shogi.GOLD: "KI",
shogi.BISHOP: "KA",
shogi.ROOK: "HI",
shogi.KING: "OU",
shogi.PROM_PAWN: "TO",
shogi.PROM_LANCE: "NY",
shogi.PROM_KNIGHT: "NK",
shogi.PROM_SILVER: "NG",
shogi.PROM_BISHOP: "UM",
shogi.PROM_ROOK: "RY",
}
_PROMOTE_MAP = {
shogi.PAWN: shogi.PROM_PAWN,
shogi.LANCE: shogi.PROM_LANCE,
shogi.KNIGHT: shogi.PROM_KNIGHT,
shogi.SILVER: shogi.PROM_SILVER,
shogi.BISHOP: shogi.PROM_BISHOP,
shogi.ROOK: shogi.PROM_ROOK,
}
_PIECE_JP = {
shogi.PAWN: "歩",
shogi.LANCE: "香",
shogi.KNIGHT: "桂",
shogi.SILVER: "銀",
shogi.GOLD: "金",
shogi.BISHOP: "角",
shogi.ROOK: "飛",
shogi.KING: "玉",
shogi.PROM_PAWN: "と",
shogi.PROM_LANCE: "成香",
shogi.PROM_KNIGHT: "成桂",
shogi.PROM_SILVER: "成銀",
shogi.PROM_BISHOP: "馬",
shogi.PROM_ROOK: "龍",
}
_NUM_JP = "一二三四五六七八九"
def usi_to_csa_move(usi: str, board: shogi.Board, color: str) -> str:
"""USI 1 手を CSA 形式の手筋表記に変換する.
Args:
usi: USI 形式の手(例 ``"7g7f"``,``"P*5e"``).
board: その手を指す直前の盤面.駒種推定に使う.
color: ``"+"`` または ``"-"``.
Returns:
CSA 形式の 1 手文字列(例 ``"+7776FU"``).
"""
move = shogi.Move.from_usi(usi)
to_sq = move.to_square
to_file = 9 - (to_sq % 9)
to_rank = (to_sq // 9) + 1
if move.from_square is None:
return f"{color}00{to_file}{to_rank}{_PIECE_TO_CSA[move.drop_piece_type]}"
fs = move.from_square
from_file = 9 - (fs % 9)
from_rank = (fs // 9) + 1
ptype = board.piece_at(fs).piece_type
if move.promotion:
ptype = _PROMOTE_MAP.get(ptype, ptype)
return f"{color}{from_file}{from_rank}{to_file}{to_rank}{_PIECE_TO_CSA[ptype]}"
def emit_csa(
usi_moves: List[str],
sente: str = "Sender",
gote: str = "Receiver",
event: str = "Stego",
) -> str:
"""USI 棋譜から CSA フォーマット文字列を生成する.
Args:
usi_moves: USI 文字列のリスト.
sente: 先手の名前.
gote: 後手の名前.
event: イベント名(``$EVENT:`` ヘッダ).
Returns:
CSA フォーマットの文字列(末尾改行付き).対局結果は ``%CHUDAN``
(途中で終了)として書き出される.
"""
lines = ["V2.2", f"N+{sente}", f"N-{gote}", f"$EVENT:{event}", "PI", "+"]
board = shogi.Board()
for usi in usi_moves:
color = "+" if board.turn == shogi.BLACK else "-"
lines.append(usi_to_csa_move(usi, board, color))
lines.append("T1")
board.push(shogi.Move.from_usi(usi))
lines.append("%CHUDAN")
return "\n".join(lines) + "\n"
def parse_csa(path: str) -> Dict[str, Any]:
"""CSA ファイルを読んで主要情報を辞書で返す.
エンコーディングは UTF-8 → Shift_JIS → CP932 の順で試す.
Args:
path: 読み込む CSA ファイルのパス.
Returns:
``{"moves": List[str], "sfen": str, "names": List[Optional[str]]}``
の辞書.``moves`` は USI 文字列のリスト,``names`` は ``[先手名, 後手名]``.
Raises:
UnicodeDecodeError: いずれのエンコーディングでも読めなかった場合.
"""
import shogi.CSA
text: Optional[str] = None
last_err: Optional[UnicodeDecodeError] = None
for enc in ("utf-8", "shift_jis", "cp932"):
try:
with open(path, encoding=enc) as f:
text = f.read()
break
except UnicodeDecodeError as e:
last_err = e
if text is None:
raise UnicodeDecodeError(
f"CSAファイルのエンコーディング判定失敗: {path} ({last_err})"
)
parsed = shogi.CSA.Parser.parse_str(text)[0]
return {
"moves": parsed["moves"],
"sfen": parsed["sfen"],
"names": parsed.get("names", [None, None]),
}
def usi_to_jp(usi: str, board: shogi.Board) -> str:
"""USI 1 手を日本語の指し手表記に変換する(表示専用).
例: ``"7g7f"`` → ``"7六歩"``,``"P*5e"`` → ``"5五歩打"``.
Args:
usi: USI 形式の手.
board: その手を指す直前の盤面.
Returns:
日本語表記の文字列.
"""
move = shogi.Move.from_usi(usi)
to_sq = move.to_square
to_file = 9 - (to_sq % 9)
to_rank = (to_sq // 9) + 1
to_str = f"{to_file}{_NUM_JP[to_rank - 1]}"
if move.from_square is None:
return f"{to_str}{_PIECE_JP[move.drop_piece_type]}打"
ptype = board.piece_at(move.from_square).piece_type
suffix = "成" if move.promotion else ""
return f"{to_str}{_PIECE_JP[ptype]}{suffix}"
# ============================================================
# メイン: サブコマンド方式
# ============================================================
def _make_engine(
args: argparse.Namespace,
) -> Tuple[FairyStockfishEngine, str]:
"""CLI 引数からエンジンを起動して返す.
エンジンが見つからない場合はヘルプメッセージを出して ``sys.exit(1)`` する.
Args:
args: ``argparse`` で解析した引数.``args.engine`` を参照する.
Returns:
``(engine, engine_path)`` のタプル.
"""
engine_path = find_fairy_stockfish(args.engine)
if engine_path is None:
_print_engine_help()
sys.exit(1)
print(f"engine: {engine_path}")
return FairyStockfishEngine(engine_path), engine_path
def cmd_decode(args: argparse.Namespace) -> None:
"""``decode`` サブコマンドのハンドラ.
既存の CSA ファイルを読み込み,鍵パラメータに従って復号する.
Args:
args: ``argparse`` で解析した引数.
"""
print(f"input: {args.input}")
parsed = parse_csa(args.input)
print(f"先手: {parsed['names'][0]}, 後手: {parsed['names'][1]}")
print(f"手数: {len(parsed['moves'])}")
key = StegoKey(depth=args.depth, k=args.top_k)
print(f"key: {key}")
engine, _ = _make_engine(args)
try:
t0 = time.time()
turns = decode_conversation(parsed["moves"], engine, key)
print(f" ({time.time() - t0:.1f}秒)")
finally:
engine.close()
print(f"\n復号結果 ({len(turns)}発話):")
print("-" * 60)
for i, (label, msg) in enumerate(turns):
speaker = "先手" if label == "sente" else "後手"
marker = " ← 不完全" if "<INCOMPLETE>" in msg else ""
clean_msg = msg.replace(" <INCOMPLETE>", "")
print(f" [{i + 1}] {speaker}: 「{clean_msg}」{marker}")
if any("<INCOMPLETE>" in m for _, m in turns):
print("\n注: 末尾にEOTが見つからない不完全メッセージがあります.")
print(" 鍵 (depth, top-k) が一致しない,プロトコル外の通常棋譜,")
print(" あるいは --play-to-end で生成された継続部分の可能性があります.")
def cmd_encode(args: argparse.Namespace) -> None:
"""``encode`` サブコマンドのハンドラ.
会話文字列リストを CSA ファイルに符号化する.受信者の手は
``[receiver_rank_min, receiver_rank_max]`` からランダムに rank を選ぶので
同じメッセージでも毎回異なる棋譜が生成される.対局が早く終わって
メッセージが収まらない場合は別シードで自動リトライする.
Args:
args: ``argparse`` で解析した引数.
"""
if args.message:
conversation = args.message
elif args.file:
with open(args.file, encoding="utf-8") as f:
conversation = [line.rstrip("\n\r") for line in f if line.strip()]
else:
conversation = [
"もうかりまっか",
"ぼちぼちでんな",
]
print("符号化する会話:")
for i, m in enumerate(conversation):
print(f" {'A' if i % 2 == 0 else 'B'}さん: 「{m}」")
key = StegoKey(depth=args.depth, k=args.top_k)
print(f"\nkey: {key}")
# 受信者 rank の範囲を決定(None 指定なら K//2 〜 K-1)
rank_min = (
args.receiver_rank_min
if args.receiver_rank_min is not None
else args.top_k // 2
)
rank_max = (
args.receiver_rank_max if args.receiver_rank_max is not None else args.top_k - 1
)
print(f"receiver rank range: [{rank_min}, {rank_max}]")
# シード(明示指定なければランダムに決める)
base_seed = args.seed if args.seed is not None else random.randrange(2**31)
print(f"seed: {base_seed}")
expected: List[Tuple[str, str]] = [
("sente" if i % 2 == 0 else "gote", m) for i, m in enumerate(conversation)
]
moves: Optional[List[str]] = None
trace: Optional[List[Dict[str, Any]]] = None
used_seed: Optional[int] = None
last_error: Any = None
for attempt in range(args.retry + 1):
attempt_seed = base_seed + attempt
if args.retry > 0:
print(
f"\n--- 試行 {attempt + 1}/{args.retry + 1} (seed={attempt_seed}) ---"
)
else:
print()
rng = random.Random(attempt_seed)
engine, engine_path = _make_engine(args)
if attempt == 0:
key.engine_id = os.path.basename(engine_path)
try:
t0 = time.time()
try:
moves_attempt, trace_attempt = encode_with_trace(
conversation,
engine,
key,
rng,
receiver_rank_min=rank_min,
receiver_rank_max=rank_max,
play_to_end=args.play_to_end,
)
n_msg = sum(
1
for t in trace_attempt
if t["role"] in ("sender", "receiver", "skip")
)
n_filler = len(trace_attempt) - n_msg
if n_filler > 0:
print(
f" 符号化: {n_msg}手でメッセージ完成,"
f"+{n_filler}手で対局終了まで継続 "
f"(計{len(moves_attempt)}手, {time.time() - t0:.1f}秒)"
)
else:
print(
f" 符号化: {len(moves_attempt)}手で完成 "
f"({time.time() - t0:.1f}秒)"
)
n_skip = sum(
1 for t in trace_attempt if t["role"] in ("skip", "filler_skip")
)
if n_skip:
print(f" (スキップ局面: {n_skip}手)")
except RuntimeError as e:
print(f" 符号化失敗: {e}")
last_error = e
continue
finally:
engine.close()
# ラウンドトリップ確認(新規エンジンインスタンスで復号).
# 設計上必ず一致するはずだが念のため検証する.
print(" ラウンドトリップ確認...", end="", flush=True)
decode_engine, _ = _make_engine(args)
try:
decoded = decode_conversation(moves_attempt, decode_engine, key)
finally:
decode_engine.close()
# play-to-end の場合,継続部分から偶発的に「ノイズメッセージ」が
# 復号されるので,先頭 N 個(実メッセージ数)のみ一致を確認する.
n_real = len(expected)
if decoded[:n_real] == expected:
print(" ✓ 完全一致")
extra = decoded[n_real:]
if extra:
print(f" (継続部分から {len(extra)} 個のノイズが復号された.例:)")
for label, msg in extra[:3]:
speaker = "先手" if label == "sente" else "後手"
clean = msg.replace(" <INCOMPLETE>", "")
marker = " ← 不完全" if "<INCOMPLETE>" in msg else ""
print(f" {speaker}: 「{clean}」{marker}")
if len(extra) > 3:
print(f" ... 他 {len(extra) - 3} 個")
moves, trace = moves_attempt, trace_attempt
used_seed = attempt_seed
break
else:
print(" ✗ 不一致 (想定外)")
for i in range(max(n_real, len(decoded))):
e_i = expected[i] if i < n_real else None
d_i = decoded[i] if i < len(decoded) else None
if e_i != d_i:
print(f" [{i}] expected={e_i}")
print(f" decoded ={d_i}")
break
last_error = "ラウンドトリップ不一致"
if moves is None or trace is None:
print(f"\n全{args.retry + 1}試行で失敗.直前のエラー: {last_error}")
print("対策:")
print(" - --retry を増やす")
print(" - --top-k を上げる(情報密度↑で短い手数で収まる)")
print(" - --receiver-rank-min を K-1 に近づける(受信者が弱化→対局延長)")
print(" - メッセージを短くする")
sys.exit(1)
# 棋譜表示
if not args.quiet:
print("\n" + "=" * 70)
board = shogi.Board()
speaker: Optional[str] = None
in_filler = False
for t in trace:
jp = usi_to_jp(t["usi"], board)
mark = "▲" if board.turn == shogi.BLACK else "△"
role = t["role"]
if role.startswith("filler_") and not in_filler:
print(
" ─── メッセージ送信完了.以降は対局終了まで継続"
"(復号するとノイズになる) ───"
)
in_filler = True
speaker = None
if role == "sender":
if t["sender"] != speaker:
print(f" ┌─ {t['sender']}さんが話し始めた")
speaker = t["sender"]
note = f"[{t['sender']}さん 送信中… rank={t['rank']}]"
if t.get("completed_msg"):
note = f"[{t['sender']}さん 「{t['completed_msg']}」発話完了]"
speaker = None
elif role == "skip":
note = f"({t['reason']} スキップ)"
elif role == "receiver":
note = f"({t['receiver']}さん 応手 rank={t['rank']})"
elif role == "filler_sender":
note = f"(ノイズ送信 rank={t['rank']})"
elif role == "filler_receiver":
note = f"(ノイズ応手 rank={t['rank']})"
elif role == "filler_skip":
note = f"(ノイズ {t['reason']} スキップ)"
else:
note = f"(role={role})"
print(f"{t['ply']:>3} {mark}{jp:<8} {note}")
board.push(shogi.Move.from_usi(t["usi"]))
# CSA保存
csa = emit_csa(moves, sente="Aさん", gote="Bさん", event="将棋")
with open(args.out, "w", encoding="utf-8") as f:
f.write(csa)
print(f"\n→ CSA: {args.out}")
print(f" (この棋譜の seed: {used_seed})")
def _print_engine_help() -> None:
"""エンジンが見つからないときのヘルプを stderr に出す."""
print("ERROR: fairy-stockfish が見つかりません.", file=sys.stderr)
print(" Linux: apt install fairy-stockfish", file=sys.stderr)
print(
" Windows: https://github.com/fairy-stockfish/Fairy-Stockfish/releases から",
file=sys.stderr,
)
print(
" fairy-stockfish-largeboard_x86-64.exe をダウンロード",
file=sys.stderr,
)
print(
" --engine PATH または環境変数 FAIRY_STOCKFISH を設定してください",
file=sys.stderr,
)
def _add_key_args(parser: argparse.ArgumentParser) -> None:
"""encode/decode 共通の鍵パラメータをパーサに追加する.
Args:
parser: 引数を追加する対象のサブパーサ.
"""
parser.add_argument(
"--depth",
type=int,
default=4,
help="エンジン探索深さ(鍵パラメータ,両者で一致必須.default: 4)",
)
parser.add_argument(
"--top-k",
type=int,
default=8,
help="使用する上位手数(鍵パラメータ,両者で一致必須.default: 8)",
)
parser.add_argument("--engine", default=None, help="fairy-stockfish実行ファイル")
def main() -> None:
"""CLI エントリポイント.サブコマンドをディスパッチする."""
parser = argparse.ArgumentParser(
description="将棋・指し手ステガノグラフィー",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""使用例:
# 符号化(depth=4, K=8 をデフォルトに)
python shogi_stego.py encode --message こんにちは
# 復号は同じ鍵で
python shogi_stego.py decode chitchat.csa --depth 4 --top-k 8
# より深い探索 + より大きい K(情報密度↑,速度↓)
python shogi_stego.py encode --message ありがとう --depth 6 --top-k 16
# 対局終了まで指し続けて尻尾をノイズで埋める
python shogi_stego.py encode --message やあ --play-to-end
""",
)
sub = parser.add_subparsers(dest="cmd", required=True)
# decode サブコマンド
p_dec = sub.add_parser("decode", help="既存のCSAをプロトコルで復号")
p_dec.add_argument("input", help="読み込むCSAファイル")
_add_key_args(p_dec)
p_dec.set_defaults(func=cmd_decode)
# encode サブコマンド
p_enc = sub.add_parser("encode", help="会話を符号化してCSAを生成")
p_enc.add_argument(
"--message",
"-m",
nargs="+",
help="会話メッセージ(スペース区切り,先手から交互)",
)
p_enc.add_argument("--file", "-f", help="会話ファイル(1行1メッセージ)")
p_enc.add_argument(
"--out", "-o", default="chitchat.csa", help="CSA出力先 (default: chitchat.csa)"
)
_add_key_args(p_enc)
p_enc.add_argument(
"--receiver-rank-min",
type=int,
default=None,
help="受信者 rank の下限.default: K//2."
"鍵には含まれない(デコーダは受信者の rank を知る必要がない)",
)
p_enc.add_argument(
"--receiver-rank-max",
type=int,
default=None,
help="受信者 rank の上限.default: K-1."
"min と max が等しいと毎回同じ棋譜になる",
)
p_enc.add_argument(
"--seed",
type=int,
default=None,
help="乱数シード.省略時はランダム.同じ seed なら同じ棋譜が再現される",
)
p_enc.add_argument(
"--retry",
type=int,
default=5,
help="符号化失敗時のリトライ回数."
"受信者がランダムなので対局が早く終わると失敗することがある.default: 5",
)
p_enc.add_argument(
"--play-to-end",
action="store_true",
help="メッセージ送信完了後も対局終了まで指し続ける."
"継続部分はメッセージ送信中と同じ統計分布で手を選ぶので"
"観察者から見て不自然にならない."
"復号すると後半は意味不明な「メッセージ」として現れる.",
)
p_enc.add_argument("--quiet", "-q", action="store_true", help="棋譜表示を省略")
p_enc.set_defaults(func=cmd_encode)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()
