Skip to content

Commit

Permalink
update executeBlocking to latest api
Browse files Browse the repository at this point in the history
  • Loading branch information
zyclonite committed Aug 28, 2024
1 parent 37a64ac commit ca14ea6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 54 deletions.
65 changes: 27 additions & 38 deletions src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void nodeListener(NodeListener nodeListener) {

@Override
public <K, V> void getAsyncMap(String name, Promise<AsyncMap<K, V>> promise) {
vertx.<AsyncMap<K, V>>executeBlocking(prom -> prom.complete(new AsyncMapImpl<>(getCache(name), vertx))).onComplete(promise);
vertx.<AsyncMap<K, V>>executeBlocking(() -> new AsyncMapImpl<>(getCache(name), vertx)).onComplete(promise);
}

@Override
Expand All @@ -206,7 +206,7 @@ public <K, V> Map<K, V> getSyncMap(String name) {

@Override
public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise) {
vertx.<Lock>executeBlocking(prom -> {
vertx.<Lock>executeBlocking(() -> {
IgniteSemaphore semaphore = ignite.semaphore(LOCK_SEMAPHORE_PREFIX + name, 1, true, true);
boolean locked;
long remaining = timeout;
Expand All @@ -216,7 +216,7 @@ public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise)
remaining = remaining - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS);
} while (!locked && remaining > 0);
if (locked) {
prom.complete(new LockImpl(semaphore, lockReleaseExec));
return new LockImpl(semaphore, lockReleaseExec);
} else {
throw new VertxException("Timed out waiting to get lock " + name);
}
Expand All @@ -225,7 +225,7 @@ public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise)

@Override
public void getCounter(String name, Promise<Counter> promise) {
vertx.<Counter>executeBlocking(prom -> prom.complete(new CounterImpl(ignite.atomicLong(name, 0, true)))).onComplete(promise);
vertx.<Counter>executeBlocking(() -> new CounterImpl(ignite.atomicLong(name, 0, true))).onComplete(promise);
}

@Override
Expand All @@ -239,9 +239,9 @@ public void setNodeInfo(NodeInfo nodeInfo, Promise<Void> promise) {
this.nodeInfo = nodeInfo;
}
IgniteNodeInfo value = new IgniteNodeInfo(nodeInfo);
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
nodeInfoMap.put(nodeId, value);
prom.complete();
return null;
}, false).onComplete(promise);
}

Expand Down Expand Up @@ -286,7 +286,7 @@ public List<String> getNodes() {

@Override
public void join(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
synchronized (monitor) {
if (!active) {
active = true;
Expand All @@ -309,34 +309,31 @@ public void join(Promise<Void> promise) {
return false;
}

vertx.<Void>executeBlocking(f -> {
vertx.<Void>executeBlocking(() -> {
String id = nodeId(((DiscoveryEvent) event).eventNode());
switch (event.type()) {
case EVT_NODE_JOINED:
notifyNodeListener(listener -> listener.nodeAdded(id));
log.debug("node " + id + " joined the cluster");
f.complete();
break;
return null;
case EVT_NODE_LEFT:
case EVT_NODE_FAILED:
if (cleanNodeInfos(id)) {
cleanSubs(id);
}
notifyNodeListener(listener -> listener.nodeLeft(id));
log.debug("node " + id + " left the cluster");
f.complete();
break;
return null;
case EVT_NODE_SEGMENTED:
if (customIgnite || !shutdownOnSegmentation) {
log.warn("node got segmented");
} else {
log.warn("node got segmented and will be shut down");
vertx.close();
}
f.fail(new IllegalStateException("node is stopped"));
break;
throw new IllegalStateException("node is stopped");
default:
f.fail("event not known");
throw new IllegalStateException("event not known");
}
});

Expand All @@ -349,18 +346,18 @@ public void join(Promise<Void> promise) {

try {
MILLISECONDS.sleep(delayAfterStart);
prom.complete();
} catch (InterruptedException e) {
prom.fail(e);
throw new IllegalStateException(e);
}
}
return null;
}
}).onComplete(promise);
}

@Override
public void leave(Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
vertx.<Void>executeBlocking(() -> {
synchronized (monitor) {
if (active) {
active = false;
Expand All @@ -376,12 +373,10 @@ public void leave(Promise<Void> promise) {
} catch (Exception e) {
log.error(e);
}
subsMapHelper = null;
nodeInfoMap = null;
}
}

prom.complete();
return null;
}).onComplete(promise);
}

Expand All @@ -392,24 +387,18 @@ public boolean isActive() {

@Override
public void addRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
subsMapHelper.put(address, registrationInfo)
.onComplete(prom);
}, false).onComplete(promise);
subsMapHelper.put(address, registrationInfo)
.onComplete(promise);
}

@Override
public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
vertx.<Void>executeBlocking(prom -> {
subsMapHelper.remove(address, registrationInfo, prom);
}, false).onComplete(promise);
subsMapHelper.remove(address, registrationInfo, promise);
}

@Override
public void getRegistrations(String address, Promise<List<RegistrationInfo>> promise) {
vertx.<List<RegistrationInfo>>executeBlocking(prom -> {
subsMapHelper.get(address, prom);
}, false).onComplete(promise);
subsMapHelper.get(address, promise);
}

private void cleanSubs(String id) {
Expand Down Expand Up @@ -521,7 +510,7 @@ private CounterImpl(IgniteAtomicLong cnt) {

@Override
public Future<Long> get() {
return vertx.executeBlocking(fut -> fut.complete(cnt.get()));
return vertx.executeBlocking(cnt::get);
}

@Override
Expand All @@ -532,7 +521,7 @@ public void get(Handler<AsyncResult<Long>> handler) {

@Override
public Future<Long> incrementAndGet() {
return vertx.executeBlocking(fut -> fut.complete(cnt.incrementAndGet()));
return vertx.executeBlocking(cnt::incrementAndGet);
}

@Override
Expand All @@ -543,7 +532,7 @@ public void incrementAndGet(Handler<AsyncResult<Long>> handler) {

@Override
public Future<Long> getAndIncrement() {
return vertx.executeBlocking(fut -> fut.complete(cnt.getAndIncrement()));
return vertx.executeBlocking(cnt::getAndIncrement);
}

@Override
Expand All @@ -554,7 +543,7 @@ public void getAndIncrement(Handler<AsyncResult<Long>> handler) {

@Override
public Future<Long> decrementAndGet() {
return vertx.executeBlocking(fut -> fut.complete(cnt.decrementAndGet()));
return vertx.executeBlocking(cnt::decrementAndGet);
}

@Override
Expand All @@ -565,7 +554,7 @@ public void decrementAndGet(Handler<AsyncResult<Long>> handler) {

@Override
public Future<Long> addAndGet(long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.addAndGet(value)));
return vertx.executeBlocking(() -> cnt.addAndGet(value));
}

@Override
Expand All @@ -576,7 +565,7 @@ public void addAndGet(long value, Handler<AsyncResult<Long>> handler) {

@Override
public Future<Long> getAndAdd(long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.getAndAdd(value)));
return vertx.executeBlocking(() -> cnt.getAndAdd(value));
}

@Override
Expand All @@ -587,7 +576,7 @@ public void getAndAdd(long value, Handler<AsyncResult<Long>> handler) {

@Override
public Future<Boolean> compareAndSet(long expected, long value) {
return vertx.executeBlocking(fut -> fut.complete(cnt.compareAndSet(expected, value)));
return vertx.executeBlocking(() -> cnt.compareAndSet(expected, value));
}

@Override
Expand Down
23 changes: 9 additions & 14 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ public Future<List<V>> values() {
@Override
public Future<Map<K, V>> entries() {
return vertx.executeBlocking(
promise -> {
() -> {
try {
List<Cache.Entry<K, V>> all = cache.query(new ScanQuery<K, V>()).getAll();
Map<K, V> map = new HashMap<>(all.size());
for (Cache.Entry<K, V> entry : all) {
map.put(unmarshal(entry.getKey()), unmarshal(entry.getValue()));
}
promise.complete(map);
return map;
} catch (final RuntimeException cause) {
promise.fail(new VertxException(cause));
throw new VertxException(cause);
}
}
);
Expand All @@ -163,17 +163,12 @@ private <T> Future<T> executeWithTtl(Function<IgniteCache<K, V>, IgniteFuture<T>
: cache;

return vertx.executeBlocking(
promise -> {
IgniteFuture<T> future = cacheOp.apply(cache0);
future.listen(
fut -> {
try {
promise.complete(unmarshal(future.get()));
} catch (final RuntimeException e) {
promise.fail(new VertxException(e));
}
}
);
() -> {
try {
return unmarshal(cacheOp.apply(cache0).get());
} catch (final RuntimeException e) {
throw new VertxException(e);
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address) {
}

private void listen(final Iterable<CacheEntryEvent<? extends IgniteRegistrationInfo, ? extends Boolean>> events, final VertxInternal vertxInternal) {
vertxInternal.<List<RegistrationInfo>>executeBlocking(promise -> {
vertxInternal.<List<RegistrationInfo>>executeBlocking(() -> {
StreamSupport.stream(events.spliterator(), false)
.map(e -> e.getKey().address())
.distinct()
.forEach(this::fireRegistrationUpdateEvent);
promise.complete();
return null;
});
}
}

0 comments on commit ca14ea6

Please sign in to comment.