From b381db24d67f69b84cfb593d403a268b5aea8da6 Mon Sep 17 00:00:00 2001 From: Aroooba Date: Wed, 26 Jul 2023 08:39:29 +0900 Subject: [PATCH] Move blocking send operation to executor thread --- .../producer/impl/KafkaWriteStreamImpl.java | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java index 2214d371..0c23fdb1 100644 --- a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java +++ b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java @@ -35,6 +35,8 @@ import java.time.Duration; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Kafka write stream implementation @@ -50,6 +52,7 @@ public class KafkaWriteStreamImpl implements KafkaWriteStream { private final ProducerTracer tracer; private final TaskQueue taskQueue; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); public KafkaWriteStreamImpl(Vertx vertx, Producer producer, KafkaClientOptions options) { ContextInternal ctxInt = ((ContextInternal) vertx.getOrCreateContext()).unwrap(); this.producer = producer; @@ -77,42 +80,44 @@ public Future send(ProducerRecord record) { return ctx.executeBlocking(() -> { Promise prom = ctx.promise(); try { - this.producer.send(record, (metadata, err) -> { + executor.execute(() -> { + this.producer.send(record, (metadata, err) -> { - // callback from Kafka IO thread - ctx.runOnContext(v1 -> { - synchronized (KafkaWriteStreamImpl.this) { + // callback from Kafka IO thread + ctx.runOnContext(v1 -> { + synchronized (KafkaWriteStreamImpl.this) { - // if exception happens, no record written - if (err != null) { + // if exception happens, no record written + if (err != null) { - if (this.exceptionHandler != null) { - Handler exceptionHandler = this.exceptionHandler; - ctx.runOnContext(v2 -> exceptionHandler.handle(err)); + if (this.exceptionHandler != null) { + Handler exceptionHandler = this.exceptionHandler; + ctx.runOnContext(v2 -> exceptionHandler.handle(err)); + } + } + + long lowWaterMark = this.maxSize / 2; + this.pending -= len; + if (this.pending < lowWaterMark && this.drainHandler != null) { + Handler drainHandler = this.drainHandler; + this.drainHandler = null; + ctx.runOnContext(drainHandler); } } + }); - long lowWaterMark = this.maxSize / 2; - this.pending -= len; - if (this.pending < lowWaterMark && this.drainHandler != null) { - Handler drainHandler = this.drainHandler; - this.drainHandler = null; - ctx.runOnContext(drainHandler); + if (err != null) { + if (startedSpan != null) { + startedSpan.fail(ctx, err); } + prom.fail(err); + } else { + if (startedSpan != null) { + startedSpan.finish(ctx); + } + prom.complete(metadata); } }); - - if (err != null) { - if (startedSpan != null) { - startedSpan.fail(ctx, err); - } - prom.fail(err); - } else { - if (startedSpan != null) { - startedSpan.finish(ctx); - } - prom.complete(metadata); - } }); } catch (Throwable e) { synchronized (KafkaWriteStreamImpl.this) {