From ca14ea6f4183b44186e69f4a1d8fd1c94944f901 Mon Sep 17 00:00:00 2001 From: Lukas Prettenthaler Date: Tue, 27 Aug 2024 18:26:56 -0600 Subject: [PATCH] update executeBlocking to latest api --- .../cluster/ignite/IgniteClusterManager.java | 65 ++++++++----------- .../spi/cluster/ignite/impl/AsyncMapImpl.java | 23 +++---- .../cluster/ignite/impl/SubsMapHelper.java | 4 +- 3 files changed, 38 insertions(+), 54 deletions(-) diff --git a/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java b/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java index 7d2afd2..183fb1e 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/IgniteClusterManager.java @@ -196,7 +196,7 @@ public void nodeListener(NodeListener nodeListener) { @Override public void getAsyncMap(String name, Promise> promise) { - vertx.>executeBlocking(prom -> prom.complete(new AsyncMapImpl<>(getCache(name), vertx))).onComplete(promise); + vertx.>executeBlocking(() -> new AsyncMapImpl<>(getCache(name), vertx)).onComplete(promise); } @Override @@ -206,7 +206,7 @@ public Map getSyncMap(String name) { @Override public void getLockWithTimeout(String name, long timeout, Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { IgniteSemaphore semaphore = ignite.semaphore(LOCK_SEMAPHORE_PREFIX + name, 1, true, true); boolean locked; long remaining = timeout; @@ -216,7 +216,7 @@ public void getLockWithTimeout(String name, long timeout, Promise promise) remaining = remaining - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS); } while (!locked && remaining > 0); if (locked) { - prom.complete(new LockImpl(semaphore, lockReleaseExec)); + return new LockImpl(semaphore, lockReleaseExec); } else { throw new VertxException("Timed out waiting to get lock " + name); } @@ -225,7 +225,7 @@ public void getLockWithTimeout(String name, long timeout, Promise promise) @Override public void getCounter(String name, Promise promise) { - vertx.executeBlocking(prom -> prom.complete(new CounterImpl(ignite.atomicLong(name, 0, true)))).onComplete(promise); + vertx.executeBlocking(() -> new CounterImpl(ignite.atomicLong(name, 0, true))).onComplete(promise); } @Override @@ -239,9 +239,9 @@ public void setNodeInfo(NodeInfo nodeInfo, Promise promise) { this.nodeInfo = nodeInfo; } IgniteNodeInfo value = new IgniteNodeInfo(nodeInfo); - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { nodeInfoMap.put(nodeId, value); - prom.complete(); + return null; }, false).onComplete(promise); } @@ -286,7 +286,7 @@ public List getNodes() { @Override public void join(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { synchronized (monitor) { if (!active) { active = true; @@ -309,14 +309,13 @@ public void join(Promise promise) { return false; } - vertx.executeBlocking(f -> { + vertx.executeBlocking(() -> { String id = nodeId(((DiscoveryEvent) event).eventNode()); switch (event.type()) { case EVT_NODE_JOINED: notifyNodeListener(listener -> listener.nodeAdded(id)); log.debug("node " + id + " joined the cluster"); - f.complete(); - break; + return null; case EVT_NODE_LEFT: case EVT_NODE_FAILED: if (cleanNodeInfos(id)) { @@ -324,8 +323,7 @@ public void join(Promise promise) { } notifyNodeListener(listener -> listener.nodeLeft(id)); log.debug("node " + id + " left the cluster"); - f.complete(); - break; + return null; case EVT_NODE_SEGMENTED: if (customIgnite || !shutdownOnSegmentation) { log.warn("node got segmented"); @@ -333,10 +331,9 @@ public void join(Promise promise) { log.warn("node got segmented and will be shut down"); vertx.close(); } - f.fail(new IllegalStateException("node is stopped")); - break; + throw new IllegalStateException("node is stopped"); default: - f.fail("event not known"); + throw new IllegalStateException("event not known"); } }); @@ -349,18 +346,18 @@ public void join(Promise promise) { try { MILLISECONDS.sleep(delayAfterStart); - prom.complete(); } catch (InterruptedException e) { - prom.fail(e); + throw new IllegalStateException(e); } } + return null; } }).onComplete(promise); } @Override public void leave(Promise promise) { - vertx.executeBlocking(prom -> { + vertx.executeBlocking(() -> { synchronized (monitor) { if (active) { active = false; @@ -376,12 +373,10 @@ public void leave(Promise promise) { } catch (Exception e) { log.error(e); } - subsMapHelper = null; - nodeInfoMap = null; } } - prom.complete(); + return null; }).onComplete(promise); } @@ -392,24 +387,18 @@ public boolean isActive() { @Override public void addRegistration(String address, RegistrationInfo registrationInfo, Promise promise) { - vertx.executeBlocking(prom -> { - subsMapHelper.put(address, registrationInfo) - .onComplete(prom); - }, false).onComplete(promise); + subsMapHelper.put(address, registrationInfo) + .onComplete(promise); } @Override public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise promise) { - vertx.executeBlocking(prom -> { - subsMapHelper.remove(address, registrationInfo, prom); - }, false).onComplete(promise); + subsMapHelper.remove(address, registrationInfo, promise); } @Override public void getRegistrations(String address, Promise> promise) { - vertx.>executeBlocking(prom -> { - subsMapHelper.get(address, prom); - }, false).onComplete(promise); + subsMapHelper.get(address, promise); } private void cleanSubs(String id) { @@ -521,7 +510,7 @@ private CounterImpl(IgniteAtomicLong cnt) { @Override public Future get() { - return vertx.executeBlocking(fut -> fut.complete(cnt.get())); + return vertx.executeBlocking(cnt::get); } @Override @@ -532,7 +521,7 @@ public void get(Handler> handler) { @Override public Future incrementAndGet() { - return vertx.executeBlocking(fut -> fut.complete(cnt.incrementAndGet())); + return vertx.executeBlocking(cnt::incrementAndGet); } @Override @@ -543,7 +532,7 @@ public void incrementAndGet(Handler> handler) { @Override public Future getAndIncrement() { - return vertx.executeBlocking(fut -> fut.complete(cnt.getAndIncrement())); + return vertx.executeBlocking(cnt::getAndIncrement); } @Override @@ -554,7 +543,7 @@ public void getAndIncrement(Handler> handler) { @Override public Future decrementAndGet() { - return vertx.executeBlocking(fut -> fut.complete(cnt.decrementAndGet())); + return vertx.executeBlocking(cnt::decrementAndGet); } @Override @@ -565,7 +554,7 @@ public void decrementAndGet(Handler> handler) { @Override public Future addAndGet(long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.addAndGet(value))); + return vertx.executeBlocking(() -> cnt.addAndGet(value)); } @Override @@ -576,7 +565,7 @@ public void addAndGet(long value, Handler> handler) { @Override public Future getAndAdd(long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.getAndAdd(value))); + return vertx.executeBlocking(() -> cnt.getAndAdd(value)); } @Override @@ -587,7 +576,7 @@ public void getAndAdd(long value, Handler> handler) { @Override public Future compareAndSet(long expected, long value) { - return vertx.executeBlocking(fut -> fut.complete(cnt.compareAndSet(expected, value))); + return vertx.executeBlocking(() -> cnt.compareAndSet(expected, value)); } @Override diff --git a/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java b/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java index cd7b7d7..3198ec6 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/impl/AsyncMapImpl.java @@ -135,16 +135,16 @@ public Future> values() { @Override public Future> entries() { return vertx.executeBlocking( - promise -> { + () -> { try { List> all = cache.query(new ScanQuery()).getAll(); Map map = new HashMap<>(all.size()); for (Cache.Entry entry : all) { map.put(unmarshal(entry.getKey()), unmarshal(entry.getValue())); } - promise.complete(map); + return map; } catch (final RuntimeException cause) { - promise.fail(new VertxException(cause)); + throw new VertxException(cause); } } ); @@ -163,17 +163,12 @@ private Future executeWithTtl(Function, IgniteFuture : cache; return vertx.executeBlocking( - promise -> { - IgniteFuture future = cacheOp.apply(cache0); - future.listen( - fut -> { - try { - promise.complete(unmarshal(future.get())); - } catch (final RuntimeException e) { - promise.fail(new VertxException(e)); - } - } - ); + () -> { + try { + return unmarshal(cacheOp.apply(cache0).get()); + } catch (final RuntimeException e) { + throw new VertxException(e); + } } ); } diff --git a/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java b/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java index d961f38..6c531d2 100644 --- a/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java +++ b/src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java @@ -180,12 +180,12 @@ private Future> getAndUpdate(String address) { } private void listen(final Iterable> events, final VertxInternal vertxInternal) { - vertxInternal.>executeBlocking(promise -> { + vertxInternal.>executeBlocking(() -> { StreamSupport.stream(events.spliterator(), false) .map(e -> e.getKey().address()) .distinct() .forEach(this::fireRegistrationUpdateEvent); - promise.complete(); + return null; }); } }