From b6eed7badff22de7dd5c8efe034c86dfc1b6e825 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Wed, 1 Feb 2023 20:05:58 +0800 Subject: [PATCH 1/3] [BitSail][Connector]add doris source connector --- bitsail-connectors/connector-doris/pom.xml | 99 +++++ .../doris/backend/BackendClient.java | 214 ++++++++++ .../doris/backend/model/Routing.java | 64 +++ .../doris/backend/model/RowBatch.java | 330 ++++++++++++++++ .../doris/config/DorisExecutionOptions.java | 7 + .../doris/constant/DorisConstants.java | 2 + .../connector/doris/error/DorisErrorCode.java | 14 + .../doris/option/DorisReaderOptions.java | 87 ++++ .../connector/doris/rest/RestService.java | 222 +++++++++++ .../connector/doris/rest/model/Field.java | 36 ++ .../doris/rest/model/PartitionDefinition.java | 113 ++++++ .../connector/doris/rest/model/QueryPlan.java | 38 ++ .../connector/doris/rest/model/Schema.java | 65 +++ .../connector/doris/rest/model/Tablet.java | 34 ++ .../connector/doris/source/DorisSource.java | 120 ++++++ .../source/reader/DorisSourceReader.java | 125 ++++++ .../doris/source/split/DorisSourceSplit.java | 45 +++ .../source/split/DorisSourceSplitReader.java | 222 +++++++++++ .../DorisSourceSplitCoordinator.java | 138 +++++++ .../bitsail-connector-unified-doris.json | 3 +- .../main/resources/doris-type-converter.yaml | 6 +- .../thrift/doris/DorisExternalService.thrift | 121 ++++++ .../src/main/thrift/doris/Status.thrift | 65 +++ .../src/main/thrift/doris/Types.thrift | 374 ++++++++++++++++++ .../doris/source/DorisSourceITCase.java | 53 +++ .../examples/Doris_Print_Example.json | 0 .../src/test/resources/doris_to_print.json | 0 .../connectors/doris/doris-example.md | 35 ++ .../en/documents/connectors/doris/doris.md | 61 +++ website/en/documents/start/env_setup.md | 28 ++ .../connectors/doris/doris-example.md | 35 ++ .../zh/documents/connectors/doris/doris.md | 61 +++ website/zh/documents/start/env_setup.md | 28 ++ 33 files changed, 2841 insertions(+), 4 deletions(-) create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/Routing.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisReaderOptions.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Field.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/PartitionDefinition.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/QueryPlan.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Schema.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Tablet.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/DorisSource.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/reader/DorisSourceReader.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplit.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java create mode 100644 bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java create mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift create mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift create mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift create mode 100644 bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java create mode 100644 bitsail-dist/src/main/resources/examples/Doris_Print_Example.json create mode 100644 bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json diff --git a/bitsail-connectors/connector-doris/pom.xml b/bitsail-connectors/connector-doris/pom.xml index c2849cce4..548b9cc40 100644 --- a/bitsail-connectors/connector-doris/pom.xml +++ b/bitsail-connectors/connector-doris/pom.xml @@ -31,6 +31,8 @@ 8 8 5.1.49 + 5.0.0 + 0.13.0 @@ -41,5 +43,102 @@ ${mysql.version} compile + + org.apache.thrift + libthrift + ${libthrift.version} + + + httpclient + org.apache.httpcomponents + + + httpcore + org.apache.httpcomponents + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + io.netty + netty-common + + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + runtime + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + io.netty + netty-common + + + io.netty + netty-buffer + + + + + + + com.bytedance.bitsail + bitsail-connector-test + ${revision} + test + + + com.bytedance.bitsail + bitsail-connector-fake + ${revision} + test + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + + + + + + org.apache.thrift.tools + maven-thrift-plugin + 0.1.11 + + ${thrift.binary} + java:fullcamel + + + + thrift-sources + generate-sources + + compile + + + + + + \ No newline at end of file diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java new file mode 100644 index 000000000..bfe6d15bb --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java @@ -0,0 +1,214 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.backend; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.connector.doris.backend.model.Routing; +import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; +import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; +import com.bytedance.bitsail.connector.doris.thrift.TDorisExternalService; +import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; +import com.bytedance.bitsail.connector.doris.thrift.TScanCloseParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanCloseResult; +import com.bytedance.bitsail.connector.doris.thrift.TScanNextBatchParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanOpenParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanOpenResult; +import com.bytedance.bitsail.connector.doris.thrift.TStatusCode; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client to request Doris BE + */ +public class BackendClient { + private static final Logger LOGGER = LoggerFactory.getLogger(BackendClient.class); + private final Routing routing; + private TDorisExternalService.Client client; + private TTransport transport; + private boolean isConnected = false; + private final int retries; + private final int socketTimeout; + private final int connectTimeout; + + public BackendClient(Routing routing, DorisExecutionOptions executionOptions) { + this.routing = routing; + this.connectTimeout = executionOptions.getRequestConnectTimeoutMs(); + this.socketTimeout = executionOptions.getRequestReadTimeoutMs(); + this.retries = executionOptions.getRequestRetries(); + LOGGER.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", + this.connectTimeout, this.socketTimeout, this.retries); + open(); + } + + private void open() { + LOGGER.debug("Open client to Doris BE '{}'.", routing); + TException ex = null; + for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { + LOGGER.debug("Attempt {} to connect {}.", attempt, routing); + TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + transport = new TSocket(routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); + TProtocol protocol = factory.getProtocol(transport); + client = new TDorisExternalService.Client(protocol); + if (isConnected) { + LOGGER.info("Success connect to {}.", routing); + return; + } + try { + LOGGER.trace("Connect status before open transport to {} is '{}'.", routing, isConnected); + if (!transport.isOpen()) { + transport.open(); + isConnected = true; + } + } catch (TTransportException e) { + LOGGER.warn("Failed to connect message, routing={}", routing, e); + ex = e; + } + } + if (!isConnected) { + String errMsg = String.format("Failed to connect message, routing=%s", routing); + LOGGER.error(errMsg, ex); + throw BitSailException.asBitSailException(DorisErrorCode.CONNECT_FAILED_MESSAGE, errMsg, ex); + } + } + + private void close() { + LOGGER.trace("Connect status before close with '{}' is '{}'.", routing, isConnected); + isConnected = false; + if ((transport != null) && transport.isOpen()) { + transport.close(); + LOGGER.info("Closed a connection to {}.", routing); + } + if (null != client) { + client = null; + } + } + + /** + * Open a scanner for reading Doris data. + * + * @param openParams thrift struct to required by request + * @return scan open result + */ + public TScanOpenResult openScanner(TScanOpenParams openParams) { + LOGGER.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams); + if (!isConnected) { + open(); + } + TException ex = null; + for (int attempt = 0; attempt < retries; ++attempt) { + LOGGER.debug("Attempt {} to openScanner {}.", attempt, routing); + try { + TScanOpenResult result = client.openScanner(openParams); + if (result == null) { + LOGGER.warn("Open scanner result from {} is null.", routing); + continue; + } + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { + LOGGER.warn("The status of open scanner result from {} is '{}', error message is: {}.", + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); + continue; + } + return result; + } catch (TException e) { + LOGGER.warn("Open scanner from {} failed.", routing, e); + ex = e; + } + } + String errMsg = String.format("Failed to connect message, routing=%s", routing); + LOGGER.error(errMsg, ex); + throw BitSailException.asBitSailException(DorisErrorCode.CONNECT_FAILED_MESSAGE, errMsg, ex); + } + + /** + * get next row batch from Doris BE + * + * @param nextBatchParams thrift struct to required by request + * @return scan batch result + */ + public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) { + LOGGER.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams); + if (!isConnected) { + open(); + } + TException ex = null; + TScanBatchResult result = null; + for (int attempt = 0; attempt < retries; ++attempt) { + LOGGER.debug("Attempt {} to getNext {}.", attempt, routing); + try { + result = client.getNext(nextBatchParams); + if (result == null) { + LOGGER.warn("GetNext result from {} is null.", routing); + continue; + } + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { + LOGGER.warn("The status of get next result from {} is '{}', error message is: {}.", + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); + continue; + } + return result; + } catch (TException e) { + LOGGER.warn("Get next from {} failed.", routing, e); + ex = e; + } + } + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + String errMsg = String.format("Doris Internal error, routing=%s, status=%s, errorMsgs=%s", + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); + LOGGER.error(errMsg); + throw BitSailException.asBitSailException(DorisErrorCode.INTERNAL_FAIL_MESSAGE, errMsg); + } + String errMsg = String.format("Failed to connect message, routing=%s", routing); + LOGGER.error(errMsg); + throw BitSailException.asBitSailException(DorisErrorCode.CONNECT_FAILED_MESSAGE, errMsg, ex); + } + + /** + * close an scanner. + * + * @param closeParams thrift struct to required by request + */ + public void closeScanner(TScanCloseParams closeParams) { + LOGGER.debug("CloseScanner to '{}', parameter is '{}'.", routing, closeParams); + for (int attempt = 0; attempt < retries; ++attempt) { + LOGGER.debug("Attempt {} to closeScanner {}.", attempt, routing); + try { + TScanCloseResult result = client.closeScanner(closeParams); + if (result == null) { + LOGGER.warn("CloseScanner result from {} is null.", routing); + continue; + } + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { + LOGGER.warn("The status of get next result from {} is '{}', error message is: {}.", + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); + continue; + } + break; + } catch (TException e) { + LOGGER.warn("Close scanner from {} failed.", routing, e); + } + } + LOGGER.info("CloseScanner to Doris BE '{}' success.", routing); + close(); + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/Routing.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/Routing.java new file mode 100644 index 000000000..cfb8ca478 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/Routing.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.backend.model; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * present an Doris BE address. + */ +public class Routing { + private static final Logger LOGGER = LoggerFactory.getLogger(Routing.class); + private String host; + private int port; + + public Routing(String routing) throws IllegalArgumentException { + parseRouting(routing); + } + + private void parseRouting(String routing) throws IllegalArgumentException { + LOGGER.debug("Parse Doris BE address: '{}'.", routing); + String[] hostPort = routing.split(":"); + if (hostPort.length != 2) { + String errMsg = String.format("Format of Doris BE address is illegal, routing=%s", routing); + LOGGER.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + this.host = hostPort[0]; + try { + this.port = Integer.parseInt(hostPort[1]); + } catch (NumberFormatException e) { + String errMsg = String.format("Failed to parse Doris BE's hostPort, host=%s, Port=%s", hostPort[0], hostPort[1]); + LOGGER.error(errMsg); + throw new IllegalArgumentException(errMsg, e); + } + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + @Override + public String toString() { + return "Doris BE{host='" + host + '\'' + ", port=" + port + '}'; + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java new file mode 100644 index 000000000..6a2c2ecdb --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java @@ -0,0 +1,330 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.backend.model; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; +import com.bytedance.bitsail.connector.doris.rest.model.Schema; +import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.types.Types; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * row batch data container. + */ +public class RowBatch { + private static final Logger LOGGER = LoggerFactory.getLogger(RowBatch.class); + // offset for iterate the rowBatch + private int offsetInRowBatch = 0; + private int rowCountInOneBatch = 0; + private int readRowCount = 0; + private final List rowBatch = new ArrayList<>(); + private final ArrowStreamReader arrowStreamReader; + private VectorSchemaRoot root; + private List fieldVectors; + private final RootAllocator rootAllocator; + private final Schema schema; + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + public RowBatch(TScanBatchResult nextResult, Schema schema) { + this.schema = schema; + this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); + this.arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(nextResult.getRows()), + rootAllocator + ); + this.offsetInRowBatch = 0; + } + + public RowBatch readArrow() { + try { + this.root = arrowStreamReader.getVectorSchemaRoot(); + while (arrowStreamReader.loadNextBatch()) { + fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != schema.size()) { + LOGGER.error("Schema size '{}' is not equal to arrow field size '{}'.", + fieldVectors.size(), schema.size()); + throw new BitSailException(DorisErrorCode.FAILED_LOAD_DATA, + "Load Doris data failed, schema size of fetch data is wrong."); + } + if (fieldVectors.size() == 0 || root.getRowCount() == 0) { + LOGGER.debug("One batch in arrow has no data."); + continue; + } + rowCountInOneBatch = root.getRowCount(); + // init the rowBatch + for (int i = 0; i < rowCountInOneBatch; ++i) { + rowBatch.add(new Row(fieldVectors.size())); + } + convertArrowToRowBatch(); + readRowCount += root.getRowCount(); + } + return this; + } catch (Exception e) { + LOGGER.error("Read Doris Data failed because: ", e); + throw new RuntimeException("Read Doris Data failed", e); + } finally { + close(); + } + } + + public boolean hasNext() { + return offsetInRowBatch < readRowCount; + } + + private void addValueToRow(int rowIndex, Object obj) { + if (rowIndex > rowCountInOneBatch) { + String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + + rowCountInOneBatch; + LOGGER.error(errMsg); + throw new NoSuchElementException(errMsg); + } + rowBatch.get(readRowCount + rowIndex).put(obj); + } + + public void convertArrowToRowBatch() { + try { + for (int col = 0; col < fieldVectors.size(); col++) { + FieldVector curFieldVector = fieldVectors.get(col); + Types.MinorType mt = curFieldVector.getMinorType(); + + final String currentType = schema.get(col).getType(); + switch (currentType) { + case "NULL_TYPE": + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + addValueToRow(rowIndex, null); + } + break; + case "BOOLEAN": + Preconditions.checkArgument(mt.equals(Types.MinorType.BIT), + typeMismatchMessage(currentType, mt)); + BitVector bitVector = (BitVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0; + addValueToRow(rowIndex, fieldValue); + } + break; + case "TINYINT": + Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT), + typeMismatchMessage(currentType, mt)); + TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "SMALLINT": + Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT), + typeMismatchMessage(currentType, mt)); + SmallIntVector smallIntVector = (SmallIntVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "INT": + Preconditions.checkArgument(mt.equals(Types.MinorType.INT), + typeMismatchMessage(currentType, mt)); + IntVector intVector = (IntVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "BIGINT": + + Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT), + typeMismatchMessage(currentType, mt)); + BigIntVector bigIntVector = (BigIntVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "FLOAT": + Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4), + typeMismatchMessage(currentType, mt)); + Float4Vector float4Vector = (Float4Vector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "TIME": + case "DOUBLE": + Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8), + typeMismatchMessage(currentType, mt)); + Float8Vector float8Vector = (Float8Vector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "BINARY": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY), + typeMismatchMessage(currentType, mt)); + VarBinaryVector varBinaryVector = (VarBinaryVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex); + addValueToRow(rowIndex, fieldValue); + } + break; + case "DECIMAL": + case "DECIMALV2": + Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL), + typeMismatchMessage(currentType, mt)); + DecimalVector decimalVector = (DecimalVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (decimalVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros(); + addValueToRow(rowIndex, value); + } + break; + case "DATE": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector date = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (date.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = new String(date.get(rowIndex)); + LocalDate localDate = LocalDate.parse(value, dateFormatter); + addValueToRow(rowIndex, localDate); + } + break; + case "DATETIME": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector timeStampSecVector = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (timeStampSecVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = new String(timeStampSecVector.get(rowIndex)); + LocalDateTime parse = LocalDateTime.parse(value, dateTimeFormatter); + addValueToRow(rowIndex, parse); + } + break; + case "LARGEINT": + case "CHAR": + case "VARCHAR": + case "STRING": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector varCharVector = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (varCharVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = new String(varCharVector.get(rowIndex)); + addValueToRow(rowIndex, value); + } + break; + default: + String errMsg = "Unsupported type " + schema.get(col).getType(); + LOGGER.error(errMsg); + throw new IllegalArgumentException(errMsg); + } + } + } catch (Exception e) { + close(); + throw e; + } + } + + public List next() { + if (!hasNext()) { + String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount; + LOGGER.error(errMsg); + throw new NoSuchElementException(errMsg); + } + return rowBatch.get(offsetInRowBatch++).getCols(); + } + + private String typeMismatchMessage(final String sparkType, final Types.MinorType arrowType) { + final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s."; + return String.format(messageTemplate, sparkType, arrowType.name()); + } + + public int getReadRowCount() { + return readRowCount; + } + + public void close() { + try { + if (arrowStreamReader != null) { + arrowStreamReader.close(); + } + if (rootAllocator != null) { + rootAllocator.close(); + } + } catch (IOException ioe) { + // do nothing + } + } + + public static class Row { + private final List cols; + + Row(int colCount) { + this.cols = new ArrayList<>(colCount); + } + + public List getCols() { + return cols; + } + + public void put(Object o) { + cols.add(o); + } + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/config/DorisExecutionOptions.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/config/DorisExecutionOptions.java index 352a2e9e7..5a1781e40 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/config/DorisExecutionOptions.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/config/DorisExecutionOptions.java @@ -43,9 +43,16 @@ public class DorisExecutionOptions implements Serializable { private final boolean isBatch; private final boolean enable2PC; private final int checkInterval; + private final int requestConnectTimeoutMs; private final int requestReadTimeoutMs; private final int requestRetries; + private final Integer requestTabletSize; + private final Integer requestQueryTimeoutS; + private final Integer requestBatchSize; + + private final String sqlFilter; + private final Long execMemLimit; /** * Properties for the StreamLoad. diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/constant/DorisConstants.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/constant/DorisConstants.java index ce4339079..04a141984 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/constant/DorisConstants.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/constant/DorisConstants.java @@ -17,6 +17,8 @@ package com.bytedance.bitsail.connector.doris.constant; public class DorisConstants { + + public static String DORIS_CONNECTOR_NAME = "doris"; public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; } diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/error/DorisErrorCode.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/error/DorisErrorCode.java index 60c519bfa..3b355166b 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/error/DorisErrorCode.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/error/DorisErrorCode.java @@ -19,6 +19,10 @@ import com.bytedance.bitsail.common.exception.ErrorCode; public enum DorisErrorCode implements ErrorCode { + + /** + * Doris Writer + */ // Failed to init write proxy PROXY_INIT_FAILED("DorisWriter-01", "Failed to init write proxy"), REQUIRED_VALUE("DorisWriter-02", "You missed parameter which is required, please check your configuration."), @@ -30,6 +34,16 @@ public enum DorisErrorCode implements ErrorCode { PARSE_FAILED("DorisWriter-09", "Failed parse response"), ABORT_FAILED("DorisWriter-10", "Fail to abort transaction"), LABEL_ALREADY_EXIST("DorisWriter-11", "Label already exist"), + + + /** + * Doris Reader + */ + CONNECT_FAILED_MESSAGE("DorisReader-50", "Failed to connect message"), + INTERNAL_FAIL_MESSAGE("DorisReader-51", "Doris Internal error"), + FAILED_LOAD_DATA("DorisReader-52", "Failed to load doris data"), + FAILED_PARSE("DorisReader-53", "Failed parse"), + READER_REQUIRED_VALUE("DorisReader-53", "You missed parameter which is required, please check your configuration."), ; private final String code; diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisReaderOptions.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisReaderOptions.java new file mode 100644 index 000000000..bf2a27b25 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/option/DorisReaderOptions.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.option; + +import com.bytedance.bitsail.common.annotation.Essential; +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface DorisReaderOptions extends ReaderOptions.BaseReaderOptions { + + ConfigOption FE_HOSTS = + key(READER_PREFIX + "fe_hosts") + .defaultValue(""); + + ConfigOption MYSQL_HOSTS = + key(READER_PREFIX + "mysql_hosts") + .defaultValue(""); + + ConfigOption USER = + key(READER_PREFIX + "user") + .defaultValue("root"); + + ConfigOption PASSWORD = + key(READER_PREFIX + "password") + .defaultValue(""); + + @Essential + ConfigOption DB_NAME = + key(READER_PREFIX + "db_name") + .noDefaultValue(String.class); + + @Essential + ConfigOption TABLE_NAME = + key(READER_PREFIX + "table_name") + .noDefaultValue(String.class); + + ConfigOption TABLET_SIZE = + key(READER_PREFIX + "tablet_size") + .defaultValue(Integer.MAX_VALUE); + + ConfigOption EXEC_MEM_LIMIT = + key(READER_PREFIX + "exec_mem_limit") + .defaultValue(2147483648L); + + ConfigOption REQUEST_QUERY_TIMEOUT_S = + key(READER_PREFIX + "request_query_timeout_s") + .defaultValue(3600); + + ConfigOption REQUEST_BATCH_SIZE = + key(READER_PREFIX + "request_batch_size") + .defaultValue(1024); + + ConfigOption REQUEST_CONNECT_TIMEOUTS = + key(READER_PREFIX + "request_connect_timeouts") + .defaultValue(30 * 1000); + + ConfigOption REQUEST_RETRIES = + key(READER_PREFIX + "request_retries") + .defaultValue(3); + + ConfigOption REQUEST_READ_TIMEOUTS = + key(READER_PREFIX + "request_read_timeouts") + .defaultValue(30 * 1000); + + // Options for select data. + ConfigOption SQL_FILTER = + key(READER_PREFIX + "sql_filter") + .noDefaultValue(String.class); + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/RestService.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/RestService.java index 985682571..715f7bde1 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/RestService.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/RestService.java @@ -17,15 +17,20 @@ package com.bytedance.bitsail.connector.doris.rest; import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.model.ColumnInfo; import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; import com.bytedance.bitsail.connector.doris.config.DorisOptions; import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; import com.bytedance.bitsail.connector.doris.rest.model.Backend; import com.bytedance.bitsail.connector.doris.rest.model.Backend.BackendRow; +import com.bytedance.bitsail.connector.doris.rest.model.PartitionDefinition; +import com.bytedance.bitsail.connector.doris.rest.model.QueryPlan; +import com.bytedance.bitsail.connector.doris.rest.model.Tablet; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.CollectionUtils; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -34,6 +39,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; import org.slf4j.Logger; import java.io.BufferedReader; @@ -44,18 +50,27 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; +import static org.apache.http.HttpStatus.SC_OK; + public class RestService { private static final String BACKENDS = "/api/backends?is_alive=true"; private static List backendRows; private static long pos; + private static final String QUERY_PLAN = "_query_plan"; + private static final String API_PREFIX = "/api"; public static String getBackend(DorisOptions options, DorisExecutionOptions executionOptions, Logger logger) { try { @@ -271,4 +286,211 @@ private static String parseResponse(HttpURLConnection connection, Logger logger) return result; } + /** + * find Doris partitions from Doris FE. + */ + public static List findPartitions(DorisOptions dorisOptions, DorisExecutionOptions executionOptions, Logger logger) { + String database = dorisOptions.getDatabaseName(); + String table = dorisOptions.getTableName(); + String readFields = constructReadFields(dorisOptions.getColumnInfos()); + + String sql = "select " + readFields + " from `" + database + "`.`" + table + "`"; + + if (!StringUtils.isEmpty(executionOptions.getSqlFilter())) { + sql += " where " + executionOptions.getSqlFilter(); + } + logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); + + HttpPost httpPost = new HttpPost(getUriStr(dorisOptions, logger) + QUERY_PLAN); + String entity = "{\"sql\": \"" + sql + "\"}"; + logger.debug("Post body Sending to Doris FE is: '{}'.", entity); + StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); + stringEntity.setContentEncoding("UTF-8"); + stringEntity.setContentType("application/json"); + httpPost.setEntity(stringEntity); + + String resStr = send(dorisOptions, executionOptions, httpPost, logger); + logger.debug("Find partition response is '{}'.", resStr); + QueryPlan queryPlan = getQueryPlan(resStr, logger); + Map> be2Tablets = selectBeForTablet(queryPlan, logger); + + return tabletsMapToPartition(executionOptions, be2Tablets, queryPlan.getOpaquedQueryPlan(), database, table, logger); + } + + private static String constructReadFields(List columnInfos) { + StringBuilder readFields = new StringBuilder(); + if (CollectionUtils.isNotEmpty(columnInfos)) { + for (int i = 0; i < columnInfos.size(); i++) { + readFields.append(columnInfos.get(i).getName()); + if (i != columnInfos.size() - 1) { + readFields.append(","); + } + } + } else { + readFields.append("*"); + } + return readFields.toString(); + } + + /** + * translate Doris FE response string to inner {@link QueryPlan} struct. + * + * @param response Doris FE response string + * @param logger {@link Logger} + * @return inner {@link QueryPlan} struct + */ + @VisibleForTesting + static QueryPlan getQueryPlan(String response, Logger logger) { + ObjectMapper mapper = new ObjectMapper(); + QueryPlan queryPlan; + try { + queryPlan = mapper.readValue(response, QueryPlan.class); + } catch (JsonParseException e) { + String errMsg = "Doris FE's response is not a json. res: " + response; + logger.error(errMsg, e); + throw BitSailException.asBitSailException(DorisErrorCode.FAILED_PARSE, errMsg, e); + } catch (JsonMappingException e) { + String errMsg = "Doris FE's response cannot map to schema. res: " + response; + logger.error(errMsg, e); + throw BitSailException.asBitSailException(DorisErrorCode.FAILED_PARSE, errMsg, e); + } catch (IOException e) { + String errMsg = "Parse Doris FE's response to json failed. res: " + response; + logger.error(errMsg, e); + throw BitSailException.asBitSailException(DorisErrorCode.FAILED_PARSE, errMsg, e); + } + + if (Objects.isNull(queryPlan)) { + String errMsg = "can not get query plan"; + logger.error(errMsg); + throw new RuntimeException(errMsg); + } + + if (queryPlan.getStatus() != SC_OK) { + String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus(); + logger.error(errMsg); + throw new RuntimeException(errMsg); + } + logger.debug("Parsing partition result is '{}'.", queryPlan); + return queryPlan; + } + + /** + * translate BE tablets map to Doris RDD partition. + * + * @param executionOptions execution Options + * @param be2Tablets BE to tablets {@link Map} + * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE + * @param database database name of Doris table + * @param table table name of Doris table + * @param logger {@link Logger} + * @return Doris RDD partition {@link List} + * @throws IllegalArgumentException throw when translate failed + */ + @VisibleForTesting + static List tabletsMapToPartition(DorisExecutionOptions executionOptions, Map> be2Tablets, + String opaquedQueryPlan, String database, String table, Logger logger) + throws IllegalArgumentException { + int tabletsSize = executionOptions.getRequestTabletSize(); + List partitions = new ArrayList<>(); + for (Map.Entry> beInfo : be2Tablets.entrySet()) { + logger.debug("Generate partition with beInfo: '{}'.", beInfo); + HashSet tabletSet = new HashSet<>(beInfo.getValue()); + beInfo.getValue().clear(); + beInfo.getValue().addAll(tabletSet); + int first = 0; + while (first < beInfo.getValue().size()) { + Set partitionTablets = new HashSet<>(beInfo.getValue() + .subList(first, Math.min(beInfo.getValue().size(), first + tabletsSize))); + first = first + tabletsSize; + PartitionDefinition partitionDefinition = new PartitionDefinition(database, table, beInfo.getKey(), partitionTablets, opaquedQueryPlan); + logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition); + partitions.add(partitionDefinition); + } + } + return partitions; + } + + /** + * get a valid URI to connect Doris FE. + * + * @param dorisOptions configuration of request + * @param logger {@link Logger} + * @return uri string + * @throws IllegalArgumentException throw when configuration is illegal + */ + @VisibleForTesting + static String getUriStr(DorisOptions dorisOptions, Logger logger) throws IllegalArgumentException { + return "http://" + randomEndpoint(dorisOptions.getFeNodes(), logger) + API_PREFIX + + "/" + dorisOptions.getDatabaseName() + + "/" + dorisOptions.getTableName() + + "/"; + } + + /** + * choice a Doris FE node to request. + * + * @param feNodes Doris FE node list, separate be comma + * @param logger slf4j logger + * @return the chosen one Doris FE node + * @throws IllegalArgumentException fe nodes is illegal + */ + @VisibleForTesting + static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgumentException { + logger.trace("Parse fenodes '{}'.", feNodes); + List nodes = Arrays.asList(feNodes.split(",")); + Collections.shuffle(nodes); + return nodes.get(0).trim(); + } + + /** + * select which Doris BE to get tablet data. + * + * @param queryPlan {@link QueryPlan} translated from Doris FE response + * @param logger {@link Logger} + * @return BE to tablets {@link Map} + */ + @VisibleForTesting + static Map> selectBeForTablet(QueryPlan queryPlan, Logger logger) { + Map> be2Tablets = new HashMap<>(); + for (Map.Entry part : queryPlan.getPartitions().entrySet()) { + logger.debug("Parse tablet info: '{}'.", part); + long tabletId; + try { + tabletId = Long.parseLong(part.getKey()); + } catch (NumberFormatException e) { + String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed."; + logger.error(errMsg, e); + throw new BitSailException(DorisErrorCode.FAILED_PARSE, errMsg); + } + String target = null; + int tabletCount = Integer.MAX_VALUE; + for (String candidate : part.getValue().getRoutings()) { + logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId); + if (!be2Tablets.containsKey(candidate)) { + logger.debug("Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId); + List tablets = new ArrayList<>(); + be2Tablets.put(candidate, tablets); + target = candidate; + break; + } else { + if (be2Tablets.get(candidate).size() < tabletCount) { + target = candidate; + tabletCount = be2Tablets.get(candidate).size(); + logger.debug("Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.", + tabletId, target, tabletCount); + } + } + } + if (Objects.isNull(target)) { + String errMsg = "Cannot choice Doris BE for tablet " + tabletId; + logger.error(errMsg); + throw new RuntimeException(errMsg); + } + + logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId); + be2Tablets.get(target).add(tabletId); + } + return be2Tablets; + } + } diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Field.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Field.java new file mode 100644 index 000000000..7dc575c2d --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Field.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.rest.model; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +@Getter +public class Field { + private String name; + private String type; + private String comment; + private int precision; + private int scale; + private String aggregationType; + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/PartitionDefinition.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/PartitionDefinition.java new file mode 100644 index 000000000..7f26b7ac3 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/PartitionDefinition.java @@ -0,0 +1,113 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.rest.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Doris partition info. + */ +@Getter +@AllArgsConstructor +public class PartitionDefinition implements Serializable, Comparable { + private final String database; + private final String table; + private final String beAddress; + private final Set tabletIds; + private final String queryPlan; + + @Override + public int compareTo(PartitionDefinition o) { + int cmp = database.compareTo(o.database); + if (cmp != 0) { + return cmp; + } + cmp = table.compareTo(o.table); + if (cmp != 0) { + return cmp; + } + cmp = beAddress.compareTo(o.beAddress); + if (cmp != 0) { + return cmp; + } + cmp = queryPlan.compareTo(o.queryPlan); + if (cmp != 0) { + return cmp; + } + + cmp = tabletIds.size() - o.tabletIds.size(); + if (cmp != 0) { + return cmp; + } + + Set similar = new HashSet<>(tabletIds); + Set diffSelf = new HashSet<>(tabletIds); + Set diffOther = new HashSet<>(o.tabletIds); + similar.retainAll(o.tabletIds); + diffSelf.removeAll(similar); + diffOther.removeAll(similar); + if (diffSelf.size() == 0) { + return 0; + } + long diff = Collections.min(diffSelf) - Collections.min(diffOther); + return diff < 0 ? -1 : 1; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionDefinition that = (PartitionDefinition) o; + return Objects.equals(database, that.database) && + Objects.equals(table, that.table) && + Objects.equals(beAddress, that.beAddress) && + Objects.equals(tabletIds, that.tabletIds) && + Objects.equals(queryPlan, that.queryPlan); + } + + @Override + public int hashCode() { + int result = database.hashCode(); + result = 31 * result + table.hashCode(); + result = 31 * result + beAddress.hashCode(); + result = 31 * result + queryPlan.hashCode(); + result = 31 * result + tabletIds.hashCode(); + return result; + } + + @Override + public String toString() { + return "PartitionDefinition{" + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", beAddress='" + beAddress + '\'' + + ", tabletIds=" + tabletIds + + ", queryPlan='" + queryPlan + '\'' + + '}'; + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/QueryPlan.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/QueryPlan.java new file mode 100644 index 000000000..a07c67288 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/QueryPlan.java @@ -0,0 +1,38 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.rest.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +import java.util.Map; + +@Getter +@Setter +@EqualsAndHashCode +public class QueryPlan { + + @JsonProperty("status") + private int status; + @JsonProperty("opaqued_query_plan") + private String opaquedQueryPlan; + @JsonProperty("partitions") + private Map partitions; + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Schema.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Schema.java new file mode 100644 index 000000000..007d9e03a --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Schema.java @@ -0,0 +1,65 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.rest.model; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; + +@Getter +@Setter +@EqualsAndHashCode +public class Schema { + private int status = 0; + private String keysType; + private List properties; + + public Schema(int fieldCount) { + properties = new ArrayList<>(fieldCount); + } + + public void put(String name, String type, String comment, int scale, int precision, String aggregationType) { + properties.add(new Field(name, type, comment, scale, precision, aggregationType)); + } + + public void put(Field f) { + properties.add(f); + } + + public Field get(int index) { + if (index >= properties.size()) { + String errMsg = String.format("index=%s, fields size=%s", index, properties.size()); + throw new IndexOutOfBoundsException(errMsg); + } + return properties.get(index); + } + + public int size() { + return properties.size(); + } + + @Override + public String toString() { + return "Schema{" + + "status=" + status + + ", properties=" + properties + + '}'; + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Tablet.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Tablet.java new file mode 100644 index 000000000..fd45b0df3 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/rest/model/Tablet.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.rest.model; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +@Getter +@Setter +@EqualsAndHashCode +public class Tablet { + private List routings; + private int version; + private long versionHash; + private long schemaHash; + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/DorisSource.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/DorisSource.java new file mode 100644 index 000000000..25f26cb46 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/DorisSource.java @@ -0,0 +1,120 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; +import com.bytedance.bitsail.connector.doris.config.DorisOptions; +import com.bytedance.bitsail.connector.doris.constant.DorisConstants; +import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; +import com.bytedance.bitsail.connector.doris.option.DorisReaderOptions; +import com.bytedance.bitsail.connector.doris.source.reader.DorisSourceReader; +import com.bytedance.bitsail.connector.doris.source.split.DorisSourceSplit; +import com.bytedance.bitsail.connector.doris.source.split.coordinator.DorisSourceSplitCoordinator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DorisSource implements Source, ParallelismComputable { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class); + private DorisOptions dorisOptions; + private DorisExecutionOptions dorisExecutionOptions; + private BitSailConfiguration jobConf; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) { + this.jobConf = readerConfiguration; + initDorisOptions(readerConfiguration); + initDorisExecutionOptions(readerConfiguration); + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new DorisSourceReader(readerContext, dorisExecutionOptions, dorisOptions); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator( + SourceSplitCoordinator.Context coordinatorContext) { + return new DorisSourceSplitCoordinator(coordinatorContext, dorisExecutionOptions, dorisOptions); + } + + @Override + public String getReaderName() { + return DorisConstants.DORIS_CONNECTOR_NAME; + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new FileMappingTypeInfoConverter(getReaderName()); + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) { + int parallelism = 1; + if (selfConf.fieldExists(DorisReaderOptions.READER_PARALLELISM_NUM)) { + parallelism = selfConf.get(DorisReaderOptions.READER_PARALLELISM_NUM); + } + return new ParallelismAdvice(false, parallelism); + } + + private void initDorisOptions(BitSailConfiguration readerConfiguration) { + LOG.info("Start to init DorisOptions!"); + DorisOptions.DorisOptionsBuilder builder = DorisOptions.builder() + .feNodes(readerConfiguration.getNecessaryOption(DorisReaderOptions.FE_HOSTS, DorisErrorCode.READER_REQUIRED_VALUE)) + .mysqlNodes(readerConfiguration.get(DorisReaderOptions.MYSQL_HOSTS)) + .databaseName(readerConfiguration.getNecessaryOption(DorisReaderOptions.DB_NAME, DorisErrorCode.READER_REQUIRED_VALUE)) + .tableName(readerConfiguration.getNecessaryOption(DorisReaderOptions.TABLE_NAME, DorisErrorCode.READER_REQUIRED_VALUE)) + .username(readerConfiguration.getNecessaryOption(DorisReaderOptions.USER, DorisErrorCode.READER_REQUIRED_VALUE)) + .password(readerConfiguration.getNecessaryOption(DorisReaderOptions.PASSWORD, DorisErrorCode.READER_REQUIRED_VALUE)) + .columnInfos(readerConfiguration.get(DorisReaderOptions.COLUMNS)); + + dorisOptions = builder.build(); + } + + private void initDorisExecutionOptions(BitSailConfiguration readerConfiguration) { + LOG.info("Start to init DorisExecutionOptions!"); + final DorisExecutionOptions.DorisExecutionOptionsBuilder builder = DorisExecutionOptions.builder(); + builder.requestConnectTimeoutMs(readerConfiguration.get(DorisReaderOptions.REQUEST_CONNECT_TIMEOUTS)) + .requestRetries(readerConfiguration.get(DorisReaderOptions.REQUEST_RETRIES)) + .requestReadTimeoutMs(readerConfiguration.get(DorisReaderOptions.REQUEST_READ_TIMEOUTS)) + .sqlFilter(readerConfiguration.get(DorisReaderOptions.SQL_FILTER)) + .requestTabletSize(readerConfiguration.get(DorisReaderOptions.TABLET_SIZE)) + .execMemLimit(readerConfiguration.get(DorisReaderOptions.EXEC_MEM_LIMIT)) + .requestQueryTimeoutS(readerConfiguration.get(DorisReaderOptions.REQUEST_QUERY_TIMEOUT_S)) + .requestBatchSize(readerConfiguration.get(DorisReaderOptions.REQUEST_BATCH_SIZE)); + + dorisExecutionOptions = builder.build(); + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/reader/DorisSourceReader.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/reader/DorisSourceReader.java new file mode 100644 index 000000000..cd49cca30 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/reader/DorisSourceReader.java @@ -0,0 +1,125 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; +import com.bytedance.bitsail.connector.doris.config.DorisOptions; +import com.bytedance.bitsail.connector.doris.source.split.DorisSourceSplit; +import com.bytedance.bitsail.connector.doris.source.split.DorisSourceSplitReader; + +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; + +/** + * A {@link SourceReader} that read records from {@link DorisSourceSplit}. + **/ +public class DorisSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceReader.class); + private final transient Set assignedDorisSplits; + private final transient Set finishedDorisSplits; + private final transient Context context; + private boolean noMoreSplits; + private final DorisExecutionOptions executionOptions; + private final DorisOptions dorisOptions; + + public DorisSourceReader(SourceReader.Context readerContext, + DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + context = readerContext; + this.assignedDorisSplits = Sets.newHashSet(); + this.finishedDorisSplits = Sets.newHashSet(); + this.noMoreSplits = false; + this.executionOptions = executionOptions; + this.dorisOptions = dorisOptions; + } + + @Override + public void start() { + + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + for (DorisSourceSplit dorisSplit : assignedDorisSplits) { + DorisSourceSplitReader sourceSplitReader = new DorisSourceSplitReader(dorisSplit, executionOptions, + dorisOptions); + while (sourceSplitReader.hasNext()) { + List next = sourceSplitReader.next(); + + Row row = deserialize(next); + pipeline.output(row); + } + finishSplit(sourceSplitReader); + finishedDorisSplits.add(dorisSplit); + } + assignedDorisSplits.removeAll(finishedDorisSplits); + } + + public Row deserialize(List records) { + Row row = new Row(records.size()); + for (int i = 0; i < records.size(); i++) { + row.setField(i, records.get(i)); + } + return row; + } + + private void finishSplit(DorisSourceSplitReader sourceSplitReader) { + try { + sourceSplitReader.close(); + } catch (Exception e) { + LOG.error("close resource reader failed,", e); + } + } + + @Override + public void addSplits(List splits) { + LOG.info("Subtask {} received {}(s) new splits, splits = {}.", context.getIndexOfSubtask(), + CollectionUtils.size(splits), splits); + assignedDorisSplits.addAll(splits); + } + + @Override + public boolean hasMoreElements() { + if (noMoreSplits) { + return CollectionUtils.size(assignedDorisSplits) != 0; + } + return true; + } + + @Override + public List snapshotState(long checkpointId) { + return null; + } + + @Override + public void close() throws Exception { + + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask()); + noMoreSplits = true; + } +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplit.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplit.java new file mode 100644 index 000000000..5713f9f56 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplit.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; +import com.bytedance.bitsail.connector.doris.rest.model.PartitionDefinition; + +import lombok.Getter; +import lombok.Setter; + +/** + * A {@link SourceSplit} that represents a {@link PartitionDefinition}. + **/ +@Getter +@Setter +public class DorisSourceSplit implements SourceSplit { + + private String splitId; + private PartitionDefinition partitionDefinition; + + public DorisSourceSplit(PartitionDefinition partitionDefinition) { + this.partitionDefinition = partitionDefinition; + this.splitId = partitionDefinition.getBeAddress(); + } + + @Override + public String uniqSplitId() { + return splitId; + } + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java new file mode 100644 index 000000000..8ab9df72a --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java @@ -0,0 +1,222 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source.split; + +import com.bytedance.bitsail.connector.doris.backend.BackendClient; +import com.bytedance.bitsail.connector.doris.backend.model.Routing; +import com.bytedance.bitsail.connector.doris.backend.model.RowBatch; +import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; +import com.bytedance.bitsail.connector.doris.config.DorisOptions; +import com.bytedance.bitsail.connector.doris.rest.model.Field; +import com.bytedance.bitsail.connector.doris.rest.model.PartitionDefinition; +import com.bytedance.bitsail.connector.doris.rest.model.Schema; +import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; +import com.bytedance.bitsail.connector.doris.thrift.TScanCloseParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanColumnDesc; +import com.bytedance.bitsail.connector.doris.thrift.TScanNextBatchParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanOpenParams; +import com.bytedance.bitsail.connector.doris.thrift.TScanOpenResult; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DorisSourceSplitReader { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceSplitReader.class); + private final String defaultCluster = "default_cluster"; + private final BackendClient client; + private final PartitionDefinition partition; + private final DorisExecutionOptions executionOptions; + private final DorisOptions dorisOptions; + private TScanOpenParams openParams; + protected String contextId; + protected Schema schema; + protected boolean asyncThreadStarted; + // flag indicate if support deserialize Arrow to RowBatch asynchronously + protected Boolean deserializeArrowToRowBatchAsync = false; + protected AtomicBoolean eos = new AtomicBoolean(false); + protected int offset = 0; + protected RowBatch rowBatch; + protected BlockingQueue rowBatchBlockingQueue; + + public DorisSourceSplitReader(DorisSourceSplit dorisSplit, DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + this.partition = dorisSplit.getPartitionDefinition(); + this.executionOptions = executionOptions; + this.dorisOptions = dorisOptions; + this.client = backendClient(); + init(); + } + + private BackendClient backendClient() { + try { + return new BackendClient(new Routing(partition.getBeAddress()), executionOptions); + } catch (IllegalArgumentException e) { + LOG.error("init backend:{} client failed,", partition.getBeAddress(), e); + throw new IllegalArgumentException(e); + } + } + + private void init() { + this.openParams = openParams(); + TScanOpenResult openResult = this.client.openScanner(this.openParams); + this.contextId = openResult.getContextId(); + this.schema = convertToSchema(openResult.getSelectedColumns()); + this.asyncThreadStarted = asyncThreadStarted(); + LOG.debug("Open scan result is, contextId: {}, schema: {}.", contextId, schema); + } + + protected boolean asyncThreadStarted() { + boolean started = false; + if (deserializeArrowToRowBatchAsync) { + asyncThread.start(); + started = true; + } + return started; + } + + protected Thread asyncThread = new Thread(new Runnable() { + @Override + public void run() { + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + while (!eos.get()) { + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow(); + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + try { + rowBatchBlockingQueue.put(rowBatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + }); + + /** + * convert Doris return schema to inner schema struct. + * + * @param tscanColumnDescs Doris BE return schema + * @return inner schema struct + */ + public static Schema convertToSchema(List tscanColumnDescs) { + Schema schema = new Schema(tscanColumnDescs.size()); + tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0, ""))); + return schema; + } + + private TScanOpenParams openParams() { + TScanOpenParams params = new TScanOpenParams(); + params.cluster = defaultCluster; + params.database = partition.getDatabase(); + params.table = partition.getTable(); + + params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[] {})); + params.opaqued_query_plan = partition.getQueryPlan(); + // max row number of one read batch + Integer batchSize = executionOptions.getRequestBatchSize(); + Integer queryDorisTimeout = executionOptions.getRequestQueryTimeoutS(); + Long execMemLimit = executionOptions.getExecMemLimit(); + params.setBatchSize(batchSize); + params.setQueryTimeout(queryDorisTimeout); + params.setMemLimit(execMemLimit); + params.setUser(dorisOptions.getUsername()); + params.setPasswd(dorisOptions.getPassword()); + LOG.debug("Open scan params is,cluster:{},database:{},table:{},tabletId:{},batch size:{},query timeout:{},execution memory limit:{},user:{},query plan: {}", + params.getCluster(), params.getDatabase(), params.getTable(), params.getTabletIds(), params.getBatchSize(), params.getQueryTimeout(), params.getMemLimit(), + params.getUser(), params.getOpaquedQueryPlan()); + return params; + } + + /** + * read data and cached in rowBatch. + * + * @return true if hax next value + */ + @SuppressWarnings("checkstyle:MagicNumber") + public boolean hasNext() { + boolean hasNext = false; + if (deserializeArrowToRowBatchAsync && asyncThreadStarted) { + // support deserialize Arrow to RowBatch asynchronously + if (rowBatch == null || !rowBatch.hasNext()) { + while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) { + if (!rowBatchBlockingQueue.isEmpty()) { + try { + rowBatch = rowBatchBlockingQueue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + hasNext = true; + break; + } else { + // wait for rowBatch put in queue or eos change + try { + Thread.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } else { + hasNext = true; + } + } else { + // Arrow data was acquired synchronously during the iterative process + if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { + if (rowBatch != null) { + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + } + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + rowBatch = new RowBatch(nextResult, schema).readArrow(); + } + } + hasNext = !eos.get(); + } + return hasNext; + } + + /** + * get next value. + * + * @return next value + */ + public List next() { + return rowBatch.next(); + } + + public void close() throws Exception { + TScanCloseParams closeParams = new TScanCloseParams(); + closeParams.setContextId(contextId); + client.closeScanner(closeParams); + } + +} diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java new file mode 100644 index 000000000..46da687f7 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java @@ -0,0 +1,138 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source.split.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; +import com.bytedance.bitsail.connector.doris.config.DorisOptions; +import com.bytedance.bitsail.connector.doris.rest.RestService; +import com.bytedance.bitsail.connector.doris.rest.model.PartitionDefinition; +import com.bytedance.bitsail.connector.doris.source.split.DorisSourceSplit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class DorisSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceSplitCoordinator.class); + private final SourceSplitCoordinator.Context context; + private final Map> splitAssignmentPlan; + private final DorisExecutionOptions executionOptions; + private final DorisOptions dorisOptions; + + public DorisSourceSplitCoordinator(SourceSplitCoordinator.Context context, DorisExecutionOptions executionOptions, + DorisOptions dorisOptions) { + this.splitAssignmentPlan = Maps.newConcurrentMap(); + this.context = context; + this.executionOptions = executionOptions; + this.dorisOptions = dorisOptions; + } + + @Override + public void start() { + List splitList = new ArrayList<>(); + List partitions = RestService.findPartitions(dorisOptions, executionOptions, LOG); + partitions.forEach(m -> splitList.add(new DorisSourceSplit(m))); + + int readerNum = context.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, splitList.size()); + if (readerNum > splitList.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size()); + } + for (DorisSourceSplit split : splitList) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + tryAssignSplitsToReader(); + + } + + private void tryAssignSplitsToReader() { + Map> splitsToAssign = new HashMap<>(); + for (Integer readerIndex : splitAssignmentPlan.keySet()) { + if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) + && context.registeredReaders().contains(readerIndex)) { + splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex))); + } + } + + for (Integer readerIndex : splitsToAssign.keySet()) { + LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex, + splitsToAssign.get(readerIndex).stream().map(DorisSourceSplit::getSplitId).collect(Collectors.toList())); + splitAssignmentPlan.remove(readerIndex); + context.assignSplit(readerIndex, splitsToAssign.get(readerIndex)); + context.signalNoMoreSplits(readerIndex); + LOG.info("Finish assigning splits reader {}", readerIndex); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + + int readerNum = context.totalParallelism(); + for (DorisSourceSplit split : splits) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); + } + tryAssignSplitsToReader(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + } + + @Override + public EmptyState snapshotState() throws Exception { + return new EmptyState(); + } + + @Override + public void close() { + // empty + } + + @NoArgsConstructor + static class ReaderSelector { + private static long readerIndex = 0; + + public static int getReaderIndex(int totalReaderNum) { + return (int) readerIndex++ % totalReaderNum; + } + } +} diff --git a/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json b/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json index d5a8cf09f..623ac5858 100644 --- a/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json +++ b/bitsail-connectors/connector-doris/src/main/resources/bitsail-connector-unified-doris.json @@ -1,7 +1,8 @@ { "name": "bitsail-connector-unified-doris", "classes": [ - "com.bytedance.bitsail.connector.doris.sink.DorisSink" + "com.bytedance.bitsail.connector.doris.sink.DorisSink", + "com.bytedance.bitsail.connector.doris.source.DorisSource" ], "libs": [ "bitsail-connector-doris-${version}.jar" diff --git a/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml b/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml index 93ba709cc..d3cd272b7 100644 --- a/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml +++ b/bitsail-connectors/connector-doris/src/main/resources/doris-type-converter.yaml @@ -36,13 +36,13 @@ engine.type.to.bitsail.type.converter: target.type: short - source.type: int - target.type: long + target.type: int - source.type: smallint - target.type: long + target.type: int - source.type: integer - target.type: long + target.type: int - source.type: bigint target.type: bigint diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift new file mode 100644 index 000000000..ba4ea6ffd --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift @@ -0,0 +1,121 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java com.bytedance.bitsail.connector.doris.thrift +namespace cpp doris + +include "Types.thrift" +include "Status.thrift" + + +// Parameters to open(). +struct TScanOpenParams { + + 1: required string cluster + + 2: required string database + + 3: required string table + + // tablets to scan + 4: required list tablet_ids + + // base64 encoded binary plan fragment + 5: required string opaqued_query_plan + + // A string specified for the table that is passed to the external data source. + // Always set, may be an empty string. + 6: optional i32 batch_size + + // reserved params for use + 7: optional map properties + + // The query limit, if specified. + 8: optional i64 limit + + // The authenticated user name. Always set. + // maybe usefullless + 9: optional string user + + 10: optional string passwd + // max keep alive time min + 11: optional i16 keep_alive_min + + 12: optional i32 query_timeout + + // memory limit for a single query + 13: optional i64 mem_limit +} + +struct TScanColumnDesc { + // The column name + 1: optional string name + // The column type. Always set. + 2: optional Types.TPrimitiveType type +} + +// Returned by open(). +struct TScanOpenResult { + 1: required Status.TStatus status + // An opaque context_id used in subsequent getNext()/close() calls. Required. + 2: optional string context_id + // selected fields + 3: optional list selected_columns + +} + +// Parameters to getNext() +struct TScanNextBatchParams { + // The opaque handle returned by the previous open() call. Always set. + 1: optional string context_id // doris olap engine context id + 2: optional i64 offset // doris should check the offset to prevent duplicate rpc calls +} + +// Returned by getNext(). +struct TScanBatchResult { + 1: required Status.TStatus status + + // If true, reached the end of the result stream; subsequent calls to + // getNext() won’t return any more results. Required. + 2: optional bool eos + + // A batch of rows of arrow format to return, if any exist. The number of rows in the batch + // should be less than or equal to the batch_size specified in TOpenParams. + 3: optional binary rows +} + +// Parameters to close() +struct TScanCloseParams { + // The opaque handle returned by the previous open() call. Always set. + 1: optional string context_id +} + +// Returned by close(). +struct TScanCloseResult { + 1: required Status.TStatus status +} + +// scan service expose ability of scanning data ability to other compute system +service TDorisExternalService { + // doris will build a scan context for this session, context_id returned if success + TScanOpenResult open_scanner(1: TScanOpenParams params); + + // return the batch_size of data + TScanBatchResult get_next(1: TScanNextBatchParams params); + + // release the context resource associated with the context_id + TScanCloseResult close_scanner(1: TScanCloseParams params); +} diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift new file mode 100644 index 000000000..5308a6f9f --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift @@ -0,0 +1,65 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java com.bytedance.bitsail.connector.doris.thrift +namespace cpp doris + +enum TStatusCode { + OK, + CANCELLED, + ANALYSIS_ERROR, + NOT_IMPLEMENTED_ERROR, + RUNTIME_ERROR, + MEM_LIMIT_EXCEEDED, + INTERNAL_ERROR, + THRIFT_RPC_ERROR, + TIMEOUT, + KUDU_NOT_ENABLED, // Deprecated + KUDU_NOT_SUPPORTED_ON_OS, // Deprecated + MEM_ALLOC_FAILED, + BUFFER_ALLOCATION_FAILED, + MINIMUM_RESERVATION_UNAVAILABLE, + PUBLISH_TIMEOUT, + LABEL_ALREADY_EXISTS, + ES_INTERNAL_ERROR, + ES_INDEX_NOT_FOUND, + ES_SHARD_NOT_FOUND, + ES_INVALID_CONTEXTID, + ES_INVALID_OFFSET, + ES_REQUEST_ERROR, + + // end of file + END_OF_FILE = 30, + NOT_FOUND = 31, + CORRUPTION = 32, + INVALID_ARGUMENT = 33, + IO_ERROR = 34, + ALREADY_EXIST = 35, + NETWORK_ERROR = 36, + ILLEGAL_STATE = 37, + NOT_AUTHORIZED = 38, + ABORTED = 39, + REMOTE_ERROR = 40, + SERVICE_UNAVAILABLE = 41, + UNINITIALIZED = 42, + CONFIGURATION_ERROR = 43, + INCOMPLETE = 44 +} + +struct TStatus { + 1: required TStatusCode status_code + 2: optional list error_msgs +} diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift new file mode 100644 index 000000000..fa48c8e20 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift @@ -0,0 +1,374 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace java com.bytedance.bitsail.connector.doris.thrift +namespace cpp doris + +typedef i64 TTimestamp +typedef i32 TPlanNodeId +typedef i32 TTupleId +typedef i32 TSlotId +typedef i64 TTableId +typedef i64 TTabletId +typedef i64 TVersion +typedef i64 TVersionHash +typedef i32 TSchemaHash +typedef i32 TPort +typedef i64 TCount +typedef i64 TSize +typedef i32 TClusterId +typedef i64 TEpoch + +// add for real time load, partitionid is not defined previously, define it here +typedef i64 TTransactionId +typedef i64 TPartitionId + +enum TStorageType { + ROW, + COLUMN, +} + +enum TStorageMedium { + HDD, + SSD, +} + +enum TVarType { + SESSION, + GLOBAL +} + +enum TPrimitiveType { + INVALID_TYPE, + NULL_TYPE, + BOOLEAN, + TINYINT, + SMALLINT, + INT, + BIGINT, + FLOAT, + DOUBLE, + DATE, + DATETIME, + BINARY, + DECIMAL, + // CHAR(n). Currently only supported in UDAs + CHAR, + LARGEINT, + VARCHAR, + HLL, + DECIMALV2, + TIME, + OBJECT, + ARRAY, + MAP, + STRUCT, + STRING, + ALL +} + +enum TTypeNodeType { + SCALAR, + ARRAY, + MAP, + STRUCT +} + +struct TScalarType { + 1: required TPrimitiveType type + + // Only set if type == CHAR or type == VARCHAR + 2: optional i32 len + + // Only set for DECIMAL + 3: optional i32 precision + 4: optional i32 scale +} + +// Represents a field in a STRUCT type. +// TODO: Model column stats for struct fields. +struct TStructField { + 1: required string name + 2: optional string comment +} + +struct TTypeNode { + 1: required TTypeNodeType type + + // only set for scalar types + 2: optional TScalarType scalar_type + + // only used for structs; has struct_fields.size() corresponding child types + 3: optional list struct_fields +} + +// A flattened representation of a tree of column types obtained by depth-first +// traversal. Complex types such as map, array and struct have child types corresponding +// to the map key/value, array item type, and struct fields, respectively. +// For scalar types the list contains only a single node. +// Note: We cannot rename this to TType because it conflicts with Thrift's internal TType +// and the generated Python thrift files will not work. +// Note: TTypeDesc in impala is TColumnType, but we already use TColumnType, so we name this +// to TTypeDesc. In future, we merge these two to one +struct TTypeDesc { + 1: list types +} + +enum TAggregationType { + SUM, + MAX, + MIN, + REPLACE, + HLL_UNION, + NONE +} + +enum TPushType { + LOAD, + DELETE, + LOAD_DELETE +} + +enum TTaskType { + CREATE, + DROP, + PUSH, + CLONE, + STORAGE_MEDIUM_MIGRATE, + ROLLUP, + SCHEMA_CHANGE, + CANCEL_DELETE, // Deprecated + MAKE_SNAPSHOT, + RELEASE_SNAPSHOT, + CHECK_CONSISTENCY, + UPLOAD, + DOWNLOAD, + CLEAR_REMOTE_FILE, + MOVE + REALTIME_PUSH, + PUBLISH_VERSION, + CLEAR_ALTER_TASK, + CLEAR_TRANSACTION_TASK, + RECOVER_TABLET, + STREAM_LOAD, + UPDATE_TABLET_META_INFO, + ALTER_TASK +} + +enum TStmtType { + QUERY, + DDL, // Data definition, e.g. CREATE TABLE (includes read-only functions e.g. SHOW) + DML, // Data modification e.g. INSERT + EXPLAIN // EXPLAIN +} + +// level of verboseness for "explain" output +// TODO: should this go somewhere else? +enum TExplainLevel { + NORMAL, + VERBOSE +} + +struct TColumnType { + 1: required TPrimitiveType type + // Only set if type == CHAR_ARRAY + 2: optional i32 len + 3: optional i32 index_len + 4: optional i32 precision + 5: optional i32 scale +} + +// A TNetworkAddress is the standard host, port representation of a +// network address. The hostname field must be resolvable to an IPv4 +// address. +struct TNetworkAddress { + 1: required string hostname + 2: required i32 port +} + +// Wire format for UniqueId +struct TUniqueId { + 1: required i64 hi + 2: required i64 lo +} + +enum QueryState { + CREATED, + INITIALIZED, + COMPILED, + RUNNING, + FINISHED, + EXCEPTION +} + +enum TFunctionType { + SCALAR, + AGGREGATE, +} + +enum TFunctionBinaryType { + // Palo builtin. We can either run this interpreted or via codegen + // depending on the query option. + BUILTIN, + + // Hive UDFs, loaded from *.jar + HIVE, + + // Native-interface, precompiled UDFs loaded from *.so + NATIVE, + + // Native-interface, precompiled to IR; loaded from *.ll + IR, +} + +// Represents a fully qualified function name. +struct TFunctionName { + // Name of the function's parent database. Not set if in global + // namespace (e.g. builtins) + 1: optional string db_name + + // Name of the function + 2: required string function_name +} + +struct TScalarFunction { + // Symbol for the function + 1: required string symbol + 2: optional string prepare_fn_symbol + 3: optional string close_fn_symbol +} + +struct TAggregateFunction { + 1: required TTypeDesc intermediate_type + 2: optional string update_fn_symbol + 3: optional string init_fn_symbol + 4: optional string serialize_fn_symbol + 5: optional string merge_fn_symbol + 6: optional string finalize_fn_symbol + 8: optional string get_value_fn_symbol + 9: optional string remove_fn_symbol + 10: optional bool is_analytic_only_fn = false +} + +// Represents a function in the Catalog. +struct TFunction { + // Fully qualified function name. + 1: required TFunctionName name + + // Type of the udf. e.g. hive, native, ir + 2: required TFunctionBinaryType binary_type + + // The types of the arguments to the function + 3: required list arg_types + + // Return type for the function. + 4: required TTypeDesc ret_type + + // If true, this function takes var args. + 5: required bool has_var_args + + // Optional comment to attach to the function + 6: optional string comment + + 7: optional string signature + + // HDFS path for the function binary. This binary must exist at the time the + // function is created. + 8: optional string hdfs_location + + // One of these should be set. + 9: optional TScalarFunction scalar_fn + 10: optional TAggregateFunction aggregate_fn + + 11: optional i64 id + 12: optional string checksum +} + +enum TLoadJobState { + PENDING, + ETL, + LOADING, + FINISHED, + CANCELLED +} + +enum TEtlState { + RUNNING, + FINISHED, + CANCELLED, + UNKNOWN +} + +enum TTableType { + MYSQL_TABLE, + OLAP_TABLE, + SCHEMA_TABLE, + KUDU_TABLE, // Deprecated + BROKER_TABLE, + ES_TABLE +} + +enum TKeysType { + PRIMARY_KEYS, + DUP_KEYS, + UNIQUE_KEYS, + AGG_KEYS +} + +enum TPriority { + NORMAL, + HIGH +} + +struct TBackend { + 1: required string host + 2: required TPort be_port + 3: required TPort http_port +} + +struct TResourceInfo { + 1: required string user + 2: required string group +} + +enum TExportState { + RUNNING, + FINISHED, + CANCELLED, + UNKNOWN +} + +enum TFileType { + FILE_LOCAL, + FILE_BROKER, + FILE_STREAM, // file content is streaming in the buffer +} + +struct TTabletCommitInfo { + 1: required i64 tabletId + 2: required i64 backendId +} + +enum TLoadType { + MANUL_LOAD, + ROUTINE_LOAD, + MINI_LOAD +} + +enum TLoadSourceType { + RAW, + KAFKA, +} diff --git a/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java b/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java new file mode 100644 index 000000000..2418d8178 --- /dev/null +++ b/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.doris.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.doris.option.DorisReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class DorisSourceITCase { + + @Test + public void test() throws Exception { + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("doris_to_print.json"); + addDorisInfo(jobConf); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + /** + * Add your doris setting to job configuration. + * Below codes are just example. + *

+ * select id, bigint_type, string_type, double_type from doris_table where id = 1 + */ + public void addDorisInfo(BitSailConfiguration jobConf) { + jobConf.set(DorisReaderOptions.FE_HOSTS, "127.0.0.1:8030"); + jobConf.set(DorisReaderOptions.MYSQL_HOSTS, "127.0.0.1:9030"); + jobConf.set(DorisReaderOptions.USER, "root"); + jobConf.set(DorisReaderOptions.PASSWORD, ""); + jobConf.set(DorisReaderOptions.DB_NAME, "test"); + jobConf.set(DorisReaderOptions.TABLE_NAME, "doris_table"); + jobConf.set(DorisReaderOptions.SQL_FILTER, "id=1"); + } + +} diff --git a/bitsail-dist/src/main/resources/examples/Doris_Print_Example.json b/bitsail-dist/src/main/resources/examples/Doris_Print_Example.json new file mode 100644 index 000000000..e69de29bb diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json new file mode 100644 index 000000000..e69de29bb diff --git a/website/en/documents/connectors/doris/doris-example.md b/website/en/documents/connectors/doris/doris-example.md index 0f1e42de3..05be8a790 100644 --- a/website/en/documents/connectors/doris/doris-example.md +++ b/website/en/documents/connectors/doris/doris-example.md @@ -42,6 +42,41 @@ PROPERTIES ) ``` +## Doris Reader +Assuming there is a test doris cluster, then we can use the following configuration to read `test_doris_table` table. +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.doris.source.DorisSource", + "fe_hosts": "127.0.0.1:1234", + "mysql_hosts": "127.0.0.1:4321", + "user": "test_user", + "password": "1234567", + "db_name": "test_db", + "table_name": "test_doris_table", + "columns": [ + { + "name": "id", + "type": "bigint" + }, + { + "name": "bigint_type", + "type": "bigint" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "double_type", + "type": "double" + } + ] + } + } +} +``` ## Doris writer diff --git a/website/en/documents/connectors/doris/doris.md b/website/en/documents/connectors/doris/doris.md index 324a4e511..2da6a795b 100644 --- a/website/en/documents/connectors/doris/doris.md +++ b/website/en/documents/connectors/doris/doris.md @@ -17,6 +17,67 @@ Parent document: [Connectors](../README.md) ``` +## Doris Reader +### Supported data types +Doris read connector parses according to the data segment mapping and supports the following data types: + +- CHAR +- VARCHAR +- BOOLEAN +- BINARY +- VARBINARY +- INT +- TINYINT +- SMALLINT +- INTEGER +- BIGINT +- FLOAT +- DOUBLE + +### Parameters +The following mentioned parameters should be added to `job.reader` block when using, for example: +```json +{ + "job": { + "writer": { + "class": "com.bytedance.bitsail.connector.doris.source.DorisSource", + "fe_hosts": "127.0.0.1:8030", + "mysql_hosts": "127.0.0.1:9030", + "user": "root", + "password": "", + "db_name": "test", + "table_name": "test_doris_table" + } + } +} +``` + +#### Necessary parameters +| Param name | Required | Default Value | Description | +|:----------------|:---------|:--------------|:------------------------------------------------------------------------------------| +| class | yes | -- | Doris writer class name, `com.bytedance.bitsail.connector.doris.source.DorisSource` | +| fe_hosts | yes | -- | Doris FE address, multi addresses separated by comma | +| mysql_hosts | yes | -- | Doris jdbc query address , multi addresses separated by comma | +| user | yes | -- | Doris account user | +| password | yes | -- | Doris account password, can be empty | +| db_name | yes | -- | database to read | +| table_name | yes | -- | table to read | +| columns | yes | -- | The name and type of columns to read | + +#### Optional parameters +| Param name | Required | Default Value | Description | +|:------------------------------|:---------|:------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| reader_parallelism_num | no | 1 | reader parallelism | +| sql_filter | no | -- | Column value conditions that need to be queried and filtered | +| tablet_size | no | Integer.MAX_VALUE | The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the bitSail side, but at the same time will cause greater pressure on Doris. | +| exec_mem_limit | no | 2147483648 | Memory limit for a single query. The default is 2GB, in bytes. | +| request_query_timeout_s | no | 3600 | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit | +| request_batch_size | no | 1024 | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between bitSail and Doris. Thereby reducing the extra time overhead caused by network delay. | +| request_connect_timeouts | no | 30 * 1000 | Connection timeout for sending requests to Doris | +| request_read_timeouts | no | 30 * 1000 | Read timeout for sending request to Doris | +| request_retries | no | 3 | Number of retries to send requests to Doris | + + ## Doris Writer ### Supported data type diff --git a/website/en/documents/start/env_setup.md b/website/en/documents/start/env_setup.md index 0d3f51c53..21d1e3463 100644 --- a/website/en/documents/start/env_setup.md +++ b/website/en/documents/start/env_setup.md @@ -15,6 +15,34 @@ English | [简体中文](../../../zh/documents/start/env_setup.md) - JDK1.8 - maven 3.6+ - [Docker desktop](https://www.docker.com/products/docker-desktop/) +- thrift +```bash +install thrift + Windows: + 1. Download: `http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe` + 2. Modify thrift-0.13.0.exe to thrift + + MacOS: + 1. Download: `brew install thrift@0.13.0` + 2. default address: /opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift + + Note: Executing `brew install thrift@0.13.0` on MacOS may report an error that the version cannot be found. The solution is as follows, execute it in the terminal: + 1. `brew tap-new $USER/local-tap` + 2. `brew extract --version='0.13.0' thrift $USER/local-tap` + 3. `brew install thrift@0.13.0` + Reference link: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c` + + Linux: + 1.Download source package:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz` + 2.Install dependencies:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++` + 3.`tar zxvf thrift-0.13.0.tar.gz` + 4.`cd thrift-0.13.0` + 5.`./configure --without-tests` + 6.`make` + 7.`make install` + Check the version after installation is complete:thrift --version + Note: If you have compiled Doris, you do not need to install thrift, you can directly use $DORIS_HOME/thirdparty/installed/bin/thrift +``` After correctly installing the above required components, we are able to run integration tests on your local IDE. diff --git a/website/zh/documents/connectors/doris/doris-example.md b/website/zh/documents/connectors/doris/doris-example.md index b4b091263..6bef28ef1 100644 --- a/website/zh/documents/connectors/doris/doris-example.md +++ b/website/zh/documents/connectors/doris/doris-example.md @@ -42,6 +42,41 @@ PROPERTIES ) ``` +## Doris读连接器 +假设当前有一个测试doris集群,则可以通过如下配置读取`test_doris_table`表。 +```json +{ + "job": { + "reader": { + "class": "com.bytedance.bitsail.connector.doris.source.DorisSource", + "fe_hosts": "127.0.0.1:1234", + "mysql_hosts": "127.0.0.1:4321", + "user": "test_user", + "password": "1234567", + "db_name": "test_db", + "table_name": "test_doris_table", + "columns": [ + { + "name": "id", + "type": "bigint" + }, + { + "name": "bigint_type", + "type": "bigint" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "double_type", + "type": "double" + } + ] + } + } +} +``` ## Doris写连接器 diff --git a/website/zh/documents/connectors/doris/doris.md b/website/zh/documents/connectors/doris/doris.md index 3aa9a1055..b7178a8c2 100644 --- a/website/zh/documents/connectors/doris/doris.md +++ b/website/zh/documents/connectors/doris/doris.md @@ -19,6 +19,67 @@ ``` +## Doris读取 +### 支持的数据类型 +Doris读连接器根据字段映射进行解析,支持以下数据类型: + +- CHAR +- VARCHAR +- BOOLEAN +- BINARY +- VARBINARY +- INT +- TINYINT +- SMALLINT +- INTEGER +- BIGINT +- FLOAT +- DOUBLE + +### 主要参数 +写连接器参数在`job.reader`中配置,实际使用时请注意路径前缀。示例: +```json +{ + "job": { + "writer": { + "class": "com.bytedance.bitsail.connector.doris.source.DorisSource", + "fe_hosts": "127.0.0.1:8030", + "mysql_hosts": "127.0.0.1:9030", + "user": "root", + "password": "", + "db_name": "test", + "table_name": "test_doris_table" + } + } +} +``` +#### 必需参数 +| 参数名称 | 是否必填 | 默认值 | 参数含义 | +|:------------|:-------|:----|:------------------------------------------------------------------------| +| class | 是 | -- | Doris读连接器类型, `com.bytedance.bitsail.connector.doris.source.DorisSource` | +| fe_hosts | 是 | -- | Doris FE地址, 多个地址用逗号分隔 | +| mysql_hosts | 是 | -- | JDBC连接Doris的地址, 多个地址用逗号分隔 | +| user | 是 | -- | Doris账户名 | +| password | 是 | -- | Doris密码,可为空 | +| db_name | 是 | -- | 要读取的Doris库 | +| table_name | 是 | -- | 要读取的Doris表 | +| columns | 是 | -- | 要读取的数据列的列名和类型 | + +#### 可选参数 +| 参数名称 | 是否必填 | 默认值 | 参数含义 | +|:------------------------------|:-----|:------------------|:----------------------------------------------------------------------------------------------------| +| reader_parallelism_num | 否 | 1 | 指定doris读并发 | +| sql_filter | 否 | -- | 需要查询过滤的列值条件 | +| tablet_size | 否 | Integer.MAX_VALUE | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 bitSail 侧的并行度,但同时会对 Doris 造成更大的压力。 | +| exec_mem_limit | 否 | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | +| request_query_timeout_s | 否 | 3600 | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 | +| request_batch_size | 否 | 1024 | 一次从 Doris BE 读取数据的最大行数。增大此数值可减少 bitSail 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销 | +| request_connect_timeouts | 否 | 30 * 1000 | 向 Doris 发送请求的连接超时时间 | +| request_read_timeouts | 否 | 30 * 1000 | 向 Doris 发送请求的读取超时时间 | +| request_retries | 否 | 3 | 向 Doris 发送请求的重试次数 | + + + ## Doris写入 ### 支持的数据类型 diff --git a/website/zh/documents/start/env_setup.md b/website/zh/documents/start/env_setup.md index c187aabf4..3e95c0503 100644 --- a/website/zh/documents/start/env_setup.md +++ b/website/zh/documents/start/env_setup.md @@ -15,6 +15,34 @@ order: 2 - JDK1.8 - maven 3.6+ - [Docker desktop](https://www.docker.com/products/docker-desktop/) +- thrift +```bash +安装 thrift + Windows: + 1.下载:`http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe`(下载目录自己指定) + 2.修改thrift-0.13.0.exe 为 thrift + + MacOS: + 1. 下载:`brew install thrift@0.13.0` + 2. 默认下载地址:/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift + + 注:MacOS执行 `brew install thrift@0.13.0` 可能会报找不到版本的错误,解决方法如下,在终端执行: + 1. `brew tap-new $USER/local-tap` + 2. `brew extract --version='0.13.0' thrift $USER/local-tap` + 3. `brew install thrift@0.13.0` + 参考链接: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c` + + Linux: + 1.下载源码包:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz` + 2.安装依赖:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++` + 3.`tar zxvf thrift-0.13.0.tar.gz` + 4.`cd thrift-0.13.0` + 5.`./configure --without-tests` + 6.`make` + 7.`make install` + 安装完成后查看版本:thrift --version + 注:如果编译过Doris,则不需要安装thrift,可以直接使用 $DORIS_HOME/thirdparty/installed/bin/thrift +``` 在安装上述必需组件后,您可以在本地的IDE上直接运行已有的集成测试。 From 98f25a7e6c84dc8879e042f66f1dc517a9350da6 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Wed, 1 Feb 2023 20:16:52 +0800 Subject: [PATCH 2/3] fix doc --- README.md | 2 +- README_zh.md | 2 +- website/en/documents/introduce.md | 2 +- website/zh/documents/introduce.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4697cb9da..df6de81d9 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,7 @@ In the Runtime layer, it supports multiple execution modes, such as yarn, local, Doris - - + ✅ ✅ diff --git a/README_zh.md b/README_zh.md index 99e9f5d95..ce906e1a5 100644 --- a/README_zh.md +++ b/README_zh.md @@ -96,7 +96,7 @@ BitSail目前已被广泛使用,并支持数百万亿的大流量场景。同时 Doris - - + ✅ ✅ diff --git a/website/en/documents/introduce.md b/website/en/documents/introduce.md index c289fafe6..9543cb766 100644 --- a/website/en/documents/introduce.md +++ b/website/en/documents/introduce.md @@ -120,7 +120,7 @@ In the Runtime layer, it supports multiple execution modes, such as yarn, local, Doris - - + ✅ ✅ diff --git a/website/zh/documents/introduce.md b/website/zh/documents/introduce.md index 984ab82c5..887d73476 100644 --- a/website/zh/documents/introduce.md +++ b/website/zh/documents/introduce.md @@ -104,7 +104,7 @@ BitSail目前已被广泛使用,并支持数百万亿的大流量场景。同时 Doris - - + ✅ ✅ From 96c1799835be31522d7a50e73a18ccb2c855ae11 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Wed, 10 May 2023 18:04:14 +0800 Subject: [PATCH 3/3] add doris-sdk to replace thrift compile --- bitsail-connectors/connector-doris/pom.xml | 31 +- .../doris/backend/BackendClient.java | 33 +- .../doris/backend/model/RowBatch.java | 4 +- .../source/split/DorisSourceSplitReader.java | 12 +- .../DorisSourceSplitCoordinator.java | 2 +- .../thrift/doris/DorisExternalService.thrift | 121 ------ .../src/main/thrift/doris/Status.thrift | 65 --- .../src/main/thrift/doris/Types.thrift | 374 ------------------ .../doris/{ => sink}/DorisSinkITCase.java | 14 +- .../doris/source/DorisSourceITCase.java | 14 +- .../src/test/resources/doris_to_print.json | 40 ++ website/en/documents/start/env_setup.md | 28 -- website/zh/documents/start/env_setup.md | 29 +- 13 files changed, 88 insertions(+), 679 deletions(-) delete mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift delete mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift delete mode 100644 bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift rename bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/{ => sink}/DorisSinkITCase.java (85%) rename {bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector => bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration}/doris/source/DorisSourceITCase.java (78%) diff --git a/bitsail-connectors/connector-doris/pom.xml b/bitsail-connectors/connector-doris/pom.xml index 548b9cc40..5e98d7d7a 100644 --- a/bitsail-connectors/connector-doris/pom.xml +++ b/bitsail-connectors/connector-doris/pom.xml @@ -32,10 +32,16 @@ 8 5.1.49 5.0.0 - 0.13.0 + 0.16.0 + 1.0.0 + + org.apache.doris + thrift-service + ${thrift-service.version} + mysql mysql-connector-java @@ -118,27 +124,4 @@ test - - - - - org.apache.thrift.tools - maven-thrift-plugin - 0.1.11 - - ${thrift.binary} - java:fullcamel - - - - thrift-sources - generate-sources - - compile - - - - - - \ No newline at end of file diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java index bfe6d15bb..a77f72239 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/BackendClient.java @@ -20,15 +20,16 @@ import com.bytedance.bitsail.connector.doris.backend.model.Routing; import com.bytedance.bitsail.connector.doris.config.DorisExecutionOptions; import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; -import com.bytedance.bitsail.connector.doris.thrift.TDorisExternalService; -import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; -import com.bytedance.bitsail.connector.doris.thrift.TScanCloseParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanCloseResult; -import com.bytedance.bitsail.connector.doris.thrift.TScanNextBatchParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanOpenParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanOpenResult; -import com.bytedance.bitsail.connector.doris.thrift.TStatusCode; +import org.apache.doris.sdk.thrift.TDorisExternalService; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TScanCloseParams; +import org.apache.doris.sdk.thrift.TScanCloseResult; +import org.apache.doris.sdk.thrift.TScanNextBatchParams; +import org.apache.doris.sdk.thrift.TScanOpenParams; +import org.apache.doris.sdk.thrift.TScanOpenResult; +import org.apache.doris.sdk.thrift.TStatusCode; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -66,15 +67,15 @@ private void open() { TException ex = null; for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { LOGGER.debug("Attempt {} to connect {}.", attempt, routing); - TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); - transport = new TSocket(routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); - TProtocol protocol = factory.getProtocol(transport); - client = new TDorisExternalService.Client(protocol); - if (isConnected) { - LOGGER.info("Success connect to {}.", routing); - return; - } try { + TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); + TProtocol protocol = factory.getProtocol(transport); + client = new TDorisExternalService.Client(protocol); + if (isConnected) { + LOGGER.info("Success connect to {}.", routing); + return; + } LOGGER.trace("Connect status before open transport to {} is '{}'.", routing, isConnected); if (!transport.isOpen()) { transport.open(); diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java index 6a2c2ecdb..b11af762d 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/backend/model/RowBatch.java @@ -17,9 +17,9 @@ package com.bytedance.bitsail.connector.doris.backend.model; import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.util.Preconditions; import com.bytedance.bitsail.connector.doris.error.DorisErrorCode; import com.bytedance.bitsail.connector.doris.rest.model.Schema; -import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; @@ -36,7 +36,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; -import org.apache.flink.util.Preconditions; +import org.apache.doris.sdk.thrift.TScanBatchResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java index 8ab9df72a..2e8a05a3a 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/DorisSourceSplitReader.java @@ -24,13 +24,13 @@ import com.bytedance.bitsail.connector.doris.rest.model.Field; import com.bytedance.bitsail.connector.doris.rest.model.PartitionDefinition; import com.bytedance.bitsail.connector.doris.rest.model.Schema; -import com.bytedance.bitsail.connector.doris.thrift.TScanBatchResult; -import com.bytedance.bitsail.connector.doris.thrift.TScanCloseParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanColumnDesc; -import com.bytedance.bitsail.connector.doris.thrift.TScanNextBatchParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanOpenParams; -import com.bytedance.bitsail.connector.doris.thrift.TScanOpenResult; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TScanCloseParams; +import org.apache.doris.sdk.thrift.TScanColumnDesc; +import org.apache.doris.sdk.thrift.TScanNextBatchParams; +import org.apache.doris.sdk.thrift.TScanOpenParams; +import org.apache.doris.sdk.thrift.TScanOpenResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java index 46da687f7..187d54b1f 100644 --- a/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java +++ b/bitsail-connectors/connector-doris/src/main/java/com/bytedance/bitsail/connector/doris/source/split/coordinator/DorisSourceSplitCoordinator.java @@ -118,7 +118,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public EmptyState snapshotState() throws Exception { + public EmptyState snapshotState(long checkpoint) throws Exception { return new EmptyState(); } diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift deleted file mode 100644 index ba4ea6ffd..000000000 --- a/bitsail-connectors/connector-doris/src/main/thrift/doris/DorisExternalService.thrift +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace java com.bytedance.bitsail.connector.doris.thrift -namespace cpp doris - -include "Types.thrift" -include "Status.thrift" - - -// Parameters to open(). -struct TScanOpenParams { - - 1: required string cluster - - 2: required string database - - 3: required string table - - // tablets to scan - 4: required list tablet_ids - - // base64 encoded binary plan fragment - 5: required string opaqued_query_plan - - // A string specified for the table that is passed to the external data source. - // Always set, may be an empty string. - 6: optional i32 batch_size - - // reserved params for use - 7: optional map properties - - // The query limit, if specified. - 8: optional i64 limit - - // The authenticated user name. Always set. - // maybe usefullless - 9: optional string user - - 10: optional string passwd - // max keep alive time min - 11: optional i16 keep_alive_min - - 12: optional i32 query_timeout - - // memory limit for a single query - 13: optional i64 mem_limit -} - -struct TScanColumnDesc { - // The column name - 1: optional string name - // The column type. Always set. - 2: optional Types.TPrimitiveType type -} - -// Returned by open(). -struct TScanOpenResult { - 1: required Status.TStatus status - // An opaque context_id used in subsequent getNext()/close() calls. Required. - 2: optional string context_id - // selected fields - 3: optional list selected_columns - -} - -// Parameters to getNext() -struct TScanNextBatchParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string context_id // doris olap engine context id - 2: optional i64 offset // doris should check the offset to prevent duplicate rpc calls -} - -// Returned by getNext(). -struct TScanBatchResult { - 1: required Status.TStatus status - - // If true, reached the end of the result stream; subsequent calls to - // getNext() won’t return any more results. Required. - 2: optional bool eos - - // A batch of rows of arrow format to return, if any exist. The number of rows in the batch - // should be less than or equal to the batch_size specified in TOpenParams. - 3: optional binary rows -} - -// Parameters to close() -struct TScanCloseParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string context_id -} - -// Returned by close(). -struct TScanCloseResult { - 1: required Status.TStatus status -} - -// scan service expose ability of scanning data ability to other compute system -service TDorisExternalService { - // doris will build a scan context for this session, context_id returned if success - TScanOpenResult open_scanner(1: TScanOpenParams params); - - // return the batch_size of data - TScanBatchResult get_next(1: TScanNextBatchParams params); - - // release the context resource associated with the context_id - TScanCloseResult close_scanner(1: TScanCloseParams params); -} diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift deleted file mode 100644 index 5308a6f9f..000000000 --- a/bitsail-connectors/connector-doris/src/main/thrift/doris/Status.thrift +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace java com.bytedance.bitsail.connector.doris.thrift -namespace cpp doris - -enum TStatusCode { - OK, - CANCELLED, - ANALYSIS_ERROR, - NOT_IMPLEMENTED_ERROR, - RUNTIME_ERROR, - MEM_LIMIT_EXCEEDED, - INTERNAL_ERROR, - THRIFT_RPC_ERROR, - TIMEOUT, - KUDU_NOT_ENABLED, // Deprecated - KUDU_NOT_SUPPORTED_ON_OS, // Deprecated - MEM_ALLOC_FAILED, - BUFFER_ALLOCATION_FAILED, - MINIMUM_RESERVATION_UNAVAILABLE, - PUBLISH_TIMEOUT, - LABEL_ALREADY_EXISTS, - ES_INTERNAL_ERROR, - ES_INDEX_NOT_FOUND, - ES_SHARD_NOT_FOUND, - ES_INVALID_CONTEXTID, - ES_INVALID_OFFSET, - ES_REQUEST_ERROR, - - // end of file - END_OF_FILE = 30, - NOT_FOUND = 31, - CORRUPTION = 32, - INVALID_ARGUMENT = 33, - IO_ERROR = 34, - ALREADY_EXIST = 35, - NETWORK_ERROR = 36, - ILLEGAL_STATE = 37, - NOT_AUTHORIZED = 38, - ABORTED = 39, - REMOTE_ERROR = 40, - SERVICE_UNAVAILABLE = 41, - UNINITIALIZED = 42, - CONFIGURATION_ERROR = 43, - INCOMPLETE = 44 -} - -struct TStatus { - 1: required TStatusCode status_code - 2: optional list error_msgs -} diff --git a/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift b/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift deleted file mode 100644 index fa48c8e20..000000000 --- a/bitsail-connectors/connector-doris/src/main/thrift/doris/Types.thrift +++ /dev/null @@ -1,374 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace java com.bytedance.bitsail.connector.doris.thrift -namespace cpp doris - -typedef i64 TTimestamp -typedef i32 TPlanNodeId -typedef i32 TTupleId -typedef i32 TSlotId -typedef i64 TTableId -typedef i64 TTabletId -typedef i64 TVersion -typedef i64 TVersionHash -typedef i32 TSchemaHash -typedef i32 TPort -typedef i64 TCount -typedef i64 TSize -typedef i32 TClusterId -typedef i64 TEpoch - -// add for real time load, partitionid is not defined previously, define it here -typedef i64 TTransactionId -typedef i64 TPartitionId - -enum TStorageType { - ROW, - COLUMN, -} - -enum TStorageMedium { - HDD, - SSD, -} - -enum TVarType { - SESSION, - GLOBAL -} - -enum TPrimitiveType { - INVALID_TYPE, - NULL_TYPE, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DATE, - DATETIME, - BINARY, - DECIMAL, - // CHAR(n). Currently only supported in UDAs - CHAR, - LARGEINT, - VARCHAR, - HLL, - DECIMALV2, - TIME, - OBJECT, - ARRAY, - MAP, - STRUCT, - STRING, - ALL -} - -enum TTypeNodeType { - SCALAR, - ARRAY, - MAP, - STRUCT -} - -struct TScalarType { - 1: required TPrimitiveType type - - // Only set if type == CHAR or type == VARCHAR - 2: optional i32 len - - // Only set for DECIMAL - 3: optional i32 precision - 4: optional i32 scale -} - -// Represents a field in a STRUCT type. -// TODO: Model column stats for struct fields. -struct TStructField { - 1: required string name - 2: optional string comment -} - -struct TTypeNode { - 1: required TTypeNodeType type - - // only set for scalar types - 2: optional TScalarType scalar_type - - // only used for structs; has struct_fields.size() corresponding child types - 3: optional list struct_fields -} - -// A flattened representation of a tree of column types obtained by depth-first -// traversal. Complex types such as map, array and struct have child types corresponding -// to the map key/value, array item type, and struct fields, respectively. -// For scalar types the list contains only a single node. -// Note: We cannot rename this to TType because it conflicts with Thrift's internal TType -// and the generated Python thrift files will not work. -// Note: TTypeDesc in impala is TColumnType, but we already use TColumnType, so we name this -// to TTypeDesc. In future, we merge these two to one -struct TTypeDesc { - 1: list types -} - -enum TAggregationType { - SUM, - MAX, - MIN, - REPLACE, - HLL_UNION, - NONE -} - -enum TPushType { - LOAD, - DELETE, - LOAD_DELETE -} - -enum TTaskType { - CREATE, - DROP, - PUSH, - CLONE, - STORAGE_MEDIUM_MIGRATE, - ROLLUP, - SCHEMA_CHANGE, - CANCEL_DELETE, // Deprecated - MAKE_SNAPSHOT, - RELEASE_SNAPSHOT, - CHECK_CONSISTENCY, - UPLOAD, - DOWNLOAD, - CLEAR_REMOTE_FILE, - MOVE - REALTIME_PUSH, - PUBLISH_VERSION, - CLEAR_ALTER_TASK, - CLEAR_TRANSACTION_TASK, - RECOVER_TABLET, - STREAM_LOAD, - UPDATE_TABLET_META_INFO, - ALTER_TASK -} - -enum TStmtType { - QUERY, - DDL, // Data definition, e.g. CREATE TABLE (includes read-only functions e.g. SHOW) - DML, // Data modification e.g. INSERT - EXPLAIN // EXPLAIN -} - -// level of verboseness for "explain" output -// TODO: should this go somewhere else? -enum TExplainLevel { - NORMAL, - VERBOSE -} - -struct TColumnType { - 1: required TPrimitiveType type - // Only set if type == CHAR_ARRAY - 2: optional i32 len - 3: optional i32 index_len - 4: optional i32 precision - 5: optional i32 scale -} - -// A TNetworkAddress is the standard host, port representation of a -// network address. The hostname field must be resolvable to an IPv4 -// address. -struct TNetworkAddress { - 1: required string hostname - 2: required i32 port -} - -// Wire format for UniqueId -struct TUniqueId { - 1: required i64 hi - 2: required i64 lo -} - -enum QueryState { - CREATED, - INITIALIZED, - COMPILED, - RUNNING, - FINISHED, - EXCEPTION -} - -enum TFunctionType { - SCALAR, - AGGREGATE, -} - -enum TFunctionBinaryType { - // Palo builtin. We can either run this interpreted or via codegen - // depending on the query option. - BUILTIN, - - // Hive UDFs, loaded from *.jar - HIVE, - - // Native-interface, precompiled UDFs loaded from *.so - NATIVE, - - // Native-interface, precompiled to IR; loaded from *.ll - IR, -} - -// Represents a fully qualified function name. -struct TFunctionName { - // Name of the function's parent database. Not set if in global - // namespace (e.g. builtins) - 1: optional string db_name - - // Name of the function - 2: required string function_name -} - -struct TScalarFunction { - // Symbol for the function - 1: required string symbol - 2: optional string prepare_fn_symbol - 3: optional string close_fn_symbol -} - -struct TAggregateFunction { - 1: required TTypeDesc intermediate_type - 2: optional string update_fn_symbol - 3: optional string init_fn_symbol - 4: optional string serialize_fn_symbol - 5: optional string merge_fn_symbol - 6: optional string finalize_fn_symbol - 8: optional string get_value_fn_symbol - 9: optional string remove_fn_symbol - 10: optional bool is_analytic_only_fn = false -} - -// Represents a function in the Catalog. -struct TFunction { - // Fully qualified function name. - 1: required TFunctionName name - - // Type of the udf. e.g. hive, native, ir - 2: required TFunctionBinaryType binary_type - - // The types of the arguments to the function - 3: required list arg_types - - // Return type for the function. - 4: required TTypeDesc ret_type - - // If true, this function takes var args. - 5: required bool has_var_args - - // Optional comment to attach to the function - 6: optional string comment - - 7: optional string signature - - // HDFS path for the function binary. This binary must exist at the time the - // function is created. - 8: optional string hdfs_location - - // One of these should be set. - 9: optional TScalarFunction scalar_fn - 10: optional TAggregateFunction aggregate_fn - - 11: optional i64 id - 12: optional string checksum -} - -enum TLoadJobState { - PENDING, - ETL, - LOADING, - FINISHED, - CANCELLED -} - -enum TEtlState { - RUNNING, - FINISHED, - CANCELLED, - UNKNOWN -} - -enum TTableType { - MYSQL_TABLE, - OLAP_TABLE, - SCHEMA_TABLE, - KUDU_TABLE, // Deprecated - BROKER_TABLE, - ES_TABLE -} - -enum TKeysType { - PRIMARY_KEYS, - DUP_KEYS, - UNIQUE_KEYS, - AGG_KEYS -} - -enum TPriority { - NORMAL, - HIGH -} - -struct TBackend { - 1: required string host - 2: required TPort be_port - 3: required TPort http_port -} - -struct TResourceInfo { - 1: required string user - 2: required string group -} - -enum TExportState { - RUNNING, - FINISHED, - CANCELLED, - UNKNOWN -} - -enum TFileType { - FILE_LOCAL, - FILE_BROKER, - FILE_STREAM, // file content is streaming in the buffer -} - -struct TTabletCommitInfo { - 1: required i64 tabletId - 2: required i64 backendId -} - -enum TLoadType { - MANUL_LOAD, - ROUTINE_LOAD, - MINI_LOAD -} - -enum TLoadSourceType { - RAW, - KAFKA, -} diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java similarity index 85% rename from bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java rename to bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java index 05c841ca0..6ca4b2650 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/DorisSinkITCase.java +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/sink/DorisSinkITCase.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.bytedance.bitsail.test.integration.doris; +package com.bytedance.bitsail.test.integration.doris.sink; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.option.CommonOptions; @@ -48,12 +48,12 @@ public void test() throws Exception { * Below codes are just example. */ public void addDorisInfo(BitSailConfiguration jobConf) { - jobConf.set(DorisWriterOptions.FE_HOSTS, "127.0.0.1:1234"); - jobConf.set(DorisWriterOptions.MYSQL_HOSTS, "127.0.0.1:4321"); - jobConf.set(DorisWriterOptions.USER, "test_user"); - jobConf.set(DorisWriterOptions.PASSWORD, "password"); - jobConf.set(DorisWriterOptions.DB_NAME, "test_db"); - jobConf.set(DorisWriterOptions.TABLE_NAME, "test_table"); + jobConf.set(DorisWriterOptions.FE_HOSTS, "127.0.0.1:8030"); + jobConf.set(DorisWriterOptions.MYSQL_HOSTS, "127.0.0.1:9030"); + jobConf.set(DorisWriterOptions.USER, "root"); + jobConf.set(DorisWriterOptions.PASSWORD, ""); + jobConf.set(DorisWriterOptions.DB_NAME, "test"); + jobConf.set(DorisWriterOptions.TABLE_NAME, "test_bitsails"); jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_ENABLE, true); jobConf.set(CommonOptions.CheckPointOptions.CHECKPOINT_INTERVAL, 5000L); jobConf.set(DorisWriterOptions.SINK_ENABLE_2PC, false); diff --git a/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java similarity index 78% rename from bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java rename to bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java index 2418d8178..3f6f0d716 100644 --- a/bitsail-connectors/connector-doris/src/test/java/com/bytedance/bitsail/connector/doris/source/DorisSourceITCase.java +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/java/com/bytedance/bitsail/test/integration/doris/source/DorisSourceITCase.java @@ -14,24 +14,24 @@ * limitations under the License. */ -package com.bytedance.bitsail.connector.doris.source; +package com.bytedance.bitsail.test.integration.doris.source; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.connector.doris.option.DorisReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; +import com.bytedance.bitsail.test.integration.AbstractIntegrationTest; +import com.bytedance.bitsail.test.integration.utils.JobConfUtils; import org.junit.Ignore; import org.junit.Test; @Ignore -public class DorisSourceITCase { +public class DorisSourceITCase extends AbstractIntegrationTest { @Test public void test() throws Exception { BitSailConfiguration jobConf = JobConfUtils.fromClasspath("doris_to_print.json"); addDorisInfo(jobConf); - EmbeddedFlinkCluster.submitJob(jobConf); + submitJob(jobConf); } /** @@ -46,8 +46,8 @@ public void addDorisInfo(BitSailConfiguration jobConf) { jobConf.set(DorisReaderOptions.USER, "root"); jobConf.set(DorisReaderOptions.PASSWORD, ""); jobConf.set(DorisReaderOptions.DB_NAME, "test"); - jobConf.set(DorisReaderOptions.TABLE_NAME, "doris_table"); - jobConf.set(DorisReaderOptions.SQL_FILTER, "id=1"); + jobConf.set(DorisReaderOptions.TABLE_NAME, "test_bitsail"); +// jobConf.set(DorisReaderOptions.SQL_FILTER, "id=1"); } } diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json index e69de29bb..113ca2fc1 100644 --- a/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-doris/src/test/resources/doris_to_print.json @@ -0,0 +1,40 @@ +{ + "job": { + "common": { + "job_id": -31198, + "instance_id": -31198, + "job_name": "bitsail_doris_to_print_test", + "user_name": "test" + }, + "reader": { + "class": "com.bytedance.bitsail.connector.doris.source.DorisSource", + "fe_hosts": "127.0.0.1:8030", + "mysql_hosts": "127.0.0.1:9030", + "user": "root", + "password": "", + "db_name": "test", + "table_name": "doris_table", + "columns": [ + { + "name": "id", + "type": "int" + }, + { + "name": "bigint_type", + "type": "bigint" + }, + { + "name": "string_type", + "type": "string" + }, + { + "name": "double_type", + "type": "double" + } + ] + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" + } + } +} \ No newline at end of file diff --git a/website/en/documents/start/env_setup.md b/website/en/documents/start/env_setup.md index 21d1e3463..0d3f51c53 100644 --- a/website/en/documents/start/env_setup.md +++ b/website/en/documents/start/env_setup.md @@ -15,34 +15,6 @@ English | [简体中文](../../../zh/documents/start/env_setup.md) - JDK1.8 - maven 3.6+ - [Docker desktop](https://www.docker.com/products/docker-desktop/) -- thrift -```bash -install thrift - Windows: - 1. Download: `http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe` - 2. Modify thrift-0.13.0.exe to thrift - - MacOS: - 1. Download: `brew install thrift@0.13.0` - 2. default address: /opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift - - Note: Executing `brew install thrift@0.13.0` on MacOS may report an error that the version cannot be found. The solution is as follows, execute it in the terminal: - 1. `brew tap-new $USER/local-tap` - 2. `brew extract --version='0.13.0' thrift $USER/local-tap` - 3. `brew install thrift@0.13.0` - Reference link: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c` - - Linux: - 1.Download source package:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz` - 2.Install dependencies:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++` - 3.`tar zxvf thrift-0.13.0.tar.gz` - 4.`cd thrift-0.13.0` - 5.`./configure --without-tests` - 6.`make` - 7.`make install` - Check the version after installation is complete:thrift --version - Note: If you have compiled Doris, you do not need to install thrift, you can directly use $DORIS_HOME/thirdparty/installed/bin/thrift -``` After correctly installing the above required components, we are able to run integration tests on your local IDE. diff --git a/website/zh/documents/start/env_setup.md b/website/zh/documents/start/env_setup.md index 3e95c0503..41651e941 100644 --- a/website/zh/documents/start/env_setup.md +++ b/website/zh/documents/start/env_setup.md @@ -15,34 +15,7 @@ order: 2 - JDK1.8 - maven 3.6+ - [Docker desktop](https://www.docker.com/products/docker-desktop/) -- thrift -```bash -安装 thrift - Windows: - 1.下载:`http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe`(下载目录自己指定) - 2.修改thrift-0.13.0.exe 为 thrift - - MacOS: - 1. 下载:`brew install thrift@0.13.0` - 2. 默认下载地址:/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift - - 注:MacOS执行 `brew install thrift@0.13.0` 可能会报找不到版本的错误,解决方法如下,在终端执行: - 1. `brew tap-new $USER/local-tap` - 2. `brew extract --version='0.13.0' thrift $USER/local-tap` - 3. `brew install thrift@0.13.0` - 参考链接: `https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c` - - Linux: - 1.下载源码包:`wget https://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz` - 2.安装依赖:`yum install -y autoconf automake libtool cmake ncurses-devel openssl-devel lzo-devel zlib-devel gcc gcc-c++` - 3.`tar zxvf thrift-0.13.0.tar.gz` - 4.`cd thrift-0.13.0` - 5.`./configure --without-tests` - 6.`make` - 7.`make install` - 安装完成后查看版本:thrift --version - 注:如果编译过Doris,则不需要安装thrift,可以直接使用 $DORIS_HOME/thirdparty/installed/bin/thrift -``` + 在安装上述必需组件后,您可以在本地的IDE上直接运行已有的集成测试。