Initial public release of birdwatch-relay
This commit is contained in:
commit
c176f2ad24
17 changed files with 2025 additions and 0 deletions
227
internal/frigate/client.go
Normal file
227
internal/frigate/client.go
Normal file
|
|
@ -0,0 +1,227 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
//
|
||||
// Client subscribes to Frigate's /ws WebSocket as a long-lived event consumer.
|
||||
// The wire format is taken verbatim from BirdWatch's existing
|
||||
// EventWebSocketService: login via POST /api/login (Gson body
|
||||
// `{"user":"...","password":"..."}`), then dial /ws with the resulting cookies,
|
||||
// and parse incoming JSON frames whose `topic` field is "events". The frame's
|
||||
// `payload` field is itself a JSON-encoded string; we forward it as-is to the
|
||||
// handler. Reconnect is exponential backoff capped at 5 minutes — same as the
|
||||
// Android client, with the same heartbeat (20s ping) so connection drops are
|
||||
// detected promptly.
|
||||
|
||||
package frigate
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type EventHandler func(payload []byte)
|
||||
|
||||
type Client struct {
|
||||
baseURL string
|
||||
username string
|
||||
password string
|
||||
handler EventHandler
|
||||
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// frame is the outer envelope Frigate sends on /ws.
|
||||
type frame struct {
|
||||
Topic string `json:"topic"`
|
||||
Payload string `json:"payload"`
|
||||
}
|
||||
|
||||
func NewClient(baseURL, username, password string, handler EventHandler) (*Client, error) {
|
||||
jar, err := cookiejar.New(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client{
|
||||
baseURL: strings.TrimRight(baseURL, "/"),
|
||||
username: username,
|
||||
password: password,
|
||||
handler: handler,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
Jar: jar,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run blocks until ctx is canceled, reconnecting on every failure with
|
||||
// exponential backoff (1s → 5min cap).
|
||||
func (c *Client) Run(ctx context.Context) {
|
||||
const (
|
||||
initialBackoff = time.Second
|
||||
maxBackoff = 5 * time.Minute
|
||||
)
|
||||
backoff := initialBackoff
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
err := c.runOnce(ctx)
|
||||
if err == nil || ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
slog.Warn("frigate connection ended, will retry",
|
||||
"err", err, "next_retry_in", backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) runOnce(ctx context.Context) error {
|
||||
if err := c.login(ctx); err != nil {
|
||||
return fmt.Errorf("login: %w", err)
|
||||
}
|
||||
return c.subscribe(ctx)
|
||||
}
|
||||
|
||||
func (c *Client) login(ctx context.Context) error {
|
||||
if c.username == "" {
|
||||
// Auth-disabled Frigate. Skip login.
|
||||
return nil
|
||||
}
|
||||
body, _ := json.Marshal(map[string]string{
|
||||
"user": c.username,
|
||||
"password": c.password,
|
||||
})
|
||||
loginURL := c.baseURL + "/api/login"
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, loginURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
b, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("frigate login returned %d: %s", resp.StatusCode, truncate(string(b), 200))
|
||||
}
|
||||
slog.Info("frigate login ok", "url", loginURL)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) subscribe(ctx context.Context) error {
|
||||
wsURL, err := wsURLFromBase(c.baseURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
headers := http.Header{}
|
||||
if cookies := c.httpClient.Jar.Cookies(wsURL); len(cookies) > 0 {
|
||||
var sb strings.Builder
|
||||
for i, ck := range cookies {
|
||||
if i > 0 {
|
||||
sb.WriteString("; ")
|
||||
}
|
||||
sb.WriteString(ck.Name + "=" + ck.Value)
|
||||
}
|
||||
headers.Set("Cookie", sb.String())
|
||||
}
|
||||
|
||||
dialer := websocket.Dialer{HandshakeTimeout: 15 * time.Second}
|
||||
conn, _, err := dialer.DialContext(ctx, wsURL.String(), headers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ws dial: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
slog.Info("frigate ws connected", "url", wsURL.String())
|
||||
|
||||
// Read deadline refreshed by every pong; gorilla auto-replies to server pings.
|
||||
_ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
conn.SetPongHandler(func(string) error {
|
||||
return conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
})
|
||||
|
||||
// Local pinger to keep NAT/proxy paths alive.
|
||||
pingerCtx, cancelPinger := context.WithCancel(ctx)
|
||||
defer cancelPinger()
|
||||
go func() {
|
||||
ticker := time.NewTicker(20 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-pingerCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
_ = conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ws read: %w", err)
|
||||
}
|
||||
c.dispatch(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) dispatch(raw []byte) {
|
||||
var f frame
|
||||
if err := json.Unmarshal(raw, &f); err != nil {
|
||||
slog.Debug("ws frame unparseable, ignoring", "err", err)
|
||||
return
|
||||
}
|
||||
if f.Topic != "events" {
|
||||
// Frigate sends other topics (stats, etc.) — we only forward "events".
|
||||
return
|
||||
}
|
||||
// payload is itself a JSON-encoded string per Frigate's wire format.
|
||||
c.handler([]byte(f.Payload))
|
||||
}
|
||||
|
||||
func wsURLFromBase(base string) (*url.URL, error) {
|
||||
u, err := url.Parse(base)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "http":
|
||||
u.Scheme = "ws"
|
||||
case "https":
|
||||
u.Scheme = "wss"
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported scheme %q (want http or https)", u.Scheme)
|
||||
}
|
||||
u.Path = strings.TrimRight(u.Path, "/") + "/ws"
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n] + "…"
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue