Skip to content

Commit

Permalink
Merge branch 'stream-iori-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
tsegismont committed May 29, 2020
2 parents b1f918d + 83db463 commit fdd14dc
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 195 deletions.
33 changes: 25 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -14,8 +15,12 @@

<properties>
<stack.version>4.0.0-SNAPSHOT</stack.version>
<curator.version>2.13.0</curator.version>
<curator.version>4.2.0</curator.version>
<zookeeper.version>3.5.6</zookeeper.version>
<guava.version>27.0.1-jre</guava.version>
<junit.version>4.12</junit.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<asciidoc.dir>${project.basedir}/src/main/asciidoc</asciidoc.dir>
<jar.manifest>${project.basedir}/src/main/resources/META-INF/MANIFEST.MF</jar.manifest>
</properties>
Expand Down Expand Up @@ -44,7 +49,11 @@
<artifactId>vertx-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-docgen</artifactId>
Expand Down Expand Up @@ -87,8 +96,16 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-tranasport-native-epoll</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -117,24 +134,24 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>25.1-jre</version>
<version>${guava.version}</version>
</dependency>

<!-- ZK requires these dependencies: declare this dependency to force this module to use this one. These are the versions used by vert.x -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<version>${log4j.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor;
import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMap;
import io.vertx.spi.cluster.zookeeper.impl.ZKAsyncMultiMap;
import io.vertx.spi.cluster.zookeeper.impl.ZKSyncMap;
Expand Down Expand Up @@ -195,9 +194,8 @@ public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiM

@Override
public <K, V> Future<AsyncMap<K, V>> getAsyncMap(String name) {
AsyncMapTTLMonitor<K, V> asyncMapTTLMonitor = AsyncMapTTLMonitor.getInstance(vertx, this);
return vertx.executeBlocking(event -> {
AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, asyncMapTTLMonitor, name));
AsyncMap zkAsyncMap = asyncMapCache.computeIfAbsent(name, key -> new ZKAsyncMap<>(vertx, curator, name));
event.complete(zkAsyncMap);
});
}
Expand Down Expand Up @@ -374,7 +372,6 @@ public void leave(Handler<AsyncResult<Void>> resultHandler) {
}
}
}).forPath(ZK_PATH_CLUSTER_NODE + nodeID);
AsyncMapTTLMonitor.getInstance(vertx, this).stop();
} catch (Exception e) {
log.error(e);
future.fail(e);
Expand Down

This file was deleted.

74 changes: 13 additions & 61 deletions src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKAsyncMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@
*/
package io.vertx.spi.cluster.zookeeper.impl;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.AsyncMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
Expand All @@ -30,30 +25,19 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor.*;
import java.util.*;

/**
* Created by Stream.Liu
*/
public class ZKAsyncMap<K, V> extends ZKMap<K, V> implements AsyncMap<K, V> {

private final PathChildrenCache curatorCache;
private AsyncMapTTLMonitor<K, V> asyncMapTTLMonitor;

public ZKAsyncMap(Vertx vertx, CuratorFramework curator, AsyncMapTTLMonitor<K,V> asyncMapTTLMonitor, String mapName) {
public ZKAsyncMap(Vertx vertx, CuratorFramework curator, String mapName) {
super(curator, vertx, ZK_PATH_ASYNC_MAP, mapName);
this.curatorCache = new PathChildrenCache(curator, mapPath, true);
try {
this.asyncMapTTLMonitor = asyncMapTTLMonitor;
curatorCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
throw new VertxException(e);
Expand Down Expand Up @@ -99,18 +83,8 @@ public Future<Void> put(K k, V v, long ttl) {
private Future<Void> put(K k, V v, Optional<Long> timeoutOptional) {
return assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> checkExists(k))
.compose(checkResult -> checkResult ? setData(k, v) : create(k, v))
.compose(aVoid -> {
JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k));
if (timeoutOptional.isPresent()) {
asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this);
body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get());
} else body.put(TTL_KEY_IS_CANCEL, true);
//publish a ttl message to all nodes.
vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body);

return Future.<Void>succeededFuture();
});
.compose(checkResult -> checkResult ? setData(k, v) : create(k, v, timeoutOptional))
.compose(stat -> Future.succeededFuture());
}

@Override
Expand All @@ -125,37 +99,15 @@ public Future<V> putIfAbsent(K k, V v, long ttl) {

private Future<V> putIfAbsent(K k, V v, Optional<Long> timeoutOptional) {
return assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> {
Promise<V> innerPromise = Promise.promise();
vertx.executeBlocking(future -> {
long startTime = Instant.now().toEpochMilli();
int retries = 0;

for (; ; ) {
try {
Stat stat = new Stat();
String path = keyPath(k);
V currentValue = getData(stat, path);
if (compareAndSet(startTime, retries++, stat, path, currentValue, v)) {
future.complete(currentValue);
return;
}
} catch (Exception e) {
future.fail(e);
return;
}
}
}, false, innerPromise);
return innerPromise.future();
})
.compose(aVoid -> get(k))
.compose(value -> {
JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k));
if (timeoutOptional.isPresent()) {
asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this);
body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get());
} else body.put(TTL_KEY_IS_CANCEL, true);
//publish a ttl message to all nodes.
vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body);
if (value == null) {
if (timeoutOptional.isPresent()) {
put(k, v, timeoutOptional);
} else {
put(k, v);
}
}
return Future.succeededFuture(value);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.INITIALIZED;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_ADDED;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_REMOVED;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.*;

/**
* Created by Stream.Liu
Expand Down Expand Up @@ -72,7 +70,7 @@ public void add(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
String path = valuePath(k, v);
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> checkExists(path))
.compose(checkResult -> checkResult ? setData(path, v) : create(path, v))
.compose(checkResult -> checkResult ? setData(path, v) : create(path, v, Optional.empty()))
.compose(stat -> {
//add to snapshot cache if path contains eventbus address
if (path.contains(EVENTBUS_PATH)) {
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/vertx/spi/cluster/zookeeper/impl/ZKMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.*;
import java.lang.reflect.Constructor;
import java.time.Instant;
import java.util.Optional;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -216,18 +217,23 @@ Future<Boolean> checkExists(String path) {
return future.future();
}

Future<Stat> create(K k, V v) {
return create(keyPath(k), v);
Future<Stat> create(K k, V v, Optional<Long> timeToLive) {
return create(keyPath(k), v, timeToLive);
}

Future<Stat> create(String path, V v) {
Future<Stat> create(String path, V v, Optional<Long> timeToLive) {
Promise<Stat> future = Promise.promise();
try {
//there are two type of node - ephemeral and persistent.
//if path is 'asyncMultiMap/subs/' which save the data of eventbus address and serverID we could using ephemeral,
//since the lifecycle of this path as long as this verticle.
CreateMode nodeMode = path.contains(EVENTBUS_PATH) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
curator.create().creatingParentsIfNeeded().withMode(nodeMode).inBackground((cl, el) -> {
//as zk 3.5.x provide ttl node mode, we should consider it.
nodeMode = timeToLive.isPresent() ? CreateMode.PERSISTENT_WITH_TTL : nodeMode;
ProtectACLCreateModeStatPathAndBytesable<String> pathAndBytesable = timeToLive.isPresent()
? curator.create().withTtl(timeToLive.get()).creatingParentsIfNeeded()
: curator.create().creatingParentsIfNeeded();
pathAndBytesable.withMode(nodeMode).inBackground((cl, el) -> {
if (el.getType() == CuratorEventType.CREATE) {
vertx.runOnContext(event -> future.complete(el.getStat()));
}
Expand Down
Loading

0 comments on commit fdd14dc

Please sign in to comment.