"""
ExeWatch Python SDK v0.1.0

Lightweight APM / observability SDK for Python applications.
Zero external dependencies — uses only the Python 3.7+ standard library.

Usage:
    from exewatch import initialize_exewatch, ew, finalize_exewatch

    initialize_exewatch("ew_win_xxx", "my-customer")
    ew().info("Application started", "startup")
    ew().start_timing("db_query")
    # ... do work ...
    ew().end_timing("db_query")
    finalize_exewatch()

Copyright (c) 2026 - bit Time Professionals
"""

from __future__ import annotations

import atexit
import ctypes
import datetime
import getpass
import hashlib
import json
import logging
import os
import platform
import queue
import random
import re
import socket
import struct
import sys
import threading
import time
import uuid
from enum import IntEnum
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
)
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SDK_VERSION = "0.1.0"
API_VERSION = "v1"
DEFAULT_ENDPOINT = "https://exewatch.com"

DEFAULT_BUFFER_SIZE = 100
DEFAULT_FLUSH_INTERVAL_MS = 5000
DEFAULT_RETRY_INTERVAL_MS = 30000
DEFAULT_SAMPLE_RATE = 1.0
DEFAULT_GAUGE_SAMPLING_INTERVAL_SEC = 30
MIN_GAUGE_SAMPLING_INTERVAL_SEC = 10
DEFAULT_MAX_PENDING_AGE_DAYS = 7

MAX_BREADCRUMBS = 20
MAX_PENDING_TIMINGS = 100
MAX_REGISTERED_GAUGES = 20

LOG_FILE_EXT = ".ewlog"
DEVICE_FILE_EXT = ".ewdevice"
METRIC_FILE_EXT = ".ewmetric"
SENDING_EXT = ".sending"

METRIC_FLUSH_INTERVAL_MS = 30000

INTERNAL_LOG_TAG = "exewatch"

# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------

class LogLevel(IntEnum):
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    FATAL = 4

    def to_str(self) -> str:
        return self.name.lower()

    @staticmethod
    def from_str(s: str) -> "LogLevel":
        return _LOG_LEVEL_MAP.get(s.lower(), LogLevel.DEBUG)


_LOG_LEVEL_MAP = {
    "debug": LogLevel.DEBUG,
    "info": LogLevel.INFO,
    "warning": LogLevel.WARNING,
    "error": LogLevel.ERROR,
    "fatal": LogLevel.FATAL,
}


class BreadcrumbType(IntEnum):
    CLICK = 0
    NAVIGATION = 1
    HTTP = 2
    CONSOLE = 3
    CUSTOM = 4
    ERROR = 5
    QUERY = 6
    TRANSACTION = 7
    USER = 8
    SYSTEM = 9
    FILE = 10
    STATE = 11
    FORM = 12
    CONFIG = 13
    MESSAGE = 14
    DEBUG = 15

    def to_str(self) -> str:
        return self.name.lower()


# ---------------------------------------------------------------------------
# Data classes (plain dicts/named tuples compatible with 3.7)
# ---------------------------------------------------------------------------

class ExeWatchConfig:
    """SDK initialization configuration."""

    __slots__ = (
        "api_key", "customer_id", "app_version", "buffer_size",
        "flush_interval_ms", "retry_interval_ms", "storage_path",
        "sample_rate", "gauge_sampling_interval_sec",
        "max_pending_age_days", "anonymize_device_id", "endpoint",
    )

    def __init__(
        self,
        api_key: str,
        customer_id: str,
        app_version: str = "",
        buffer_size: int = DEFAULT_BUFFER_SIZE,
        flush_interval_ms: int = DEFAULT_FLUSH_INTERVAL_MS,
        retry_interval_ms: int = DEFAULT_RETRY_INTERVAL_MS,
        storage_path: str = "",
        sample_rate: float = DEFAULT_SAMPLE_RATE,
        gauge_sampling_interval_sec: int = DEFAULT_GAUGE_SAMPLING_INTERVAL_SEC,
        max_pending_age_days: int = DEFAULT_MAX_PENDING_AGE_DAYS,
        anonymize_device_id: bool = False,
        endpoint: str = DEFAULT_ENDPOINT,
    ):
        self.api_key = api_key
        self.customer_id = customer_id
        self.app_version = app_version
        self.buffer_size = buffer_size
        self.flush_interval_ms = flush_interval_ms
        self.retry_interval_ms = retry_interval_ms
        self.storage_path = storage_path
        self.sample_rate = max(0.0, min(1.0, sample_rate))
        self.gauge_sampling_interval_sec = max(
            MIN_GAUGE_SAMPLING_INTERVAL_SEC, gauge_sampling_interval_sec
        )
        self.max_pending_age_days = max_pending_age_days
        self.anonymize_device_id = anonymize_device_id
        self.endpoint = endpoint.rstrip("/")


class UserIdentity:
    __slots__ = ("id", "email", "name")

    def __init__(self, id: str = "", email: str = "", name: str = ""):
        self.id = id
        self.email = email
        self.name = name

    def is_empty(self) -> bool:
        return not self.id and not self.email and not self.name

    def to_dict(self) -> Dict[str, str]:
        d = {}  # type: Dict[str, str]
        if self.id:
            d["user_id"] = self.id
        if self.email:
            d["user_email"] = self.email
        if self.name:
            d["user_name"] = self.name
        return d


# ---------------------------------------------------------------------------
# Server config (dynamic, received from backend)
# ---------------------------------------------------------------------------

class _ServerConfig:
    __slots__ = (
        "version", "flush_interval_ms", "batch_size", "sampling_rate",
        "max_message_length", "min_level", "enabled", "disabled_until",
    )

    def __init__(self) -> None:
        self.version = 0
        self.flush_interval_ms = DEFAULT_FLUSH_INTERVAL_MS
        self.batch_size = DEFAULT_BUFFER_SIZE
        self.sampling_rate = DEFAULT_SAMPLE_RATE
        self.max_message_length = 50000
        self.min_level = LogLevel.DEBUG
        self.enabled = True
        self.disabled_until = ""  # type: str  # ISO8601 or empty

    def update_from_dict(self, d: Dict[str, Any]) -> None:
        if not d:
            return
        if "version" in d:
            self.version = int(d["version"])
        if "flush_interval_ms" in d:
            self.flush_interval_ms = int(d["flush_interval_ms"])
        if "batch_size" in d:
            self.batch_size = int(d["batch_size"])
        if "sampling_rate" in d:
            self.sampling_rate = float(d["sampling_rate"])
        if "max_message_length" in d:
            self.max_message_length = int(d["max_message_length"])
        if "min_level" in d:
            self.min_level = LogLevel.from_str(str(d["min_level"]))
        if "enabled" in d:
            self.enabled = bool(d["enabled"])
        if "disabled_until" in d:
            self.disabled_until = str(d["disabled_until"])


# ---------------------------------------------------------------------------
# Platform helpers
# ---------------------------------------------------------------------------

def _get_os_type() -> str:
    s = sys.platform
    if s.startswith("win"):
        return "windows"
    elif s.startswith("linux"):
        return "linux"
    elif s == "darwin":
        return "macos"
    return s


def _get_os_version() -> str:
    return platform.platform()


def _get_hostname() -> str:
    return socket.gethostname()


def _get_username() -> str:
    try:
        return getpass.getuser()
    except Exception:
        return "unknown"


def _get_timezone_offset() -> str:
    utc_offset = -(time.timezone if time.daylight == 0 else time.altzone)
    sign = "+" if utc_offset >= 0 else "-"
    hours, remainder = divmod(abs(utc_offset), 3600)
    minutes = remainder // 60
    return "{}{:02d}:{:02d}".format(sign, hours, minutes)


def _get_timezone_name() -> str:
    try:
        return time.tzname[0] or ""
    except Exception:
        return ""


def _get_local_ips() -> List[str]:
    ips = []  # type: List[str]
    try:
        for info in socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET):
            addr = info[4][0]
            if addr not in ips and addr != "127.0.0.1":
                ips.append(addr)
    except Exception:
        pass
    return ips


def _fnv1a_hash(data: str) -> str:
    """FNV-1a hash for GDPR anonymization (matches Delphi SDK)."""
    h = 0x811C9DC5
    for b in data.encode("utf-8"):
        h ^= b
        h = (h * 0x01000193) & 0xFFFFFFFF
    return "{:08x}".format(h)


def _now_utc_iso() -> str:
    return (
        datetime.datetime.utcnow()
        .strftime("%Y-%m-%dT%H:%M:%S.") +
        "{:03d}Z".format(
            datetime.datetime.utcnow().microsecond // 1000
        )
    )


def _now_utc() -> datetime.datetime:
    return datetime.datetime.utcnow()


def _generate_session_id() -> str:
    return uuid.uuid4().hex[:16]


def _file_timestamp() -> str:
    now = datetime.datetime.utcnow()
    return now.strftime("%Y%m%d_%H%M%S") + "{:03d}".format(
        now.microsecond // 1000
    )


# ---------------------------------------------------------------------------
# Device / Hardware info collectors
# ---------------------------------------------------------------------------

def _collect_device_info(
    app_version: str = "",
    anonymize: bool = False,
) -> Dict[str, Any]:
    username = _get_username()
    hostname = _get_hostname()
    device_id = "{}@{}".format(
        _fnv1a_hash(username) if anonymize else username, hostname
    )
    return {
        "device_id": device_id,
        "hostname": hostname,
        "username": _fnv1a_hash(username) if anonymize else username,
        "os_type": _get_os_type(),
        "os_version": _get_os_version(),
        "app_binary_version": _get_app_binary_version(),
        "app_version": app_version,
        "sdk_version": SDK_VERSION,
        "timezone_offset": _get_timezone_offset(),
    }


def _get_app_binary_version() -> str:
    """Best-effort version from the running script/package."""
    main = sys.modules.get("__main__")
    if main and hasattr(main, "__version__"):
        return str(main.__version__)
    return ""


def _collect_hardware_info() -> Dict[str, Any]:
    info = {}  # type: Dict[str, Any]
    os_type = _get_os_type()
    # Memory
    mem = _get_memory_info(os_type)
    info["total_physical_memory"] = mem[0]
    info["available_physical_memory"] = mem[1]
    # CPU
    cpu = _get_cpu_info(os_type)
    info["cpu_name"] = cpu.get("name", "")
    info["cpu_cores"] = cpu.get("cores", 0)
    info["cpu_logical_processors"] = cpu.get("logical", 0)
    info["cpu_architecture"] = platform.machine()
    # Disks
    info["disks"] = _get_disk_info(os_type)
    # Monitors
    info["monitors"] = _get_monitor_info(os_type)
    # Paths
    info["executable_path"] = sys.executable
    info["working_directory"] = os.getcwd()
    info["command_line"] = " ".join(sys.argv)
    # Boot time
    info["system_boot_time"] = _get_boot_time(os_type)
    # Network
    info["local_ip_addresses"] = _get_local_ips()
    # Locale
    info["timezone"] = _get_timezone_name()
    info["system_language"] = _get_system_language()
    info["system_locale"] = _get_system_locale()
    return info


def _get_memory_info(os_type: str) -> Tuple[int, int]:
    if os_type == "windows":
        return _get_memory_windows()
    elif os_type == "linux":
        return _get_memory_linux()
    return (0, 0)


def _get_memory_windows() -> Tuple[int, int]:
    try:
        class MEMORYSTATUSEX(ctypes.Structure):
            _fields_ = [
                ("dwLength", ctypes.c_ulong),
                ("dwMemoryLoad", ctypes.c_ulong),
                ("ullTotalPhys", ctypes.c_ulonglong),
                ("ullAvailPhys", ctypes.c_ulonglong),
                ("ullTotalPageFile", ctypes.c_ulonglong),
                ("ullAvailPageFile", ctypes.c_ulonglong),
                ("ullTotalVirtual", ctypes.c_ulonglong),
                ("ullAvailVirtual", ctypes.c_ulonglong),
                ("ullAvailExtendedVirtual", ctypes.c_ulonglong),
            ]
        ms = MEMORYSTATUSEX()
        ms.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
        ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(ms))  # type: ignore[attr-defined]
        return (ms.ullTotalPhys, ms.ullAvailPhys)
    except Exception:
        return (0, 0)


def _get_memory_linux() -> Tuple[int, int]:
    total = avail = 0
    try:
        with open("/proc/meminfo", "r") as f:
            for line in f:
                if line.startswith("MemTotal:"):
                    total = int(line.split()[1]) * 1024
                elif line.startswith("MemAvailable:"):
                    avail = int(line.split()[1]) * 1024
    except Exception:
        pass
    return (total, avail)


def _get_cpu_info(os_type: str) -> Dict[str, Any]:
    info = {"name": "", "cores": 0, "logical": 0}  # type: Dict[str, Any]
    info["logical"] = os.cpu_count() or 0
    if os_type == "windows":
        try:
            import winreg
            key = winreg.OpenKey(
                winreg.HKEY_LOCAL_MACHINE,
                r"HARDWARE\DESCRIPTION\System\CentralProcessor\0",
            )
            info["name"] = str(winreg.QueryValueEx(key, "ProcessorNameString")[0]).strip()
            winreg.CloseKey(key)
        except Exception:
            pass
        # Physical cores via environment
        try:
            info["cores"] = int(os.environ.get("NUMBER_OF_PROCESSORS", "0"))
        except (ValueError, TypeError):
            info["cores"] = info["logical"]
    elif os_type == "linux":
        try:
            with open("/proc/cpuinfo", "r") as f:
                physical_ids = set()  # type: set
                for line in f:
                    if line.startswith("model name") and not info["name"]:
                        info["name"] = line.split(":", 1)[1].strip()
                    if line.startswith("physical id"):
                        physical_ids.add(line.split(":", 1)[1].strip())
                    if line.startswith("cpu cores") and info["cores"] == 0:
                        info["cores"] = int(line.split(":", 1)[1].strip())
        except Exception:
            pass
    if info["cores"] == 0:
        info["cores"] = info["logical"]
    return info


def _get_disk_info(os_type: str) -> List[Dict[str, Any]]:
    disks = []  # type: List[Dict[str, Any]]
    if os_type == "windows":
        try:
            GetDiskFreeSpaceExW = ctypes.windll.kernel32.GetDiskFreeSpaceExW  # type: ignore[attr-defined]
            GetVolumeInformationW = ctypes.windll.kernel32.GetVolumeInformationW  # type: ignore[attr-defined]
            GetDriveTypeW = ctypes.windll.kernel32.GetDriveTypeW  # type: ignore[attr-defined]
            GetLogicalDriveStringsW = ctypes.windll.kernel32.GetLogicalDriveStringsW  # type: ignore[attr-defined]
            buf = ctypes.create_unicode_buffer(256)
            GetLogicalDriveStringsW(256, buf)
            drives = buf.value.split("\x00")
            drive_type_map = {
                0: "Unknown", 1: "NoRootDir", 2: "Removable",
                3: "Fixed", 4: "Network", 5: "CDRom", 6: "RamDisk",
            }
            for drv in drives:
                if not drv:
                    continue
                dtype = GetDriveTypeW(drv)
                if dtype not in (2, 3, 4, 6):
                    continue
                free = ctypes.c_ulonglong(0)
                total = ctypes.c_ulonglong(0)
                total_free = ctypes.c_ulonglong(0)
                ok = GetDiskFreeSpaceExW(
                    drv,
                    ctypes.byref(free),
                    ctypes.byref(total),
                    ctypes.byref(total_free),
                )
                vol_name = ctypes.create_unicode_buffer(256)
                fs_name = ctypes.create_unicode_buffer(256)
                GetVolumeInformationW(
                    drv, vol_name, 256, None, None, None, fs_name, 256
                )
                if ok:
                    disks.append({
                        "drive": drv.rstrip("\\"),
                        "volume_name": vol_name.value,
                        "file_system": fs_name.value,
                        "total_bytes": total.value,
                        "free_bytes": total_free.value,
                        "drive_type": drive_type_map.get(dtype, "Unknown"),
                    })
        except Exception:
            pass
    elif os_type == "linux":
        seen = set()  # type: set
        try:
            with open("/proc/mounts", "r") as f:
                for line in f:
                    parts = line.split()
                    if len(parts) < 3:
                        continue
                    mount_point = parts[1]
                    fs_type = parts[2]
                    if fs_type in ("proc", "sysfs", "devtmpfs", "tmpfs",
                                   "devpts", "cgroup", "cgroup2",
                                   "pstore", "securityfs", "debugfs",
                                   "hugetlbfs", "mqueue", "fusectl",
                                   "binfmt_misc", "autofs", "tracefs",
                                   "configfs", "efivarfs", "bpf",
                                   "nsfs", "fuse.snapfuse"):
                        continue
                    if mount_point in seen:
                        continue
                    seen.add(mount_point)
                    try:
                        st = os.statvfs(mount_point)
                        total_bytes = st.f_blocks * st.f_frsize
                        free_bytes = st.f_bfree * st.f_frsize
                        if total_bytes == 0:
                            continue
                        disks.append({
                            "drive": mount_point,
                            "volume_name": parts[0],
                            "file_system": fs_type,
                            "total_bytes": total_bytes,
                            "free_bytes": free_bytes,
                            "drive_type": "Fixed",
                        })
                    except Exception:
                        continue
        except Exception:
            pass
    return disks


def _get_monitor_info(os_type: str) -> List[Dict[str, Any]]:
    monitors = []  # type: List[Dict[str, Any]]
    if os_type == "windows":
        try:
            user32 = ctypes.windll.user32  # type: ignore[attr-defined]
            MONITORINFOF_PRIMARY = 0x00000001

            MONITORENUMPROC = ctypes.WINFUNCTYPE(
                ctypes.c_int,
                ctypes.c_void_p,
                ctypes.c_void_p,
                ctypes.POINTER(ctypes.c_long * 4),
                ctypes.c_void_p,
            )

            class MONITORINFOEXW(ctypes.Structure):
                _fields_ = [
                    ("cbSize", ctypes.c_ulong),
                    ("rcMonitor", ctypes.c_long * 4),
                    ("rcWork", ctypes.c_long * 4),
                    ("dwFlags", ctypes.c_ulong),
                    ("szDevice", ctypes.c_wchar * 32),
                ]

            idx = [0]

            def callback(hMonitor, hdcMonitor, lprcMonitor, dwData):
                mi = MONITORINFOEXW()
                mi.cbSize = ctypes.sizeof(MONITORINFOEXW)
                user32.GetMonitorInfoW(hMonitor, ctypes.byref(mi))
                rc = mi.rcMonitor
                monitors.append({
                    "index": idx[0],
                    "name": mi.szDevice,
                    "width": rc[2] - rc[0],
                    "height": rc[3] - rc[1],
                    "bits_per_pixel": 32,
                    "primary": bool(mi.dwFlags & MONITORINFOF_PRIMARY),
                })
                idx[0] += 1
                return 1

            user32.EnumDisplayMonitors(
                None, None, MONITORENUMPROC(callback), 0
            )
        except Exception:
            pass
    return monitors


def _get_boot_time(os_type: str) -> str:
    if os_type == "windows":
        try:
            tick = ctypes.windll.kernel32.GetTickCount64()  # type: ignore[attr-defined]
            boot = datetime.datetime.utcnow() - datetime.timedelta(
                milliseconds=tick
            )
            return boot.strftime("%Y-%m-%dT%H:%M:%SZ")
        except Exception:
            pass
    elif os_type == "linux":
        try:
            with open("/proc/stat", "r") as f:
                for line in f:
                    if line.startswith("btime"):
                        ts = int(line.split()[1])
                        return datetime.datetime.utcfromtimestamp(ts).strftime(
                            "%Y-%m-%dT%H:%M:%SZ"
                        )
        except Exception:
            pass
    return ""


def _get_system_language() -> str:
    import locale
    try:
        lang, _ = locale.getdefaultlocale()
        return lang or ""
    except Exception:
        return ""


def _get_system_locale() -> str:
    import locale
    try:
        lang, encoding = locale.getdefaultlocale()
        if lang and encoding:
            return "{}.{}".format(lang, encoding)
        return lang or ""
    except Exception:
        return ""


# ---------------------------------------------------------------------------
# Internal logger
# ---------------------------------------------------------------------------

_internal_logger = logging.getLogger("exewatch.sdk")


def _log_internal(msg: str, *args: Any) -> None:
    try:
        _internal_logger.debug(msg, *args)
    except Exception:
        pass


# ---------------------------------------------------------------------------
# Breadcrumb storage (thread-local)
# ---------------------------------------------------------------------------

_breadcrumb_local = threading.local()


def _get_thread_breadcrumbs() -> List[Dict[str, Any]]:
    if not hasattr(_breadcrumb_local, "crumbs"):
        _breadcrumb_local.crumbs = []
    return _breadcrumb_local.crumbs


def _add_breadcrumb(
    breadcrumb_type: BreadcrumbType,
    category: str,
    message: str,
    data: Optional[Dict[str, Any]] = None,
) -> None:
    crumbs = _get_thread_breadcrumbs()
    crumbs.append({
        "timestamp": _now_utc_iso(),
        "type": breadcrumb_type.to_str(),
        "category": category,
        "message": message,
        "data": data,
    })
    while len(crumbs) > MAX_BREADCRUMBS:
        crumbs.pop(0)


def _take_breadcrumbs() -> List[Dict[str, Any]]:
    crumbs = _get_thread_breadcrumbs()
    taken = list(crumbs)
    crumbs.clear()
    return taken


def _clear_breadcrumbs() -> None:
    _get_thread_breadcrumbs().clear()


# ---------------------------------------------------------------------------
# Timing entry
# ---------------------------------------------------------------------------

class _TimingEntry:
    __slots__ = ("id", "tag", "start_time", "metadata")

    def __init__(
        self, id: str, tag: str, metadata: Optional[Dict[str, Any]] = None
    ):
        self.id = id
        self.tag = tag
        self.start_time = time.perf_counter()
        self.metadata = metadata


# ---------------------------------------------------------------------------
# Metric accumulator
# ---------------------------------------------------------------------------

class _MetricAccumulator:
    __slots__ = (
        "name", "metric_type", "tag", "value", "count",
        "min_val", "max_val", "sum_val", "period_start",
    )

    def __init__(self, name: str, metric_type: str, tag: str):
        self.name = name
        self.metric_type = metric_type
        self.tag = tag
        self.value = 0.0
        self.count = 0
        self.min_val = float("inf")
        self.max_val = float("-inf")
        self.sum_val = 0.0
        self.period_start = _now_utc_iso()

    def record(self, value: float) -> None:
        self.value = value
        self.count += 1
        self.sum_val += value
        if value < self.min_val:
            self.min_val = value
        if value > self.max_val:
            self.max_val = value

    def increment(self, delta: float) -> None:
        self.value += delta
        self.count += 1
        self.sum_val += delta
        if self.value < self.min_val:
            self.min_val = self.value
        if self.value > self.max_val:
            self.max_val = self.value

    def to_dict(self) -> Dict[str, Any]:
        d = {
            "name": self.name,
            "type": self.metric_type,
            "value": self.value,
            "tag": self.tag,
            "count": self.count,
            "period_start": self.period_start,
            "period_end": _now_utc_iso(),
        }  # type: Dict[str, Any]
        if self.metric_type == "gauge" and self.count > 0:
            d["min"] = self.min_val
            d["max"] = self.max_val
            d["avg"] = self.sum_val / self.count
        return d

    def reset(self) -> None:
        self.value = 0.0 if self.metric_type == "counter" else self.value
        self.count = 0
        self.min_val = float("inf")
        self.max_val = float("-inf")
        self.sum_val = 0.0
        self.period_start = _now_utc_iso()


# ---------------------------------------------------------------------------
# ExeWatch Client
# ---------------------------------------------------------------------------

class ExeWatchClient:
    """Main ExeWatch SDK client. Use the module-level functions to access."""

    def __init__(self, config: ExeWatchConfig) -> None:
        self._config = config
        self._session_id = _generate_session_id()
        self._enabled = True
        self._shutdown = False
        self._file_counter = 0
        self._file_counter_lock = threading.Lock()

        # Server config
        self._server_config = _ServerConfig()
        self._server_config.flush_interval_ms = config.flush_interval_ms
        self._server_config.batch_size = config.buffer_size
        self._server_config.sampling_rate = config.sample_rate
        self._server_config_lock = threading.Lock()

        # Buffer
        self._buffer = []  # type: List[Dict[str, Any]]
        self._buffer_lock = threading.Lock()

        # User identity
        self._user = UserIdentity()
        self._user_lock = threading.Lock()

        # Global tags
        self._tags = {}  # type: Dict[str, str]
        self._tags_lock = threading.Lock()

        # Customer ID (mutable at runtime)
        self._customer_id = config.customer_id
        self._customer_id_lock = threading.Lock()

        # Timings -- stored per-thread so two threads using the same timing id
        # don't collide (start_timing auto-closes duplicates on same thread).
        # Outer key = threading.get_ident(); inner dicts/lists are per-thread.
        self._pending_timings = {}  # type: Dict[int, Dict[str, _TimingEntry]]
        self._timing_stacks = {}    # type: Dict[int, List[str]]
        self._timings_lock = threading.Lock()

        # Metrics
        self._metric_accumulators = {}  # type: Dict[str, _MetricAccumulator]
        self._metrics_lock = threading.Lock()
        self._registered_gauges = {}  # type: Dict[str, Tuple[Callable[[], float], str]]
        self._gauges_lock = threading.Lock()

        # Custom device info
        self._custom_device_info = {}  # type: Dict[str, str]
        self._custom_device_info_lock = threading.Lock()
        self._device_info_sent = False

        # Device info (cached)
        self._device_dict = _collect_device_info(
            app_version=config.app_version,
            anonymize=config.anonymize_device_id,
        )

        # Storage path
        self._storage_path = self._resolve_storage_path()
        os.makedirs(self._storage_path, exist_ok=True)

        # Recover stale .sending files
        self._recover_sending_files()

        # Callbacks
        self.on_error = None  # type: Optional[Callable[[str], None]]
        self.on_logs_sent = None  # type: Optional[Callable[[int, int], None]]
        self.on_device_info_sent = None  # type: Optional[Callable[[bool, str], None]]

        # Shipper thread
        self._shipper_event = threading.Event()
        self._shipper_thread = threading.Thread(
            target=self._shipper_loop, name="exewatch-shipper", daemon=True
        )
        self._shipper_thread.start()

        # Sampler thread (periodic gauges)
        self._sampler_event = threading.Event()
        self._sampler_thread = threading.Thread(
            target=self._sampler_loop, name="exewatch-sampler", daemon=True
        )
        self._sampler_thread.start()

        # Last flush / metrics persist times
        self._last_flush_time = time.monotonic()
        self._last_metrics_persist_time = time.monotonic()
        self._last_cleanup_time = time.monotonic()

        _log_internal(
            "ExeWatch SDK initialized (session=%s, storage=%s)",
            self._session_id, self._storage_path,
        )

    # -- Properties ---------------------------------------------------------

    @property
    def enabled(self) -> bool:
        return self._enabled and self._server_config.enabled

    @enabled.setter
    def enabled(self, value: bool) -> None:
        self._enabled = value

    @property
    def session_id(self) -> str:
        return self._session_id

    @property
    def config(self) -> ExeWatchConfig:
        return self._config

    @property
    def device_info_sent(self) -> bool:
        return self._device_info_sent

    # -- Storage path -------------------------------------------------------

    def _resolve_storage_path(self) -> str:
        if self._config.storage_path:
            return self._config.storage_path
        os_type = _get_os_type()
        if os_type == "windows":
            base = os.environ.get("LOCALAPPDATA", "")
            if not base:
                base = os.path.join(
                    os.environ.get("USERPROFILE", os.path.expanduser("~")),
                    "AppData", "Local",
                )
            return os.path.join(base, "ExeWatch", "pending")
        else:
            base = os.environ.get("TMPDIR", "/tmp")
            return os.path.join(base, "ExeWatch", "pending")

    # -- Recover stale .sending files ---------------------------------------

    def _recover_sending_files(self) -> None:
        try:
            for fname in os.listdir(self._storage_path):
                if fname.endswith(SENDING_EXT):
                    full = os.path.join(self._storage_path, fname)
                    base = fname[: -len(SENDING_EXT)]
                    # Detect original extension
                    for ext in (LOG_FILE_EXT, DEVICE_FILE_EXT, METRIC_FILE_EXT):
                        if base.endswith(ext):
                            orig = os.path.join(
                                self._storage_path, base
                            )
                            try:
                                os.rename(full, orig)
                                _log_internal(
                                    "Recovered stale sending file: %s", fname
                                )
                            except OSError:
                                pass
                            break
                    else:
                        # Unknown, try as .ewlog
                        try:
                            os.rename(
                                full,
                                full[: -len(SENDING_EXT)],
                            )
                        except OSError:
                            pass
        except FileNotFoundError:
            pass
        except Exception as e:
            _log_internal("Error recovering sending files: %s", e)

    # -- File naming --------------------------------------------------------

    def _next_file_path(self, ext: str) -> str:
        with self._file_counter_lock:
            self._file_counter += 1
            counter = self._file_counter
        ts = _file_timestamp()
        tid = threading.current_thread().ident or 0
        name = "{}_{}_{}{}".format(ts, counter, tid, ext)
        return os.path.join(self._storage_path, name)

    # -- Logging ------------------------------------------------------------

    def log(
        self,
        level: LogLevel,
        message: str,
        tag: str = "main",
        extra_data: Optional[Dict[str, Any]] = None,
    ) -> None:
        if not self.enabled:
            # Check disabled_until for quota re-enable
            self._check_quota_reenable()
            if not self.enabled:
                return
        if not message:
            return

        # Min level filter (errors/fatals always pass)
        if level < LogLevel.ERROR:
            with self._server_config_lock:
                min_level = self._server_config.min_level
                sample_rate = self._server_config.sampling_rate
                max_msg_len = self._server_config.max_message_length
            if level < min_level:
                return
            # Sampling (errors/fatals bypass)
            if sample_rate < 1.0 and random.random() > sample_rate:
                return
        else:
            with self._server_config_lock:
                max_msg_len = self._server_config.max_message_length

        # Truncate message
        if len(message) > max_msg_len:
            message = message[:max_msg_len]

        # Build event
        event = {
            "level": level.to_str(),
            "message": message,
            "tag": tag,
            "timestamp": _now_utc_iso(),
            "thread_id": threading.current_thread().ident or 0,
            "process_id": os.getpid(),
            "session_id": self._session_id,
        }  # type: Dict[str, Any]

        # Extra data (user, tags, custom)
        merged_extra = {}  # type: Dict[str, Any]
        if extra_data:
            merged_extra.update(extra_data)

        # User identity
        with self._user_lock:
            if not self._user.is_empty():
                merged_extra.update(self._user.to_dict())

        # Global tags
        with self._tags_lock:
            for k, v in self._tags.items():
                merged_extra["tag_{}".format(k)] = v

        # Breadcrumbs for errors/fatals
        if level >= LogLevel.ERROR:
            crumbs = _take_breadcrumbs()
            if crumbs:
                merged_extra["breadcrumbs"] = crumbs
            # Stack trace
            stack = self._capture_stack_trace()
            if stack:
                merged_extra["stack_trace"] = stack

        if merged_extra:
            event["extra_data"] = merged_extra

        # Add to buffer
        with self._buffer_lock:
            self._buffer.append(event)
            buf_len = len(self._buffer)

        # Auto-persist if buffer full
        with self._server_config_lock:
            batch_size = self._server_config.batch_size
        if buf_len >= batch_size:
            self._persist_buffer()

        # Wake shipper on error/fatal
        if level >= LogLevel.ERROR:
            self._persist_buffer()
            self._shipper_event.set()

    def debug(self, message: str, tag: str = "main", **extra: Any) -> None:
        self.log(LogLevel.DEBUG, message, tag, extra or None)

    def info(self, message: str, tag: str = "main", **extra: Any) -> None:
        self.log(LogLevel.INFO, message, tag, extra or None)

    def warning(self, message: str, tag: str = "main", **extra: Any) -> None:
        self.log(LogLevel.WARNING, message, tag, extra or None)

    def error(self, message: str, tag: str = "main", **extra: Any) -> None:
        self.log(LogLevel.ERROR, message, tag, extra or None)

    def fatal(self, message: str, tag: str = "main", **extra: Any) -> None:
        self.log(LogLevel.FATAL, message, tag, extra or None)

    def error_with_exception(
        self,
        exc: BaseException,
        tag: str = "main",
        additional_message: str = "",
    ) -> None:
        import traceback
        msg = "{}: {}".format(type(exc).__name__, exc)
        if additional_message:
            msg = "{} - {}".format(additional_message, msg)
        tb = "".join(traceback.format_exception(
            type(exc), exc, exc.__traceback__
        ))
        self.log(
            LogLevel.ERROR, msg, tag,
            {"stack_trace": tb, "exception_class": type(exc).__name__},
        )

    def _capture_stack_trace(self) -> str:
        import traceback
        try:
            frames = traceback.extract_stack()
            # Skip internal SDK frames
            filtered = []
            for f in frames:
                if "exewatch.py" in (f.filename or ""):
                    continue
                filtered.append(f)
            if filtered:
                return "".join(traceback.format_list(filtered))
        except Exception:
            pass
        return ""

    def flush(self) -> None:
        """Force persist buffer and wake shipper."""
        self._persist_buffer()
        self._persist_metrics()
        self._shipper_event.set()

    def get_pending_count(self) -> int:
        """Return number of events waiting to be shipped (buffer + disk queue)."""
        with self._buffer_lock:
            buf_count = len(self._buffer)
        file_count = 0
        try:
            for fname in os.listdir(self._storage_path):
                for ext in (LOG_FILE_EXT, DEVICE_FILE_EXT, METRIC_FILE_EXT):
                    if fname.endswith(ext):
                        file_count += 1
                        break
        except FileNotFoundError:
            pass
        return buf_count + file_count

    def wait_for_sending(self, timeout_sec: int) -> int:
        """
        Flush the in-memory buffer and wait (up to ``timeout_sec`` seconds)
        for every pending event to reach the server.

        Returns the number of events still pending when the wait ends
        (0 = fully drained, >0 = timeout).

        Useful for short-lived scripts / console samples where you must
        ensure events are uploaded before the process exits.
        """
        self.flush()
        if timeout_sec is None or timeout_sec < 0:
            timeout_sec = 0
        deadline = time.monotonic() + float(timeout_sec)
        while self.get_pending_count() > 0:
            if time.monotonic() >= deadline:
                break
            time.sleep(0.1)
        return self.get_pending_count()

    # -- Breadcrumbs --------------------------------------------------------

    def add_breadcrumb(
        self,
        breadcrumb_type: BreadcrumbType,
        category: str,
        message: str,
        data: Optional[Dict[str, Any]] = None,
    ) -> None:
        _add_breadcrumb(breadcrumb_type, category, message, data)

    def get_breadcrumbs(self) -> List[Dict[str, Any]]:
        return list(_get_thread_breadcrumbs())

    def clear_breadcrumbs(self) -> None:
        _clear_breadcrumbs()

    # -- Timing -------------------------------------------------------------

    # Caller holds self._timings_lock.
    def _thread_timing_slots(
        self, tid: int
    ) -> Tuple[Dict[str, "_TimingEntry"], List[str]]:
        timings = self._pending_timings.get(tid)
        if timings is None:
            timings = {}
            self._pending_timings[tid] = timings
        stack = self._timing_stacks.get(tid)
        if stack is None:
            stack = []
            self._timing_stacks[tid] = stack
        return timings, stack

    def start_timing(
        self,
        id: str,
        tag: str = "",
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        tid = threading.get_ident()
        with self._timings_lock:
            timings, stack = self._thread_timing_slots(tid)

            # Auto-close duplicate (same thread only)
            if id in timings:
                old = timings.pop(id)
                elapsed = (time.perf_counter() - old.start_time) * 1000.0
                self._log_timing(
                    id, old.tag, elapsed, True,
                    {"auto_close_reason": "duplicate_start"},
                )
                if id in stack:
                    stack.remove(id)

            # Evict oldest if at per-thread max
            if len(timings) >= MAX_PENDING_TIMINGS and stack:
                oldest_id = stack.pop(0)
                old = timings.pop(oldest_id, None)
                if old:
                    elapsed = (time.perf_counter() - old.start_time) * 1000.0
                    self._log_timing(
                        oldest_id, old.tag, elapsed, True,
                        {"auto_close_reason": "max_pending_reached"},
                    )

            entry = _TimingEntry(id, tag, metadata)
            timings[id] = entry
            stack.append(id)

    def end_timing(
        self,
        id: Optional[str] = None,
        success: bool = True,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> float:
        tid = threading.get_ident()
        with self._timings_lock:
            timings = self._pending_timings.get(tid)
            stack = self._timing_stacks.get(tid)
            if not timings or stack is None:
                return -1.0

            if id is None:
                # Stack-based: end last
                if not stack:
                    return -1.0
                id = stack.pop()
            else:
                if id in stack:
                    stack.remove(id)

            entry = timings.pop(id, None)
            if entry is None:
                return -1.0

            elapsed = (time.perf_counter() - entry.start_time) * 1000.0

        # Merge metadata
        merged = {}  # type: Dict[str, Any]
        if entry.metadata:
            merged.update(entry.metadata)
        if metadata:
            merged.update(metadata)

        self._log_timing(id, entry.tag, elapsed, success, merged or None)
        return elapsed

    def cancel_timing(self, id: Optional[str] = None) -> None:
        tid = threading.get_ident()
        with self._timings_lock:
            timings = self._pending_timings.get(tid)
            stack = self._timing_stacks.get(tid)
            if not timings or stack is None:
                return

            if id is None:
                if not stack:
                    return
                id = stack.pop()
            else:
                if id in stack:
                    stack.remove(id)
            timings.pop(id, None)

    def is_timing_active(self, id: str) -> bool:
        tid = threading.get_ident()
        with self._timings_lock:
            timings = self._pending_timings.get(tid)
            return bool(timings and id in timings)

    def get_active_timings(self) -> List[Dict[str, Any]]:
        tid = threading.get_ident()
        with self._timings_lock:
            result = []
            timings = self._pending_timings.get(tid) or {}
            now = time.perf_counter()
            for tname, entry in timings.items():
                result.append({
                    "id": tname,
                    "tag": entry.tag,
                    "elapsed_ms": (now - entry.start_time) * 1000.0,
                })
            return result

    def _log_timing(
        self,
        timing_id: str,
        tag: str,
        elapsed_ms: float,
        success: bool,
        metadata: Optional[Dict[str, Any]] = None,
    ) -> None:
        extra = {
            "timing_id": timing_id,
            "timing_type": "duration",
            "elapsed_ms": round(elapsed_ms, 3),
            "success": success,
        }  # type: Dict[str, Any]
        if metadata:
            extra.update(metadata)
        self.log(LogLevel.INFO, timing_id, tag or "timing", extra)

    # -- Metrics ------------------------------------------------------------

    def increment_counter(
        self, name: str, value: float = 1.0, tag: str = ""
    ) -> None:
        key = "counter:{}:{}".format(name, tag)
        with self._metrics_lock:
            acc = self._metric_accumulators.get(key)
            if acc is None:
                acc = _MetricAccumulator(name, "counter", tag)
                self._metric_accumulators[key] = acc
            acc.increment(value)

    def record_gauge(
        self, name: str, value: float, tag: str = ""
    ) -> None:
        key = "gauge:{}:{}".format(name, tag)
        with self._metrics_lock:
            acc = self._metric_accumulators.get(key)
            if acc is None:
                acc = _MetricAccumulator(name, "gauge", tag)
                self._metric_accumulators[key] = acc
            acc.record(value)

    def register_periodic_gauge(
        self,
        name: str,
        callback: Callable[[], float],
        tag: str = "",
    ) -> None:
        with self._gauges_lock:
            if len(self._registered_gauges) >= MAX_REGISTERED_GAUGES:
                _log_internal(
                    "Max registered gauges reached (%d), ignoring: %s",
                    MAX_REGISTERED_GAUGES, name,
                )
                return
            self._registered_gauges[name] = (callback, tag)

    def unregister_periodic_gauge(self, name: str) -> None:
        with self._gauges_lock:
            self._registered_gauges.pop(name, None)

    # -- User Identity ------------------------------------------------------

    def set_user(
        self, id: str, email: str = "", name: str = ""
    ) -> None:
        with self._user_lock:
            self._user = UserIdentity(id, email, name)

    def clear_user(self) -> None:
        with self._user_lock:
            self._user = UserIdentity()

    def get_user(self) -> UserIdentity:
        with self._user_lock:
            return UserIdentity(
                self._user.id, self._user.email, self._user.name
            )

    # -- Tags ---------------------------------------------------------------

    def set_tag(self, key: str, value: str) -> None:
        with self._tags_lock:
            self._tags[key] = value

    def set_tags(self, tags: Dict[str, str]) -> None:
        with self._tags_lock:
            self._tags.update(tags)

    def remove_tag(self, key: str) -> None:
        with self._tags_lock:
            self._tags.pop(key, None)

    def clear_tags(self) -> None:
        with self._tags_lock:
            self._tags.clear()

    def get_tags(self) -> Dict[str, str]:
        with self._tags_lock:
            return dict(self._tags)

    # -- Customer ID --------------------------------------------------------

    def set_customer_id(self, customer_id: str) -> None:
        with self._customer_id_lock:
            self._customer_id = customer_id

    def get_customer_id(self) -> str:
        with self._customer_id_lock:
            return self._customer_id

    # -- Device Info --------------------------------------------------------

    def send_device_info(self) -> None:
        """Queue device hardware info for sending."""
        try:
            hw = _collect_hardware_info()
            with self._custom_device_info_lock:
                custom = dict(self._custom_device_info)

            payload = {
                "customer_id": self.get_customer_id(),
                "device": dict(self._device_dict),
                "hardware_info": hw,
            }  # type: Dict[str, Any]
            if custom:
                payload["custom_device_info"] = custom

            path = self._next_file_path(DEVICE_FILE_EXT)
            with open(path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False)
            self._shipper_event.set()
            _log_internal("Device info queued: %s", path)
        except Exception as e:
            _log_internal("Error queueing device info: %s", e)

    def set_custom_device_info(self, key: str, value: str) -> None:
        with self._custom_device_info_lock:
            self._custom_device_info[key] = value

    def clear_custom_device_info(self, key: str = "") -> None:
        with self._custom_device_info_lock:
            if key:
                self._custom_device_info.pop(key, None)
            else:
                self._custom_device_info.clear()

    def send_custom_device_info(self) -> None:
        """Queue custom device info for sending (uses device-info endpoint)."""
        self.send_device_info()

    # -- Buffer persistence -------------------------------------------------

    def _persist_buffer(self) -> None:
        with self._buffer_lock:
            if not self._buffer:
                return
            events = list(self._buffer)
            self._buffer.clear()

        payload = {
            "customer_id": self.get_customer_id(),
            "device": dict(self._device_dict),
            "events": events,
        }
        path = self._next_file_path(LOG_FILE_EXT)
        try:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False)
        except Exception as e:
            _log_internal("Error persisting buffer: %s", e)
            # Put events back
            with self._buffer_lock:
                self._buffer = events + self._buffer

    def _persist_metrics(self) -> None:
        with self._metrics_lock:
            if not self._metric_accumulators:
                return
            metrics_list = []
            keys_to_reset = []
            for key, acc in self._metric_accumulators.items():
                if acc.count > 0:
                    metrics_list.append(acc.to_dict())
                    keys_to_reset.append(key)
            for key in keys_to_reset:
                self._metric_accumulators[key].reset()

        if not metrics_list:
            return

        payload = {
            "customer_id": self.get_customer_id(),
            "device": dict(self._device_dict),
            "session_id": self._session_id,
            "metrics": metrics_list,
        }
        path = self._next_file_path(METRIC_FILE_EXT)
        try:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False)
        except Exception as e:
            _log_internal("Error persisting metrics: %s", e)

    # -- Quota re-enable check ----------------------------------------------

    def _check_quota_reenable(self) -> None:
        with self._server_config_lock:
            if not self._server_config.disabled_until:
                return
            try:
                dt = datetime.datetime.strptime(
                    self._server_config.disabled_until.replace("Z", "+00:00").split("+")[0],
                    "%Y-%m-%dT%H:%M:%S",
                )
                if datetime.datetime.utcnow() >= dt:
                    self._server_config.enabled = True
                    self._server_config.disabled_until = ""
                    _log_internal("Quota re-enabled after disabled_until expired")
            except (ValueError, TypeError):
                pass

    # -- Shipper thread -----------------------------------------------------

    def _shipper_loop(self) -> None:
        while not self._shutdown:
            try:
                self._shipper_tick()
            except Exception as e:
                _log_internal("Shipper error: %s", e)
                if self.on_error:
                    try:
                        self.on_error(str(e))
                    except Exception:
                        pass

            # Check if pending files exist
            has_pending = self._has_pending_files()
            if has_pending:
                self._shipper_event.wait(timeout=0.1)
            else:
                with self._server_config_lock:
                    interval = self._server_config.flush_interval_ms / 1000.0
                self._shipper_event.wait(timeout=interval)
            self._shipper_event.clear()

    def _shipper_tick(self) -> None:
        now = time.monotonic()

        # Auto-persist buffer if flush interval exceeded
        with self._server_config_lock:
            flush_interval = self._server_config.flush_interval_ms / 1000.0
        if now - self._last_flush_time >= flush_interval:
            self._persist_buffer()
            self._last_flush_time = now

        # Auto-persist metrics
        if now - self._last_metrics_persist_time >= METRIC_FLUSH_INTERVAL_MS / 1000.0:
            self._persist_metrics()
            self._last_metrics_persist_time = now

        # Auto-cleanup old files (hourly)
        if now - self._last_cleanup_time >= 3600:
            self._cleanup_old_files()
            self._last_cleanup_time = now

        # Send pending files
        self._send_pending_files()

    def _has_pending_files(self) -> bool:
        try:
            for fname in os.listdir(self._storage_path):
                for ext in (LOG_FILE_EXT, DEVICE_FILE_EXT, METRIC_FILE_EXT):
                    if fname.endswith(ext):
                        return True
        except FileNotFoundError:
            pass
        return False

    def _send_pending_files(self) -> None:
        try:
            files = sorted(os.listdir(self._storage_path))
        except FileNotFoundError:
            return

        for fname in files:
            if self._shutdown:
                break
            full_path = os.path.join(self._storage_path, fname)
            if fname.endswith(LOG_FILE_EXT):
                self._send_file(full_path, "ingest/logs")
            elif fname.endswith(DEVICE_FILE_EXT):
                self._send_file(full_path, "ingest/device-info")
            elif fname.endswith(METRIC_FILE_EXT):
                self._send_file(full_path, "ingest/metrics")

    def _send_file(self, file_path: str, endpoint_path: str) -> bool:
        sending_path = file_path + SENDING_EXT
        try:
            os.rename(file_path, sending_path)
        except OSError:
            return False

        try:
            with open(sending_path, "r", encoding="utf-8") as f:
                data = f.read()
        except Exception as e:
            _log_internal("Error reading file %s: %s", sending_path, e)
            self._restore_file(sending_path, file_path)
            return False

        # Validate JSON
        try:
            json.loads(data)
        except json.JSONDecodeError:
            _log_internal("Invalid JSON, deleting: %s", sending_path)
            try:
                os.remove(sending_path)
            except OSError:
                pass
            return False

        url = "{}/api/{}/{}".format(
            self._config.endpoint, API_VERSION, endpoint_path
        )

        with self._server_config_lock:
            config_version = self._server_config.version

        headers = {
            "Content-Type": "application/json",
            "X-API-Key": self._config.api_key,
            "X-Config-Version": str(config_version),
        }

        try:
            req = Request(
                url, data=data.encode("utf-8"), headers=headers, method="POST"
            )
            with urlopen(req, timeout=30) as resp:
                status = resp.status
                body = resp.read().decode("utf-8")

            if status == 200:
                try:
                    resp_data = json.loads(body)
                    cfg = resp_data.get("config")
                    if cfg:
                        with self._server_config_lock:
                            self._server_config.update_from_dict(cfg)

                    # Callbacks
                    if "accepted" in resp_data and self.on_logs_sent:
                        try:
                            self.on_logs_sent(
                                resp_data.get("accepted", 0),
                                resp_data.get("rejected", 0),
                            )
                        except Exception:
                            pass

                    if "device_id" in resp_data:
                        self._device_info_sent = True
                        if self.on_device_info_sent:
                            try:
                                self.on_device_info_sent(True, "")
                            except Exception:
                                pass
                except json.JSONDecodeError:
                    pass

                # Success — delete file
                try:
                    os.remove(sending_path)
                except OSError:
                    pass
                return True

        except HTTPError as e:
            status = e.code
            try:
                body = e.read().decode("utf-8")
            except Exception:
                body = ""

            if status == 422:
                # Validation error — try to salvage valid events
                _log_internal("422 validation error for %s", file_path)
                try:
                    os.remove(sending_path)
                except OSError:
                    pass
                return False

            elif status == 429:
                # Quota exceeded — drop file, disable SDK
                _log_internal("429 quota exceeded, disabling SDK")
                with self._server_config_lock:
                    self._server_config.enabled = False
                    # Disable for 6 hours
                    dt = datetime.datetime.utcnow() + datetime.timedelta(hours=6)
                    self._server_config.disabled_until = dt.strftime(
                        "%Y-%m-%dT%H:%M:%SZ"
                    )
                try:
                    os.remove(sending_path)
                except OSError:
                    pass
                return False

            else:
                # Other error — restore for retry
                _log_internal("HTTP %d error, will retry: %s", status, file_path)
                self._restore_file(sending_path, file_path)
                return False

        except (URLError, OSError, Exception) as e:
            _log_internal("Network error sending %s: %s", file_path, e)
            self._restore_file(sending_path, file_path)
            return False

    def _restore_file(self, sending_path: str, original_path: str) -> None:
        try:
            os.rename(sending_path, original_path)
        except OSError:
            pass

    # -- Cleanup old files --------------------------------------------------

    def _cleanup_old_files(self) -> None:
        if self._config.max_pending_age_days <= 0:
            return
        cutoff = time.time() - (
            self._config.max_pending_age_days * 86400
        )
        try:
            for fname in os.listdir(self._storage_path):
                full = os.path.join(self._storage_path, fname)
                try:
                    if os.path.getmtime(full) < cutoff:
                        os.remove(full)
                        _log_internal("Cleaned up old file: %s", fname)
                except OSError:
                    pass
        except FileNotFoundError:
            pass

    # -- Sampler thread (periodic gauges) -----------------------------------

    def _sampler_loop(self) -> None:
        while not self._shutdown:
            interval = self._config.gauge_sampling_interval_sec
            self._sampler_event.wait(timeout=interval)
            if self._shutdown:
                break
            self._sampler_event.clear()
            self._sample_gauges()

    def _sample_gauges(self) -> None:
        with self._gauges_lock:
            gauges = list(self._registered_gauges.items())
        for name, (callback, tag) in gauges:
            try:
                value = callback()
                self.record_gauge(name, value, tag)
            except Exception as e:
                _log_internal("Error sampling gauge %s: %s", name, e)

    # -- Shutdown -----------------------------------------------------------

    def shutdown(self) -> None:
        if self._shutdown:
            return
        _log_internal("ExeWatch SDK shutting down...")
        self._shutdown = True

        # Wake threads
        self._shipper_event.set()
        self._sampler_event.set()

        # Persist remaining data
        self._persist_buffer()
        self._persist_metrics()

        # Send remaining files
        self._send_pending_files()

        # Wait for threads
        self._shipper_thread.join(timeout=5)
        self._sampler_thread.join(timeout=2)

        _log_internal("ExeWatch SDK shutdown complete")


# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------

_instance = None  # type: Optional[ExeWatchClient]
_instance_lock = threading.Lock()


def initialize_exewatch(
    api_key: str,
    customer_id: str,
    app_version: str = "",
    endpoint: str = DEFAULT_ENDPOINT,
    storage_path: str = "",
    buffer_size: int = DEFAULT_BUFFER_SIZE,
    flush_interval_ms: int = DEFAULT_FLUSH_INTERVAL_MS,
    retry_interval_ms: int = DEFAULT_RETRY_INTERVAL_MS,
    sample_rate: float = DEFAULT_SAMPLE_RATE,
    gauge_sampling_interval_sec: int = DEFAULT_GAUGE_SAMPLING_INTERVAL_SEC,
    max_pending_age_days: int = DEFAULT_MAX_PENDING_AGE_DAYS,
    anonymize_device_id: bool = False,
) -> ExeWatchClient:
    """Initialize the ExeWatch SDK singleton."""
    global _instance
    with _instance_lock:
        if _instance is not None:
            raise RuntimeError("ExeWatch is already initialized. Call finalize_exewatch() first.")
        config = ExeWatchConfig(
            api_key=api_key,
            customer_id=customer_id,
            app_version=app_version,
            buffer_size=buffer_size,
            flush_interval_ms=flush_interval_ms,
            retry_interval_ms=retry_interval_ms,
            storage_path=storage_path,
            sample_rate=sample_rate,
            gauge_sampling_interval_sec=gauge_sampling_interval_sec,
            max_pending_age_days=max_pending_age_days,
            anonymize_device_id=anonymize_device_id,
            endpoint=endpoint,
        )
        _instance = ExeWatchClient(config)
        return _instance


def initialize_exewatch_with_config(config: ExeWatchConfig) -> ExeWatchClient:
    """Initialize the ExeWatch SDK singleton with a config object."""
    global _instance
    with _instance_lock:
        if _instance is not None:
            raise RuntimeError("ExeWatch is already initialized. Call finalize_exewatch() first.")
        _instance = ExeWatchClient(config)
        return _instance


def finalize_exewatch() -> None:
    """Shutdown the ExeWatch SDK and release resources."""
    global _instance
    with _instance_lock:
        if _instance is not None:
            _instance.shutdown()
            _instance = None


def exewatch() -> ExeWatchClient:
    """Get the ExeWatch SDK singleton instance."""
    if _instance is None:
        raise RuntimeError("ExeWatch is not initialized. Call initialize_exewatch() first.")
    return _instance


class _ExeWatchProxy:
    """Lazy proxy that delegates attribute access to the singleton.

    Allows writing ``ew.info(...)`` instead of ``ew().info(...)``.
    """

    def __getattr__(self, name: str) -> Any:
        return getattr(exewatch(), name)

    def __setattr__(self, name: str, value: Any) -> None:
        setattr(exewatch(), name, value)

    def __repr__(self) -> str:
        if _instance is None:
            return "<ExeWatch: not initialized>"
        return "<ExeWatch: session={}>".format(_instance.session_id)


ew = _ExeWatchProxy()


def exewatch_is_initialized() -> bool:
    """Check if the ExeWatch SDK is initialized."""
    return _instance is not None


# Auto-shutdown on exit
atexit.register(finalize_exewatch)
