Skip to content

Commit

Permalink
Merge pull request #295 from rickardoberg/batchingreactivestream
Browse files Browse the repository at this point in the history
Support back-pressure on reads and subscriptions.
  • Loading branch information
YoEight authored Jan 23, 2025
2 parents cdd1ca2 + 693a35e commit 12021e6
Show file tree
Hide file tree
Showing 22 changed files with 539 additions and 372 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

Expand All @@ -16,9 +12,9 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;

private final GrpcClient client;
private final OptionsBase<?> options;
private final OptionsWithBackPressure<?> options;

protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
this.client = client;
this.options = options;
}
Expand All @@ -32,86 +28,23 @@ protected AbstractRead(GrpcClient client, OptionsBase<?> options) {
public abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

@Override
@SuppressWarnings("unchecked")
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
ReadSubscription readSubscription = new ReadSubscription(subscriber);
subscriber.onSubscribe(readSubscription);
ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber));

this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
observer.onError(error);
return;
}

CompletableFuture<ReadSubscription> result = new CompletableFuture<>();
this.client.run(channel -> {
StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();

StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(channel), this.client.getSettings(), this.options);

client.read(request, new ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp>() {
@Override
public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
readSubscription.setStreamObserver(requestStream);
}

private boolean completed = false;

@Override
public void onNext(StreamsOuterClass.ReadResp value) {
if (this.completed) {
return;
}
if (value.hasStreamNotFound()) {
StreamNotFoundException streamNotFoundException = new StreamNotFoundException();
handleError(streamNotFoundException);
return;
}

try {
readSubscription.onNext(new ReadMessage(value));
} catch (Throwable t) {
handleError(t);
}
}

@Override
public void onCompleted() {
if (this.completed) {
return;
}
this.completed = true;
result.complete(readSubscription);
readSubscription.onCompleted();
}

@Override
public void onError(Throwable t) {
if (this.completed) {
return;
}

if (t instanceof StatusRuntimeException) {
StatusRuntimeException e = (StatusRuntimeException) t;
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
handleError(reason);
return;
}
}

handleError(t);
}

private void handleError(Throwable t) {
this.completed = true;
result.completeExceptionally(t);
readSubscription.onError(t);
}
});
return result;
}).exceptionally(t -> {
readSubscription.onError(t);
return readSubscription;
StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
observer.onConnected(args);
subscriber.onSubscribe(observer.getSubscription());
client.read(request, observer);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,19 @@
import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsGrpc;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.validation.constraints.NotNull;
import java.util.concurrent.CompletableFuture;

abstract class AbstractRegularSubscription {
private static Logger logger = LoggerFactory.getLogger(AbstractRegularSubscription.class);
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultSubscribeOptions;

protected SubscriptionListener listener;
protected Checkpointer checkpointer = null;
private final GrpcClient client;
private final OptionsBase<?> options;
private final OptionsWithBackPressure<?> options;

protected AbstractRegularSubscription(GrpcClient client, OptionsBase<?> options) {
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
this.client = client;
this.options = options;
}
Expand All @@ -40,120 +31,40 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsBase<?> options)

protected abstract StreamsOuterClass.ReadReq.Options.Builder createOptions();

@SuppressWarnings("unchecked")
public CompletableFuture<Subscription> execute() {
return this.client.run(channel -> {
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();

StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(channel), this.client.getSettings(), this.options);

CompletableFuture<Subscription> future = new CompletableFuture<>();
ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp> observer = new ClientResponseObserver<StreamsOuterClass.ReadReq, StreamsOuterClass.ReadResp>() {
private boolean _confirmed;
private Subscription _subscription;
private ClientCallStreamObserver<StreamsOuterClass.ReadReq> _requestStream;

@Override
public void beforeStart(ClientCallStreamObserver<StreamsOuterClass.ReadReq> requestStream) {
this._requestStream = requestStream;
}

@Override
public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) {
if (!_confirmed && readResp.hasConfirmation()) {
this._confirmed = true;
this._subscription = new Subscription(this._requestStream,
readResp.getConfirmation().getSubscriptionId(), checkpointer);
future.complete(this._subscription);
listener.onConfirmation(this._subscription);
return;
}

if (!_confirmed && readResp.hasEvent()) {
onError(new IllegalStateException("Unconfirmed subscription received event"));
return;
}

if (_confirmed && readResp.hasCheckpoint()) {
Checkpointer checkpointer = this._subscription.getCheckpointer();
if (checkpointer == null) {
return;
}

StreamsOuterClass.ReadResp.Checkpoint checkpoint = readResp.getCheckpoint();
Position checkpointPos = new Position(checkpoint.getCommitPosition(), checkpoint.getPreparePosition());
checkpointer.onCheckpoint(this._subscription, checkpointPos);
return;
}

if (_confirmed && readResp.hasCaughtUp()) {
listener.onCaughtUp(_subscription);
return;
}

if (_confirmed && readResp.hasFellBehind()) {
listener.onFellBehind(_subscription);
return;
}
CompletableFuture<Subscription> future = new CompletableFuture<>();

if (_confirmed && !readResp.hasEvent()) {
logger.warn(
String.format("Confirmed subscription %s received non-{event,checkpoint} variant",
_subscription.getSubscriptionId()));
return;
}
this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, resolvedEvent),
_subscription.getSubscriptionId(),
channel,
client.getSettings(),
options.getCredentials(),
resolvedEvent.getEvent());
} catch (Exception e) {
onError(e);
}
}
ReadResponseObserver observer = createObserver(args, future);
observer.onConnected(args);

@Override
public void onError(Throwable throwable) {
if (!_confirmed) {
future.completeExceptionally(throwable);
}

Throwable error = throwable;
if (error instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) error;
String desc = sre.getStatus().getDescription();
if (sre.getStatus().getCode() == Status.Code.CANCELLED && desc != null && desc.equals("user-initiated")) {
listener.onCancelled(this._subscription, null);
return;
}

String leaderHost = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = sre.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
error = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
}
}

listener.onCancelled(this._subscription, error);
}

@Override
public void onCompleted() {
// Subscriptions should only complete on error.
}
};
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();

StreamsGrpc.StreamsStub streamsClient = GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
streamsClient.read(readReq, observer);
});

return future;
return future;
}

private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture<Subscription> future) {
StreamConsumer consumer = new SubscriptionStreamConsumer(this.listener, this.checkpointer, future, (subscriptionId, event, action) -> {
ClientTelemetry.traceSubscribe(
action,
subscriptionId,
args.getChannel(),
client.getSettings(),
options.getCredentials(),
event);
});

return new ReadResponseObserver(this.options, consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void process(RunWorkItem args) {
this.channelId,
this.connection.getCurrentChannel(),
this.connection.getLastConnectedEndpoint(),
this.serverInfo);
this.serverInfo,
this.queue);

args.getItem().accept(workArgs, null);
}
Expand Down
Loading

0 comments on commit 12021e6

Please sign in to comment.