Skip to content

Commit

Permalink
feat: fix record size bigger than batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
digikata authored and fraidev committed Sep 26, 2024
1 parent ef486e0 commit e8479a3
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 69 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-protocol"
edition = "2021"
version = "0.11.0"
version = "0.11.1"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio streaming protocol"
repository = "https://github.com/infinyon/fluvio"
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-protocol/src/record/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ mod test {
assert_eq!(value.len(), 3);
assert_eq!(value[0], 0x64);

let hdr = record.get_header();
assert_eq!(hdr.attributes, 0i8);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.23.2"
version = "0.23.3"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
114 changes: 88 additions & 26 deletions crates/fluvio/src/producer/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::producer::ProducerError;
use crate::error::Result;

use super::event::EventHandler;
use super::memory_batch::MemoryBatch;
use super::memory_batch::{BatchRecordStatus, BatchSize, MemoryBatch};

const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -117,15 +117,27 @@ impl RecordAccumulator {
batches = guard;
}
if let Some(batch) = batches.back_mut() {
if let Some(push_record) = batch.push_record(record.clone()) {
if batch.is_full() {
match batch.push_record(record.clone()) {
PushRecordStatus::Ok(push_record) => {
if batch.is_full() {
batch_events.notify_batch_full().await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
}
PushRecordStatus::DontFit(batch_size, record) => {
batch_events.notify_batch_full().await;
return self
.create_batch_single_record(
batch_size,
record,
&mut batches,
batch_events,
partition_id,
)
.await;
}
return Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
));
} else {
batch_events.notify_batch_full().await;
}
}

Expand All @@ -137,20 +149,53 @@ impl RecordAccumulator {
let mut batch = ProducerBatch::new(self.batch_size, self.compression);

match batch.push_record(record) {
Some(push_record) => {
PushRecordStatus::Ok(push_record) => {
batch_events.notify_new_batch().await;

if batch.is_full() {
batch_events.notify_batch_full().await;
}

batches.push_back(batch);
Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}
PushRecordStatus::DontFit(size, record) => {
// send the single record without batch size limit
self.create_batch_single_record(
size,
record,
&mut batches,
batch_events,
partition_id,
)
.await
}
}
}

pub async fn create_batch_single_record(
&self,
batch_size: usize,
record: Record,
batches: &mut VecDeque<ProducerBatch>,
batch_events: &BatchEvents,
partition_id: PartitionId,
) -> Result<PushRecord, ProducerError> {
let mut batch = ProducerBatch::new(batch_size, self.compression);
match batch.push_record(record) {
PushRecordStatus::Ok(push_record) => {
batch_events.notify_new_batch().await;
batches.push_back(batch);
Ok(PushRecord::new(
push_record.into_future_record_metadata(partition_id),
))
}
None => Err(ProducerError::RecordTooLarge(self.batch_size)),
PushRecordStatus::DontFit(size, _record) => {
// This should never happen, as we are creating a batch with a single record, with
// the same size as the record
// Only if we add a max_size option to the record (not batch)
Err(ProducerError::RecordTooLarge(size))
}
}
}

Expand All @@ -170,6 +215,23 @@ where {
}
}

pub enum PushRecordStatus {
Ok(PartialFutureRecordMetadata),
DontFit(BatchSize, Record),
}

impl PushRecordStatus {
#[allow(dead_code)]
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_))
}

#[allow(dead_code)]
pub fn dont_fit(&self) -> bool {
matches!(self, Self::DontFit(_, _))
}
}

pub(crate) struct ProducerBatch {
pub(crate) notify: Sender<ProducePartitionResponseFuture>,
batch_metadata: Arc<BatchMetadata>,
Expand All @@ -191,13 +253,13 @@ impl ProducerBatch {
/// Add a record to the batch.
/// Return ProducerError::BatchFull if record does not fit in the batch, so
/// the RecordAccumulator can create more batches if needed.
fn push_record(&mut self, record: Record) -> Option<PartialFutureRecordMetadata> {
fn push_record(&mut self, record: Record) -> PushRecordStatus {
match self.batch.push_record(record) {
None => None,
Some(relative_offset) => Some(PartialFutureRecordMetadata::new(
relative_offset,
self.batch_metadata.clone(),
)),
BatchRecordStatus::NotFull(relative_offset)
| BatchRecordStatus::Full(relative_offset) => PushRecordStatus::Ok(
PartialFutureRecordMetadata::new(relative_offset, self.batch_metadata.clone()),
),
BatchRecordStatus::DontFit(size, record) => PushRecordStatus::DontFit(size, record),
}
}

Expand Down Expand Up @@ -334,13 +396,13 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_ok());
assert!(pb.push_record(record.clone()).is_ok());
assert!(pb.push_record(record.clone()).is_ok());

assert!(!pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).dont_fit());
}

#[test]
Expand All @@ -356,13 +418,13 @@ mod test {
Compression::None,
);

assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_some());
assert!(pb.push_record(record.clone()).is_ok());
assert!(pb.push_record(record.clone()).is_ok());
assert!(pb.push_record(record.clone()).is_ok());

assert!(pb.is_full());

assert!(pb.push_record(record).is_none());
assert!(pb.push_record(record).dont_fit());
}

#[fluvio_future::test]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/src/producer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::producer::PartitionId;
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]
pub enum ProducerError {
#[error("the given record is larger than the buffer max_size ({0} bytes). Try increasing the producer batch size or reducing the record size enabling a compression algorithm")]
#[error("the given record is larger than the buffer max_size ({0} bytes)")]
RecordTooLarge(usize),
#[error("failed to send record metadata: {0}")]
SendRecordMetadata(#[from] async_channel::SendError<RecordMetadata>),
Expand Down
97 changes: 60 additions & 37 deletions crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,39 @@ use fluvio_types::Timestamp;

use super::*;

pub type BatchSize = usize;
pub enum BatchRecordStatus {
DontFit(BatchSize, Record), // Actual record does not fit in the batch
Full(Offset), // Actual record fits in the batch and the batch is full.
NotFull(Offset), // Actual record fits in the batch and the batch is not full.
}

impl BatchRecordStatus {
pub fn fit(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => true,
}
}

pub fn is_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => true,
BatchRecordStatus::NotFull(_) => false,
}
}

pub fn not_full(&self) -> bool {
match self {
BatchRecordStatus::DontFit(_, _) => false,
BatchRecordStatus::Full(_) => false,
BatchRecordStatus::NotFull(_) => true,
}
}
}

pub struct MemoryBatch {
compression: Compression,
write_limit: usize,
Expand Down Expand Up @@ -35,7 +68,7 @@ impl MemoryBatch {

/// Add a record to the batch.
/// The value of `Offset` is relative to the `MemoryBatch` instance.
pub fn push_record(&mut self, mut record: Record) -> Option<Offset> {
pub fn push_record(&mut self, mut record: Record) -> BatchRecordStatus {
let current_offset = self.offset() as i64;
record
.get_mut_header()
Expand All @@ -45,21 +78,21 @@ impl MemoryBatch {
record.get_mut_header().set_timestamp_delta(timestamp_delta);

let record_size = record.write_size(0);
let est_size = self.estimated_size() + record_size;

if self.estimated_size() + record_size > self.write_limit {
self.is_full = true;
return None;
}

if self.estimated_size() + record_size == self.write_limit {
self.is_full = true;
if est_size > self.write_limit {
return BatchRecordStatus::DontFit(est_size, record);
}

self.current_size_uncompressed += record_size;

self.records.push(record);

Some(current_offset)
if est_size == self.write_limit {
self.is_full = true;
return BatchRecordStatus::Full(current_offset);
}

BatchRecordStatus::NotFull(current_offset)
}

pub fn is_full(&self) -> bool {
Expand All @@ -73,23 +106,7 @@ impl MemoryBatch {
}

fn estimated_size(&self) -> usize {
(self.current_size_uncompressed as f32 * self.compression_coefficient()) as usize
+ Batch::<RawRecords>::default().write_size(0)
}

fn compression_coefficient(&self) -> f32 {
cfg_if::cfg_if! {
if #[cfg(feature = "compress")] {
match self.compression {
Compression::None => 1.0,
Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => {
0.5
}
}
} else {
1.0
}
}
self.current_size_uncompressed + Batch::<RawRecords>::default().write_size(0)
}

pub fn records_len(&self) -> usize {
Expand Down Expand Up @@ -157,13 +174,13 @@ mod test {
Compression::None,
);

assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).not_full());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).not_full());
std::thread::sleep(std::time::Duration::from_millis(100));
let record = Record::from(("key", "value"));
assert!(mb.push_record(record).is_some());
assert!(mb.push_record(record).not_full());

let batch: Batch<MemoryRecords> = mb.into();
assert!(
Expand Down Expand Up @@ -204,17 +221,23 @@ mod test {
let memory_batch_compression = Compression::Gzip;

// This MemoryBatch write limit is minimal value to pass test
let mut memory_batch = MemoryBatch::new(180, memory_batch_compression);
let mut memory_batch = MemoryBatch::new(360, memory_batch_compression);

let mut offset = 0;

for _ in 0..num_records {
offset = memory_batch
.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
})
.expect("Offset should exist");
let status = memory_batch.push_record(Record {
value: RecordData::from(record_data.clone()),
..Default::default()
});

if let BatchRecordStatus::Full(o) = status {
offset = o;
} else if let BatchRecordStatus::NotFull(o) = status {
offset = o;
} else {
panic!("Offset should exist");
}
}

let memory_batch_records_len = memory_batch.records_len();
Expand Down
Loading

0 comments on commit e8479a3

Please sign in to comment.