- Desktop widget (Python/pystray): system tray icon showing 5h usage as circular progress bar with Claude starburst logo, 10-step green-to-red color scale, right-click menu with usage stats and configuration - Shared cache: both widget and CLI statusline read/write the same /tmp/claude_usage.json — only one fetcher needs to run - Installer wizard (install_wizard.py): interactive cross-platform setup with component selection, session key prompt, cron/autostart config - OS wrappers: install.sh (Linux/macOS) and install.ps1 (Windows) find Python 3.9+ and launch the wizard - README with topology diagram, usage docs, and configuration reference Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
"""Usage fetcher — Python port of claude-statusline/fetch-usage.js (urllib only).
|
|
|
|
Writes to the same shared cache file as fetch-usage.js so both the CLI
|
|
statusline and the desktop widget see the same data from a single fetch.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
from . import config
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
|
"Accept": "application/json",
|
|
"Referer": "https://claude.ai/",
|
|
"Origin": "https://claude.ai",
|
|
}
|
|
|
|
# Shared cache path — identical to fetch-usage.js default
|
|
if sys.platform == "win32":
|
|
_DEFAULT_CACHE = os.path.join(os.environ.get("TEMP", os.path.expanduser("~")), "claude_usage.json")
|
|
else:
|
|
_DEFAULT_CACHE = os.path.join(os.environ.get("TMPDIR", "/tmp"), "claude_usage.json")
|
|
|
|
CACHE_PATH = os.environ.get("CLAUDE_USAGE_CACHE", _DEFAULT_CACHE)
|
|
|
|
|
|
# -- Cache I/O (compatible with fetch-usage.js format) -----------------------
|
|
|
|
|
|
def write_cache(data):
|
|
"""Write raw API response (or error dict) to the shared cache file."""
|
|
cache_dir = os.path.dirname(CACHE_PATH)
|
|
if cache_dir:
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
with open(CACHE_PATH, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def read_cache():
|
|
"""Read the shared cache file. Returns (data_dict, age_seconds) or (None, inf)."""
|
|
try:
|
|
mtime = os.path.getmtime(CACHE_PATH)
|
|
age = time.time() - mtime
|
|
with open(CACHE_PATH, "r") as f:
|
|
data = json.load(f)
|
|
return data, age
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
|
return None, float("inf")
|
|
|
|
|
|
# -- API requests ------------------------------------------------------------
|
|
|
|
|
|
def _request(url, session_key):
|
|
req = urllib.request.Request(url, headers={
|
|
**_HEADERS,
|
|
"Cookie": f"sessionKey={session_key}",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def _discover_org_id(session_key):
|
|
orgs = _request("https://claude.ai/api/organizations", session_key)
|
|
if not orgs:
|
|
raise RuntimeError("No organizations found")
|
|
return orgs[0]["uuid"]
|
|
|
|
|
|
def fetch_usage(session_key, org_id=""):
|
|
"""Fetch usage data once. Returns (data_dict, org_id) or raises."""
|
|
if not org_id:
|
|
org_id = _discover_org_id(session_key)
|
|
data = _request(f"https://claude.ai/api/organizations/{org_id}/usage", session_key)
|
|
return data, org_id
|
|
|
|
|
|
# -- Parsing -----------------------------------------------------------------
|
|
|
|
|
|
def parse_usage(data):
|
|
"""Extract display-friendly usage info from raw API response.
|
|
|
|
Returns dict with keys:
|
|
five_hour_pct, five_hour_resets_at, five_hour_resets_in,
|
|
seven_day_pct, seven_day_resets_at, seven_day_resets_in,
|
|
error
|
|
"""
|
|
if data is None:
|
|
return {"error": "no data"}
|
|
|
|
# Error states written by either fetcher
|
|
if isinstance(data.get("_error"), str):
|
|
err = data["_error"]
|
|
if err == "auth_expired":
|
|
return {"error": "session expired"}
|
|
return {"error": err}
|
|
|
|
result = {}
|
|
now = datetime.now(timezone.utc)
|
|
|
|
for window, prefix in [("five_hour", "five_hour"), ("seven_day", "seven_day")]:
|
|
block = data.get(window, {})
|
|
pct = round(block.get("utilization", 0))
|
|
resets_at = block.get("resets_at", "")
|
|
resets_in = ""
|
|
if resets_at:
|
|
try:
|
|
dt = datetime.fromisoformat(resets_at)
|
|
delta = dt - now
|
|
total_seconds = max(0, int(delta.total_seconds()))
|
|
days = total_seconds // 86400
|
|
hours = (total_seconds % 86400) // 3600
|
|
minutes = (total_seconds % 3600) // 60
|
|
if days > 0:
|
|
resets_in = f"{days}d {hours}h"
|
|
elif hours > 0:
|
|
resets_in = f"{hours}h {minutes}m"
|
|
else:
|
|
resets_in = f"{minutes}m"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
result[f"{prefix}_pct"] = pct
|
|
result[f"{prefix}_resets_at"] = resets_at
|
|
result[f"{prefix}_resets_in"] = resets_in
|
|
|
|
return result
|
|
|
|
|
|
# -- Background fetcher thread -----------------------------------------------
|
|
|
|
|
|
class UsageFetcher:
|
|
"""Background daemon thread that periodically fetches usage data.
|
|
|
|
Writes every fetch result to the shared cache file so the CLI statusline
|
|
(and any other consumer) sees the same data. Before fetching, checks if
|
|
the cache is already fresh enough (e.g. populated by the Node.js cron
|
|
fetcher) and skips the API call if so.
|
|
"""
|
|
|
|
def __init__(self, on_update):
|
|
"""on_update(parsed_dict) called on each successful or failed fetch."""
|
|
self._on_update = on_update
|
|
self._stop = threading.Event()
|
|
self._refresh_now = threading.Event()
|
|
self._thread = threading.Thread(target=self._loop, daemon=True)
|
|
self._cfg = config.load()
|
|
self._org_id = self._cfg.get("org_id", "")
|
|
|
|
def start(self):
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
self._refresh_now.set() # unblock wait
|
|
|
|
def refresh(self):
|
|
"""Trigger an immediate fetch (bypasses cache freshness check)."""
|
|
self._refresh_now.set()
|
|
|
|
def set_interval(self, seconds):
|
|
"""Change refresh interval and trigger immediate fetch."""
|
|
self._cfg["refresh_interval"] = seconds
|
|
config.save(self._cfg)
|
|
self._refresh_now.set()
|
|
|
|
@property
|
|
def interval(self):
|
|
return self._cfg.get("refresh_interval", 300)
|
|
|
|
def _loop(self):
|
|
# On startup, try to show cached data immediately
|
|
self._load_from_cache()
|
|
|
|
while not self._stop.is_set():
|
|
forced = self._refresh_now.is_set()
|
|
self._refresh_now.clear()
|
|
self._do_fetch(force=forced)
|
|
self._refresh_now.wait(timeout=self._cfg.get("refresh_interval", 300))
|
|
|
|
def _load_from_cache(self):
|
|
"""Read existing cache for instant display on startup."""
|
|
data, age = read_cache()
|
|
if data and age < self._cfg.get("refresh_interval", 300):
|
|
parsed = parse_usage(data)
|
|
self._on_update(parsed)
|
|
|
|
def _do_fetch(self, force=False):
|
|
"""Fetch from API, or use cache if fresh enough.
|
|
|
|
If force=True (manual refresh), always hit the API.
|
|
Otherwise, skip if cache is younger than half the refresh interval
|
|
(meaning another fetcher like the Node.js cron job already updated it).
|
|
"""
|
|
if not force:
|
|
data, age = read_cache()
|
|
freshness_threshold = self._cfg.get("refresh_interval", 300) / 2
|
|
if data and not data.get("_error") and age < freshness_threshold:
|
|
parsed = parse_usage(data)
|
|
self._on_update(parsed)
|
|
log.debug("Cache is fresh (%.0fs old), skipping API call", age)
|
|
return
|
|
|
|
session_key = config.get_session_key()
|
|
if not session_key:
|
|
error_data = {"_error": "no_session_key"}
|
|
write_cache(error_data)
|
|
self._on_update({"error": "no session key"})
|
|
return
|
|
|
|
try:
|
|
data, org_id = fetch_usage(session_key, self._org_id)
|
|
if org_id and org_id != self._org_id:
|
|
self._org_id = org_id
|
|
self._cfg["org_id"] = org_id
|
|
config.save(self._cfg)
|
|
write_cache(data)
|
|
parsed = parse_usage(data)
|
|
self._on_update(parsed)
|
|
except urllib.error.HTTPError as e:
|
|
error_key = "auth_expired" if e.code in (401, 403) else "api_error"
|
|
error_data = {"_error": error_key, "_status": e.code}
|
|
write_cache(error_data)
|
|
self._on_update({"error": "session expired" if error_key == "auth_expired" else f"HTTP {e.code}"})
|
|
log.warning("Fetch failed: HTTP %d", e.code)
|
|
except Exception as e:
|
|
error_data = {"_error": "fetch_failed", "_message": str(e)}
|
|
write_cache(error_data)
|
|
self._on_update({"error": str(e)})
|
|
log.warning("Fetch failed: %s", e)
|