Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolaydubina committed Feb 22, 2024
1 parent 024a993 commit 7499456
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 1 deletion.
29 changes: 29 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Test

on:
push:
branches: [master]

jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.22
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Test
run: go test -v -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.txt
verbose: true
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
# aws-s3-reader
#### aws-s3-reader

Efficient reader for large S3 files.

* zero-memory copy
* `Seek()` via `Byte-Range` offsets
* early HTTP Body termination
166 changes: 166 additions & 0 deletions aws_s3_reader_seeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package awss3reader

import (
"errors"
"fmt"
"io"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)

// ChunkSizePolicy is something that can tell how much data to fetch in single request for given S3 Object.
// With more advanced policies, Visit methods will be integrated.
type ChunkSizePolicy interface {
ChunkSize() int
}

// FixedChunkSizePolicy always returns same chunk size.
type FixedChunkSizePolicy struct {
Size int
}

func (s FixedChunkSizePolicy) ChunkSize() int { return s.Size }

// S3ReadSeeker is a reader of given S3 Object.
// It utilizes HTTP Byte Ranges to read chunks of data from S3 Object.
// It uses zero-memory copy from underlying HTTP Body response.
// It uses early HTTP Body termination, if seeks are beyond current HTTP Body.
// It uses adaptive policy for chunk size fetching.
// This is useful for iterating over very large S3 Objects.
type S3ReadSeeker struct {
s3client *s3.S3
bucket string
key string
offset int64 // in s3 object
size int64 // in s3 object
lastByte int64 // in s3 object that we expect to have in current HTTP Body
chunkSizePolicy ChunkSizePolicy
r io.ReadCloser // temporary holder for current reader
sink []byte // where to read bytes discarding data from readers during in-body seek
}

func NewS3ReadSeeker(
s3client *s3.S3,
bucket string,
key string,
minChunkSize int,
chunkSizePolicy ChunkSizePolicy,
) *S3ReadSeeker {
return &S3ReadSeeker{
s3client: s3client,
bucket: bucket,
key: key,
chunkSizePolicy: chunkSizePolicy,
sink: make([]byte, minChunkSize),
}
}

// Seek assumes always can seek to position in S3 object.
// Seeking beyond S3 file size will result failures in Read calls.
func (s *S3ReadSeeker) Seek(offset int64, whence int) (int64, error) {
discardBytes := 0

switch whence {
case io.SeekCurrent:
discardBytes = int(offset)
s.offset += offset
case io.SeekStart:
// seeking backwards results in dropping current http body.
// since http body reader can read only forwards.
if offset < s.offset {
s.reset()
}
discardBytes = int(offset - s.offset)
s.offset = offset
default:
return 0, errors.New("unsupported whence")
}

if s.offset > s.lastByte {
s.reset()
discardBytes = 0
}

if discardBytes > 0 {
// not seeking
if discardBytes > len(s.sink) {
s.sink = make([]byte, discardBytes)
}
n, err := s.r.Read(s.sink[:discardBytes])
if err != nil || n < discardBytes {
s.reset()
}
}

return s.offset, nil
}

func (s *S3ReadSeeker) Close() error {
if s.r != nil {
return s.r.Close()
}
return nil
}

func (s *S3ReadSeeker) Read(b []byte) (int, error) {
if s.r == nil {
if err := s.fetch(s.chunkSizePolicy.ChunkSize()); err != nil {
return 0, err
}
}

n, err := s.r.Read(b)
s.offset += int64(n)

if err != nil && errors.Is(err, io.EOF) {
return n, s.fetch(s.chunkSizePolicy.ChunkSize())
}

return n, err
}

func (s *S3ReadSeeker) reset() {
if s.r != nil {
s.r.Close()
}
s.r = nil
s.lastByte = 0
}

func (s *S3ReadSeeker) getSize() int {
if s.size > 0 {
return int(s.size)
}
resp, err := s.s3client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
})
if err != nil {
return 0
}
s.size = *resp.ContentLength
return int(s.size)
}

func (s *S3ReadSeeker) fetch(n int) error {
s.reset()

n = min(n, s.getSize()-int(s.offset))
if n <= 0 {
return io.EOF
}

// note, that HTTP Byte Ranges is inclusive range of start-byte and end-byte
s.lastByte = s.offset + int64(n) - 1
resp, err := s.s3client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", s.offset, s.lastByte)),
})
if err != nil {
return fmt.Errorf("cannot fetch bytes=%d-%d: %w", s.offset, s.lastByte, err)
}
s.r = resp.Body
return nil
}
64 changes: 64 additions & 0 deletions aws_s3_reader_seeker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package awss3reader_test

import (
"bytes"
"io"
"os"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
awss3reader "github.com/nikolaydubina/aws-s3-reader"
)

func TestS3ReadSeeker(t *testing.T) {
mySession := session.Must(session.NewSession(
aws.NewConfig().WithRegion("ap-southeast-1"),
))
s3client := s3.New(mySession)

bucket := "nikolaydubina-blog-public"
key := "photos/2021-12-20-4.jpeg"

r := awss3reader.NewS3ReadSeeker(
s3client,
bucket,
key,
1<<10*100,
awss3reader.FixedChunkSizePolicy{Size: 1 << 10 * 100}, // 100 KB
)

got, err := io.ReadAll(r)
if err != nil {
t.Fatal(err)
}

downloader := s3manager.NewDownloader(mySession)
f, err := os.CreateTemp("", "s3reader")
if err != nil {
t.Fatal(err)
}
defer f.Close()

n, err := downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
t.Fatal(err)
}
exp, err := os.ReadFile(f.Name())
if err != nil {
t.Fatal(err)
}
if n != int64(len(exp)) {
t.Errorf("expected %d bytes, got %d", len(exp), n)
}

if !bytes.Equal(exp, got) {
os.WriteFile("got", got, 0644)
t.Errorf("expected %d bytes, got %d", len(exp), len(got))
}
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/nikolaydubina/aws-s3-reader

go 1.22.0

require github.com/aws/aws-sdk-go v1.50.23

require github.com/jmespath/go-jmespath v0.4.0 // indirect
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/aws/aws-sdk-go v1.50.23 h1:BB99ohyCmq6O7m5RvjN2yqTt57snL8OhDvfxEvM6ihs=
github.com/aws/aws-sdk-go v1.50.23/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

0 comments on commit 7499456

Please sign in to comment.