Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent Subscriber.seq data race and slow replay emits from filling subscriber outbox #45

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/jetstream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ func main() {
Value: 5_000,
EnvVars: []string{"JETSTREAM_MAX_SUB_RATE"},
},
&cli.BoolFlag{
Name: "per-ip-limit",
Usage: "share max-sub-rate limit by client IP",
Value: true,
EnvVars: []string{"JETSTREAM_PER_IP_LIMIT"},
},
&cli.Int64Flag{
Name: "override-relay-cursor",
Usage: "override cursor to start from, if not set will start from the last cursor in the database, if no cursor in the database will start from live",
Expand Down Expand Up @@ -124,7 +130,7 @@ func Jetstream(cctx *cli.Context) error {
return fmt.Errorf("failed to parse ws-url: %w", err)
}

s, err := server.NewServer(cctx.Float64("max-sub-rate"))
s, err := server.NewServer(cctx.Float64("max-sub-rate"), cctx.Bool("per-ip-limit"))
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Server struct {
Consumer *consumer.Consumer
maxSubRate float64
seq int64
limitPerIP bool
perIPLimiters map[string]*rate.Limiter
}

Expand All @@ -40,9 +41,10 @@ var maxConcurrentEmits = int64(100)
var cutoverThresholdUS = int64(1_000_000)
var tracer = otel.Tracer("jetstream-server")

func NewServer(maxSubRate float64) (*Server, error) {
func NewServer(maxSubRate float64, perIPLimit bool) (*Server, error) {
s := Server{
Subscribers: make(map[int64]*Subscriber),
limitPerIP: perIPLimit,
maxSubRate: maxSubRate,
perIPLimiters: make(map[string]*rate.Limiter),
}
Expand Down Expand Up @@ -224,14 +226,15 @@ func (s *Server) HandleSubscribe(c echo.Context) error {
return
}
serverLastSeq := s.GetSeq()
log.Info("finished replaying events", "replay_last_time", time.UnixMicro(lastSeq), "server_last_time", time.UnixMicro(serverLastSeq))
log.Info("finished replaying events", "replay_last_time", time.UnixMicro(lastSeq), "server_last_time", time.UnixMicro(serverLastSeq), "diff", (serverLastSeq - lastSeq) / 1_000)

// If last event replayed is close enough to the last live event, start live tailing
if lastSeq > serverLastSeq-(cutoverThresholdUS/2) {
break
}

// Otherwise, update the cursor and replay again
log.Info("restarting replay", "cursor diff", (lastSeq - *sub.cursor) / 1000)
lastSeq++
sub.SetCursor(&lastSeq)
}
Expand Down Expand Up @@ -310,7 +313,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
defer sub.lk.Unlock()

// Don't emit events to subscribers that are replaying and are too far behind
if sub.cursor != nil && sub.seq < e.TimeUS-cutoverThresholdUS || sub.tearingDown {
if sub.cursor != nil && sub.seq.Load() < e.TimeUS-cutoverThresholdUS || sub.tearingDown {
return
}

Expand Down
37 changes: 31 additions & 6 deletions pkg/server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

const OUTBOX_SIZE = 10_000

type WantedCollections struct {
Prefixes []string
FullPaths map[string]struct{}
Expand All @@ -25,7 +29,7 @@ type Subscriber struct {
conLk sync.Mutex
realIP string
lk sync.Mutex
seq int64
seq atomic.Int64
outbox chan *[]byte
hello chan struct{}
id int64
Expand Down Expand Up @@ -60,8 +64,15 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti
}
}

// slow down replay emits if the outbox is getting filled up
// blocking here is ONLY allowable during replay/playback, never live tailing!
if playback && len(sub.outbox) > (OUTBOX_SIZE/3) {
time.Sleep(100 * time.Millisecond) // sketch but 🤷🏻‍♀️
}

// Skip events that are older than the subscriber's last seen event
if timeUS <= sub.seq {
currentSeq := sub.seq.Load()
if timeUS <= currentSeq {
return nil
}

Expand All @@ -84,7 +95,13 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti
}
return ctx.Err()
case sub.outbox <- &evtBytes:
sub.seq = timeUS
moreCurrentSeq := sub.seq.Swap(timeUS)
if moreCurrentSeq != currentSeq {
log.Warn("subscriber sequence updated while we were using it", "warning", "emit collision", "subscriber", sub.id, "playback", playback)
if moreCurrentSeq > timeUS { // it's possible that it didn't lead to out-of-order events this time
log.Error("emitted to subscriber out of order", "error", "emitted out of order", "subscriber", sub.id, "playback", playback)
}
}
sub.deliveredCounter.Inc()
sub.bytesCounter.Add(float64(len(evtBytes)))
}
Expand All @@ -100,7 +117,13 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti
}
return ctx.Err()
case sub.outbox <- &evtBytes:
sub.seq = timeUS
moreCurrentSeq := sub.seq.Swap(timeUS)
if moreCurrentSeq != currentSeq {
log.Warn("subscriber sequence updated while we were using it", "warning", "emit collision", "subscriber", sub.id, "playback", playback)
if moreCurrentSeq > timeUS { // it's possible that it didn't lead to out-of-order events this time
log.Error("emitted to subscriber out of order", "error", "emitted out of order", "subscriber", sub.id, "playback", playback)
}
}
sub.deliveredCounter.Inc()
sub.bytesCounter.Add(float64(len(evtBytes)))
default:
Expand Down Expand Up @@ -207,13 +230,15 @@ func (s *Server) AddSubscriber(ws *websocket.Conn, realIP string, opts *Subscrib
lim := s.perIPLimiters[realIP]
if lim == nil {
lim = rate.NewLimiter(rate.Limit(s.maxSubRate), int(s.maxSubRate))
s.perIPLimiters[realIP] = lim
if s.limitPerIP {
s.perIPLimiters[realIP] = lim
}
}

sub := Subscriber{
ws: ws,
realIP: realIP,
outbox: make(chan *[]byte, 10_000),
outbox: make(chan *[]byte, OUTBOX_SIZE),
hello: make(chan struct{}),
id: s.nextSub,
wantedCollections: opts.WantedCollections,
Expand Down