Skip to content

Commit

Permalink
Annotate with Nullable, fields that can become null (#277)
Browse files Browse the repository at this point in the history
Signed-off-by: Violeta Georgieva <[email protected]>
  • Loading branch information
violetagg authored Feb 12, 2025
1 parent 2e56a4b commit 24cae73
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
3 changes: 2 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;

import reactor.core.Disposable;
Expand Down Expand Up @@ -67,7 +68,7 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
final Mono<T> allocator;
final Function<PoolConfig<T>, CONF> configModifier;
int maxPending = -1;
AllocationStrategy allocationStrategy = null;
@Nullable AllocationStrategy allocationStrategy = null;
Function<T, ? extends Publisher<Void>> releaseHandler = noopHandler();
Function<T, ? extends Publisher<Void>> destroyHandler = noopHandler();
BiPredicate<T, PooledRefMetadata> evictionPredicate = neverPredicate();
Expand Down
15 changes: 9 additions & 6 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> {

volatile Deque<QueuePooledRef<POOLABLE>> idleResources;
@SuppressWarnings("rawtypes")
protected static final AtomicReferenceFieldUpdater<SimpleDequePool, Deque> IDLE_RESOURCES =
protected static final AtomicReferenceFieldUpdater<SimpleDequePool, @Nullable Deque> IDLE_RESOURCES =
AtomicReferenceFieldUpdater.newUpdater(SimpleDequePool.class, Deque.class, "idleResources");

volatile int acquired;
Expand Down Expand Up @@ -97,7 +97,7 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> {
private static final AtomicIntegerFieldUpdater<SimpleDequePool> IDLE_SIZE =
AtomicIntegerFieldUpdater.newUpdater(SimpleDequePool.class, "idleSize");

Disposable evictionTask;
@Nullable Disposable evictionTask;

SimpleDequePool(PoolConfig<POOLABLE> poolConfig) {
super(poolConfig, Loggers.getLogger(SimpleDequePool.class));
Expand Down Expand Up @@ -187,7 +187,9 @@ public Mono<Void> disposeLater() {
PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
//stop reaper thread
this.evictionTask.dispose();
if (this.evictionTask != null) {
this.evictionTask.dispose();
}

Borrower<POOLABLE> p;
while ((p = q.pollFirst()) != null) {
Expand Down Expand Up @@ -750,8 +752,8 @@ private static final class QueuePoolRecyclerInner<T>
final SimpleDequePool<T> pool;

//poolable can be checked for null to protect against protocol errors
QueuePooledRef<T> pooledRef;
Subscription upstream;
@Nullable QueuePooledRef<T> pooledRef;
@Nullable Subscription upstream;
long start;

//once protects against multiple requests
Expand Down Expand Up @@ -829,6 +831,7 @@ public void onSubscribe(Subscription s) {

@Override
public void request(long l) {
assert upstream != null;
if (Operators.validate(l)) {
upstream.request(l);
// we decrement ACQUIRED EXACTLY ONCE to indicate that the poolable was released by the user
Expand Down Expand Up @@ -867,7 +870,7 @@ private static final class QueuePoolRecyclerMono<T> extends Mono<Void>
implements Scannable {

final Publisher<Void> source;
final AtomicReference<QueuePooledRef<T>> slotRef;
final AtomicReference<@Nullable QueuePooledRef<T>> slotRef;

QueuePoolRecyclerMono(Publisher<Void> source, QueuePooledRef<T> poolSlot) {
this.source = source;
Expand Down

0 comments on commit 24cae73

Please sign in to comment.