-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip crdt spec #316
Draft
ripienaar
wants to merge
10
commits into
main
Choose a base branch
from
counter_crdt
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
wip crdt spec #316
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
de40992
wip crdt spec
ripienaar a742314
handle non counter messages on a counter
ripienaar 2a3f9c9
Clarify mirrors
ripienaar 0bae32c
Clarify sources, make float64, handle some more edge cases
ripienaar 7229c23
Move to int64 and value in string
ripienaar 8413875
Remove more floats
ripienaar 441db05
Add some details around topologies, resets, etc
ripienaar 8932953
Better labels for github dark mode
ripienaar 22277ad
Better labels for github dark mode
ripienaar 74e0f20
Do not support rollupt, reject some other headers
ripienaar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
# JetStream Distributed Counter CRDT | ||
|
||
| Metadata | Value | | ||
|----------|------------| | ||
| Date | 2025-01-09 | | ||
| Author | @ripienaar | | ||
| Status | Approved | | ||
| Tags | jetstream | | ||
|
||
| Revision | Date | Author | Info | | ||
|----------|------------|------------|-------------------------| | ||
| 1 | 2025-01-09 | @ripienaar | Document Initial Design | | ||
|
||
## Context and Motivation | ||
|
||
We wish to provide a distributed counter that will function in Clusters, Super Clusters, through Sources and any other way that data might reach a Stream. | ||
|
||
We will start with basic addition and subtraction primitives which will automatically behave as a CRDT and be order independent. | ||
|
||
Related: | ||
|
||
* [Netflix Distributed Counter](https://netflixtechblog.com/netflixs-distributed-counter-abstraction-8d0c45eb66b2) | ||
|
||
## Solution Overview | ||
|
||
A Stream can opt-in to supporting Counters which will allow any subject to be a counter. | ||
|
||
Publishing a message to such a Stream will load the most recent message on the subject, increment its value and save | ||
the new message with the latest value in the body. The header is preserved for downstream processing by Sources and for | ||
visibility and debugging purposes. | ||
|
||
```bash | ||
$ nats s get COUNTER --last-for counter.hits | ||
Item: COUNTER#22802062 received 2025-01-09 18:05:07.93747413 +0000 UTC on Subject counter.hits | ||
|
||
Headers: | ||
Nats-Incr: +2 | ||
|
||
{"val":"100"} | ||
$ nats pub counter.hits '' -J -H "Nats-Incr:+1" | ||
$ nats s get COUNTER --last-for counter.hits | ||
Item: COUNTER#22802063 received 2025-01-09 18:06:00 +0000 UTC on Subject counter.hits | ||
|
||
Headers: | ||
Nats-Incr: +1 | ||
|
||
{"val":"101"} | ||
``` | ||
|
||
## Design and Behavior | ||
|
||
The goal is to support addition and subtraction only and that a Stream that Sources 10s of other Streams all with | ||
counters will effectively create a big combined counter holding totals contributed to by all sources. | ||
|
||
Handling published messages has the follow behavior and constraints: | ||
|
||
* The header holds values like `+1`, `-1` and `-10`, in other words any valid `int64`, if the value fails to parse | ||
the message is rejected with an error. A value of `0` is valid. | ||
* When publishing a message to the subject the last value is loaded by the server receiving the message (so the leader), the body is parsed, incremented and written | ||
into the new message body. The headers are all preserved. | ||
* If the addition will overflow a `int64` in either direction the message will be rejected with an error | ||
* When publishing a message and the previous message do not have a `Nats-Incr` header it means the subject is not a | ||
counter, an error is returned to the user and the message is rejected | ||
* When a message has a `Nats-Rollup`, `Nats-Expected-Last-Sequence` or `Nats-Expected-Subject-Last-Sequence`, `Nats-Expected-Stream` or `Nats-Expected-Last-Msg-Id` header in addition to a `Nats-Incr` header it should be rejected | ||
* When a message with the header is received over a Source, that has the configuration setting enabled, processing is | ||
done as above otherwise the message is discarded | ||
* When a message with the header is published to a Stream without the option set the message is rejected with an error | ||
* When a message with the header is received over a Mirror the message is stored verbatim | ||
* When a message without the header is received and the previous message is a counter this will be rejected since | ||
the subject is then treated as being a counter only. This could be very expensive on Streams with many | ||
non-counter messages being written to them | ||
|
||
The value in the body is stored in a struct with the following layout: | ||
|
||
```go | ||
type CounterValue struct { | ||
ripienaar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Value string `json:"val"` | ||
} | ||
``` | ||
|
||
We use a `string` value since JSON spec would only allow up to 2^53 for number, we might want to support BigInt kind of numbers. | ||
|
||
## Recounts and Audit | ||
|
||
It's important in many scenarios that counter values can be audited and recounts can happen. We preserve the `Nats-Incr` header separate from the body so given streams with limits applied one can manually recount the entire stream. | ||
|
||
In the case of sourced streams the various headers that sourcing adds will provide further providence for a specific click. | ||
|
||
## Counter Resets | ||
|
||
It's important that counters can be reset to zero, in a simple standalone counter this is easily done with a subject purge. In a topology where multiple regions contribute to a total a regional reset might require a negative value to be published equal to the current total. | ||
|
||
The preferred method for Reset should be a negative value being published followed by a purge up to the message holding the negative value, with subject purge being used only for entire counter deletes. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can split the responsibilities here:
Does solve the problem of resetting in more advanced topologies at least. |
||
|
||
## Source-based Replicated Counters | ||
|
||
> [!NOTE] | ||
> | ||
> This is half baked and largely here to solicit discussion | ||
|
||
We want to be able to build large replicated global counters, but this present several challenges, the final goal can be seen in the diagram below: | ||
|
||
![Replicated Counters](images/0048-sources.png) | ||
|
||
When the counter is read at any location in the tree the value shown should be returned. | ||
|
||
To achieve this topology we create sources that copy a subject between location and regions. The problem here would be that in order to get a fully global counter we end up with the same subject in all these Streams and locations and might end up with doube writes etc. | ||
|
||
We might say the regions listen on subjects like `count.es.hits` and we use rewriting during the sourcing to turn that into `count.eu.hits` and eventually `count.hits` with the aggregate streams not listening on any subject. | ||
|
||
In this scenario we lose the ability to access the `count.es.hits` value anywhere other than in the `es` location - not ideal. One could mirror from ES -> EU cluster and then Source in the EU cluster into an aggregate counter thus retaining the source and total. | ||
|
||
Replicated counters further complicates matter wrt counter resets. In a standalone counter one can just purge the subject to reset it. In the replicated case a purge will not replicate, a roll-up would replicate and destroy the entire counter everywhere so the only real option is to publish a negative value. | ||
|
||
### Adding Sources | ||
|
||
Adding a source that already has values in it and that had limits applied will be problematic since we will not have the history to fully recount the message in the target Stream. | ||
|
||
Tooling might use a direct get to retrieve that value in the source, place that value into the target Stream and then starting the sourcing from seq+1, meaning we snapshot the Source value into the target and then only handle new counts. This will scale well and avoid huge recounts, but, would require the target Stream to have a subject. | ||
|
||
## Stream Configuration | ||
|
||
Weather or not a Stream support this behavior should be a configuration opt-in. We want clients to definitely know | ||
when this is supported which the opt-in approach with a boolean on the configuration would make clear. | ||
|
||
```golang | ||
type StreamConfig struct { | ||
// AllowMsgCounter enables the feature | ||
AllowMsgCounter bool `json:"allow_msg_counter"` | ||
} | ||
``` | ||
|
||
Setting this on a Mirror should cause an error. | ||
|
||
This feature can be turned off and on using Stream edits. | ||
|
||
A Stream with this feature on should require API level 1. |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can say that Stream with Counters enabled is always counter only to substantialy reduce the cost of those checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, may be worth starting there, we can always relax later