// SPDX-License-Identifier: AGPL-3.0-or-later // // Client subscribes to Frigate's /ws WebSocket as a long-lived event consumer. // The wire format is taken verbatim from BirdWatch's existing // EventWebSocketService: login via POST /api/login (Gson body // `{"user":"...","password":"..."}`), then dial /ws with the resulting cookies, // and parse incoming JSON frames whose `topic` field is "events". The frame's // `payload` field is itself a JSON-encoded string; we forward it as-is to the // handler. Reconnect is exponential backoff capped at 5 minutes — same as the // Android client, with the same heartbeat (20s ping) so connection drops are // detected promptly. package frigate import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "net/http/cookiejar" "net/url" "strings" "time" "github.com/gorilla/websocket" ) type EventHandler func(payload []byte) type Client struct { baseURL string username string password string handler EventHandler httpClient *http.Client } // frame is the outer envelope Frigate sends on /ws. type frame struct { Topic string `json:"topic"` Payload string `json:"payload"` } func NewClient(baseURL, username, password string, handler EventHandler) (*Client, error) { jar, err := cookiejar.New(nil) if err != nil { return nil, err } return &Client{ baseURL: strings.TrimRight(baseURL, "/"), username: username, password: password, handler: handler, httpClient: &http.Client{ Timeout: 30 * time.Second, Jar: jar, }, }, nil } // Run blocks until ctx is canceled, reconnecting on every failure with // exponential backoff (1s → 5min cap). func (c *Client) Run(ctx context.Context) { const ( initialBackoff = time.Second maxBackoff = 5 * time.Minute ) backoff := initialBackoff for { if err := ctx.Err(); err != nil { return } err := c.runOnce(ctx) if err == nil || ctx.Err() != nil { return } slog.Warn("frigate connection ended, will retry", "err", err, "next_retry_in", backoff) select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } func (c *Client) runOnce(ctx context.Context) error { if err := c.login(ctx); err != nil { return fmt.Errorf("login: %w", err) } return c.subscribe(ctx) } func (c *Client) login(ctx context.Context) error { if c.username == "" { // Auth-disabled Frigate. Skip login. return nil } body, _ := json.Marshal(map[string]string{ "user": c.username, "password": c.password, }) loginURL := c.baseURL + "/api/login" req, err := http.NewRequestWithContext(ctx, http.MethodPost, loginURL, bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode/100 != 2 { b, _ := io.ReadAll(resp.Body) return fmt.Errorf("frigate login returned %d: %s", resp.StatusCode, truncate(string(b), 200)) } slog.Info("frigate login ok", "url", loginURL) return nil } func (c *Client) subscribe(ctx context.Context) error { wsURL, err := wsURLFromBase(c.baseURL) if err != nil { return err } headers := http.Header{} if cookies := c.httpClient.Jar.Cookies(wsURL); len(cookies) > 0 { var sb strings.Builder for i, ck := range cookies { if i > 0 { sb.WriteString("; ") } sb.WriteString(ck.Name + "=" + ck.Value) } headers.Set("Cookie", sb.String()) } dialer := websocket.Dialer{HandshakeTimeout: 15 * time.Second} conn, _, err := dialer.DialContext(ctx, wsURL.String(), headers) if err != nil { return fmt.Errorf("ws dial: %w", err) } defer conn.Close() slog.Info("frigate ws connected", "url", wsURL.String()) // Read deadline refreshed by every pong; gorilla auto-replies to server pings. _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) conn.SetPongHandler(func(string) error { return conn.SetReadDeadline(time.Now().Add(60 * time.Second)) }) // Local pinger to keep NAT/proxy paths alive. pingerCtx, cancelPinger := context.WithCancel(ctx) defer cancelPinger() go func() { ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() for { select { case <-pingerCtx.Done(): return case <-ticker.C: _ = conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)) } } }() for { if err := ctx.Err(); err != nil { return err } _, msg, err := conn.ReadMessage() if err != nil { return fmt.Errorf("ws read: %w", err) } c.dispatch(msg) } } func (c *Client) dispatch(raw []byte) { var f frame if err := json.Unmarshal(raw, &f); err != nil { slog.Debug("ws frame unparseable, ignoring", "err", err) return } if f.Topic != "events" { // Frigate sends other topics (stats, etc.) — we only forward "events". return } // payload is itself a JSON-encoded string per Frigate's wire format. c.handler([]byte(f.Payload)) } func wsURLFromBase(base string) (*url.URL, error) { u, err := url.Parse(base) if err != nil { return nil, err } switch u.Scheme { case "http": u.Scheme = "ws" case "https": u.Scheme = "wss" default: return nil, fmt.Errorf("unsupported scheme %q (want http or https)", u.Scheme) } u.Path = strings.TrimRight(u.Path, "/") + "/ws" return u, nil } func truncate(s string, n int) string { if len(s) <= n { return s } return s[:n] + "…" }