Skip to content

Commit

Permalink
Merge pull request #32 from NilFoundation/feature/blob-reader
Browse files Browse the repository at this point in the history
sync committee blob reader
  • Loading branch information
oclaw authored Feb 3, 2025
2 parents 254fb91 + 3cb6dfc commit 20dd971
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 1 deletion.
87 changes: 87 additions & 0 deletions nil/cmd/sync_committee_cli/internal/commands/batch_decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package commands

import (
"context"
"errors"
"io"
"os"
"sync"

"github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/encode"
v1 "github.com/NilFoundation/nil/nil/services/synccommittee/core/batches/encode/v1"
"github.com/NilFoundation/nil/nil/services/synccommittee/public"
"github.com/rs/zerolog"
)

type DecodeBatchParams struct {
// one of
BatchId public.BatchId
BatchFile string

OutputFile string
}

type batchIntermediateDecoder interface {
DecodeIntermediate(from io.Reader, to io.Writer) error
}

var (
knownDecoders []batchIntermediateDecoder
decoderLoader sync.Once
)

func initDecoders(logger zerolog.Logger) {
decoderLoader.Do(func() {
knownDecoders = append(knownDecoders,
v1.NewDecoder(logger),
// each new implemented decoder needs to be added here
)
})
}

// TODO embed this call into commands.Executor?
func DecodeBatch(_ context.Context, params *DecodeBatchParams, logger zerolog.Logger) error {
initDecoders(logger)

var batchSource io.ReadSeeker

var emptyBatchId public.BatchId
if params.BatchId != emptyBatchId {
return errors.New("fetching batch directly from the L1 is not supported yet") // TODO
}

if len(params.BatchFile) > 0 {
inFile, err := os.OpenFile(params.BatchFile, os.O_RDONLY, 0o644)
if err != nil {
return err
}
defer inFile.Close()
batchSource = inFile
}

if batchSource == nil {
return errors.New("batch input is not specified")
}

outFile, err := os.OpenFile(params.OutputFile, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
}

for _, decoder := range knownDecoders {
err := decoder.DecodeIntermediate(batchSource, outFile)
if err == nil {
break
}
if !errors.Is(err, encode.ErrInvalidVersion) {
return err
}

// in case of version mismatch reset the input stream offset and try next available decoder
_, err = batchSource.Seek(0, io.SeekStart)
if err != nil {
return err
}
}
return nil
}
22 changes: 22 additions & 0 deletions nil/cmd/sync_committee_cli/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"

Expand Down Expand Up @@ -37,6 +38,9 @@ func execute() error {
}
rootCmd.AddCommand(getTaskTreeCmd)

decodeBatchCmd := buildDecodeBatchCmd(executorParams, logger)
rootCmd.AddCommand(decodeBatchCmd)

return rootCmd.Execute()
}

Expand Down Expand Up @@ -109,6 +113,24 @@ func buildGetTaskTreeCmd(commonParam *commands.ExecutorParams, logger zerolog.Lo
return cmd, nil
}

func buildDecodeBatchCmd(_ *commands.ExecutorParams, logger zerolog.Logger) *cobra.Command {
params := &commands.DecodeBatchParams{}

cmd := &cobra.Command{
Use: "decode-batch",
Short: "Deserialize L1 stored batch with nil transactions into human readable format",
RunE: func(cmd *cobra.Command, args []string) error {
return commands.DecodeBatch(context.Background(), params, logger)
},
}

cmd.Flags().Var(&params.BatchId, "batch-id", "unique ID of L1-stored batch")
cmd.Flags().StringVar(&params.BatchFile, "batch-file", "", "file with binary content of concatenated blobs of the batch")
cmd.Flags().StringVar(&params.OutputFile, "output-file", "", "target file to keep decoded batch data")

return cmd
}

func addCommonFlags(cmd *cobra.Command, params *commands.ExecutorParams) {
cmd.Flags().StringVar(&params.DebugRpcEndpoint, "endpoint", params.DebugRpcEndpoint, "debug rpc endpoint")
cmd.Flags().BoolVar(&params.AutoRefresh, "refresh", params.AutoRefresh, "should the received data be refreshed")
Expand Down
106 changes: 106 additions & 0 deletions nil/services/synccommittee/core/batches/blob/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package blob

import (
"bytes"
"fmt"
"io"

"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/icza/bitio"
)

type reader struct {
blobs []kzg4844.Blob
wordOffset int
blobOffset int
curBlobIdx int
curBitReader *bitio.Reader
}

var _ io.Reader = (*reader)(nil)

func NewReader(blobs []kzg4844.Blob) *reader {
r := &reader{blobs: blobs}
if len(r.blobs) > 0 {
r.curBitReader = bitio.NewReader(bytes.NewReader(r.blobs[0][:]))
}
return r
}

func (r *reader) Read(dst []byte) (int, error) {
dstBits := len(dst) * 8
var buf bytes.Buffer
writer := bitio.NewWriter(&buf)
writtenBits := 0

// read by 64bit chunks with 254bit alignment
// could be optimized to use underlying io.Reader but it seems to be rarely used
// and complicates the code
for !r.eof() && writtenBits < dstBits {
r.wordOffset %= 256

left := dstBits - writtenBits
toRead := uint8(min(left, 64, 254-r.wordOffset))
bits, err := r.readBits(toRead)
if err != nil {
return writtenBits / 8, err
}

r.wordOffset += int(toRead)

if err := writer.WriteBits(bits, toRead); err != nil {
return writtenBits / 8, err
}
writtenBits += int(toRead)

if r.wordOffset == 254 {
_, err := r.readBits(2)
if err != nil {
return writtenBits / 8, err // failed to align
}
r.wordOffset += 2
}
}
copy(dst, buf.Bytes()) // bytes.NewBuffer is an owning call so it is potentially unsafe to use dst without copying

return writtenBits / 8, nil
}

func (r *reader) readBits(n uint8) (uint64, error) {
const blobBitSize = blobSize * 8

if r.eof() {
return 0, io.EOF
}
newOffset := r.blobOffset + int(n)
if newOffset > blobBitSize {
return 0, fmt.Errorf("not aligned blob read is not permitted (current %d requested %d)", r.blobOffset, n)
}

ret, err := r.curBitReader.ReadBits(n)
if err != nil {
return ret, err
}
r.blobOffset = newOffset
if r.blobOffset >= blobBitSize {
r.advance()
}
return ret, err
}

func (r *reader) advance() bool {
if r.curBlobIdx < len(r.blobs) {
r.curBlobIdx++
}
r.curBitReader = nil
if r.eof() {
return false
}
r.curBitReader = bitio.NewReader(bytes.NewReader(r.blobs[r.curBlobIdx][:]))
r.blobOffset = 0
return true
}

func (r *reader) eof() bool {
return r.curBlobIdx >= len(r.blobs)
}
69 changes: 69 additions & 0 deletions nil/services/synccommittee/core/batches/blob/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package blob

import (
"bytes"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBlobReader(t *testing.T) {
t.Parallel()
input := make([]byte, blobSize+blobSize/2)
for i := range input {
input[i] = byte(i & 0xFF)
}
rd := bytes.NewReader(input)

builder := NewBuilder()
blobs, err := builder.MakeBlobs(rd, 2)
require.NoError(t, err)
require.Len(t, blobs, 2)

t.Run("FullRead", func(t *testing.T) {
t.Parallel()

blobReader := NewReader(blobs)
output := make([]byte, len(input))
read, err := blobReader.Read(output)
require.NoError(t, err)
require.Equal(t, len(output), read)
assert.Equal(t, input, output)
})

t.Run("RandomizedRead", func(t *testing.T) {
t.Parallel()

output := make([]byte, len(input))
blobReader := NewReader(blobs)
read := 0
for read < len(output) {
left := len(output) - read
minRead := min(1024, left)
maxRead := max(minRead, min(len(output)/4, left))

readReq := rand.Intn(maxRead + 1) //nolint: gosec

readRes, err := blobReader.Read(output[read : read+readReq])
require.NoError(t, err)
require.Equal(t, readReq, readRes)

read += readRes
}
assert.Equal(t, input, output)
})

t.Run("ExcessiveRead", func(t *testing.T) {
t.Parallel()

blobReader := NewReader(blobs)
output := make([]byte, 2*len(input))
read, err := blobReader.Read(output)
require.NoError(t, err)
payloadInTwoBlobs := 2*blobSize - (((blobSize/32)*2)/8)*2
require.Equal(t, payloadInTwoBlobs, read)
assert.Equal(t, input, output[:len(input)])
})
}
8 changes: 8 additions & 0 deletions nil/services/synccommittee/core/batches/encode/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package encode

import "errors"

var (
ErrInvalidMagic = errors.New("invalid_batch_magic")
ErrInvalidVersion = errors.New("invalid_batch_encoding_version")
)
27 changes: 26 additions & 1 deletion nil/services/synccommittee/core/batches/encode/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package encode

import (
"encoding/binary"
"fmt"
"io"
)

Expand All @@ -19,7 +20,7 @@ func NewBatchHeader(version uint16) BatchHeader {
}
}

func (bh BatchHeader) EncodeTo(out io.Writer) error {
func (bh *BatchHeader) EncodeTo(out io.Writer) error {
if err := binary.Write(out, binary.LittleEndian, bh.Magic); err != nil {
return err
}
Expand All @@ -28,3 +29,27 @@ func (bh BatchHeader) EncodeTo(out io.Writer) error {
}
return nil
}

func (bh *BatchHeader) ReadFrom(in io.Reader) error {
if err := binary.Read(in, binary.LittleEndian, &bh.Magic); err != nil {
return err
}
if bh.Magic != BatchMagic {
return fmt.Errorf("%w: read value %04X", ErrInvalidMagic, bh.Magic)
}
if err := binary.Read(in, binary.LittleEndian, &bh.Version); err != nil {
return err
}
return nil
}

func CheckBatchVersion(in io.Reader, desiredVersion uint16) error {
var bh BatchHeader
if err := bh.ReadFrom(in); err != nil {
return err
}
if bh.Version != desiredVersion {
return fmt.Errorf("%w: version is %04X", ErrInvalidVersion, bh.Version)
}
return nil
}
Loading

0 comments on commit 20dd971

Please sign in to comment.