Add desktop tray widget + installer wizard
- Desktop widget (Python/pystray): system tray icon showing 5h usage as circular progress bar with Claude starburst logo, 10-step green-to-red color scale, right-click menu with usage stats and configuration - Shared cache: both widget and CLI statusline read/write the same /tmp/claude_usage.json — only one fetcher needs to run - Installer wizard (install_wizard.py): interactive cross-platform setup with component selection, session key prompt, cron/autostart config - OS wrappers: install.sh (Linux/macOS) and install.ps1 (Windows) find Python 3.9+ and launch the wizard - README with topology diagram, usage docs, and configuration reference Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
240
claude_usage_widget/fetcher.py
Normal file
240
claude_usage_widget/fetcher.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""Usage fetcher — Python port of claude-statusline/fetch-usage.js (urllib only).
|
||||
|
||||
Writes to the same shared cache file as fetch-usage.js so both the CLI
|
||||
statusline and the desktop widget see the same data from a single fetch.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from . import config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_HEADERS = {
|
||||
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0",
|
||||
"Accept": "application/json",
|
||||
"Referer": "https://claude.ai/",
|
||||
"Origin": "https://claude.ai",
|
||||
}
|
||||
|
||||
# Shared cache path — identical to fetch-usage.js default
|
||||
if sys.platform == "win32":
|
||||
_DEFAULT_CACHE = os.path.join(os.environ.get("TEMP", os.path.expanduser("~")), "claude_usage.json")
|
||||
else:
|
||||
_DEFAULT_CACHE = os.path.join(os.environ.get("TMPDIR", "/tmp"), "claude_usage.json")
|
||||
|
||||
CACHE_PATH = os.environ.get("CLAUDE_USAGE_CACHE", _DEFAULT_CACHE)
|
||||
|
||||
|
||||
# -- Cache I/O (compatible with fetch-usage.js format) -----------------------
|
||||
|
||||
|
||||
def write_cache(data):
|
||||
"""Write raw API response (or error dict) to the shared cache file."""
|
||||
cache_dir = os.path.dirname(CACHE_PATH)
|
||||
if cache_dir:
|
||||
os.makedirs(cache_dir, exist_ok=True)
|
||||
with open(CACHE_PATH, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
def read_cache():
|
||||
"""Read the shared cache file. Returns (data_dict, age_seconds) or (None, inf)."""
|
||||
try:
|
||||
mtime = os.path.getmtime(CACHE_PATH)
|
||||
age = time.time() - mtime
|
||||
with open(CACHE_PATH, "r") as f:
|
||||
data = json.load(f)
|
||||
return data, age
|
||||
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
||||
return None, float("inf")
|
||||
|
||||
|
||||
# -- API requests ------------------------------------------------------------
|
||||
|
||||
|
||||
def _request(url, session_key):
|
||||
req = urllib.request.Request(url, headers={
|
||||
**_HEADERS,
|
||||
"Cookie": f"sessionKey={session_key}",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def _discover_org_id(session_key):
|
||||
orgs = _request("https://claude.ai/api/organizations", session_key)
|
||||
if not orgs:
|
||||
raise RuntimeError("No organizations found")
|
||||
return orgs[0]["uuid"]
|
||||
|
||||
|
||||
def fetch_usage(session_key, org_id=""):
|
||||
"""Fetch usage data once. Returns (data_dict, org_id) or raises."""
|
||||
if not org_id:
|
||||
org_id = _discover_org_id(session_key)
|
||||
data = _request(f"https://claude.ai/api/organizations/{org_id}/usage", session_key)
|
||||
return data, org_id
|
||||
|
||||
|
||||
# -- Parsing -----------------------------------------------------------------
|
||||
|
||||
|
||||
def parse_usage(data):
|
||||
"""Extract display-friendly usage info from raw API response.
|
||||
|
||||
Returns dict with keys:
|
||||
five_hour_pct, five_hour_resets_at, five_hour_resets_in,
|
||||
seven_day_pct, seven_day_resets_at, seven_day_resets_in,
|
||||
error
|
||||
"""
|
||||
if data is None:
|
||||
return {"error": "no data"}
|
||||
|
||||
# Error states written by either fetcher
|
||||
if isinstance(data.get("_error"), str):
|
||||
err = data["_error"]
|
||||
if err == "auth_expired":
|
||||
return {"error": "session expired"}
|
||||
return {"error": err}
|
||||
|
||||
result = {}
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
for window, prefix in [("five_hour", "five_hour"), ("seven_day", "seven_day")]:
|
||||
block = data.get(window, {})
|
||||
pct = round(block.get("utilization", 0))
|
||||
resets_at = block.get("resets_at", "")
|
||||
resets_in = ""
|
||||
if resets_at:
|
||||
try:
|
||||
dt = datetime.fromisoformat(resets_at)
|
||||
delta = dt - now
|
||||
total_seconds = max(0, int(delta.total_seconds()))
|
||||
days = total_seconds // 86400
|
||||
hours = (total_seconds % 86400) // 3600
|
||||
minutes = (total_seconds % 3600) // 60
|
||||
if days > 0:
|
||||
resets_in = f"{days}d {hours}h"
|
||||
elif hours > 0:
|
||||
resets_in = f"{hours}h {minutes}m"
|
||||
else:
|
||||
resets_in = f"{minutes}m"
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
result[f"{prefix}_pct"] = pct
|
||||
result[f"{prefix}_resets_at"] = resets_at
|
||||
result[f"{prefix}_resets_in"] = resets_in
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# -- Background fetcher thread -----------------------------------------------
|
||||
|
||||
|
||||
class UsageFetcher:
|
||||
"""Background daemon thread that periodically fetches usage data.
|
||||
|
||||
Writes every fetch result to the shared cache file so the CLI statusline
|
||||
(and any other consumer) sees the same data. Before fetching, checks if
|
||||
the cache is already fresh enough (e.g. populated by the Node.js cron
|
||||
fetcher) and skips the API call if so.
|
||||
"""
|
||||
|
||||
def __init__(self, on_update):
|
||||
"""on_update(parsed_dict) called on each successful or failed fetch."""
|
||||
self._on_update = on_update
|
||||
self._stop = threading.Event()
|
||||
self._refresh_now = threading.Event()
|
||||
self._thread = threading.Thread(target=self._loop, daemon=True)
|
||||
self._cfg = config.load()
|
||||
self._org_id = self._cfg.get("org_id", "")
|
||||
|
||||
def start(self):
|
||||
self._thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
self._refresh_now.set() # unblock wait
|
||||
|
||||
def refresh(self):
|
||||
"""Trigger an immediate fetch (bypasses cache freshness check)."""
|
||||
self._refresh_now.set()
|
||||
|
||||
def set_interval(self, seconds):
|
||||
"""Change refresh interval and trigger immediate fetch."""
|
||||
self._cfg["refresh_interval"] = seconds
|
||||
config.save(self._cfg)
|
||||
self._refresh_now.set()
|
||||
|
||||
@property
|
||||
def interval(self):
|
||||
return self._cfg.get("refresh_interval", 300)
|
||||
|
||||
def _loop(self):
|
||||
# On startup, try to show cached data immediately
|
||||
self._load_from_cache()
|
||||
|
||||
while not self._stop.is_set():
|
||||
forced = self._refresh_now.is_set()
|
||||
self._refresh_now.clear()
|
||||
self._do_fetch(force=forced)
|
||||
self._refresh_now.wait(timeout=self._cfg.get("refresh_interval", 300))
|
||||
|
||||
def _load_from_cache(self):
|
||||
"""Read existing cache for instant display on startup."""
|
||||
data, age = read_cache()
|
||||
if data and age < self._cfg.get("refresh_interval", 300):
|
||||
parsed = parse_usage(data)
|
||||
self._on_update(parsed)
|
||||
|
||||
def _do_fetch(self, force=False):
|
||||
"""Fetch from API, or use cache if fresh enough.
|
||||
|
||||
If force=True (manual refresh), always hit the API.
|
||||
Otherwise, skip if cache is younger than half the refresh interval
|
||||
(meaning another fetcher like the Node.js cron job already updated it).
|
||||
"""
|
||||
if not force:
|
||||
data, age = read_cache()
|
||||
freshness_threshold = self._cfg.get("refresh_interval", 300) / 2
|
||||
if data and not data.get("_error") and age < freshness_threshold:
|
||||
parsed = parse_usage(data)
|
||||
self._on_update(parsed)
|
||||
log.debug("Cache is fresh (%.0fs old), skipping API call", age)
|
||||
return
|
||||
|
||||
session_key = config.get_session_key()
|
||||
if not session_key:
|
||||
error_data = {"_error": "no_session_key"}
|
||||
write_cache(error_data)
|
||||
self._on_update({"error": "no session key"})
|
||||
return
|
||||
|
||||
try:
|
||||
data, org_id = fetch_usage(session_key, self._org_id)
|
||||
if org_id and org_id != self._org_id:
|
||||
self._org_id = org_id
|
||||
self._cfg["org_id"] = org_id
|
||||
config.save(self._cfg)
|
||||
write_cache(data)
|
||||
parsed = parse_usage(data)
|
||||
self._on_update(parsed)
|
||||
except urllib.error.HTTPError as e:
|
||||
error_key = "auth_expired" if e.code in (401, 403) else "api_error"
|
||||
error_data = {"_error": error_key, "_status": e.code}
|
||||
write_cache(error_data)
|
||||
self._on_update({"error": "session expired" if error_key == "auth_expired" else f"HTTP {e.code}"})
|
||||
log.warning("Fetch failed: HTTP %d", e.code)
|
||||
except Exception as e:
|
||||
error_data = {"_error": "fetch_failed", "_message": str(e)}
|
||||
write_cache(error_data)
|
||||
self._on_update({"error": str(e)})
|
||||
log.warning("Fetch failed: %s", e)
|
||||
Reference in New Issue
Block a user