Save finished games and add history views
This commit is contained in:
@@ -0,0 +1,290 @@
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterator, Optional
|
||||
|
||||
from flask import g, has_app_context
|
||||
|
||||
from app.db import connect_db
|
||||
|
||||
|
||||
PIECE_IMAGE_BY_SYMBOL = {
|
||||
"P": "wp.png",
|
||||
"N": "wn.png",
|
||||
"B": "wb.png",
|
||||
"R": "wr.png",
|
||||
"Q": "wq.png",
|
||||
"K": "wk.png",
|
||||
"p": "bp.png",
|
||||
"n": "bn.png",
|
||||
"b": "bb.png",
|
||||
"r": "br.png",
|
||||
"q": "bq.png",
|
||||
"k": "bk.png",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SavedGameSummary:
|
||||
id: int
|
||||
opponent_id: int
|
||||
opponent_username: str
|
||||
my_color: str
|
||||
opponent_color: str
|
||||
winner_color: Optional[str]
|
||||
termination: Optional[str]
|
||||
termination_detail: Optional[str]
|
||||
time_mode: Optional[str]
|
||||
move_count: int
|
||||
started_at: Optional[str]
|
||||
ended_at: Optional[str]
|
||||
|
||||
@property
|
||||
def result(self) -> str:
|
||||
if self.winner_color == "draw" or not self.winner_color:
|
||||
return "draw"
|
||||
return "win" if self.winner_color == self.my_color else "loss"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SavedGameDetail(SavedGameSummary):
|
||||
final_fen: str
|
||||
move_history: list[str]
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _db_session() -> Iterator:
|
||||
if has_app_context() and "db" in g:
|
||||
yield g.db
|
||||
return
|
||||
|
||||
db = connect_db()
|
||||
try:
|
||||
yield db
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def save_finished_game(
|
||||
*,
|
||||
white_player_id: int,
|
||||
black_player_id: int,
|
||||
final_fen: str,
|
||||
termination: str,
|
||||
termination_detail: str,
|
||||
winner_color: Optional[str],
|
||||
move_history: list[str],
|
||||
time_mode: Optional[str],
|
||||
started_at: Optional[str],
|
||||
ended_at: Optional[str],
|
||||
) -> int:
|
||||
winner_value = winner_color if winner_color in {"white", "black"} else "draw"
|
||||
|
||||
with _db_session() as db:
|
||||
cursor = db.execute(
|
||||
"""
|
||||
INSERT INTO games (
|
||||
final_fen,
|
||||
termination,
|
||||
termination_detail,
|
||||
winner_color,
|
||||
move_history,
|
||||
time_mode,
|
||||
started_at,
|
||||
ended_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
final_fen,
|
||||
termination,
|
||||
termination_detail,
|
||||
winner_value,
|
||||
json.dumps(move_history),
|
||||
time_mode,
|
||||
started_at,
|
||||
ended_at,
|
||||
),
|
||||
)
|
||||
game_id = int(cursor.lastrowid)
|
||||
|
||||
db.executemany(
|
||||
"""
|
||||
INSERT INTO game_players (game_id, player_id, color)
|
||||
VALUES (?, ?, ?)
|
||||
""",
|
||||
[
|
||||
(game_id, white_player_id, "white"),
|
||||
(game_id, black_player_id, "black"),
|
||||
],
|
||||
)
|
||||
db.commit()
|
||||
|
||||
return game_id
|
||||
|
||||
|
||||
def list_games_for_user(user_id: int) -> list[SavedGameSummary]:
|
||||
with _db_session() as db:
|
||||
rows = db.execute(
|
||||
"""
|
||||
SELECT
|
||||
g.id,
|
||||
g.winner_color,
|
||||
g.termination,
|
||||
g.termination_detail,
|
||||
g.time_mode,
|
||||
g.started_at,
|
||||
g.ended_at,
|
||||
g.move_history,
|
||||
me.color AS my_color,
|
||||
opp.color AS opponent_color,
|
||||
u.id AS opponent_id,
|
||||
u.username AS opponent_username
|
||||
FROM games g
|
||||
JOIN game_players me
|
||||
ON me.game_id = g.id
|
||||
AND me.player_id = ?
|
||||
JOIN game_players opp
|
||||
ON opp.game_id = g.id
|
||||
AND opp.player_id != ?
|
||||
JOIN users u
|
||||
ON u.id = opp.player_id
|
||||
ORDER BY COALESCE(g.ended_at, g.created_at) DESC, g.id DESC
|
||||
""",
|
||||
(user_id, user_id),
|
||||
).fetchall()
|
||||
|
||||
return [_row_to_summary(row) for row in rows]
|
||||
|
||||
|
||||
def get_game_for_user(game_id: int, user_id: int) -> Optional[SavedGameDetail]:
|
||||
with _db_session() as db:
|
||||
row = db.execute(
|
||||
"""
|
||||
SELECT
|
||||
g.id,
|
||||
g.final_fen,
|
||||
g.winner_color,
|
||||
g.termination,
|
||||
g.termination_detail,
|
||||
g.time_mode,
|
||||
g.started_at,
|
||||
g.ended_at,
|
||||
g.move_history,
|
||||
me.color AS my_color,
|
||||
opp.color AS opponent_color,
|
||||
u.id AS opponent_id,
|
||||
u.username AS opponent_username
|
||||
FROM games g
|
||||
JOIN game_players me
|
||||
ON me.game_id = g.id
|
||||
AND me.player_id = ?
|
||||
JOIN game_players opp
|
||||
ON opp.game_id = g.id
|
||||
AND opp.player_id != ?
|
||||
JOIN users u
|
||||
ON u.id = opp.player_id
|
||||
WHERE g.id = ?
|
||||
LIMIT 1
|
||||
""",
|
||||
(user_id, user_id, game_id),
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
summary = _row_to_summary(row)
|
||||
return SavedGameDetail(
|
||||
**summary.__dict__,
|
||||
final_fen=row["final_fen"] or "",
|
||||
move_history=_parse_move_history(row["move_history"]),
|
||||
)
|
||||
|
||||
|
||||
def build_board_rows(fen: str) -> list[list[dict[str, Optional[str]]]]:
|
||||
board_part = (fen or "").split(" ", 1)[0]
|
||||
raw_rows = board_part.split("/")
|
||||
if len(raw_rows) != 8:
|
||||
raw_rows = ["8"] * 8
|
||||
|
||||
rows: list[list[dict[str, Optional[str]]]] = []
|
||||
for rank_index, raw_row in enumerate(raw_rows):
|
||||
row: list[dict[str, Optional[str]]] = []
|
||||
file_index = 0
|
||||
for char in raw_row:
|
||||
if char.isdigit():
|
||||
for _ in range(int(char)):
|
||||
row.append(
|
||||
{
|
||||
"square": f"{'abcdefgh'[file_index]}{8 - rank_index}",
|
||||
"piece_image": None,
|
||||
"shade": "light" if (file_index + rank_index) % 2 == 0 else "dark",
|
||||
}
|
||||
)
|
||||
file_index += 1
|
||||
continue
|
||||
|
||||
row.append(
|
||||
{
|
||||
"square": f"{'abcdefgh'[file_index]}{8 - rank_index}",
|
||||
"piece_image": PIECE_IMAGE_BY_SYMBOL.get(char),
|
||||
"shade": "light" if (file_index + rank_index) % 2 == 0 else "dark",
|
||||
}
|
||||
)
|
||||
file_index += 1
|
||||
|
||||
while len(row) < 8:
|
||||
row.append(
|
||||
{
|
||||
"square": f"{'abcdefgh'[len(row)]}{8 - rank_index}",
|
||||
"piece_image": None,
|
||||
"shade": "light" if (len(row) + rank_index) % 2 == 0 else "dark",
|
||||
}
|
||||
)
|
||||
rows.append(row)
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def group_move_pairs(move_history: list[str]) -> list[dict[str, str]]:
|
||||
pairs = []
|
||||
for index in range(0, len(move_history), 2):
|
||||
pairs.append(
|
||||
{
|
||||
"move_no": str((index // 2) + 1),
|
||||
"white": move_history[index],
|
||||
"black": move_history[index + 1] if index + 1 < len(move_history) else "",
|
||||
}
|
||||
)
|
||||
return pairs
|
||||
|
||||
|
||||
def _row_to_summary(row) -> SavedGameSummary:
|
||||
move_history = _parse_move_history(row["move_history"])
|
||||
return SavedGameSummary(
|
||||
id=row["id"],
|
||||
opponent_id=row["opponent_id"],
|
||||
opponent_username=row["opponent_username"],
|
||||
my_color=row["my_color"],
|
||||
opponent_color=row["opponent_color"],
|
||||
winner_color=row["winner_color"],
|
||||
termination=row["termination"],
|
||||
termination_detail=row["termination_detail"],
|
||||
time_mode=row["time_mode"],
|
||||
move_count=len(move_history),
|
||||
started_at=row["started_at"],
|
||||
ended_at=row["ended_at"],
|
||||
)
|
||||
|
||||
|
||||
def _parse_move_history(raw_value: Optional[str]) -> list[str]:
|
||||
if not raw_value:
|
||||
return []
|
||||
try:
|
||||
parsed = json.loads(raw_value)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
if not isinstance(parsed, list):
|
||||
return []
|
||||
return [str(move) for move in parsed]
|
||||
Reference in New Issue
Block a user