import random import string import time from dataclasses import dataclass, field from datetime import datetime from threading import Lock from typing import Optional from app.chess_sim.game_board import ( ChessBoard, Outcome, Color, BoardMove, PieceType, ) from flask_login import current_user from flask import request from flask_socketio import emit, join_room, leave_room from app import sIO from app.models.game import save_finished_game from .types import validate_client_event @dataclass class GameRoom: code: str p1_sid: str p1_user_id: int p1_name: str p1_pref: str time_mode: str p2_sid: Optional[str] = None p2_user_id: Optional[int] = None p2_name: Optional[str] = None ready: dict[str, bool] = field(default_factory=dict) board: ChessBoard = field(default_factory=ChessBoard.init_default) color_by_sid: dict[str, str] = field(default_factory=dict) initial_ms: int = 600000 increment_ms: int = 0 white_ms: int = 600000 black_ms: int = 600000 active_since: Optional[float] = None game_active: bool = False completed: bool = False move_history: list[str] = field(default_factory=list) started_at: Optional[str] = None ended_at: Optional[str] = None saved_game_id: Optional[int] = None games_by_code: dict[str, GameRoom] = {} code_by_sid: dict[str, str] = {} rooms_lock = Lock() clock_task_started = False def _new_code(length: int = 6) -> str: chars = string.ascii_uppercase + string.digits while True: code = "".join(random.choices(chars, k=length)) if code not in games_by_code: return code def _current_sid() -> Optional[str]: sid = getattr(request, "sid", None) return sid if isinstance(sid, str) and sid else None def _parse_time_mode(time_mode: str) -> tuple[int, int]: # "10+0" (minutes + increment seconds). try: left, right = time_mode.split("+", 1) minutes = max(1, int(left)) increment_seconds = max(0, int(right)) except (ValueError, TypeError): minutes = 10 increment_seconds = 0 return minutes * 60 * 1000, increment_seconds * 1000 def _room_for_sid(sid: str) -> Optional[GameRoom]: code = code_by_sid.get(sid) if not code: return None return games_by_code.get(code) def _timestamp_now() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _sid_for_color(room: GameRoom, color: str) -> Optional[str]: for sid, c in room.color_by_sid.items(): if c == color: return sid return None def _turn_sid(room: GameRoom) -> Optional[str]: return _sid_for_color(room, "w" if room.board.turn == Color.WHITE else "b") def _clock_payload(room: GameRoom) -> dict: return { "white_time_left_ms": max(0, room.white_ms), "black_time_left_ms": max(0, room.black_ms), "turn": "w" if room.board.turn == Color.WHITE else "b", } def _apply_elapsed(room: GameRoom) -> Optional[str]: if not room.game_active or room.completed or room.active_since is None: return None now = time.monotonic() elapsed_ms = int((now - room.active_since) * 1000) if elapsed_ms <= 0: return None active_color = "w" if room.board.turn == Color.WHITE else "b" if active_color == "w": room.white_ms = max(0, room.white_ms - elapsed_ms) else: room.black_ms = max(0, room.black_ms - elapsed_ms) room.active_since = now if room.white_ms <= 0: return "w" if room.black_ms <= 0: return "b" return None def _cleanup_room(code: str) -> None: room = games_by_code.pop(code, None) if not room: return code_by_sid.pop(room.p1_sid, None) if room.p2_sid: code_by_sid.pop(room.p2_sid, None) def _game_over_reason(board: ChessBoard) -> str: if board.is_checkmate(): return "checkmate" if board.is_stalemate(): return "stalemate" # todo introduce game over reason insufficient material # if board.is_insufficient_material(): # return "insufficient material" if board.is_seventyfive_moves(): return "75-move rule" if board.is_fivefold_repetition(): return "fivefold repetition" return "game over" def _emit_game_over(room: GameRoom, reason: str) -> None: room.completed = True room.game_active = False outcome = room.board.outcome() p1_result = "draw" p2_result = "draw" if outcome == Outcome.WHITE_WIN: winner_color = "w" else: winner_color = "b" if room.color_by_sid.get(room.p1_sid) == winner_color: p1_result = "win" p2_result = "loss" else: p1_result = "loss" p2_result = "win" emit("game_over", {"result": p1_result, "reason": reason}, to=room.p1_sid) if room.p2_sid: emit("game_over", {"result": p2_result, "reason": reason}, to=room.p2_sid) def _emit_timeout(room: GameRoom, timed_out_color: str) -> None: room.completed = True room.game_active = False reason = "timeout" p1_color = room.color_by_sid.get(room.p1_sid) p1_result = "loss" if p1_color == timed_out_color else "win" p2_result = "win" if p1_result == "loss" else "loss" sIO.emit("game_over", {"result": p1_result, "reason": reason}, to=room.p1_sid) if room.p2_sid: sIO.emit("game_over", {"result": p2_result, "reason": reason}, to=room.p2_sid) def _start_game_if_ready(room: GameRoom) -> None: if not room.p2_sid: return if not room.ready.get(room.p1_sid) or not room.ready.get(room.p2_sid): return room.board = ChessBoard.init_default() if room.p1_pref == "w": p1_color = "w" elif room.p1_pref == "b": p1_color = "b" else: p1_color = random.choice(["w", "b"]) p2_color = "b" if p1_color == "w" else "w" room.color_by_sid = {room.p1_sid: p1_color, room.p2_sid: p2_color} room.initial_ms, room.increment_ms = _parse_time_mode(room.time_mode) room.white_ms = room.initial_ms room.black_ms = room.initial_ms room.active_since = time.monotonic() room.game_active = True room.completed = False p1_payload = { "play_as": p1_color, "time_left_ms": room.white_ms if p1_color == "w" else room.black_ms, "fen": room.board.to_fen(), "opponent": room.p2_name, **_clock_payload(room), } p2_payload = { "play_as": p2_color, "time_left_ms": room.white_ms if p2_color == "w" else room.black_ms, "fen": room.board.to_fen(), "opponent": room.p1_name, **_clock_payload(room), } emit("game_started", p1_payload, to=room.p1_sid) emit("game_started", p2_payload, to=room.p2_sid) def _clock_worker() -> None: while True: sIO.sleep(1) with rooms_lock: rooms = list(games_by_code.values()) for room in rooms: if not room.game_active or room.completed: continue with rooms_lock: timed_out = _apply_elapsed(room) payload = _clock_payload(room) if timed_out: _emit_timeout(room, timed_out) continue sIO.emit("clock_tick", payload, to=room.code) @sIO.on("connect") def on_connect(): global clock_task_started if not current_user.is_authenticated: return False with rooms_lock: if not clock_task_started: sIO.start_background_task(_clock_worker) clock_task_started = True emit("server_ready", {"ok": True}) @sIO.on("disconnect") def on_disconnect(): sid = _current_sid() if not sid: return with rooms_lock: room = _room_for_sid(sid) if not room: return code = room.code other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid leave_room(code) code_by_sid.pop(sid, None) if sid == room.p1_sid: if room.p2_sid: emit("game_over", {"result": "win", "reason": "opponent disconnected"}, to=room.p2_sid) _cleanup_room(code) return room.p2_sid = None room.p2_name = None room.ready.pop(sid, None) room.color_by_sid.pop(sid, None) room.board = ChessBoard.init_default() room.ready[room.p1_sid] = False room.game_active = False room.completed = False room.active_since = None if other_sid: emit("p2_connected", {"p2_name": None, "ready": False}, to=other_sid) def _make_room(sid: str, play_as: str, time_mode: str) -> GameRoom: return GameRoom( code="", # caller will fill in the actual code p1_sid=sid, p1_user_id=current_user.id, p1_name=current_user.username, p1_pref=play_as, time_mode=time_mode, ready={sid: False}, ) @sIO.on("create_code_game") def on_create_code_game(payload): ok, err = validate_client_event("create_code_game", payload) if not ok: emit("move_reject", {"reason": err}) return sid = _current_sid() if not sid: return with rooms_lock: existing = _room_for_sid(sid) if existing: _cleanup_room(existing.code) code = _new_code() room = _make_room(sid, payload["play_as"], payload["time_mode"]) room.code = code # assign after generation so constructor stays pure games_by_code[code] = room code_by_sid[sid] = code join_room(code) emit("code_game_created", {"code": code}, to=sid) @sIO.on("join_code_game") def on_join_code_game(payload): ok, err = validate_client_event("join_code_game", payload) if not ok: emit("code_game_join_failed", {"reason": err}) return sid = _current_sid() if not sid: return code = payload["code"].upper().strip() with rooms_lock: room = games_by_code.get(code) if not room: emit("code_game_join_failed", {"reason": "game code not found"}) return if room.p2_sid and room.p2_sid != sid: emit("code_game_join_failed", {"reason": "game already full"}) return if sid == room.p1_sid: emit("code_game_join_failed", {"reason": "cannot join your own code"}) return previous = _room_for_sid(sid) if previous: _cleanup_room(previous.code) room.p2_sid = sid room.p2_name = current_user.username room.ready.setdefault(room.p1_sid, False) room.ready[sid] = False code_by_sid[sid] = code join_room(code) emit("code_game_joined", {"p1_name": room.p1_name}, to=sid) emit("p2_connected", {"p2_name": room.p2_name, "ready": False}, to=room.p1_sid) @sIO.on("user_ready") def on_user_ready(payload): ok, err = validate_client_event("user_ready", payload) if not ok: emit("move_reject", {"reason": err}) return sid = _current_sid() if not sid: return with rooms_lock: room = _room_for_sid(sid) if not room: emit("move_reject", {"reason": "not in a game room"}) return room.ready[sid] = bool(payload["ready"]) other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid if other_sid: emit( "p2_connected", {"p2_name": current_user.username, "ready": room.ready[sid]}, to=other_sid, ) _start_game_if_ready(room) # helpers for converting client payloads to engine moves and back def _payload_to_move(payload: dict) -> BoardMove: from_sq = ChessBoard.algebraic_to_pos(payload["from_square"]) to_sq = ChessBoard.algebraic_to_pos(payload["to_square"]) promotion = payload.get("promotion") prom_piece: Optional[PieceType] = None if promotion: mapping = { "q": PieceType.QUEEN, "r": PieceType.ROOK, "b": PieceType.BISHOP, "n": PieceType.KNIGHT, } prom_piece = mapping.get(promotion.lower()) if prom_piece is None: raise ValueError("invalid promotion piece") return BoardMove(from_sq, to_sq, promotion_piece=prom_piece) def _move_to_san(move: BoardMove) -> str: # todo can be improved, very simple return f"{ChessBoard.pos_to_algebraic(move.m_from)}{ChessBoard.pos_to_algebraic(move.m_to)}" @sIO.on("move_request") def on_move_request(payload): ok, err = validate_client_event("move_request", payload) if not ok: emit("move_reject", {"reason": err}) return sid = _current_sid() if not sid: return with rooms_lock: room = _room_for_sid(sid) if not room: emit("move_reject", {"reason": "not in a game room"}) return if not room.color_by_sid: emit("move_reject", {"reason": "game not started"}) return timed_out = _apply_elapsed(room) if timed_out: _emit_timeout(room, timed_out) return if _turn_sid(room) != sid: emit("move_reject", {"reason": "not your turn"}) return try: move = _payload_to_move(payload) except ValueError: print("rejecting move due to invalid move format") emit("move_reject", {"reason": "invalid move format"}, to=sid) return current_color = room.board.turn legal = room.board.generate_moves(current_color) if move not in legal: print(f"current_color: {current_color}") print("rejecting move due to illegal move") print(f"move: {move}") room.board.print_legal_moves(current_color) emit("move_reject", {"reason": "illegal move"}, to=sid) return mover_color = room.color_by_sid.get(sid) san = _move_to_san(move) # perform the move on our own engine room.board.make_move(move, current_color) room.move_history.append(san) if mover_color == "w": room.white_ms += room.increment_ms elif mover_color == "b": room.black_ms += room.increment_ms room.active_since = time.monotonic() move_payload = { "from_square": payload["from_square"], "to_square": payload["to_square"], "promotion": payload.get("promotion"), "san": san, "time_left_ms": room.white_ms if mover_color == "w" else room.black_ms, "fen": room.board.to_fen(), **_clock_payload(room), } emit("move_accept", move_payload, to=sid) other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid if other_sid: emit("user_move", move_payload, to=other_sid) if room.board.outcome() != Outcome.NOT_FINISHED: print(f"ending a game due to outcome: { room.board.outcome()}") _emit_game_over(room, _game_over_reason(room.board)) @sIO.on("request_resign") def on_request_resign(payload): ok, err = validate_client_event("request_resign", payload) if not ok: emit("move_reject", {"reason": err}) return sid = _current_sid() if not sid: return with rooms_lock: room = _room_for_sid(sid) if not room: return room.completed = True room.game_active = False reason = "resignation" other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid emit("game_over", {"result": "loss", "reason": reason}, to=sid) if other_sid: emit("game_over", {"result": "win", "reason": reason}, to=other_sid) @sIO.on("request_draw") def on_request_draw(payload): ok, err = validate_client_event("request_draw", payload) if not ok: emit("move_reject", {"reason": err}) return sid = _current_sid() if not sid: return with rooms_lock: room = _room_for_sid(sid) if not room: return if payload.get("accepted") is True: room.completed = True room.game_active = False emit("game_over", {"result": "draw", "reason": "draw agreed"}, to=room.code) return other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid if payload.get("accepted") is False: if other_sid: emit("move_reject", {"reason": "draw offer declined"}, to=other_sid) return if other_sid: emit("draw_offer", {"from": current_user.username}, to=other_sid)