diff --git a/pom.xml b/pom.xml index 62d341b..3a74975 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -14,8 +15,12 @@ 4.0.0-SNAPSHOT - 2.13.0 + 4.2.0 + 3.5.6 + 27.0.1-jre 4.12 + 1.7.25 + 1.2.17 ${project.basedir}/src/main/asciidoc ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF @@ -44,7 +49,11 @@ vertx-core provided - + + io.vertx + vertx-web + true + io.vertx vertx-docgen @@ -87,8 +96,16 @@ org.apache.zookeeper zookeeper - 3.4.14 + ${zookeeper.version} + + io.netty + netty-handler + + + io.netty + netty-tranasport-native-epoll + org.slf4j slf4j-api @@ -117,24 +134,24 @@ com.google.guava guava - 25.1-jre + ${guava.version} org.slf4j slf4j-api - 1.7.21 + ${slf4j.version} org.slf4j slf4j-log4j12 - 1.7.21 + ${slf4j.version} log4j log4j - 1.2.16 + ${log4j.version} diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java b/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java index 3f268fa..cba1905 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java @@ -25,7 +25,6 @@ import io.vertx.core.spi.cluster.AsyncMultiMap; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeListener; -import io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor; import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap; import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMultiMap; import io.vertx.spi.cluster.zookeeper.impl.ZKSyncMap; @@ -195,9 +194,8 @@ public void getAsyncMultiMap(String name, Handler Future> getAsyncMap(String name) { - AsyncMapTTLMonitor asyncMapTTLMonitor = AsyncMapTTLMonitor.getInstance(vertx, this); return vertx.executeBlocking(event -> { - AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, asyncMapTTLMonitor, name)); + AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, name)); event.complete(zkAsyncMap); }); } @@ -374,7 +372,6 @@ public void leave(Handler> resultHandler) { } } }).forPath(ZK_PATH_CLUSTER_NODE + nodeID); - AsyncMapTTLMonitor.getInstance(vertx, this).stop(); } catch (Exception e) { log.error(e); future.fail(e); diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java deleted file mode 100644 index d005e79..0000000 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java +++ /dev/null @@ -1,112 +0,0 @@ -package io.vertx.spi.cluster.zookeeper.impl; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.MessageConsumer; -import io.vertx.core.json.JsonObject; -import io.vertx.core.impl.logging.Logger; -import io.vertx.core.impl.logging.LoggerFactory; -import io.vertx.core.shareddata.LocalMap; -import io.vertx.core.spi.cluster.ClusterManager; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * As Zookeeper do not support set TTL value to zkNode, we have to handle it by application self. - * 1. Publish a specific message to all consumers that listen ttl action. - * 2. All Consumer could receive this message in same time, so we have to make distribution lock as delete action can only - * be execute one time. - * 3. Check key is exist, and delete if exist, note: we just make a timer to delete if timeout, and we also save timer if we want to cancel - * this action in later. - * 4. Release lock. - *

- * Still if a put without ttl happens after the put with ttl but before the timeout expires, the most recent update will be removed. - * For this case, we should add a field to indicate that if we should stop timer in the cluster. - *

- * Created by stream. - */ -public class AsyncMapTTLMonitor { - private final Vertx vertx; - private final ClusterManager clusterManager; - private final Map> keyPathAndAsyncMap = new ConcurrentHashMap<>(); - - static final String TTL_KEY_HANDLER_ADDRESS = "__VERTX_ZK_TTL_HANDLER_ADDRESS"; - static final String TTL_KEY_BODY_KEY_PATH = "keyPath"; - static final String TTL_KEY_BODY_TIMEOUT = "timeout"; - static final String TTL_KEY_IS_CANCEL = "isCancel"; - - private static final String TTL_KEY_LOCK = "__VERTX_ZK_TTL_LOCK"; - private static final long TTL_KEY_GET_LOCK_TIMEOUT = 1500; - private final LocalMap ttlTimer; - private MessageConsumer consumer; - - private volatile static AsyncMapTTLMonitor instance; - - private static final Logger logger = LoggerFactory.getLogger(AsyncMapTTLMonitor.class); - - @SuppressWarnings("unchecked") - public static AsyncMapTTLMonitor getInstance(Vertx vertx, ClusterManager clusterManager) { - if (instance == null) { - synchronized (AsyncMapTTLMonitor.class) { - if (instance == null) { - instance = new AsyncMapTTLMonitor(vertx, clusterManager); - } - } - } - return instance; - } - - private AsyncMapTTLMonitor(Vertx vertx, ClusterManager clusterManager) { - this.ttlTimer = vertx.sharedData().getLocalMap("__VERTX_ZK_TTL_TIMER"); - this.vertx = vertx; - this.clusterManager = clusterManager; - initConsumer(); - } - - private void initConsumer() { - this.consumer = vertx.eventBus().consumer(TTL_KEY_HANDLER_ADDRESS, event -> { - JsonObject body = event.body(); - String keyPath = body.getString(TTL_KEY_BODY_KEY_PATH); - if (keyPathAndAsyncMap.get(keyPath) == null) return; - if (body.getBoolean(TTL_KEY_IS_CANCEL, false)) { - long timerID = ttlTimer.remove(body.getString(keyPath)); - if (timerID > 0) vertx.cancelTimer(timerID); - } else { - long timerID = vertx.setTimer(body.getLong(TTL_KEY_BODY_TIMEOUT), aLong -> { - clusterManager.getLockWithTimeout(TTL_KEY_LOCK, TTL_KEY_GET_LOCK_TIMEOUT).onComplete(lockAsyncResult -> { - ZKAsyncMap zkAsyncMap = keyPathAndAsyncMap.get(keyPath); - if (lockAsyncResult.succeeded()) { - zkAsyncMap.checkExists(keyPath) - .compose(checkResult -> checkResult ? zkAsyncMap.delete(keyPath, null) : Future.succeededFuture()) - .onComplete(deleteResult -> { - if (deleteResult.succeeded()) { - lockAsyncResult.result().release(); - logger.debug(String.format("The key %s have arrived time, and have been deleted.", keyPath)); - } else { - logger.error(String.format("Delete expire key %s failed.", keyPath), deleteResult.cause()); - } - }); - } else { - logger.error("get TTL lock failed.", lockAsyncResult.cause()); - } - }); - } - ); - ttlTimer.put(keyPath, timerID); - } - } - ); - } - - void addAsyncMapWithPath(String keyPath, ZKAsyncMap asyncMap) { - keyPathAndAsyncMap.putIfAbsent(keyPath, asyncMap); - } - - public void stop() { - consumer.unregister(); - keyPathAndAsyncMap.clear(); - instance = null; - } - -} diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java index 8b96dbe..4325d95 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java @@ -15,13 +15,8 @@ */ package io.vertx.spi.cluster.zookeeper.impl; -import io.vertx.core.CompositeFuture; -import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.VertxException; +import io.vertx.core.*; import io.vertx.core.impl.VertxInternal; -import io.vertx.core.json.JsonObject; import io.vertx.core.shareddata.AsyncMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; @@ -30,16 +25,7 @@ import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import static io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor.*; +import java.util.*; /** * Created by Stream.Liu @@ -47,13 +33,11 @@ public class ZKAsyncMap extends ZKMap implements AsyncMap { private final PathChildrenCache curatorCache; - private AsyncMapTTLMonitor asyncMapTTLMonitor; - public ZKAsyncMap(Vertx vertx, CuratorFramework curator, AsyncMapTTLMonitor asyncMapTTLMonitor, String mapName) { + public ZKAsyncMap(Vertx vertx, CuratorFramework curator, String mapName) { super(curator, vertx, ZK_PATH_ASYNC_MAP, mapName); this.curatorCache = new PathChildrenCache(curator, mapPath, true); try { - this.asyncMapTTLMonitor = asyncMapTTLMonitor; curatorCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { throw new VertxException(e); @@ -99,18 +83,8 @@ public Future put(K k, V v, long ttl) { private Future put(K k, V v, Optional timeoutOptional) { return assertKeyAndValueAreNotNull(k, v) .compose(aVoid -> checkExists(k)) - .compose(checkResult -> checkResult ? setData(k, v) : create(k, v)) - .compose(aVoid -> { - JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k)); - if (timeoutOptional.isPresent()) { - asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this); - body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get()); - } else body.put(TTL_KEY_IS_CANCEL, true); - //publish a ttl message to all nodes. - vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body); - - return Future.succeededFuture(); - }); + .compose(checkResult -> checkResult ? setData(k, v) : create(k, v, timeoutOptional)) + .compose(stat -> Future.succeededFuture()); } @Override @@ -125,37 +99,15 @@ public Future putIfAbsent(K k, V v, long ttl) { private Future putIfAbsent(K k, V v, Optional timeoutOptional) { return assertKeyAndValueAreNotNull(k, v) - .compose(aVoid -> { - Promise innerPromise = Promise.promise(); - vertx.executeBlocking(future -> { - long startTime = Instant.now().toEpochMilli(); - int retries = 0; - - for (; ; ) { - try { - Stat stat = new Stat(); - String path = keyPath(k); - V currentValue = getData(stat, path); - if (compareAndSet(startTime, retries++, stat, path, currentValue, v)) { - future.complete(currentValue); - return; - } - } catch (Exception e) { - future.fail(e); - return; - } - } - }, false, innerPromise); - return innerPromise.future(); - }) + .compose(aVoid -> get(k)) .compose(value -> { - JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k)); - if (timeoutOptional.isPresent()) { - asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this); - body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get()); - } else body.put(TTL_KEY_IS_CANCEL, true); - //publish a ttl message to all nodes. - vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body); + if (value == null) { + if (timeoutOptional.isPresent()) { + put(k, v, timeoutOptional); + } else { + put(k, v); + } + } return Future.succeededFuture(value); }); } diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java index 2052542..245249e 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java @@ -36,9 +36,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.INITIALIZED; -import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_ADDED; -import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_REMOVED; +import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.*; /** * Created by Stream.Liu @@ -72,7 +70,7 @@ public void add(K k, V v, Handler> completionHandler) { String path = valuePath(k, v); assertKeyAndValueAreNotNull(k, v) .compose(aVoid -> checkExists(path)) - .compose(checkResult -> checkResult ? setData(path, v) : create(path, v)) + .compose(checkResult -> checkResult ? setData(path, v) : create(path, v, Optional.empty())) .compose(stat -> { //add to snapshot cache if path contains eventbus address if (path.contains(EVENTBUS_PATH)) { diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index 3fc86c3..9e19783 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -33,6 +33,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.time.Instant; +import java.util.Optional; import java.util.stream.Stream; /** @@ -216,18 +217,23 @@ Future checkExists(String path) { return future.future(); } - Future create(K k, V v) { - return create(keyPath(k), v); + Future create(K k, V v, Optional timeToLive) { + return create(keyPath(k), v, timeToLive); } - Future create(String path, V v) { + Future create(String path, V v, Optional timeToLive) { Promise future = Promise.promise(); try { //there are two type of node - ephemeral and persistent. //if path is 'asyncMultiMap/subs/' which save the data of eventbus address and serverID we could using ephemeral, //since the lifecycle of this path as long as this verticle. CreateMode nodeMode = path.contains(EVENTBUS_PATH) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; - curator.create().creatingParentsIfNeeded().withMode(nodeMode).inBackground((cl, el) -> { + //as zk 3.5.x provide ttl node mode, we should consider it. + nodeMode = timeToLive.isPresent() ? CreateMode.PERSISTENT_WITH_TTL : nodeMode; + ProtectACLCreateModeStatPathAndBytesable pathAndBytesable = timeToLive.isPresent() + ? curator.create().withTtl(timeToLive.get()).creatingParentsIfNeeded() + : curator.create().creatingParentsIfNeeded(); + pathAndBytesable.withMode(nodeMode).inBackground((cl, el) -> { if (el.getType() == CuratorEventType.CREATE) { vertx.runOnContext(event -> future.complete(el.getStat())); } diff --git a/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java index ddfc6d7..7966d9c 100644 --- a/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java +++ b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java @@ -19,7 +19,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.spi.cluster.zookeeper.MockZKCluster; -import org.junit.Ignore; import org.junit.Test; /** @@ -84,7 +83,6 @@ public void testMapPutIfAbsentTtl() { @Test @Override - @Ignore("This CM removes the binding even if a new entry is added without ttl") public void testMapPutTtlThenPut() { getVertx().sharedData().getAsyncMap("foo", onSuccess(map -> { map.put("pipo", "molo", 150, onSuccess(vd -> {