From d2aa5e1547cd79eb7892b7ad344096df3dca1423 Mon Sep 17 00:00:00 2001 From: "XiaoXi.Liu" Date: Sun, 26 Jan 2020 17:46:58 +0800 Subject: [PATCH 1/2] upgrade zk version to 3.5.x Signed-off-by: XiaoXi.Liu --- pom.xml | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index c27d084..87d09ed 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -14,8 +15,12 @@ 4.0.0-SNAPSHOT - 2.12.0 + 4.2.0 + 3.5.6 + 27.0.1-jre 4.12 + 1.7.25 + 1.2.17 ${project.basedir}/src/main/asciidoc @@ -43,7 +48,11 @@ vertx-core provided - + + io.vertx + vertx-web + true + io.vertx vertx-docgen @@ -60,6 +69,11 @@ curator-framework ${curator.version} + + + org.apache.zookeeper + zookeeper + jline jline @@ -77,8 +91,16 @@ org.apache.zookeeper zookeeper - 3.4.13 + ${zookeeper.version} + + io.netty + netty-handler + + + io.netty + netty-tranasport-native-epoll + org.slf4j slf4j-api @@ -103,24 +125,24 @@ com.google.guava guava - 20.0 + ${guava.version} org.slf4j slf4j-api - 1.7.21 + ${slf4j.version} org.slf4j slf4j-log4j12 - 1.7.21 + ${slf4j.version} log4j log4j - 1.2.16 + ${log4j.version} From d8a293ef2c0e438b80859e2e81dd9918e68d24a1 Mon Sep 17 00:00:00 2001 From: "XiaoXi.Liu" Date: Sun, 26 Jan 2020 23:17:59 +0800 Subject: [PATCH 2/2] fix TTL with zk 3.5.x Signed-off-by: XiaoXi.Liu --- .../zookeeper/ZookeeperClusterManager.java | 5 +- .../zookeeper/impl/AsyncMapTTLMonitor.java | 112 ------------------ .../cluster/zookeeper/impl/ZKAsyncMap.java | 58 ++------- .../zookeeper/impl/ZKAsyncMultiMap.java | 2 +- .../spi/cluster/zookeeper/impl/ZKMap.java | 24 ++-- .../shareddata/ZKClusteredAsyncMapTest.java | 1 - 6 files changed, 28 insertions(+), 174 deletions(-) delete mode 100644 src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java b/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java index 6ff9732..3ed37e3 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/ZookeeperClusterManager.java @@ -29,7 +29,6 @@ import io.vertx.core.spi.cluster.AsyncMultiMap; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeListener; -import io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor; import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap; import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMultiMap; import io.vertx.spi.cluster.zookeeper.impl.ZKSyncMap; @@ -201,9 +200,8 @@ public void getAsyncMultiMap(String name, Handler Future> getAsyncMap(String name) { - AsyncMapTTLMonitor asyncMapTTLMonitor = AsyncMapTTLMonitor.getInstance(vertx, this); return vertx.executeBlocking(event -> { - AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, asyncMapTTLMonitor, name)); + AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, name)); event.complete(zkAsyncMap); }); } @@ -376,7 +374,6 @@ public void leave(Handler> resultHandler) { } } }).forPath(ZK_PATH_CLUSTER_NODE + nodeID); - AsyncMapTTLMonitor.getInstance(vertx, this).stop(); } catch (Exception e) { log.error(e); future.fail(e); diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java deleted file mode 100644 index 5a12a94..0000000 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/AsyncMapTTLMonitor.java +++ /dev/null @@ -1,112 +0,0 @@ -package io.vertx.spi.cluster.zookeeper.impl; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.MessageConsumer; -import io.vertx.core.json.JsonObject; -import io.vertx.core.impl.logging.Logger; -import io.vertx.core.impl.logging.LoggerFactory; -import io.vertx.core.shareddata.LocalMap; -import io.vertx.core.spi.cluster.ClusterManager; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * As Zookeeper do not support set TTL value to zkNode, we have to handle it by application self. - * 1. Publish a specific message to all consumers that listen ttl action. - * 2. All Consumer could receive this message in same time, so we have to make distribution lock as delete action can only - * be execute one time. - * 3. Check key is exist, and delete if exist, note: we just make a timer to delete if timeout, and we also save timer if we want to cancel - * this action in later. - * 4. Release lock. - *

- * Still if a put without ttl happens after the put with ttl but before the timeout expires, the most recent update will be removed. - * For this case, we should add a field to indicate that if we should stop timer in the cluster. - *

- * Created by stream. - */ -public class AsyncMapTTLMonitor { - private final Vertx vertx; - private final ClusterManager clusterManager; - private final Map> keyPathAndAsyncMap = new ConcurrentHashMap<>(); - - static final String TTL_KEY_HANDLER_ADDRESS = "__VERTX_ZK_TTL_HANDLER_ADDRESS"; - static final String TTL_KEY_BODY_KEY_PATH = "keyPath"; - static final String TTL_KEY_BODY_TIMEOUT = "timeout"; - static final String TTL_KEY_IS_CANCEL = "isCancel"; - - private static final String TTL_KEY_LOCK = "__VERTX_ZK_TTL_LOCK"; - private static final long TTL_KEY_GET_LOCK_TIMEOUT = 1500; - private final LocalMap ttlTimer; - private MessageConsumer consumer; - - private volatile static AsyncMapTTLMonitor instance; - - private static final Logger logger = LoggerFactory.getLogger(AsyncMapTTLMonitor.class); - - @SuppressWarnings("unchecked") - public static AsyncMapTTLMonitor getInstance(Vertx vertx, ClusterManager clusterManager) { - if (instance == null) { - synchronized (AsyncMapTTLMonitor.class) { - if (instance == null) { - instance = new AsyncMapTTLMonitor(vertx, clusterManager); - } - } - } - return instance; - } - - private AsyncMapTTLMonitor(Vertx vertx, ClusterManager clusterManager) { - this.ttlTimer = vertx.sharedData().getLocalMap("__VERTX_ZK_TTL_TIMER"); - this.vertx = vertx; - this.clusterManager = clusterManager; - initConsumer(); - } - - private void initConsumer() { - this.consumer = vertx.eventBus().consumer(TTL_KEY_HANDLER_ADDRESS, event -> { - JsonObject body = event.body(); - String keyPath = body.getString(TTL_KEY_BODY_KEY_PATH); - if (keyPathAndAsyncMap.get(keyPath) == null) return; - if (body.getBoolean(TTL_KEY_IS_CANCEL, false)) { - long timerID = ttlTimer.remove(body.getString(keyPath)); - if (timerID > 0) vertx.cancelTimer(timerID); - } else { - long timerID = vertx.setTimer(body.getLong(TTL_KEY_BODY_TIMEOUT), aLong -> { - clusterManager.getLockWithTimeout(TTL_KEY_LOCK, TTL_KEY_GET_LOCK_TIMEOUT).setHandler(lockAsyncResult -> { - ZKAsyncMap zkAsyncMap = keyPathAndAsyncMap.get(keyPath); - if (lockAsyncResult.succeeded()) { - zkAsyncMap.checkExists(keyPath) - .compose(checkResult -> checkResult ? zkAsyncMap.delete(keyPath, null) : Future.succeededFuture()) - .setHandler(deleteResult -> { - if (deleteResult.succeeded()) { - lockAsyncResult.result().release(); - logger.debug(String.format("The key %s have arrived time, and have been deleted.", keyPath)); - } else { - logger.error(String.format("Delete expire key %s failed.", keyPath), deleteResult.cause()); - } - }); - } else { - logger.error("get TTL lock failed.", lockAsyncResult.cause()); - } - }); - } - ); - ttlTimer.put(keyPath, timerID); - } - } - ); - } - - void addAsyncMapWithPath(String keyPath, ZKAsyncMap asyncMap) { - keyPathAndAsyncMap.putIfAbsent(keyPath, asyncMap); - } - - public void stop() { - consumer.unregister(); - keyPathAndAsyncMap.clear(); - instance = null; - } - -} diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java index a8d4abe..e119072 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java @@ -39,21 +39,17 @@ import java.util.Optional; import java.util.Set; -import static io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor.*; - /** * Created by Stream.Liu */ public class ZKAsyncMap extends ZKMap implements AsyncMap { private final PathChildrenCache curatorCache; - private AsyncMapTTLMonitor asyncMapTTLMonitor; - public ZKAsyncMap(Vertx vertx, CuratorFramework curator, AsyncMapTTLMonitor asyncMapTTLMonitor, String mapName) { + public ZKAsyncMap(Vertx vertx, CuratorFramework curator, String mapName) { super(curator, vertx, ZK_PATH_ASYNC_MAP, mapName); this.curatorCache = new PathChildrenCache(curator, mapPath, true); try { - this.asyncMapTTLMonitor = asyncMapTTLMonitor; curatorCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); } catch (Exception e) { throw new VertxException(e); @@ -99,18 +95,8 @@ public Future put(K k, V v, long ttl) { private Future put(K k, V v, Optional timeoutOptional) { return assertKeyAndValueAreNotNull(k, v) .compose(aVoid -> checkExists(k)) - .compose(checkResult -> checkResult ? setData(k, v) : create(k, v)) - .compose(aVoid -> { - JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k)); - if (timeoutOptional.isPresent()) { - asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this); - body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get()); - } else body.put(TTL_KEY_IS_CANCEL, true); - //publish a ttl message to all nodes. - vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body); - - return Future.succeededFuture(); - }); + .compose(checkResult -> checkResult ? setData(k, v) : create(k, v, timeoutOptional)) + .compose(stat -> Future.succeededFuture()); } @Override @@ -125,37 +111,15 @@ public Future putIfAbsent(K k, V v, long ttl) { private Future putIfAbsent(K k, V v, Optional timeoutOptional) { return assertKeyAndValueAreNotNull(k, v) - .compose(aVoid -> { - Promise innerPromise = Promise.promise(); - vertx.executeBlocking(future -> { - long startTime = Instant.now().toEpochMilli(); - int retries = 0; - - for (; ; ) { - try { - Stat stat = new Stat(); - String path = keyPath(k); - V currentValue = getData(stat, path); - if (compareAndSet(startTime, retries++, stat, path, currentValue, v)) { - future.complete(currentValue); - return; - } - } catch (Exception e) { - future.fail(e); - return; - } - } - }, false, innerPromise); - return innerPromise.future(); - }) + .compose(aVoid -> get(k)) .compose(value -> { - JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k)); - if (timeoutOptional.isPresent()) { - asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this); - body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get()); - } else body.put(TTL_KEY_IS_CANCEL, true); - //publish a ttl message to all nodes. - vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body); + if (value == null) { + if (timeoutOptional.isPresent()) { + put(k, v, timeoutOptional); + } else { + put(k, v); + } + } return Future.succeededFuture(value); }); } diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java index eb1ee38..931e374 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMultiMap.java @@ -72,7 +72,7 @@ public void add(K k, V v, Handler> completionHandler) { String path = valuePath(k, v); assertKeyAndValueAreNotNull(k, v) .compose(aVoid -> checkExists(path)) - .compose(checkResult -> checkResult ? setData(path, v) : create(path, v)) + .compose(checkResult -> checkResult ? setData(path, v) : create(path, v, Optional.empty())) .compose(stat -> { //add to snapshot cache if path contains eventbus address if (path.contains(EVENTBUS_PATH)) { diff --git a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java index 3fc86c3..7b50533 100644 --- a/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java +++ b/src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java @@ -24,7 +24,7 @@ import org.apache.curator.RetryLoop; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -33,6 +33,7 @@ import java.io.*; import java.lang.reflect.Constructor; import java.time.Instant; +import java.util.Optional; import java.util.stream.Stream; /** @@ -216,22 +217,27 @@ Future checkExists(String path) { return future.future(); } - Future create(K k, V v) { - return create(keyPath(k), v); + Future create(K k, V v, Optional timeToLive) { + return create(keyPath(k), v, timeToLive); } - Future create(String path, V v) { + Future create(String path, V v, Optional timeToLive) { Promise future = Promise.promise(); try { //there are two type of node - ephemeral and persistent. //if path is 'asyncMultiMap/subs/' which save the data of eventbus address and serverID we could using ephemeral, //since the lifecycle of this path as long as this verticle. CreateMode nodeMode = path.contains(EVENTBUS_PATH) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; - curator.create().creatingParentsIfNeeded().withMode(nodeMode).inBackground((cl, el) -> { - if (el.getType() == CuratorEventType.CREATE) { - vertx.runOnContext(event -> future.complete(el.getStat())); - } - }).forPath(path, asByte(v)); + //as zk 3.5.x provide ttl node mode, we should consider it. + nodeMode = timeToLive.isPresent() ? CreateMode.PERSISTENT_WITH_TTL : nodeMode; + ProtectACLCreateModeStatPathAndBytesable pathAndBytesable = timeToLive.isPresent() + ? curator.create().withTtl(timeToLive.get()).creatingParentsIfNeeded() + : curator.create().creatingParentsIfNeeded(); + pathAndBytesable.withMode(nodeMode).inBackground((cl, el) -> { + if (el.getType() == CuratorEventType.CREATE) { + vertx.runOnContext(event -> future.complete(el.getStat())); + } + }).forPath(path, asByte(v)); } catch (Exception ex) { vertx.runOnContext(event -> future.fail(ex)); } diff --git a/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java index ddfc6d7..e2334ab 100644 --- a/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java +++ b/src/test/java/io/vertx/core/shareddata/ZKClusteredAsyncMapTest.java @@ -84,7 +84,6 @@ public void testMapPutIfAbsentTtl() { @Test @Override - @Ignore("This CM removes the binding even if a new entry is added without ttl") public void testMapPutTtlThenPut() { getVertx().sharedData().getAsyncMap("foo", onSuccess(map -> { map.put("pipo", "molo", 150, onSuccess(vd -> {