Files
syncwarden/internal/monitor/monitor.go
Axel Meyer 59a98843f7
Some checks failed
CI / lint (push) Failing after 27s
CI / test (push) Successful in 30s
Release / build (push) Failing after 2m33s
v0.3.0: fix HTTP client leak, add tests and CI pipeline
Reuse a single long-poll HTTP client instead of creating one per
Events() call (~every 30s). Make TLS skip-verify configurable via
syncthing_insecure_tls. Log previously swallowed config errors.
Add unit tests for all monitor trackers, config, and state logic.
Add CI workflow (vet, golangci-lint, govulncheck, go test -race).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 00:36:52 +01:00

387 lines
7.9 KiB
Go

package monitor
import (
"encoding/json"
"log"
"path/filepath"
"sync"
"time"
"git.davoryn.de/calic/syncwarden/internal/config"
"git.davoryn.de/calic/syncwarden/internal/icons"
st "git.davoryn.de/calic/syncwarden/internal/syncthing"
)
// StatusCallback is called whenever the aggregate status changes.
type StatusCallback func(AggregateStatus)
// EventCallback is called for notable events (for notifications).
type EventCallback func(eventType string, data map[string]string)
// Monitor coordinates all tracking and polling.
type Monitor struct {
mu sync.Mutex
client *st.Client
cfg config.Config
callback StatusCallback
eventCb EventCallback
speed *SpeedTracker
folders *FolderTracker
recent *RecentTracker
conflicts *ConflictTracker
events *st.EventListener
stopCh chan struct{}
wg sync.WaitGroup
connected bool
paused bool
lastSync time.Time
devicesTotal int
devicesOnline int
pendingDevs int
}
// New creates a new Monitor.
func New(client *st.Client, cfg config.Config, callback StatusCallback, eventCb EventCallback) *Monitor {
return &Monitor{
client: client,
cfg: cfg,
callback: callback,
eventCb: eventCb,
speed: NewSpeedTracker(),
folders: NewFolderTracker(),
recent: NewRecentTracker(),
conflicts: NewConflictTracker(),
stopCh: make(chan struct{}),
}
}
// Start begins all monitoring goroutines.
func (m *Monitor) Start() {
// Start event listener
m.events = st.NewEventListener(m.client, m.cfg.LastEventID, m.onEvents)
m.events.Start()
// Start periodic poller
m.wg.Add(1)
go m.pollLoop()
// Initial full refresh
go m.fullRefresh()
}
// Stop halts all monitoring.
func (m *Monitor) Stop() {
close(m.stopCh)
if m.events != nil {
m.events.Stop()
}
m.wg.Wait()
// Persist last event ID
m.mu.Lock()
m.cfg.LastEventID = m.events.LastEventID()
m.mu.Unlock()
if err := config.Save(m.cfg); err != nil {
log.Printf("config save error: %v", err)
}
}
func (m *Monitor) pollLoop() {
defer m.wg.Done()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.pollConnections()
m.pollHealth()
}
}
}
func (m *Monitor) pollHealth() {
_, err := m.client.Health()
m.mu.Lock()
wasConnected := m.connected
m.connected = err == nil
m.mu.Unlock()
if !wasConnected && err == nil {
// Reconnected — do a full refresh
go m.fullRefresh()
}
if wasConnected && err != nil {
m.emitStatus()
}
}
func (m *Monitor) pollConnections() {
conns, err := m.client.SystemConnections()
if err != nil {
return
}
m.speed.Update(conns.Total.InBytesTotal, conns.Total.OutBytesTotal)
online := 0
for _, c := range conns.Connections {
if c.Connected {
online++
}
}
m.mu.Lock()
m.devicesOnline = online
m.mu.Unlock()
m.emitStatus()
}
func (m *Monitor) fullRefresh() {
// Get config (folders + devices)
cfg, err := m.client.Config()
if err != nil {
log.Printf("config fetch error: %v", err)
return
}
m.folders.UpdateFromConfig(cfg.Folders)
m.mu.Lock()
m.devicesTotal = len(cfg.Devices)
m.connected = true
m.mu.Unlock()
// Query each folder's status
allPaused := true
for _, f := range cfg.Folders {
if !f.Paused {
allPaused = false
}
status, err := m.client.FolderStatus(f.ID)
if err != nil {
continue
}
m.folders.UpdateStatus(f.ID, status.State)
}
m.mu.Lock()
m.paused = allPaused && len(cfg.Folders) > 0
m.mu.Unlock()
// Check pending devices
pending, err := m.client.PendingDevices()
if err == nil {
m.mu.Lock()
m.pendingDevs = len(pending)
m.mu.Unlock()
}
m.emitStatus()
}
func (m *Monitor) onEvents(events []st.Event) {
for _, ev := range events {
switch ev.Type {
case "StateChanged":
m.handleStateChanged(ev)
case "ItemFinished":
m.handleItemFinished(ev)
case "DeviceConnected":
m.handleDeviceEvent(ev, true)
case "DeviceDisconnected":
m.handleDeviceEvent(ev, false)
case "PendingDevicesChanged":
go m.refreshPendingDevices()
case "FolderCompletion", "FolderSummary":
// Trigger a folder status refresh
go m.refreshFolderStatuses()
}
}
m.emitStatus()
}
func (m *Monitor) handleStateChanged(ev st.Event) {
data, ok := ev.Data.(map[string]any)
if !ok {
return
}
folder, _ := data["folder"].(string)
from, _ := data["from"].(string)
to, _ := data["to"].(string)
if folder != "" && to != "" {
m.folders.UpdateStatus(folder, to)
// Notify when folder finishes syncing
if from == "syncing" && to == "idle" {
m.emitEvent("SyncComplete", map[string]string{
"folder": folderLabel(m.folders.Folders(), folder),
})
}
}
}
func (m *Monitor) handleItemFinished(ev st.Event) {
data, ok := ev.Data.(map[string]any)
if !ok {
return
}
item, _ := data["item"].(string)
folder, _ := data["folder"].(string)
errStr, _ := data["error"].(string)
action, _ := data["action"].(string)
if errStr != "" {
if isConflict(errStr) {
m.conflicts.Increment()
m.emitEvent("Conflict", map[string]string{
"file": filepath.Base(item),
"folder": folderLabel(m.folders.Folders(), folder),
})
}
return
}
if item != "" && folder != "" && action != "delete" {
m.recent.Add(filepath.Base(item), folderLabel(m.folders.Folders(), folder))
m.mu.Lock()
m.lastSync = time.Now()
m.mu.Unlock()
}
}
func (m *Monitor) handleDeviceEvent(ev st.Event, connected bool) {
// Re-count online devices
go func() {
conns, err := m.client.SystemConnections()
if err != nil {
return
}
online := 0
for _, c := range conns.Connections {
if c.Connected {
online++
}
}
m.mu.Lock()
m.devicesOnline = online
m.mu.Unlock()
m.emitStatus()
}()
data, ok := ev.Data.(map[string]any)
if !ok {
return
}
deviceName, _ := data["name"].(string)
if deviceName == "" {
deviceName, _ = data["id"].(string)
}
if connected {
m.emitEvent("DeviceConnected", map[string]string{"name": deviceName})
} else {
m.emitEvent("DeviceDisconnected", map[string]string{"name": deviceName})
}
}
func (m *Monitor) refreshPendingDevices() {
pending, err := m.client.PendingDevices()
if err != nil {
return
}
m.mu.Lock()
oldCount := m.pendingDevs
m.pendingDevs = len(pending)
m.mu.Unlock()
if len(pending) > oldCount {
for _, dev := range pending {
name := dev.Name
if name == "" {
name = dev.DeviceID[:8]
}
m.emitEvent("NewDevice", map[string]string{"name": name})
}
}
m.emitStatus()
}
func (m *Monitor) emitEvent(eventType string, data map[string]string) {
if m.eventCb != nil {
m.eventCb(eventType, data)
}
}
func (m *Monitor) refreshFolderStatuses() {
for _, f := range m.folders.Folders() {
status, err := m.client.FolderStatus(f.ID)
if err != nil {
continue
}
m.folders.UpdateStatus(f.ID, status.State)
}
m.emitStatus()
}
func (m *Monitor) emitStatus() {
down, up := m.speed.Rates()
folders := m.folders.Folders()
m.mu.Lock()
status := AggregateStatus{
DevicesTotal: m.devicesTotal,
DevicesOnline: m.devicesOnline,
DownRate: down,
UpRate: up,
LastSync: m.lastSync,
Paused: m.paused,
RecentFiles: m.recent.Files(),
ConflictCount: m.conflicts.Count(),
Folders: folders,
PendingDevices: m.pendingDevs,
}
if !m.connected {
status.State = icons.StateDisconnected
} else {
status.State = stateFromFolders(folders, m.paused)
}
m.mu.Unlock()
m.callback(status)
}
// EventData returns the event data field as a typed map.
func EventData(ev st.Event) map[string]any {
if data, ok := ev.Data.(map[string]any); ok {
return data
}
// Try JSON re-marshal for nested types
b, err := json.Marshal(ev.Data)
if err != nil {
return nil
}
var data map[string]any
if json.Unmarshal(b, &data) == nil {
return data
}
return nil
}
func isConflict(errStr string) bool {
return errStr == "conflict" || errStr == "conflicting changes"
}
func folderLabel(folders []FolderInfo, id string) string {
for _, f := range folders {
if f.ID == id {
return f.Label
}
}
return id
}