"""
mmflow Open API — Python SDK (single-file, zero deps).

Drop this file into your project and instantiate the client:

    from mmflow import MmflowClient
    mm = MmflowClient()
    resp = mm.perps_oi(coins=["BTC", "ETH"])
    for row in resp.data:
        print(row["coin"], row["oiUsd"])

Streaming endpoints are sync generators:

    for ev in mm.stream_whales(min_usd=250_000):
        if ev.type == "whale":
            print(ev.data["coin"], ev.data["notionalUsd"])
        if ev.type == "bye":
            break

Standard library only. Tested on CPython 3.10+.

MIT license. mmflow.ai
"""
from __future__ import annotations

import json
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Generator, Iterable, Literal, Optional


DEFAULT_BASE = "https://mmflow.ai/api/v1"
USER_AGENT = "mmflow-python-sdk/1.0"


# ── Response wrapper ──────────────────────────────────────────


@dataclass
class ApiResponse:
    """Wraps the standard mmflow envelope: { data, meta }."""

    data: Any
    meta: dict

    @classmethod
    def from_json(cls, payload: dict) -> "ApiResponse":
        return cls(data=payload.get("data"), meta=payload.get("meta", {}))


# ── Stream event ──────────────────────────────────────────────


@dataclass
class StreamEvent:
    """One SSE frame. `.type` is the event name (whale|trade|funding|
    heartbeat|bye); `.data` is the JSON-decoded payload."""

    type: str
    data: Any
    id: Optional[str] = None


# ── Exceptions ────────────────────────────────────────────────


class MmflowApiError(Exception):
    """Raised on non-2xx HTTP responses."""

    def __init__(self, status: int, body: Any):
        super().__init__(f"mmflow API {status}")
        self.status = status
        self.body = body


# ── Client ────────────────────────────────────────────────────


class MmflowClient:
    """Sync client for the mmflow Open API.

    Standard library only — no requests / httpx dependency.
    """

    def __init__(
        self,
        base_url: str = DEFAULT_BASE,
        timeout: float = 15.0,
    ):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout

    # ── internal helpers ─────────────────────────────────

    def _url(self, path: str, params: Optional[dict] = None) -> str:
        url = self.base_url + path
        clean = {
            k: v
            for k, v in (params or {}).items()
            if v is not None and v != ""
        }
        if clean:
            url += "?" + urllib.parse.urlencode(clean)
        return url

    def _get(self, path: str, params: Optional[dict] = None) -> ApiResponse:
        return self._get_url(self._url(path, params))

    def _get_url(self, url: str) -> ApiResponse:
        req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
        try:
            with urllib.request.urlopen(req, timeout=self.timeout) as resp:
                body = json.loads(resp.read())
                return ApiResponse.from_json(body)
        except urllib.error.HTTPError as e:
            try:
                body = json.loads(e.read())
            except Exception:
                body = None
            raise MmflowApiError(e.code, body) from e

    def _stream(
        self,
        path: str,
        params: Optional[dict] = None,
    ) -> Generator[StreamEvent, None, None]:
        url = self._url(path, params)
        req = urllib.request.Request(
            url,
            headers={"User-Agent": USER_AGENT, "Accept": "text/event-stream"},
        )
        # NB: timeout here is per-read, not total. SSE streams idle
        # between events; the server emits a heartbeat every 15-30s
        # so a 60s read-timeout is comfortable.
        resp = urllib.request.urlopen(req, timeout=60.0)
        try:
            buf = b""
            current_event = "message"
            current_id: Optional[str] = None
            while True:
                chunk = resp.read(1024)
                if not chunk:
                    break
                buf += chunk
                while b"\n" in buf:
                    line, _, buf = buf.partition(b"\n")
                    line = line.rstrip(b"\r")
                    if line == b"":
                        # SSE frame terminator. Reset event-name accumulator
                        # — data lines below emit immediately when seen.
                        current_event = "message"
                        current_id = None
                    elif line.startswith(b":"):
                        # SSE comment — ignore
                        pass
                    elif line.startswith(b"event:"):
                        current_event = line[6:].strip().decode("utf-8", "replace")
                    elif line.startswith(b"id:"):
                        current_id = line[3:].strip().decode("utf-8", "replace")
                    elif line.startswith(b"data:"):
                        payload = line[5:].strip().decode("utf-8", "replace")
                        try:
                            parsed = json.loads(payload)
                        except json.JSONDecodeError:
                            continue
                        yield StreamEvent(
                            type=current_event, data=parsed, id=current_id
                        )
        finally:
            try:
                resp.close()
            except Exception:
                pass

    # ── perps ────────────────────────────────────────────

    def perps_oi(
        self,
        coins: Optional[Iterable[str]] = None,
        venues: Optional[
            Iterable[
                Literal[
                    "hl", "bybit", "okx", "binance", "bitmex", "deribit", "gateio"
                ]
            ]
        ] = None,
    ) -> ApiResponse:
        """Cross-venue OI. Default venues=['hl']. Pass venues=['hl','bybit',
        'okx','binance','bitmex','deribit','gateio'] to fan out (adds ~500ms
        latency). The deriv venues (bitmex/deribit/gateio) report USD OI only
        (oiContracts=0) and default to BTC,ETH when no coins are given."""
        return self._get(
            "/perps/oi",
            {
                "coins": ",".join(coins) if coins else None,
                "venues": ",".join(venues) if venues else None,
            },
        )

    def perps_funding(
        self,
        coins: Optional[Iterable[str]] = None,
        venues: Optional[
            Iterable[
                Literal["hl", "binance", "bybit", "bitmex", "deribit", "gateio"]
            ]
        ] = None,
    ) -> ApiResponse:
        """Cross-venue funding. Default venues=['hl']. Each row carries its
        own periodHours (HL=1h; Binance/Bybit/BitMEX/Deribit/Gate.io=8h)."""
        return self._get(
            "/perps/funding",
            {
                "coins": ",".join(coins) if coins else None,
                "venues": ",".join(venues) if venues else None,
            },
        )

    def perps_liquidations(
        self, coins: Optional[Iterable[str]] = None
    ) -> ApiResponse:
        return self._get(
            "/perps/liquidations",
            {"coins": ",".join(coins) if coins else None},
        )

    def perps_positioning(
        self,
        coins: Optional[Iterable[str]] = None,
        period: Optional[Literal["15m", "1h", "4h"]] = None,
    ) -> ApiResponse:
        return self._get(
            "/perps/positioning",
            {
                "coins": ",".join(coins) if coins else None,
                "period": period,
            },
        )

    def perps_whales(
        self,
        min_usd: Optional[float] = None,
        coin: Optional[str] = None,
        since_ms: Optional[int] = None,
        limit: Optional[int] = None,
    ) -> ApiResponse:
        return self._get(
            "/perps/whales",
            {
                "minUsd": min_usd,
                "coin": coin,
                "sinceMs": since_ms,
                "limit": limit,
            },
        )

    def perps_oi_history(
        self,
        coin: str,
        since_ms: Optional[int] = None,
    ) -> ApiResponse:
        """HL OI historical time series. Data cron-snapshots every ~10min,
        ~7 day retention."""
        return self._get(
            "/perps/oi/history", {"coin": coin, "sinceMs": since_ms}
        )

    def perps_funding_history(
        self,
        coin: str,
        since_ms: Optional[int] = None,
    ) -> ApiResponse:
        """Cross-venue funding rate history per coin. Each point has a
        byVenue map of rates for HL, Binance, Bybit, OKX."""
        return self._get(
            "/perps/funding/history", {"coin": coin, "sinceMs": since_ms}
        )

    # ── options ──────────────────────────────────────────

    def options_chain(
        self, underlying: Literal["BTC", "ETH", "SOL", "XRP"]
    ) -> ApiResponse:
        return self._get("/options/chain", {"underlying": underlying})

    def options_gex(
        self, underlying: Literal["BTC", "ETH", "SOL", "XRP"]
    ) -> ApiResponse:
        return self._get("/options/gex", {"underlying": underlying})

    def options_skew(
        self, underlying: Literal["BTC", "ETH", "SOL", "XRP"]
    ) -> ApiResponse:
        return self._get("/options/skew", {"underlying": underlying})

    # ── on-chain ─────────────────────────────────────────

    def onchain_etf_flows(
        self, asset: Literal["bitcoin", "ethereum"] = "bitcoin"
    ) -> ApiResponse:
        return self._get("/onchain/etf-flows", {"asset": asset})

    def onchain_treasuries(
        self, asset: Literal["bitcoin", "ethereum"] = "bitcoin"
    ) -> ApiResponse:
        return self._get("/onchain/treasuries", {"asset": asset})

    def onchain_valuation(self) -> ApiResponse:
        return self._get("/onchain/valuation")

    # ── spot ─────────────────────────────────────────────

    def markets_spot(
        self, coins: Optional[Iterable[str]] = None
    ) -> ApiResponse:
        return self._get(
            "/markets/spot", {"coins": ",".join(coins) if coins else None}
        )

    # ── flows ───────────────────────────────────────────

    def flows_venue_spreads(
        self, coins: Optional[Iterable[str]] = None
    ) -> ApiResponse:
        return self._get(
            "/flows/venue-spreads", {"coins": ",".join(coins) if coins else None}
        )

    def flows_kimchi(self, coin: Optional[str] = None) -> ApiResponse:
        return self._get("/flows/kimchi", {"coin": coin})

    def flows_etf(
        self, asset: Literal["bitcoin", "ethereum"] = "bitcoin"
    ) -> ApiResponse:
        return self._get("/flows/etf", {"asset": asset})

    def flows_signals(
        self, coins: Optional[Iterable[str]] = None
    ) -> ApiResponse:
        return self._get(
            "/flows/signals", {"coins": ",".join(coins) if coins else None}
        )

    def flows_summary(
        self, coins: Optional[Iterable[str]] = None
    ) -> ApiResponse:
        return self._get(
            "/flows/summary", {"coins": ",".join(coins) if coins else None}
        )

    def flows_history(
        self,
        coin: str,
        metric: str,
        interval: str = "1m",
        since_ms: Optional[int] = None,
    ) -> ApiResponse:
        return self._get(
            "/flows/history",
            {
                "coin": coin,
                "metric": metric,
                "interval": interval,
                "sinceMs": since_ms,
            },
        )

    # ── discovery + generic aggregation ──────────────────

    def markets(
        self,
        exchange: Optional[str] = None,
        page_size: Optional[int] = None,
        page: Optional[int] = None,
    ) -> ApiResponse:
        """Venue catalog (omit exchange) or a venue's markets (pass exchange,
        e.g. 'polymarket', 'hyperliquid', 'gateio')."""
        return self._get(
            "/markets",
            {"exchange": exchange, "pageSize": page_size, "page": page},
        )

    def points(
        self,
        type: Literal["OI_AGG", "FUNDING_AGG", "SPOT"],
        exchanges: Iterable[str],
        coins: Optional[Iterable[str]] = None,
        group_by: Optional[
            Literal["GROUP_BY_TYPE_NONE", "GROUP_BY_TYPE_SUM"]
        ] = None,
    ) -> ApiResponse:
        """Generic cross-venue aggregation. type=OI_AGG|FUNDING_AGG|SPOT are
        live; advanced types return 501. group_by='GROUP_BY_TYPE_SUM' folds
        across exchanges per coin."""
        params = [("type", type)]
        for ex in exchanges:
            params.append(("exchange", ex))
        if coins:
            params.append(("coins", ",".join(coins)))
        if group_by:
            params.append(("transform.groupBy.type", group_by))
        url = self.base_url + "/points?" + urllib.parse.urlencode(params)
        return self._get_url(url)

    # ── polymarket ───────────────────────────────────────

    def polymarket_events(self, limit: Optional[int] = None) -> ApiResponse:
        return self._get("/polymarket/events", {"limit": limit})

    def polymarket_orderbook(self, token_id: str) -> ApiResponse:
        return self._get("/polymarket/orderbook", {"tokenId": token_id})

    # ── streams ──────────────────────────────────────────

    def stream_whales(
        self, min_usd: Optional[float] = None, coin: Optional[str] = None
    ) -> Generator[StreamEvent, None, None]:
        return self._stream(
            "/streams/whales", {"minUsd": min_usd, "coin": coin}
        )

    def stream_trades(
        self, coin: str, min_usd: Optional[float] = None
    ) -> Generator[StreamEvent, None, None]:
        return self._stream(
            "/streams/trades", {"coin": coin, "minUsd": min_usd}
        )

    def stream_funding(
        self,
        coins: Optional[Iterable[str]] = None,
        min_delta_bps: Optional[float] = None,
    ) -> Generator[StreamEvent, None, None]:
        return self._stream(
            "/streams/funding",
            {
                "coins": ",".join(coins) if coins else None,
                "minDeltaBps": min_delta_bps,
            },
        )

    # ── openapi ──────────────────────────────────────────

    def openapi(self) -> dict:
        url = self._url("/openapi")
        req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
        with urllib.request.urlopen(req, timeout=self.timeout) as resp:
            return json.loads(resp.read())


__all__ = [
    "MmflowClient",
    "MmflowApiError",
    "ApiResponse",
    "StreamEvent",
    "DEFAULT_BASE",
]
