diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index 92e93cd4ab6a4..d717dac38eec8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -30,10 +31,11 @@ import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.util.HttpRequestClient; -import org.apache.hudi.util.HttpRequestClient.RequestMethod; +import org.apache.hudi.timeline.TimelineServiceClient; +import org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod; import com.fasterxml.jackson.core.type.TypeReference; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,28 +66,27 @@ public class TimelineServerBasedWriteMarkers extends WriteMarkers { private static final Logger LOG = LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class); private static final TypeReference BOOLEAN_TYPE_REFERENCE = new TypeReference() {}; private static final TypeReference> SET_TYPE_REFERENCE = new TypeReference>() {}; - - private final HttpRequestClient httpRequestClient; + private final TimelineServiceClient timelineServiceClient; public TimelineServerBasedWriteMarkers(HoodieTable table, String instantTime) { this(table.getMetaClient().getBasePath().toString(), table.getMetaClient().getMarkerFolderPath(instantTime), instantTime, - table.getConfig().getViewStorageConfig().getRemoteViewServerHost(), - table.getConfig().getViewStorageConfig().getRemoteViewServerPort(), - table.getConfig().getViewStorageConfig().getRemoteTimelineClientTimeoutSecs()); + table.getConfig().getViewStorageConfig()); } - TimelineServerBasedWriteMarkers(String basePath, String markerFolderPath, String instantTime, - String timelineServerHost, int timelineServerPort, int timeoutSecs) { + TimelineServerBasedWriteMarkers(String basePath, + String markerFolderPath, + String instantTime, + FileSystemViewStorageConfig fileSystemViewStorageConfig) { super(basePath, markerFolderPath, instantTime); - this.httpRequestClient = new HttpRequestClient(timelineServerHost, timelineServerPort, timeoutSecs, 0); + this.timelineServiceClient = new TimelineServiceClient(fileSystemViewStorageConfig); } @Override public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { - return httpRequestClient.executeRequest( + return executeRequestToTimelineServer( DELETE_MARKER_DIR_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException("Failed to delete marker directory " + markerDirPath.toString(), e); @@ -96,7 +97,7 @@ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) { public boolean doesMarkerDirExist() { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { - return httpRequestClient.executeRequest( + return executeRequestToTimelineServer( MARKERS_DIR_EXISTS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.GET); } catch (IOException e) { throw new HoodieRemoteException("Failed to check marker directory " + markerDirPath.toString(), e); @@ -107,7 +108,7 @@ public boolean doesMarkerDirExist() { public Set createdAndMergedDataPaths(HoodieEngineContext context, int parallelism) throws IOException { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { - Set markerPaths = httpRequestClient.executeRequest( + Set markerPaths = executeRequestToTimelineServer( CREATE_AND_MERGE_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET); return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet()); } catch (IOException e) { @@ -120,7 +121,7 @@ public Set createdAndMergedDataPaths(HoodieEngineContext context, int pa public Set allMarkerFilePaths() { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { - return httpRequestClient.executeRequest( + return executeRequestToTimelineServer( ALL_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET); } catch (IOException e) { throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e); @@ -174,8 +175,8 @@ public Option createWithEarlyConflictDetection(String partitionPath private boolean executeCreateMarkerRequest(Map paramsMap, String partitionPath, String markerFileName) { boolean success; try { - success = httpRequestClient.executeRequest( - CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, HttpRequestClient.RequestMethod.POST); + success = executeRequestToTimelineServer( + CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e); } @@ -206,4 +207,12 @@ private Map getConfigMap( return paramsMap; } + private T executeRequestToTimelineServer(String requestPath, + Map queryParameters, + TypeReference reference, + RequestMethod method) throws IOException { + return timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(method, requestPath).addQueryParams(queryParameters).build()) + .getDecodedContent(reference); + } } diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index c1f58280ea943..d1d67b2295667 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -132,6 +132,14 @@ ${project.version} test + + org.apache.hudi + hudi-timeline-service + ${project.version} + tests + test-jar + test + diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java index 6f3d9b67ed655..1080e3cf33d47 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java @@ -20,6 +20,8 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; @@ -28,14 +30,21 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.MarkerUtils; -import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hudi.timeline.service.TimelineServiceTestHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -46,10 +55,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase { - TimelineService timelineService; + + private static final Logger LOG = LoggerFactory.getLogger(TestTimelineServerBasedWriteMarkers.class); + private static int DEFAULT_READ_TIMEOUT_SECS = 60; + + TimelineService timelineService = null; @BeforeEach public void setup() throws IOException { @@ -61,21 +75,8 @@ public void setup() throws IOException { this.storage = metaClient.getStorage(); this.markerFolderPath = new StoragePath(metaClient.getMarkerFolderPath("000")); - FileSystemViewStorageConfig storageConf = - FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); - HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); - - try { - timelineService = new TimelineService(localEngineContext, HadoopFSUtils.getStorageConf(), - TimelineService.Config.builder().serverPort(0).enableMarkerRequests(true).build(), - storage, - FileSystemViewManager.createViewManager(localEngineContext, storageConf, HoodieCommonConfig.newBuilder().build())); - timelineService.startService(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - this.writeMarkers = new TimelineServerBasedWriteMarkers( - metaClient.getBasePath().toString(), markerFolderPath.toString(), "000", "localhost", timelineService.getServerPort(), 300); + restartServerAndClient(0); + LOG.info("Connecting to Timeline Server :" + timelineService.getServerPort()); } @AfterEach @@ -111,6 +112,73 @@ void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException { closeQuietly(inputStream); } + @Test + public void testCreationWithTimelineServiceRetries() throws Exception { + // Validate marker creation/ deletion work without any failures in the timeline service. + createSomeMarkers(true); + assertTrue(storage.exists(markerFolderPath)); + writeMarkers.doesMarkerDirExist(); + + // Simulate only a single failure and ensure the request fails. + restartServerAndClient(1); + // validate that subsequent request fails + validateRequestFailed(writeMarkers::doesMarkerDirExist); + + // Simulate 3 failures, but make sure the request succeeds as retries are enabled + restartServerAndClient(3); + // Configure a new client with retries enabled. + TimelineServerBasedWriteMarkers writeMarkersWithRetries = initWriteMarkers( + metaClient.getBasePath().toString(), + markerFolderPath.toString(), + timelineService.getServerPort(), + true); + writeMarkersWithRetries.doesMarkerDirExist(); + } + + private void restartServerAndClient(int numberOfSimulatedConnectionFailures) { + if (timelineService != null) { + timelineService.close(); + } + try { + HoodieEngineContext hoodieEngineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); + FileSystemViewStorageConfig storageConf = + FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); + TimelineServiceTestHarness.Builder builder = TimelineServiceTestHarness.newBuilder(); + builder.withNumberOfSimulatedConnectionFailures(numberOfSimulatedConnectionFailures); + timelineService = builder.build( + hoodieEngineContext, + (Configuration) storage.getConf().unwrap(), + TimelineService.Config.builder().serverPort(0).enableMarkerRequests(true).build(), + (FileSystem) storage.getFileSystem(), + FileSystemViewManager.createViewManager( + hoodieEngineContext, storageConf, HoodieCommonConfig.newBuilder().build())); + timelineService.startService(); + this.writeMarkers = initWriteMarkers( + metaClient.getBasePath().toString(), + markerFolderPath.toString(), + timelineService.getServerPort(), + false); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private static TimelineServerBasedWriteMarkers initWriteMarkers(String basePath, + String markerFolderPath, + int serverPort, + boolean enableRetries) { + FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost") + .withRemoteServerPort(serverPort) + .withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS); + if (enableRetries) { + builder.withRemoteTimelineClientRetry(true) + .withRemoteTimelineClientMaxRetryIntervalMs(30000L) + .withRemoteTimelineClientMaxRetryNumbers(5); + } + return new TimelineServerBasedWriteMarkers(basePath, markerFolderPath, "000", builder.build()); + } + /** * Closes {@code Closeable} quietly. * @@ -126,4 +194,12 @@ private void closeQuietly(Closeable closeable) { // Ignore } } + + private static void validateRequestFailed(Executable executable) { + assertThrows( + HoodieRemoteException.class, + executable, + "Should catch a NoHTTPResponseException" + ); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java index 5121a05ba9079..6237641d7c787 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java @@ -51,7 +51,7 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { protected JavaSparkContext jsc; protected HoodieSparkEngineContext context; - private void createSomeMarkers(boolean isTablePartitioned) { + protected void createSomeMarkers(boolean isTablePartitioned) { writeMarkers.create(isTablePartitioned ? "2020/06/01" : "", "file1", IOType.MERGE); writeMarkers.create(isTablePartitioned ? "2020/06/02" : "", "file2", IOType.APPEND); writeMarkers.create(isTablePartitioned ? "2020/06/03" : "", "file3", IOType.CREATE); diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index d35cf37efe2d4..2538b939b2606 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -223,6 +223,19 @@ test + + org.eclipse.jetty + jetty-server + ${jetty.version} + test + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + test + + org.apache.hbase diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 7ebea7b4f799d..21dc51b675eed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -36,19 +36,15 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieRemoteException; +import org.apache.hudi.timeline.TimelineServiceClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; -import org.apache.http.Consts; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +56,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.timeline.TimelineServiceClient.RequestMethod; + /** * A proxy for table file-system view which translates local View API calls to REST calls to remote timeline service. */ @@ -138,21 +136,13 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private static final TypeReference>> BASE_FILE_MAP_REFERENCE = new TypeReference>>() {}; private static final TypeReference>> FILE_SLICE_MAP_REFERENCE = new TypeReference>>() {}; - private final String serverHost; - private final int serverPort; private final String basePath; private final HoodieTableMetaClient metaClient; private HoodieTimeline timeline; - private final int timeoutMs; + private final TimelineServiceClient timelineServiceClient; private boolean closed = false; - private RetryHelper retryHelper; - - private enum RequestMethod { - GET, POST - } - public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) { this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build()); } @@ -161,35 +151,20 @@ public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSys this.basePath = metaClient.getBasePath().toString(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - this.serverHost = viewConf.getRemoteViewServerHost(); - this.serverPort = viewConf.getRemoteViewServerPort(); - this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000; - if (viewConf.isRemoteTimelineClientRetryEnabled()) { - retryHelper = new RetryHelper( - viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), - viewConf.getRemoteTimelineClientMaxRetryNumbers(), - viewConf.getRemoteTimelineInitialRetryIntervalMs(), - viewConf.getRemoteTimelineClientRetryExceptions(), - "Sending request"); - } + this.timelineServiceClient = new TimelineServiceClient(viewConf); } private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, RequestMethod method) throws IOException { ValidationUtils.checkArgument(!closed, "View already closed"); - URIBuilder builder = new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme(SCHEME); - queryParameters.forEach(builder::addParameter); - // Adding mandatory parameters - Last instants affecting file-slice - timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.requestedTime())); - builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash()); + timeline.lastInstant().ifPresent(instant -> queryParameters.put(LAST_INSTANT_TS, instant.requestedTime())); + queryParameters.put(TIMELINE_HASH, timeline.getTimelineHash()); - String url = builder.toString(); - LOG.info("Sending request : ({})", url); - Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method); - String content = response.returnContent().asString(Consts.UTF_8); - return (T) OBJECT_MAPPER.readValue(content, reference); + return timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(method, requestPath).addQueryParams(queryParameters).build()) + .getDecodedContent(reference); } private Map getParamsWithPartitionPath(String partitionPath) { @@ -545,14 +520,4 @@ public void reset() { public void sync() { refresh(); } - - private Response get(int timeoutMs, String url, RequestMethod method) throws IOException { - switch (method) { - case GET: - return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); - case POST: - default: - return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClient.java b/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClient.java new file mode 100644 index 0000000000000..429fcf495c460 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClient.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; + +import org.apache.http.client.utils.URIBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Default implementation of an HTTP network client to trigger HTTP calls (GET or POST) + * to the Timeline Server from the executors. + * This class uses the Fluent HTTP client part of the HTTPComponents. + */ +public class TimelineServiceClient extends TimelineServiceClientBase { + + private static final Logger LOG = LoggerFactory.getLogger(TimelineServiceClient.class); + private static final String DEFAULT_SCHEME = "http"; + + protected final String timelineServerHost; + protected final int timelineServerPort; + protected final int timeoutMs; + + public TimelineServiceClient(HoodieConfig config) { + super(config); + this.timelineServerHost = config.getStringOrDefault(FileSystemViewStorageConfig.REMOTE_HOST_NAME); + this.timelineServerPort = config.getIntOrDefault(FileSystemViewStorageConfig.REMOTE_PORT_NUM); + this.timeoutMs = (int) TimeUnit.SECONDS.toMillis(config.getIntOrDefault(FileSystemViewStorageConfig.REMOTE_TIMEOUT_SECS)); + } + + @Override + protected Response executeRequest(Request request) throws IOException { + URIBuilder builder = + new URIBuilder().setHost(timelineServerHost).setPort(timelineServerPort).setPath(request.getPath()).setScheme(DEFAULT_SCHEME); + + if (request.getQueryParameters().isPresent()) { + request.getQueryParameters().get().forEach(builder::addParameter); + } + + String url = builder.toString(); + LOG.debug("Sending request : (" + url + ")"); + org.apache.http.client.fluent.Response response = get(request.getMethod(), url, timeoutMs); + return new Response(response.returnContent().asString()); + } + + private org.apache.http.client.fluent.Response get(RequestMethod method, String url, int timeoutMs) throws IOException { + switch (method) { + case GET: + return org.apache.http.client.fluent.Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + case POST: + default: + return org.apache.http.client.fluent.Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClientBase.java b/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClientBase.java new file mode 100644 index 0000000000000..c06006830f6ac --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/timeline/TimelineServiceClientBase.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.RetryHelper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for a client to trigger HTTP calls (GET or POST) + * to the Timeline Server from the executors. + */ +public abstract class TimelineServiceClientBase implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(TimelineServiceClientBase.class); + + private RetryHelper retryHelper; + + public TimelineServiceClientBase(HoodieConfig config) { + if (config.getBooleanOrDefault(FileSystemViewStorageConfig.REMOTE_RETRY_ENABLE)) { + retryHelper = new RetryHelper<>( + config.getLongOrDefault(FileSystemViewStorageConfig.REMOTE_MAX_RETRY_INTERVAL_MS), + config.getIntOrDefault(FileSystemViewStorageConfig.REMOTE_MAX_RETRY_NUMBERS), + config.getLongOrDefault(FileSystemViewStorageConfig.REMOTE_INITIAL_RETRY_INTERVAL_MS), + config.getStringOrDefault(FileSystemViewStorageConfig.RETRY_EXCEPTIONS), + "Sending request to timeline server"); + } + } + + protected abstract Response executeRequest(Request request) throws IOException; + + public Response makeRequest(Request request) throws IOException { + return (retryHelper != null) ? retryHelper.start(() -> executeRequest(request)) : executeRequest(request); + } + + public static class Request { + private final TimelineServiceClient.RequestMethod method; + private final String path; + private final Option> queryParameters; + + public Request(TimelineServiceClient.RequestMethod method, String path, Option> queryParameters) { + this.method = method; + this.path = path; + this.queryParameters = queryParameters; + } + + public RequestMethod getMethod() { + return method; + } + + public String getPath() { + return path; + } + + public Option> getQueryParameters() { + return queryParameters; + } + + public static TimelineServiceClient.Request.Builder newBuilder(TimelineServiceClient.RequestMethod method, String path) { + return new TimelineServiceClient.Request.Builder(method, path); + } + + public static class Builder { + private final TimelineServiceClient.RequestMethod method; + private final String path; + private Option> queryParameters; + + public Builder(TimelineServiceClient.RequestMethod method, String path) { + this.method = method; + this.path = path; + this.queryParameters = Option.empty(); + } + + public Request.Builder addQueryParam(String key, String value) { + queryParameters = (queryParameters.isPresent()) ? queryParameters : Option.of(new HashMap<>()); + queryParameters.get().put(key, value); + return this; + } + + public Request.Builder addQueryParams(Map parameters) { + queryParameters = (queryParameters.isPresent()) ? queryParameters : Option.of(new HashMap<>()); + queryParameters.get().putAll(parameters); + return this; + } + + public TimelineServiceClient.Request build() { + return new TimelineServiceClient.Request(method, path, queryParameters); + } + } + } + + public static class Response { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); + private final String content; + + public Response(String content) { + this.content = content; + } + + public String getContent() { + return content; + } + + public T getDecodedContent(TypeReference reference) throws JsonProcessingException { + return (T) OBJECT_MAPPER.readValue(content, reference); + } + } + + public enum RequestMethod { + GET, POST + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/timeline/TestTimelineServiceClient.java b/hudi-common/src/test/java/org/apache/hudi/timeline/TestTimelineServiceClient.java new file mode 100644 index 0000000000000..e3309dec6be56 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/timeline/TestTimelineServiceClient.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.Option; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.NoHttpResponseException; +import org.apache.http.ParseException; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests {@link public class TestTimelineServiceClient}. + */ +public class TestTimelineServiceClient { + + private static final String TEST_ENDPOINT = "/test-endpoint"; + private static final int DEFAULT_READ_TIMEOUT_SECS = 5; + private static final boolean DEFAULT_HTTP_RESPONSE = true; + + private Server server; + private int serverPort; + + @BeforeEach + public void setUp() throws Exception { + // Create a Jetty server + server = new Server(0); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + server.setHandler(context); + context.addServlet(new ServletHolder(new TestServlet()), TEST_ENDPOINT); + + // Start the server + server.start(); + serverPort = server.getURI().getPort(); + } + + @AfterEach + public void tearDown() throws Exception { + // Stop the server after each test + if (server != null) { + server.stop(); + } + } + + @Test + public void testSuccessfulGetRequest() throws IOException { + FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost") + .withRemoteServerPort(serverPort) + .withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS); + MockTimelineServiceNetworkClient client = new MockTimelineServiceNetworkClient(builder.build()); + TimelineServiceClientBase.Request request = + TimelineServiceClientBase.Request.newBuilder(TimelineServiceClientBase.RequestMethod.GET, TEST_ENDPOINT).build(); + TimelineServiceClientBase.Response response = client.makeRequest(request); + assertEquals(DEFAULT_HTTP_RESPONSE, response.getDecodedContent(new TypeReference() {})); + } + + @Test + public void testSuccessfulPostRequest() throws IOException { + FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost") + .withRemoteServerPort(serverPort) + .withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS); + MockTimelineServiceNetworkClient client = new MockTimelineServiceNetworkClient(builder.build()); + TimelineServiceClientBase.Request request = TimelineServiceClientBase.Request.newBuilder(TimelineServiceClientBase.RequestMethod.POST, TEST_ENDPOINT) + .addQueryParam("key1", "val1") + .addQueryParams(Collections.singletonMap("key2", "val2")) + .build(); + TimelineServiceClientBase.Response response = client.makeRequest(request); + assertEquals(DEFAULT_HTTP_RESPONSE, response.getDecodedContent(new TypeReference() {})); + } + + private static List testScenariosForFailures() { + return asList( + // Ensure that the retries can handle both IOExceptions and RuntimeExceptions. + Arguments.of(5, 2, InducedFailuresInfo.ExceptionType.NO_HTTP_RESPONSE_EXCEPTION, true), + Arguments.of(5, 2, InducedFailuresInfo.ExceptionType.PARSE_EXCEPTION, true), + Arguments.of(2, 5, InducedFailuresInfo.ExceptionType.NO_HTTP_RESPONSE_EXCEPTION, false), + Arguments.of(2, 5, InducedFailuresInfo.ExceptionType.PARSE_EXCEPTION, false) + ); + } + + @ParameterizedTest + @MethodSource("testScenariosForFailures") + public void testRetriesForExceptions(int numberOfRetries, + int numberOfInducedFailures, + InducedFailuresInfo.ExceptionType exceptionType, + boolean shouldSucceed) throws IOException { + FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost") + .withRemoteServerPort(serverPort) + .withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS) + .withRemoteTimelineClientRetry(true) + .withRemoteTimelineClientMaxRetryIntervalMs(2000L) + .withRemoteTimelineClientMaxRetryNumbers(numberOfRetries); + + InducedFailuresInfo inducedFailuresInfo = new InducedFailuresInfo(exceptionType, numberOfInducedFailures); + MockTimelineServiceNetworkClient client = new MockTimelineServiceNetworkClient(builder.build(), Option.of(inducedFailuresInfo)); + + TimelineServiceClientBase.Request request = TimelineServiceClientBase.Request.newBuilder(TimelineServiceClientBase.RequestMethod.GET, TEST_ENDPOINT).build(); + if (shouldSucceed) { + TimelineServiceClientBase.Response response = client.makeRequest(request); + assertEquals(DEFAULT_HTTP_RESPONSE, response.getDecodedContent(new TypeReference() {})); + } else { + Class expectedException = exceptionType.getExceptionType(); + assertThrows(expectedException, () -> client.makeRequest(request), "Should throw an Exception.'"); + } + } + + private static class MockTimelineServiceNetworkClient extends TimelineServiceClient { + + private final Option inducedFailuresInfo; + private int currentInducedFailures; + + public MockTimelineServiceNetworkClient(HoodieConfig config) { + this(config, Option.empty()); + } + + public MockTimelineServiceNetworkClient(HoodieConfig config, Option inducedFailuresInfo) { + super(config); + this.inducedFailuresInfo = inducedFailuresInfo; + currentInducedFailures = 0; + } + + @Override + protected Response executeRequest(Request request) throws IOException { + if (inducedFailuresInfo.isPresent() && ++currentInducedFailures <= inducedFailuresInfo.get().maxInducedFailures) { + switch (inducedFailuresInfo.get().exceptionType) { + case PARSE_EXCEPTION: + throw new ParseException("Parse Exception"); + case NO_HTTP_RESPONSE_EXCEPTION: + default: + throw new NoHttpResponseException("No HTTP Response Exception"); + } + } + return super.executeRequest(request); + } + } + + private static class InducedFailuresInfo { + private final ExceptionType exceptionType; + private final int maxInducedFailures; + + public InducedFailuresInfo(ExceptionType exceptionType, int maxInducedFailures) { + this.exceptionType = exceptionType; + this.maxInducedFailures = maxInducedFailures; + } + + public enum ExceptionType { + NO_HTTP_RESPONSE_EXCEPTION(NoHttpResponseException.class), + PARSE_EXCEPTION(ParseException.class); + + private final Class exceptionType; + + // Constructor to set the float value for each enum constant + ExceptionType(Class exceptionType) { + this.exceptionType = exceptionType; + } + + // Method to get the float value for an enum constant + public Class getExceptionType() { + return this.exceptionType; + } + } + } + + // A simple servlet to handle HTTP requests for test purposes. + private static class TestServlet extends HttpServlet { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + sendResponse(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + sendResponse(req, resp); + } + + private void sendResponse(HttpServletRequest req, HttpServletResponse resp) throws IOException { + // Set the content type + resp.setContentType("text/plain"); + resp.setStatus(HttpServletResponse.SC_OK); + ObjectMapper objectMapper = new ObjectMapper(); + resp.getWriter().println(objectMapper.writeValueAsString(DEFAULT_HTTP_RESPONSE)); + } + } +} diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TimelineServiceTestHarness.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TimelineServiceTestHarness.java new file mode 100644 index 0000000000000..9e87ad28f9a4e --- /dev/null +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TimelineServiceTestHarness.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.http.HttpResponse; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.util.EntityUtils; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.util.Map; + +/** + * Provides a wrapped {@link TimelineService} to emulate HTTP request + * failures for testing. When number of failures N is configured above 0, + * it proxies all HTTP requests to the wrapper timeline service, + * and fails the first N HTTP requests. + */ +public class TimelineServiceTestHarness extends TimelineService { + + private static final String PROXY_ALL_URLS = "/*"; + private int numberOfSimulatedConnectionFailures; + private Option server; + private int serverPort; + + public TimelineServiceTestHarness(HoodieEngineContext context, + Configuration hadoopConf, + Config timelineServerConf, + FileSystem fileSystem, + FileSystemViewManager globalFileSystemViewManager) throws IOException { + super( + context, + new HadoopStorageConfiguration(hadoopConf), + timelineServerConf, + new HoodieHadoopStorage(fileSystem), + globalFileSystemViewManager); + server = Option.empty(); + serverPort = 0; + } + + public void setNumberOfSimulatedConnectionFailures(int numberOfSimulatedConnectionFailures) { + this.numberOfSimulatedConnectionFailures = numberOfSimulatedConnectionFailures; + } + + @Override + public int startService() throws IOException { + if (numberOfSimulatedConnectionFailures > 0) { + try { + int timelineServicePort = super.startService(); + server = Option.of(new Server(serverPort)); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + context.addServlet(new ServletHolder(new FailureInducingHttpServlet(timelineServicePort, numberOfSimulatedConnectionFailures)), PROXY_ALL_URLS); + server.get().setHandler(context); + server.get().start(); + serverPort = server.get().getURI().getPort(); + // Proxy requests so we can emulate failure. + return serverPort; + } catch (Exception exception) { + throw new IOException(exception); + } + } + // Act as a pass through in the case that failure emulation is not required. + return super.startService(); + } + + @Override + public int getServerPort() { + if (serverPort > 0) { + return serverPort; + } + return super.getServerPort(); + } + + @Override + public void close() { + super.close(); + server.ifPresent(server -> { + try { + server.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private int numberOfSimulatedConnectionFailures = 0; + + public Builder withNumberOfSimulatedConnectionFailures(int numberOfSimulatedConnectionFailures) { + this.numberOfSimulatedConnectionFailures = numberOfSimulatedConnectionFailures; + return this; + } + + public TimelineServiceTestHarness build(HoodieEngineContext context, + Configuration hadoopConf, + Config timelineServerConf, + FileSystem fileSystem, + FileSystemViewManager globalFileSystemViewManager) throws IOException { + TimelineServiceTestHarness timelineServiceTestHarness = new TimelineServiceTestHarness( + context, hadoopConf, timelineServerConf, fileSystem, globalFileSystemViewManager); + timelineServiceTestHarness.setNumberOfSimulatedConnectionFailures(numberOfSimulatedConnectionFailures); + return timelineServiceTestHarness; + } + } + + private static class FailureInducingHttpServlet extends HttpServlet { + + private final int timelineServerPort; + private final int maxSimulatedConnectionFailures; + private int currentNumConnectionSimulatedFailures; + + public FailureInducingHttpServlet(int timelineServerPort, + int maxSimulatedConnectionFailures) { + this.timelineServerPort = timelineServerPort; + this.maxSimulatedConnectionFailures = maxSimulatedConnectionFailures; + currentNumConnectionSimulatedFailures = 0; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + // Emulate HTTP request failures for the first maxSimulatedConnectionFailures requests. + if (++currentNumConnectionSimulatedFailures <= maxSimulatedConnectionFailures) { + throw new NoHttpResponseException("Simulated connection failure"); + } + + // After maxSimulatedConnectionFailures requests, proxy the requests to the actual Timeline service. + URIBuilder builder = + new URIBuilder().setHost("localhost") + .setPort(timelineServerPort) + .setPath(req.getPathInfo()) + .setScheme("http"); + + Map parameterMap = req.getParameterMap(); + for (Map.Entry entry : parameterMap.entrySet()) { + String paramName = entry.getKey(); + String[] paramValues = entry.getValue(); + // Store each value in the Result object + for (String value : paramValues) { + builder.addParameter(paramName, value); + } + } + + String url = builder.toString(); + org.apache.http.client.fluent.Response response; + switch (req.getMethod()) { + case "GET": + response = org.apache.http.client.fluent.Request.Get(url).connectTimeout(10000).socketTimeout(10000).execute(); + break; + case "POST": + default: + response = org.apache.http.client.fluent.Request.Post(url).connectTimeout(10000).socketTimeout(10000).execute(); + } + + HttpResponse httpResponse = response.returnResponse(); + resp.setContentType(httpResponse.getEntity().getContentType().getValue()); + resp.setStatus(httpResponse.getStatusLine().getStatusCode()); + resp.getWriter().println(EntityUtils.toString(httpResponse.getEntity())); + } + } +} diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index 1f532d0827c91..3bb4e9e67486b 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.dto.DTOUtils; import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; @@ -35,10 +36,13 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hudi.timeline.service.TimelineServiceTestHarness; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it. @@ -60,70 +64,55 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSystemView { private static final Logger LOG = LoggerFactory.getLogger(TestRemoteHoodieTableFileSystemView.class); + private static int DEFAULT_READ_TIMEOUT_SECS = 60; - private TimelineService server; + private TimelineService server = null; private RemoteHoodieTableFileSystemView view; protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { + return getFileSystemView(timeline, 0); + } + + protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline, int numberOfSimulatedConnectionFailures) { FileSystemViewStorageConfig sConf = FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build(); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getStorageConf()); try { - server = new TimelineService(localEngineContext, HadoopFSUtils.getStorageConf(), + if (server != null) { + server.close(); + } + TimelineServiceTestHarness.Builder builder = TimelineServiceTestHarness.newBuilder(); + builder.withNumberOfSimulatedConnectionFailures(numberOfSimulatedConnectionFailures); + server = builder.build( + localEngineContext, + HadoopFSUtils.getStorageConf().unwrap(), TimelineService.Config.builder().serverPort(0).build(), - HoodieStorageUtils.getStorage(getDefaultStorageConf()), + (FileSystem) HoodieStorageUtils.getStorage(getDefaultStorageConf()).getFileSystem(), FileSystemViewManager.createViewManager(localEngineContext, sConf, commonConfig)); server.startService(); } catch (Exception ex) { throw new RuntimeException(ex); } LOG.info("Connecting to Timeline Server :" + server.getServerPort()); - view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); + view = initFsView(metaClient, server.getServerPort(), false); return view; } @Test public void testRemoteHoodieTableFileSystemViewWithRetry() { - // Service is available. + // Validate remote FS view without any failures in the timeline service. view.getLatestBaseFiles(); - // Shut down the service. - server.close(); - try { - // Immediately fails and throws a connection refused exception. - view.getLatestBaseFiles(); - fail("Should be catch Exception 'Connection refused (Connection refused)'"); - } catch (HoodieRemoteException e) { - assert e.getMessage().contains("Connection refused (Connection refused)"); - } - // Enable API request retry for remote file system view. - view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig - .newBuilder() - .withRemoteServerHost("localhost") - .withRemoteServerPort(server.getServerPort()) - .withRemoteTimelineClientRetry(true) - .withRemoteTimelineClientMaxRetryIntervalMs(2000L) - .withRemoteTimelineClientMaxRetryNumbers(4) - .build()); - try { - view.getLatestBaseFiles(); - fail("Should be catch Exception 'Connection refused (Connection refused)'"); - } catch (HoodieRemoteException e) { - assert e.getMessage().contains("Connection refused (Connection refused)"); - } - // Retry succeed after 2 or 3 tries. - new Thread(() -> { - try { - Thread.sleep(5000L); - LOG.info("Restart server."); - server.startService(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }).run(); - view.getLatestBaseFiles(); - server.close(); + + // Simulate only a single failure and ensure the request fails. + getFileSystemView(metaClient.getActiveTimeline(), 1); + validateRequestFailed(view::getLatestBaseFiles); + + // Simulate 3 failures, but make sure the request succeeds as retries are enabled + getFileSystemView(metaClient.getActiveTimeline(), 3); + RemoteHoodieTableFileSystemView viewWithRetries = initFsView(metaClient, server.getServerPort(), true); + viewWithRetries.getLatestBaseFiles(); } @Test @@ -180,4 +169,27 @@ private HoodieFileGroup createHoodieFileGroup() { return new HoodieFileGroup("", "data", activeTimeline.getCommitsTimeline().filterCompletedInstants()); } + + private static RemoteHoodieTableFileSystemView initFsView(HoodieTableMetaClient metaClient, + int serverPort, + boolean enableRetries) { + FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost") + .withRemoteServerPort(serverPort) + .withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS); + if (enableRetries) { + builder.withRemoteTimelineClientTimeoutSecs(300) + .withRemoteTimelineClientRetry(true) + .withRemoteTimelineClientMaxRetryIntervalMs(2000L) + .withRemoteTimelineClientMaxRetryNumbers(5); + } + return new RemoteHoodieTableFileSystemView(metaClient, builder.build()); + } + + private static void validateRequestFailed(Executable executable) { + assertThrows( + HoodieRemoteException.class, + executable, + "Should catch a NoHTTPResponseException'" + ); + } }