import json from contextlib import contextmanager from dataclasses import dataclass from typing import Iterator, Optional from flask import g, has_app_context from app.db import connect_db PIECE_IMAGE_BY_SYMBOL = { "P": "wp.png", "N": "wn.png", "B": "wb.png", "R": "wr.png", "Q": "wq.png", "K": "wk.png", "p": "bp.png", "n": "bn.png", "b": "bb.png", "r": "br.png", "q": "bq.png", "k": "bk.png", } @dataclass class SavedGameSummary: id: int opponent_id: int opponent_username: str my_color: str opponent_color: str winner_color: Optional[str] termination: Optional[str] termination_detail: Optional[str] time_mode: Optional[str] move_count: int started_at: Optional[str] ended_at: Optional[str] @property def result(self) -> str: if self.winner_color == "draw" or not self.winner_color: return "draw" return "win" if self.winner_color == self.my_color else "loss" @dataclass class SavedGameDetail(SavedGameSummary): final_fen: str move_history: list[str] @contextmanager def _db_session() -> Iterator: if has_app_context() and "db" in g: yield g.db return db = connect_db() try: yield db db.commit() finally: db.close() def save_finished_game( *, white_player_id: int, black_player_id: int, final_fen: str, termination: str, termination_detail: str, winner_color: Optional[str], move_history: list[str], time_mode: Optional[str], started_at: Optional[str], ended_at: Optional[str], ) -> int: winner_value = winner_color if winner_color in {"white", "black"} else "draw" with _db_session() as db: cursor = db.execute( """ INSERT INTO games ( final_fen, termination, termination_detail, winner_color, move_history, time_mode, started_at, ended_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( final_fen, termination, termination_detail, winner_value, json.dumps(move_history), time_mode, started_at, ended_at, ), ) game_id = int(cursor.lastrowid) db.executemany( """ INSERT INTO game_players (game_id, player_id, color) VALUES (?, ?, ?) """, [ (game_id, white_player_id, "white"), (game_id, black_player_id, "black"), ], ) db.commit() return game_id def list_games_for_user(user_id: int) -> list[SavedGameSummary]: with _db_session() as db: rows = db.execute( """ SELECT g.id, g.winner_color, g.termination, g.termination_detail, g.time_mode, g.started_at, g.ended_at, g.move_history, me.color AS my_color, opp.color AS opponent_color, u.id AS opponent_id, u.username AS opponent_username FROM games g JOIN game_players me ON me.game_id = g.id AND me.player_id = ? JOIN game_players opp ON opp.game_id = g.id AND opp.player_id != ? JOIN users u ON u.id = opp.player_id ORDER BY COALESCE(g.ended_at, g.created_at) DESC, g.id DESC """, (user_id, user_id), ).fetchall() return [_row_to_summary(row) for row in rows] def get_game_for_user(game_id: int, user_id: int) -> Optional[SavedGameDetail]: with _db_session() as db: row = db.execute( """ SELECT g.id, g.final_fen, g.winner_color, g.termination, g.termination_detail, g.time_mode, g.started_at, g.ended_at, g.move_history, me.color AS my_color, opp.color AS opponent_color, u.id AS opponent_id, u.username AS opponent_username FROM games g JOIN game_players me ON me.game_id = g.id AND me.player_id = ? JOIN game_players opp ON opp.game_id = g.id AND opp.player_id != ? JOIN users u ON u.id = opp.player_id WHERE g.id = ? LIMIT 1 """, (user_id, user_id, game_id), ).fetchone() if not row: return None summary = _row_to_summary(row) return SavedGameDetail( **summary.__dict__, final_fen=row["final_fen"] or "", move_history=_parse_move_history(row["move_history"]), ) def build_board_rows(fen: str) -> list[list[dict[str, Optional[str]]]]: board_part = (fen or "").split(" ", 1)[0] raw_rows = board_part.split("/") if len(raw_rows) != 8: raw_rows = ["8"] * 8 rows: list[list[dict[str, Optional[str]]]] = [] for rank_index, raw_row in enumerate(raw_rows): row: list[dict[str, Optional[str]]] = [] file_index = 0 for char in raw_row: if char.isdigit(): for _ in range(int(char)): row.append( { "square": f"{'abcdefgh'[file_index]}{8 - rank_index}", "piece_image": None, "shade": "light" if (file_index + rank_index) % 2 == 0 else "dark", } ) file_index += 1 continue row.append( { "square": f"{'abcdefgh'[file_index]}{8 - rank_index}", "piece_image": PIECE_IMAGE_BY_SYMBOL.get(char), "shade": "light" if (file_index + rank_index) % 2 == 0 else "dark", } ) file_index += 1 while len(row) < 8: row.append( { "square": f"{'abcdefgh'[len(row)]}{8 - rank_index}", "piece_image": None, "shade": "light" if (len(row) + rank_index) % 2 == 0 else "dark", } ) rows.append(row) return rows def group_move_pairs(move_history: list[str]) -> list[dict[str, str]]: pairs = [] for index in range(0, len(move_history), 2): pairs.append( { "move_no": str((index // 2) + 1), "white": move_history[index], "black": move_history[index + 1] if index + 1 < len(move_history) else "", } ) return pairs def _row_to_summary(row) -> SavedGameSummary: move_history = _parse_move_history(row["move_history"]) return SavedGameSummary( id=row["id"], opponent_id=row["opponent_id"], opponent_username=row["opponent_username"], my_color=row["my_color"], opponent_color=row["opponent_color"], winner_color=row["winner_color"], termination=row["termination"], termination_detail=row["termination_detail"], time_mode=row["time_mode"], move_count=len(move_history), started_at=row["started_at"], ended_at=row["ended_at"], ) def _parse_move_history(raw_value: Optional[str]) -> list[str]: if not raw_value: return [] try: parsed = json.loads(raw_value) except json.JSONDecodeError: return [] if not isinstance(parsed, list): return [] return [str(move) for move in parsed]