This repository has been archived on 2026-03-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files

659 lines
19 KiB
Python

import random
import string
import time
from dataclasses import dataclass, field
from datetime import datetime
from threading import Lock
from typing import Optional
from app.chess_sim.game_board import (
ChessBoard,
Outcome,
Color,
BoardMove,
PieceType,
)
from flask_login import current_user
from flask import request
from flask_socketio import emit, join_room, leave_room
from app import sIO
from app.models.game import save_finished_game
from .types import validate_client_event
@dataclass
class GameRoom:
code: str
p1_sid: str
p1_user_id: int
p1_name: str
p1_pref: str
time_mode: str
p2_sid: Optional[str] = None
p2_user_id: Optional[int] = None
p2_name: Optional[str] = None
ready: dict[str, bool] = field(default_factory=dict)
board: ChessBoard = field(default_factory=ChessBoard.init_default)
color_by_sid: dict[str, str] = field(default_factory=dict)
initial_ms: int = 600000
increment_ms: int = 0
white_ms: int = 600000
black_ms: int = 600000
active_since: Optional[float] = None
game_active: bool = False
completed: bool = False
move_history: list[str] = field(default_factory=list)
started_at: Optional[str] = None
ended_at: Optional[str] = None
saved_game_id: Optional[int] = None
games_by_code: dict[str, GameRoom] = {}
code_by_sid: dict[str, str] = {}
rooms_lock = Lock()
clock_task_started = False
def _new_code(length: int = 6) -> str:
chars = string.ascii_uppercase + string.digits
while True:
code = "".join(random.choices(chars, k=length))
if code not in games_by_code:
return code
def _current_sid() -> Optional[str]:
sid = getattr(request, "sid", None)
return sid if isinstance(sid, str) and sid else None
def _parse_time_mode(time_mode: str) -> tuple[int, int]:
# "10+0" (minutes + increment seconds).
try:
left, right = time_mode.split("+", 1)
minutes = max(1, int(left))
increment_seconds = max(0, int(right))
except (ValueError, TypeError):
minutes = 10
increment_seconds = 0
return minutes * 60 * 1000, increment_seconds * 1000
def _room_for_sid(sid: str) -> Optional[GameRoom]:
code = code_by_sid.get(sid)
if not code:
return None
return games_by_code.get(code)
def _timestamp_now() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _sid_for_color(room: GameRoom, color: str) -> Optional[str]:
for sid, c in room.color_by_sid.items():
if c == color:
return sid
return None
def _turn_sid(room: GameRoom) -> Optional[str]:
return _sid_for_color(room, "w" if room.board.turn == Color.WHITE else "b")
def _clock_payload(room: GameRoom) -> dict:
return {
"white_time_left_ms": max(0, room.white_ms),
"black_time_left_ms": max(0, room.black_ms),
"turn": "w" if room.board.turn == Color.WHITE else "b",
}
def _apply_elapsed(room: GameRoom) -> Optional[str]:
if not room.game_active or room.completed or room.active_since is None:
return None
now = time.monotonic()
elapsed_ms = int((now - room.active_since) * 1000)
if elapsed_ms <= 0:
return None
active_color = "w" if room.board.turn == Color.WHITE else "b"
if active_color == "w":
room.white_ms = max(0, room.white_ms - elapsed_ms)
else:
room.black_ms = max(0, room.black_ms - elapsed_ms)
room.active_since = now
if room.white_ms <= 0:
return "w"
if room.black_ms <= 0:
return "b"
return None
def _cleanup_room(code: str) -> None:
room = games_by_code.pop(code, None)
if not room:
return
code_by_sid.pop(room.p1_sid, None)
if room.p2_sid:
code_by_sid.pop(room.p2_sid, None)
def _game_over_reason(board: ChessBoard) -> str:
if board.is_checkmate():
return "checkmate"
if board.is_stalemate():
return "stalemate"
# todo introduce game over reason insufficient material
# if board.is_insufficient_material():
# return "insufficient material"
if board.is_seventyfive_moves():
return "75-move rule"
if board.is_fivefold_repetition():
return "fivefold repetition"
return "game over"
_TERMINATION_MAP = {
"checkmate": "checkmate",
"resignation": "resignation",
"timeout": "timeout",
"stalemate": "draw",
"draw agreed": "draw",
"75-move rule": "draw",
"fivefold repetition": "draw",
"opponent disconnected": "other",
"game over": "other",
}
def _save_game(room: GameRoom, winner_color: Optional[str], reason: str = "other") -> Optional[int]:
"""Persist the finished game and return the saved game_id (or None on error)."""
if room.saved_game_id is not None:
return room.saved_game_id
try:
white_id = next(
(uid for sid, uid in [(room.p1_sid, room.p1_user_id), (room.p2_sid, room.p2_user_id)]
if room.color_by_sid.get(sid) == "w"),
room.p1_user_id,
)
black_id = next(
(uid for sid, uid in [(room.p1_sid, room.p1_user_id), (room.p2_sid, room.p2_user_id)]
if room.color_by_sid.get(sid) == "b"),
room.p2_user_id,
)
if black_id is None:
return None
termination = _TERMINATION_MAP.get(reason, "other")
room.ended_at = _timestamp_now()
game_id = save_finished_game(
white_player_id=white_id,
black_player_id=black_id,
final_fen=room.board.to_fen(),
termination=termination,
termination_detail=reason,
winner_color=winner_color,
move_history=room.move_history,
time_mode=room.time_mode,
started_at=room.started_at,
ended_at=room.ended_at,
)
room.saved_game_id = game_id
return game_id
except Exception as exc:
print(f"[save_game] failed: {exc}")
return None
def _emit_game_over(room: GameRoom, reason: str) -> None:
room.completed = True
room.game_active = False
outcome = room.board.outcome()
p1_result = "draw"
p2_result = "draw"
winner_color_str: Optional[str] = None
if outcome == Outcome.WHITE_WIN:
winner_color = "w"
winner_color_str = "white"
elif outcome == Outcome.BLACK_WIN:
winner_color = "b"
winner_color_str = "black"
else:
winner_color = None
if winner_color is not None:
if room.color_by_sid.get(room.p1_sid) == winner_color:
p1_result = "win"
p2_result = "loss"
else:
p1_result = "loss"
p2_result = "win"
game_id = _save_game(room, winner_color_str, reason)
emit("game_over", {"result": p1_result, "reason": reason, "game_id": game_id}, to=room.p1_sid)
if room.p2_sid:
emit("game_over", {"result": p2_result, "reason": reason, "game_id": game_id}, to=room.p2_sid)
def _emit_timeout(room: GameRoom, timed_out_color: str) -> None:
room.completed = True
room.game_active = False
reason = "timeout"
p1_color = room.color_by_sid.get(room.p1_sid)
p1_result = "loss" if p1_color == timed_out_color else "win"
p2_result = "win" if p1_result == "loss" else "loss"
winner_color_str = "white" if timed_out_color == "b" else "black"
game_id = _save_game(room, winner_color_str, reason)
sIO.emit("game_over", {"result": p1_result, "reason": reason, "game_id": game_id}, to=room.p1_sid)
if room.p2_sid:
sIO.emit("game_over", {"result": p2_result, "reason": reason, "game_id": game_id}, to=room.p2_sid)
def _start_game_if_ready(room: GameRoom) -> None:
if not room.p2_sid:
return
if not room.ready.get(room.p1_sid) or not room.ready.get(room.p2_sid):
return
room.board = ChessBoard.init_default()
if room.p1_pref == "w":
p1_color = "w"
elif room.p1_pref == "b":
p1_color = "b"
else:
p1_color = random.choice(["w", "b"])
p2_color = "b" if p1_color == "w" else "w"
room.color_by_sid = {room.p1_sid: p1_color, room.p2_sid: p2_color}
room.initial_ms, room.increment_ms = _parse_time_mode(room.time_mode)
room.white_ms = room.initial_ms
room.black_ms = room.initial_ms
room.active_since = time.monotonic()
room.game_active = True
room.completed = False
room.started_at = _timestamp_now()
p1_payload = {
"play_as": p1_color,
"time_left_ms": room.white_ms if p1_color == "w" else room.black_ms,
"fen": room.board.to_fen(),
"opponent": room.p2_name,
**_clock_payload(room),
}
p2_payload = {
"play_as": p2_color,
"time_left_ms": room.white_ms if p2_color == "w" else room.black_ms,
"fen": room.board.to_fen(),
"opponent": room.p1_name,
**_clock_payload(room),
}
emit("game_started", p1_payload, to=room.p1_sid)
emit("game_started", p2_payload, to=room.p2_sid)
def _clock_worker() -> None:
while True:
sIO.sleep(1)
with rooms_lock:
rooms = list(games_by_code.values())
for room in rooms:
if not room.game_active or room.completed:
continue
with rooms_lock:
timed_out = _apply_elapsed(room)
payload = _clock_payload(room)
if timed_out:
_emit_timeout(room, timed_out)
continue
sIO.emit("clock_tick", payload, to=room.code)
@sIO.on("connect")
def on_connect():
global clock_task_started
if not current_user.is_authenticated:
return False
with rooms_lock:
if not clock_task_started:
sIO.start_background_task(_clock_worker)
clock_task_started = True
emit("server_ready", {"ok": True})
@sIO.on("disconnect")
def on_disconnect():
sid = _current_sid()
if not sid:
return
with rooms_lock:
room = _room_for_sid(sid)
if not room:
return
code = room.code
other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid
leave_room(code)
code_by_sid.pop(sid, None)
if sid == room.p1_sid:
if room.p2_sid:
emit("game_over", {"result": "win", "reason": "opponent disconnected"}, to=room.p2_sid)
_cleanup_room(code)
return
room.p2_sid = None
room.p2_name = None
room.ready.pop(sid, None)
room.color_by_sid.pop(sid, None)
room.board = ChessBoard.init_default()
room.ready[room.p1_sid] = False
room.game_active = False
room.completed = False
room.active_since = None
if other_sid:
emit("p2_connected", {"p2_name": None, "ready": False}, to=other_sid)
def _make_room(sid: str, play_as: str, time_mode: str) -> GameRoom:
return GameRoom(
code="", # caller will fill in the actual code
p1_sid=sid,
p1_user_id=current_user.id,
p1_name=current_user.username,
p1_pref=play_as,
time_mode=time_mode,
ready={sid: False},
)
@sIO.on("create_code_game")
def on_create_code_game(payload):
ok, err = validate_client_event("create_code_game", payload)
if not ok:
emit("move_reject", {"reason": err})
return
sid = _current_sid()
if not sid:
return
with rooms_lock:
existing = _room_for_sid(sid)
if existing:
_cleanup_room(existing.code)
code = _new_code()
room = _make_room(sid, payload["play_as"], payload["time_mode"])
room.code = code # assign after generation so constructor stays pure
games_by_code[code] = room
code_by_sid[sid] = code
join_room(code)
emit("code_game_created", {"code": code}, to=sid)
@sIO.on("join_code_game")
def on_join_code_game(payload):
ok, err = validate_client_event("join_code_game", payload)
if not ok:
emit("code_game_join_failed", {"reason": err})
return
sid = _current_sid()
if not sid:
return
code = payload["code"].upper().strip()
with rooms_lock:
room = games_by_code.get(code)
if not room:
emit("code_game_join_failed", {"reason": "game code not found"})
return
if room.p2_sid and room.p2_sid != sid:
emit("code_game_join_failed", {"reason": "game already full"})
return
if sid == room.p1_sid:
emit("code_game_join_failed", {"reason": "cannot join your own code"})
return
previous = _room_for_sid(sid)
if previous:
_cleanup_room(previous.code)
room.p2_sid = sid
room.p2_user_id = current_user.id
room.p2_name = current_user.username
room.ready.setdefault(room.p1_sid, False)
room.ready[sid] = False
code_by_sid[sid] = code
join_room(code)
emit("code_game_joined", {"p1_name": room.p1_name}, to=sid)
emit("p2_connected", {"p2_name": room.p2_name, "ready": False}, to=room.p1_sid)
@sIO.on("user_ready")
def on_user_ready(payload):
ok, err = validate_client_event("user_ready", payload)
if not ok:
emit("move_reject", {"reason": err})
return
sid = _current_sid()
if not sid:
return
with rooms_lock:
room = _room_for_sid(sid)
if not room:
emit("move_reject", {"reason": "not in a game room"})
return
room.ready[sid] = bool(payload["ready"])
other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid
if other_sid:
emit(
"p2_connected",
{"p2_name": current_user.username, "ready": room.ready[sid]},
to=other_sid,
)
_start_game_if_ready(room)
# helpers for converting client payloads to engine moves and back
def _payload_to_move(payload: dict) -> BoardMove:
from_sq = ChessBoard.algebraic_to_pos(payload["from_square"])
to_sq = ChessBoard.algebraic_to_pos(payload["to_square"])
promotion = payload.get("promotion")
prom_piece: Optional[PieceType] = None
if promotion:
mapping = {
"q": PieceType.QUEEN,
"r": PieceType.ROOK,
"b": PieceType.BISHOP,
"n": PieceType.KNIGHT,
}
prom_piece = mapping.get(promotion.lower())
if prom_piece is None:
raise ValueError("invalid promotion piece")
return BoardMove(from_sq, to_sq, promotion_piece=prom_piece)
def _move_to_san(move: BoardMove) -> str:
# todo can be improved, very simple
return f"{ChessBoard.pos_to_algebraic(move.m_from)}{ChessBoard.pos_to_algebraic(move.m_to)}"
@sIO.on("move_request")
def on_move_request(payload):
ok, err = validate_client_event("move_request", payload)
if not ok:
emit("move_reject", {"reason": err})
return
sid = _current_sid()
if not sid:
return
with rooms_lock:
room = _room_for_sid(sid)
if not room:
emit("move_reject", {"reason": "not in a game room"})
return
if not room.color_by_sid:
emit("move_reject", {"reason": "game not started"})
return
timed_out = _apply_elapsed(room)
if timed_out:
_emit_timeout(room, timed_out)
return
if _turn_sid(room) != sid:
emit("move_reject", {"reason": "not your turn"})
return
try:
move = _payload_to_move(payload)
except ValueError:
print("rejecting move due to invalid move format")
emit("move_reject", {"reason": "invalid move format"}, to=sid)
return
current_color = room.board.turn
legal = room.board.generate_moves(current_color)
if move not in legal:
# todo: finish debugging castling
# print(f"current_color: {current_color}")
# print("rejecting move due to illegal move")
# print(f"move: {move}")
# room.board.print_legal_moves(current_color)
emit("move_reject", {"reason": "illegal move"}, to=sid)
return
mover_color = room.color_by_sid.get(sid)
san = _move_to_san(move)
# perform the move on our own engine
room.board.make_move(move, current_color)
room.move_history.append(san)
if mover_color == "w":
room.white_ms += room.increment_ms
elif mover_color == "b":
room.black_ms += room.increment_ms
room.active_since = time.monotonic()
move_payload = {
"from_square": payload["from_square"],
"to_square": payload["to_square"],
"promotion": payload.get("promotion"),
"san": san,
"time_left_ms": room.white_ms if mover_color == "w" else room.black_ms,
"fen": room.board.to_fen(),
**_clock_payload(room),
}
emit("move_accept", move_payload, to=sid)
other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid
if other_sid:
emit("user_move", move_payload, to=other_sid)
if room.board.outcome() != Outcome.NOT_FINISHED:
print(f"ending a game due to outcome: { room.board.outcome()}")
_emit_game_over(room, _game_over_reason(room.board))
@sIO.on("request_resign")
def on_request_resign(payload):
ok, err = validate_client_event("request_resign", payload)
if not ok:
emit("move_reject", {"reason": err})
return
sid = _current_sid()
if not sid:
return
with rooms_lock:
room = _room_for_sid(sid)
if not room:
return
room.completed = True
room.game_active = False
reason = "resignation"
other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid
resigning_color = room.color_by_sid.get(sid)
winner_color_str = "black" if resigning_color == "w" else "white"
game_id = _save_game(room, winner_color_str, reason)
emit("game_over", {"result": "loss", "reason": reason, "game_id": game_id}, to=sid)
if other_sid:
emit("game_over", {"result": "win", "reason": reason, "game_id": game_id}, to=other_sid)
@sIO.on("request_draw")
def on_request_draw(payload):
ok, err = validate_client_event("request_draw", payload)
if not ok:
emit("move_reject", {"reason": err})
return
sid = _current_sid()
if not sid:
return
with rooms_lock:
room = _room_for_sid(sid)
if not room:
return
if payload.get("accepted") is True:
room.completed = True
room.game_active = False
game_id = _save_game(room, None, "draw agreed")
sIO.emit("game_over", {"result": "draw", "reason": "draw agreed", "game_id": game_id}, to=room.code)
return
other_sid = room.p2_sid if sid == room.p1_sid else room.p1_sid
if payload.get("accepted") is False:
if other_sid:
emit("move_reject", {"reason": "draw offer declined"}, to=other_sid)
return
if other_sid:
emit("draw_offer", {"from": current_user.username}, to=other_sid)