Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8988] Implement retry logic for all HTTP calls to timeline server including remote file system view and markers #12804

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -30,10 +31,11 @@
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.HttpRequestClient;
import org.apache.hudi.util.HttpRequestClient.RequestMethod;
import org.apache.hudi.timeline.TimelineServiceClient;
import org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod;

import com.fasterxml.jackson.core.type.TypeReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,28 +66,27 @@ public class TimelineServerBasedWriteMarkers extends WriteMarkers {
private static final Logger LOG = LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class);
private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>() {};
private static final TypeReference<Set<String>> SET_TYPE_REFERENCE = new TypeReference<Set<String>>() {};

private final HttpRequestClient httpRequestClient;
private final TimelineServiceClient timelineServiceClient;

public TimelineServerBasedWriteMarkers(HoodieTable table, String instantTime) {
this(table.getMetaClient().getBasePath().toString(),
table.getMetaClient().getMarkerFolderPath(instantTime), instantTime,
table.getConfig().getViewStorageConfig().getRemoteViewServerHost(),
table.getConfig().getViewStorageConfig().getRemoteViewServerPort(),
table.getConfig().getViewStorageConfig().getRemoteTimelineClientTimeoutSecs());
table.getConfig().getViewStorageConfig());
}

TimelineServerBasedWriteMarkers(String basePath, String markerFolderPath, String instantTime,
String timelineServerHost, int timelineServerPort, int timeoutSecs) {
TimelineServerBasedWriteMarkers(String basePath,
String markerFolderPath,
String instantTime,
FileSystemViewStorageConfig fileSystemViewStorageConfig) {
super(basePath, markerFolderPath, instantTime);
this.httpRequestClient = new HttpRequestClient(timelineServerHost, timelineServerPort, timeoutSecs, 0);
this.timelineServiceClient = new TimelineServiceClient(fileSystemViewStorageConfig);
}

@Override
public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
return executeRequestToTimelineServer(
DELETE_MARKER_DIR_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to delete marker directory " + markerDirPath.toString(), e);
Expand All @@ -96,7 +97,7 @@ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
public boolean doesMarkerDirExist() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
return executeRequestToTimelineServer(
MARKERS_DIR_EXISTS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to check marker directory " + markerDirPath.toString(), e);
Expand All @@ -107,7 +108,7 @@ public boolean doesMarkerDirExist() {
public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int parallelism) throws IOException {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
Set<String> markerPaths = httpRequestClient.executeRequest(
Set<String> markerPaths = executeRequestToTimelineServer(
CREATE_AND_MERGE_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
} catch (IOException e) {
Expand All @@ -120,7 +121,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
public Set<String> allMarkerFilePaths() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
return executeRequestToTimelineServer(
ALL_MARKERS_URL, paramsMap, SET_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e);
Expand Down Expand Up @@ -174,8 +175,8 @@ public Option<StoragePath> createWithEarlyConflictDetection(String partitionPath
private boolean executeCreateMarkerRequest(Map<String, String> paramsMap, String partitionPath, String markerFileName) {
boolean success;
try {
success = httpRequestClient.executeRequest(
CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, HttpRequestClient.RequestMethod.POST);
success = executeRequestToTimelineServer(
CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e);
}
Expand Down Expand Up @@ -206,4 +207,12 @@ private Map<String, String> getConfigMap(
return paramsMap;
}

private <T> T executeRequestToTimelineServer(String requestPath,
Map<String, String> queryParameters,
TypeReference reference,
RequestMethod method) throws IOException {
return timelineServiceClient.makeRequest(
TimelineServiceClient.Request.newBuilder(method, requestPath).addQueryParams(queryParameters).build())
.getDecodedContent(reference);
}
}
8 changes: 8 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- HBase - Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
Expand All @@ -28,14 +30,21 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.TimelineServiceTestHarness;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -46,10 +55,15 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestTimelineServerBasedWriteMarkers extends TestWriteMarkersBase {
TimelineService timelineService;

private static final Logger LOG = LoggerFactory.getLogger(TestTimelineServerBasedWriteMarkers.class);
private static int DEFAULT_READ_TIMEOUT_SECS = 60;

TimelineService timelineService = null;

@BeforeEach
public void setup() throws IOException {
Expand All @@ -61,21 +75,8 @@ public void setup() throws IOException {
this.storage = metaClient.getStorage();
this.markerFolderPath = new StoragePath(metaClient.getMarkerFolderPath("000"));

FileSystemViewStorageConfig storageConf =
FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build();
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());

try {
timelineService = new TimelineService(localEngineContext, HadoopFSUtils.getStorageConf(),
TimelineService.Config.builder().serverPort(0).enableMarkerRequests(true).build(),
storage,
FileSystemViewManager.createViewManager(localEngineContext, storageConf, HoodieCommonConfig.newBuilder().build()));
timelineService.startService();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
this.writeMarkers = new TimelineServerBasedWriteMarkers(
metaClient.getBasePath().toString(), markerFolderPath.toString(), "000", "localhost", timelineService.getServerPort(), 300);
restartServerAndClient(0);
LOG.info("Connecting to Timeline Server :" + timelineService.getServerPort());
}

@AfterEach
Expand Down Expand Up @@ -111,6 +112,73 @@ void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException {
closeQuietly(inputStream);
}

@Test
public void testCreationWithTimelineServiceRetries() throws Exception {
// Validate marker creation/ deletion work without any failures in the timeline service.
createSomeMarkers(true);
assertTrue(storage.exists(markerFolderPath));
writeMarkers.doesMarkerDirExist();

// Simulate only a single failure and ensure the request fails.
restartServerAndClient(1);
// validate that subsequent request fails
validateRequestFailed(writeMarkers::doesMarkerDirExist);

// Simulate 3 failures, but make sure the request succeeds as retries are enabled
restartServerAndClient(3);
// Configure a new client with retries enabled.
TimelineServerBasedWriteMarkers writeMarkersWithRetries = initWriteMarkers(
metaClient.getBasePath().toString(),
markerFolderPath.toString(),
timelineService.getServerPort(),
true);
writeMarkersWithRetries.doesMarkerDirExist();
}

private void restartServerAndClient(int numberOfSimulatedConnectionFailures) {
if (timelineService != null) {
timelineService.close();
}
try {
HoodieEngineContext hoodieEngineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
FileSystemViewStorageConfig storageConf =
FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build();
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
TimelineServiceTestHarness.Builder builder = TimelineServiceTestHarness.newBuilder();
builder.withNumberOfSimulatedConnectionFailures(numberOfSimulatedConnectionFailures);
timelineService = builder.build(
hoodieEngineContext,
(Configuration) storage.getConf().unwrap(),
TimelineService.Config.builder().serverPort(0).enableMarkerRequests(true).build(),
(FileSystem) storage.getFileSystem(),
FileSystemViewManager.createViewManager(
hoodieEngineContext, storageConf, HoodieCommonConfig.newBuilder().build()));
timelineService.startService();
this.writeMarkers = initWriteMarkers(
metaClient.getBasePath().toString(),
markerFolderPath.toString(),
timelineService.getServerPort(),
false);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

private static TimelineServerBasedWriteMarkers initWriteMarkers(String basePath,
String markerFolderPath,
int serverPort,
boolean enableRetries) {
FileSystemViewStorageConfig.Builder builder = FileSystemViewStorageConfig.newBuilder().withRemoteServerHost("localhost")
.withRemoteServerPort(serverPort)
.withRemoteTimelineClientTimeoutSecs(DEFAULT_READ_TIMEOUT_SECS);
if (enableRetries) {
builder.withRemoteTimelineClientRetry(true)
.withRemoteTimelineClientMaxRetryIntervalMs(30000L)
.withRemoteTimelineClientMaxRetryNumbers(5);
}
return new TimelineServerBasedWriteMarkers(basePath, markerFolderPath, "000", builder.build());
}

/**
* Closes {@code Closeable} quietly.
*
Expand All @@ -126,4 +194,12 @@ private void closeQuietly(Closeable closeable) {
// Ignore
}
}

private static void validateRequestFailed(Executable executable) {
assertThrows(
HoodieRemoteException.class,
executable,
"Should catch a NoHTTPResponseException"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness {
protected JavaSparkContext jsc;
protected HoodieSparkEngineContext context;

private void createSomeMarkers(boolean isTablePartitioned) {
protected void createSomeMarkers(boolean isTablePartitioned) {
writeMarkers.create(isTablePartitioned ? "2020/06/01" : "", "file1", IOType.MERGE);
writeMarkers.create(isTablePartitioned ? "2020/06/02" : "", "file2", IOType.APPEND);
writeMarkers.create(isTablePartitioned ? "2020/06/03" : "", "file3", IOType.CREATE);
Expand Down
13 changes: 13 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>

<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
Expand Down
Loading
Loading