From 49c93ee0b87a528b13474a4e6d6f8e58a9b175c2 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 5 Feb 2024 23:33:23 +0900 Subject: [PATCH 1/6] add match_by_column_name option --- README.md | 1 + .../embulk/output/SnowflakeOutputPlugin.java | 65 ++++++- ...flakeMatchByColumnNameCopyBatchInsert.java | 160 ++++++++++++++++++ 3 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java diff --git a/README.md b/README.md index 6f4065c..9903d7a 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **merge_keys**: key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) - **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values) - **batch_size**: size of a single batch insert (integer, default: 16777216) +- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_insensitive", "none", default: "none") - **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`) - **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column. - **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index a7e201e..d5655c0 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -1,5 +1,7 @@ package org.embulk.output; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; import java.io.IOException; import java.sql.SQLException; import java.sql.Types; @@ -10,6 +12,7 @@ import org.embulk.output.jdbc.*; import org.embulk.output.snowflake.PrivateKeyReader; import org.embulk.output.snowflake.SnowflakeCopyBatchInsert; +import org.embulk.output.snowflake.SnowflakeMatchByColumnNameCopyBatchInsert; import org.embulk.output.snowflake.SnowflakeOutputConnection; import org.embulk.output.snowflake.SnowflakeOutputConnector; import org.embulk.output.snowflake.StageIdentifier; @@ -65,6 +68,36 @@ public interface SnowflakePluginTask extends PluginTask { @Config("empty_field_as_null") @ConfigDefault("true") public boolean getEmtpyFieldAsNull(); + + @Config("match_by_column_name") + @ConfigDefault("\"none\"") + public MatchByColumnName getMatchByColumnName(); + + public enum MatchByColumnName { + // TODO support case_sensitive + CASE_INSENSITIVE, + NONE; + + @JsonValue + @Override + public String toString() { + return name().toLowerCase(Locale.ENGLISH); + } + + @JsonCreator + public static MatchByColumnName fromString(String value) { + switch (value) { + case "case_insensitive": + return CASE_INSENSITIVE; + case "none": + return NONE; + default: + throw new ConfigException( + String.format( + "Unknown value '%s'. Supported values are case_insensitive, none", value)); + } + } + } } @Override @@ -176,12 +209,40 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg } SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; - return new SnowflakeCopyBatchInsert( + SnowflakePluginTask.MatchByColumnName matchByColumnName = pluginTask.getMatchByColumnName(); + if (matchByColumnName == SnowflakePluginTask.MatchByColumnName.NONE) { + return new SnowflakeCopyBatchInsert( + getConnector(task, true), + this.stageIdentifier, + false, + pluginTask.getMaxUploadRetries(), + pluginTask.getEmtpyFieldAsNull()); + } + + JdbcSchema existingJdbcSchema; + JdbcOutputConnection con = getConnector(task, true).connect(true); + Mode mode = task.getMode(); + + Optional initialTargetTableSchema = + mode.ignoreTargetTableSchema() + ? Optional.empty() + : newJdbcSchemaFromTableIfExists(con, task.getActualTable()); + if (initialTargetTableSchema.isPresent()) { + existingJdbcSchema = initialTargetTableSchema.get(); + } else if (task.getIntermediateTables().isPresent() + && !task.getIntermediateTables().get().isEmpty()) { + TableIdentifier firstItermTable = task.getIntermediateTables().get().get(0); + existingJdbcSchema = newJdbcSchemaFromTableIfExists(con, firstItermTable).get(); + } else { + existingJdbcSchema = newJdbcSchemaFromTableIfExists(con, task.getActualTable()).get(); + } + return new SnowflakeMatchByColumnNameCopyBatchInsert( getConnector(task, true), this.stageIdentifier, false, pluginTask.getMaxUploadRetries(), - pluginTask.getEmtpyFieldAsNull()); + pluginTask.getEmtpyFieldAsNull(), + existingJdbcSchema); } @Override diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java b/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java new file mode 100644 index 0000000..fc0b827 --- /dev/null +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java @@ -0,0 +1,160 @@ +package org.embulk.output.snowflake; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import org.embulk.output.jdbc.JdbcColumn; +import org.embulk.output.jdbc.JdbcOutputConnector; +import org.embulk.output.jdbc.JdbcSchema; +import org.embulk.output.jdbc.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeMatchByColumnNameCopyBatchInsert extends SnowflakeCopyBatchInsert { + private final Logger logger = + LoggerFactory.getLogger(SnowflakeMatchByColumnNameCopyBatchInsert.class); + + private final JdbcSchema existingJdbcSchema; + + private final List indexes = new ArrayList<>(); + + private final List> setColumnRunnables = new ArrayList<>(); + + public SnowflakeMatchByColumnNameCopyBatchInsert( + JdbcOutputConnector connector, + StageIdentifier stageIdentifier, + boolean deleteStageFile, + int maxUploadRetries, + boolean emptyFieldAsNull, + JdbcSchema existingJdbcSchema) + throws IOException { + super(connector, stageIdentifier, deleteStageFile, maxUploadRetries, emptyFieldAsNull); + this.existingJdbcSchema = existingJdbcSchema; + } + + @Override + public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException { + super.prepare(loadTable, insertSchema); + prepareIndexes(insertSchema); + } + + private void prepareIndexes(JdbcSchema insertSchema) { + for (int i = 0; i < existingJdbcSchema.getCount(); i++) { + JdbcColumn existingColumn = existingJdbcSchema.getColumn(i); + String existingColumnName = existingColumn.getName(); + for (int j = 0; j < insertSchema.getCount(); j++) { + JdbcColumn insertColumn = insertSchema.getColumn(j); + String insertColumnName = insertColumn.getName(); + if (insertColumnName.equalsIgnoreCase(existingColumnName)) { + indexes.add(j); + break; + } + } + } + if (indexes.size() != insertSchema.getCount()) { + throw new UnsupportedOperationException( + "Input column names does not match output column names."); + } + } + + @Override + public void add() throws IOException { + indexes.forEach(i -> setColumnRunnables.get(i).run()); + setColumnRunnables.clear(); + super.add(); + } + + @Override + public void setNull(int sqlType) throws IOException { + setColumnRunnables.add(() -> super.setNull(sqlType)); + } + + @Override + public void setBoolean(boolean v) throws IOException { + setColumnRunnables.add(() -> super.setBoolean(v)); + } + + @Override + public void setByte(byte v) throws IOException { + setColumnRunnables.add(() -> super.setByte(v)); + } + + @Override + public void setShort(short v) throws IOException { + setColumnRunnables.add(() -> super.setShort(v)); + } + + @Override + public void setInt(int v) throws IOException { + setColumnRunnables.add(() -> super.setInt(v)); + } + + @Override + public void setLong(long v) throws IOException { + setColumnRunnables.add(() -> super.setLong(v)); + } + + @Override + public void setFloat(float v) throws IOException { + setColumnRunnables.add(() -> super.setFloat(v)); + } + + @Override + public void setDouble(double v) throws IOException { + setColumnRunnables.add(() -> super.setDouble(v)); + } + + @Override + public void setBigDecimal(BigDecimal v) throws IOException { + setColumnRunnables.add(() -> super.setBigDecimal(v)); + } + + @Override + public void setString(String v) throws IOException { + setColumnRunnables.add(() -> super.setString(v)); + } + + @Override + public void setNString(String v) throws IOException { + setColumnRunnables.add(() -> super.setNString(v)); + } + + @Override + public void setBytes(byte[] v) throws IOException { + setColumnRunnables.add(() -> super.setBytes(v)); + } + + @Override + public void setSqlDate(Instant v, Calendar cal) throws IOException { + setColumnRunnables.add(() -> super.setSqlDate(v, cal)); + } + + @Override + public void setSqlTime(Instant v, Calendar cal) throws IOException { + setColumnRunnables.add(() -> super.setSqlTime(v, cal)); + } + + @Override + public void setSqlTimestamp(Instant v, Calendar cal) throws IOException { + setColumnRunnables.add(() -> super.setSqlTimestamp(v, cal)); + } + + @FunctionalInterface + private interface CheckedRunnable extends Runnable { + + @Override + default void run() throws RuntimeException { + try { + runThrows(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + void runThrows() throws E; + } +} From 0dbf85636c05c44998a9ba30cba839d0e01f0bff Mon Sep 17 00:00:00 2001 From: chikamura Date: Sun, 11 Feb 2024 23:27:23 +0900 Subject: [PATCH 2/6] add match_by_column_name option by Data load with transformation --- .../embulk/output/SnowflakeOutputPlugin.java | 92 ++++++---- .../snowflake/SnowflakeCopyBatchInsert.java | 10 ++ ...flakeMatchByColumnNameCopyBatchInsert.java | 160 ------------------ .../snowflake/SnowflakeOutputConnection.java | 55 +++++- 4 files changed, 123 insertions(+), 194 deletions(-) delete mode 100644 src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index d5655c0..33f52ad 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -6,13 +6,13 @@ import java.sql.SQLException; import java.sql.Types; import java.util.*; +import java.util.function.BiFunction; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.TaskSource; import org.embulk.output.jdbc.*; import org.embulk.output.snowflake.PrivateKeyReader; import org.embulk.output.snowflake.SnowflakeCopyBatchInsert; -import org.embulk.output.snowflake.SnowflakeMatchByColumnNameCopyBatchInsert; import org.embulk.output.snowflake.SnowflakeOutputConnection; import org.embulk.output.snowflake.SnowflakeOutputConnector; import org.embulk.output.snowflake.StageIdentifier; @@ -73,8 +73,16 @@ public interface SnowflakePluginTask extends PluginTask { @ConfigDefault("\"none\"") public MatchByColumnName getMatchByColumnName(); + public void setInsertColumnNames(String[] insertColumnNames); + + public String[] getInsertColumnNames(); + + public void setInsertCSVColumnNums(int[] insertCSVColumnNums); + + public int[] getInsertCSVColumnNums(); + public enum MatchByColumnName { - // TODO support case_sensitive + CASE_SENSITIVE, CASE_INSENSITIVE, NONE; @@ -87,6 +95,8 @@ public String toString() { @JsonCreator public static MatchByColumnName fromString(String value) { switch (value) { + case "case_sensitive": + return CASE_SENSITIVE; case "case_insensitive": return CASE_INSENSITIVE; case "none": @@ -94,7 +104,8 @@ public static MatchByColumnName fromString(String value) { default: throw new ConfigException( String.format( - "Unknown value '%s'. Supported values are case_insensitive, none", value)); + "Unknown match_by_column_name '%s'. Supported values are case_sensitive, case_insensitive, none", + value)); } } } @@ -189,6 +200,47 @@ protected void doBegin( JdbcOutputConnection con, PluginTask task, final Schema schema, int taskCount) throws SQLException { super.doBegin(con, task, schema, taskCount); + + SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; + SnowflakePluginTask.MatchByColumnName matchByColumnName = pluginTask.getMatchByColumnName(); + if (matchByColumnName == SnowflakePluginTask.MatchByColumnName.NONE) { + pluginTask.setInsertCSVColumnNums(new int[0]); + pluginTask.setInsertColumnNames(new String[0]); + return; + } + + List insertColumnNames = new ArrayList<>(); + List insertCSVColumnNums = new ArrayList<>(); + JdbcSchema targetTableSchema = pluginTask.getTargetTableSchema(); + BiFunction compare = + matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_SENSITIVE + ? String::equals + : String::equalsIgnoreCase; + int columnNum = 1; + for (int i = 0; i < targetTableSchema.getCount(); i++) { + JdbcColumn targetColumn = targetTableSchema.getColumn(i); + if (targetColumn.isSkipColumn()) { + continue; + } + Column schemaColumn = schema.getColumn(i); + if (compare.apply(schemaColumn.getName(), targetColumn.getName())) { + insertColumnNames.add(targetColumn.getName()); + insertCSVColumnNums.add(columnNum); + } + columnNum += 1; + } + pluginTask.setInsertColumnNames(insertColumnNames.toArray(new String[0])); + pluginTask.setInsertCSVColumnNums(insertCSVColumnNums.stream().mapToInt(i -> i).toArray()); + + if (task.getMergeKeys().isPresent() + && matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_INSENSITIVE) { + List mergeKeys = new ArrayList<>(); + for (String mergeKey : task.getMergeKeys().get()) { + JdbcColumn targetColumn = targetTableSchema.findColumn(mergeKey).get(); + mergeKeys.add(targetColumn.getName()); + } + task.setMergeKeys(Optional.of(mergeKeys)); + } } @Override @@ -209,40 +261,14 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg } SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; - SnowflakePluginTask.MatchByColumnName matchByColumnName = pluginTask.getMatchByColumnName(); - if (matchByColumnName == SnowflakePluginTask.MatchByColumnName.NONE) { - return new SnowflakeCopyBatchInsert( - getConnector(task, true), - this.stageIdentifier, - false, - pluginTask.getMaxUploadRetries(), - pluginTask.getEmtpyFieldAsNull()); - } - - JdbcSchema existingJdbcSchema; - JdbcOutputConnection con = getConnector(task, true).connect(true); - Mode mode = task.getMode(); - - Optional initialTargetTableSchema = - mode.ignoreTargetTableSchema() - ? Optional.empty() - : newJdbcSchemaFromTableIfExists(con, task.getActualTable()); - if (initialTargetTableSchema.isPresent()) { - existingJdbcSchema = initialTargetTableSchema.get(); - } else if (task.getIntermediateTables().isPresent() - && !task.getIntermediateTables().get().isEmpty()) { - TableIdentifier firstItermTable = task.getIntermediateTables().get().get(0); - existingJdbcSchema = newJdbcSchemaFromTableIfExists(con, firstItermTable).get(); - } else { - existingJdbcSchema = newJdbcSchemaFromTableIfExists(con, task.getActualTable()).get(); - } - return new SnowflakeMatchByColumnNameCopyBatchInsert( + return new SnowflakeCopyBatchInsert( getConnector(task, true), this.stageIdentifier, + pluginTask.getInsertColumnNames(), + pluginTask.getInsertCSVColumnNums(), false, pluginTask.getMaxUploadRetries(), - pluginTask.getEmtpyFieldAsNull(), - existingJdbcSchema); + pluginTask.getEmtpyFieldAsNull()); } @Override diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java index 18f8155..8bf90a1 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java @@ -40,9 +40,15 @@ public class SnowflakeCopyBatchInsert implements BatchInsert { private List> uploadAndCopyFutures; private boolean emptyFieldAsNull; + private String[] insertColumnNames; + + private int[] insertCSVColumnNums; + public SnowflakeCopyBatchInsert( JdbcOutputConnector connector, StageIdentifier stageIdentifier, + String[] insertColumnNames, + int[] insertCSVColumnNums, boolean deleteStageFile, int maxUploadRetries, boolean emptyFieldAsNull) @@ -51,6 +57,8 @@ public SnowflakeCopyBatchInsert( openNewFile(); this.connector = connector; this.stageIdentifier = stageIdentifier; + this.insertColumnNames = insertColumnNames; + this.insertCSVColumnNums = insertCSVColumnNums; this.executorService = Executors.newCachedThreadPool(); this.deleteStageFile = deleteStageFile; this.uploadAndCopyFutures = new ArrayList(); @@ -418,6 +426,8 @@ public Void call() throws SQLException, InterruptedException, ExecutionException tableIdentifier, stageIdentifier, snowflakeStageFileName, + insertColumnNames, + insertCSVColumnNums, delimiterString, emptyFieldAsNull); diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java b/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java deleted file mode 100644 index fc0b827..0000000 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeMatchByColumnNameCopyBatchInsert.java +++ /dev/null @@ -1,160 +0,0 @@ -package org.embulk.output.snowflake; - -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.SQLException; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import org.embulk.output.jdbc.JdbcColumn; -import org.embulk.output.jdbc.JdbcOutputConnector; -import org.embulk.output.jdbc.JdbcSchema; -import org.embulk.output.jdbc.TableIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SnowflakeMatchByColumnNameCopyBatchInsert extends SnowflakeCopyBatchInsert { - private final Logger logger = - LoggerFactory.getLogger(SnowflakeMatchByColumnNameCopyBatchInsert.class); - - private final JdbcSchema existingJdbcSchema; - - private final List indexes = new ArrayList<>(); - - private final List> setColumnRunnables = new ArrayList<>(); - - public SnowflakeMatchByColumnNameCopyBatchInsert( - JdbcOutputConnector connector, - StageIdentifier stageIdentifier, - boolean deleteStageFile, - int maxUploadRetries, - boolean emptyFieldAsNull, - JdbcSchema existingJdbcSchema) - throws IOException { - super(connector, stageIdentifier, deleteStageFile, maxUploadRetries, emptyFieldAsNull); - this.existingJdbcSchema = existingJdbcSchema; - } - - @Override - public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException { - super.prepare(loadTable, insertSchema); - prepareIndexes(insertSchema); - } - - private void prepareIndexes(JdbcSchema insertSchema) { - for (int i = 0; i < existingJdbcSchema.getCount(); i++) { - JdbcColumn existingColumn = existingJdbcSchema.getColumn(i); - String existingColumnName = existingColumn.getName(); - for (int j = 0; j < insertSchema.getCount(); j++) { - JdbcColumn insertColumn = insertSchema.getColumn(j); - String insertColumnName = insertColumn.getName(); - if (insertColumnName.equalsIgnoreCase(existingColumnName)) { - indexes.add(j); - break; - } - } - } - if (indexes.size() != insertSchema.getCount()) { - throw new UnsupportedOperationException( - "Input column names does not match output column names."); - } - } - - @Override - public void add() throws IOException { - indexes.forEach(i -> setColumnRunnables.get(i).run()); - setColumnRunnables.clear(); - super.add(); - } - - @Override - public void setNull(int sqlType) throws IOException { - setColumnRunnables.add(() -> super.setNull(sqlType)); - } - - @Override - public void setBoolean(boolean v) throws IOException { - setColumnRunnables.add(() -> super.setBoolean(v)); - } - - @Override - public void setByte(byte v) throws IOException { - setColumnRunnables.add(() -> super.setByte(v)); - } - - @Override - public void setShort(short v) throws IOException { - setColumnRunnables.add(() -> super.setShort(v)); - } - - @Override - public void setInt(int v) throws IOException { - setColumnRunnables.add(() -> super.setInt(v)); - } - - @Override - public void setLong(long v) throws IOException { - setColumnRunnables.add(() -> super.setLong(v)); - } - - @Override - public void setFloat(float v) throws IOException { - setColumnRunnables.add(() -> super.setFloat(v)); - } - - @Override - public void setDouble(double v) throws IOException { - setColumnRunnables.add(() -> super.setDouble(v)); - } - - @Override - public void setBigDecimal(BigDecimal v) throws IOException { - setColumnRunnables.add(() -> super.setBigDecimal(v)); - } - - @Override - public void setString(String v) throws IOException { - setColumnRunnables.add(() -> super.setString(v)); - } - - @Override - public void setNString(String v) throws IOException { - setColumnRunnables.add(() -> super.setNString(v)); - } - - @Override - public void setBytes(byte[] v) throws IOException { - setColumnRunnables.add(() -> super.setBytes(v)); - } - - @Override - public void setSqlDate(Instant v, Calendar cal) throws IOException { - setColumnRunnables.add(() -> super.setSqlDate(v, cal)); - } - - @Override - public void setSqlTime(Instant v, Calendar cal) throws IOException { - setColumnRunnables.add(() -> super.setSqlTime(v, cal)); - } - - @Override - public void setSqlTimestamp(Instant v, Calendar cal) throws IOException { - setColumnRunnables.add(() -> super.setSqlTimestamp(v, cal)); - } - - @FunctionalInterface - private interface CheckedRunnable extends Runnable { - - @Override - default void run() throws RuntimeException { - try { - runThrows(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - void runThrows() throws E; - } -} diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java index 5bc161b..a0caeb0 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java @@ -21,11 +21,24 @@ public void runCopy( TableIdentifier tableIdentifier, StageIdentifier stageIdentifier, String filename, + String[] insertColumnNames, + int[] csvSelectedColumnNums, String delimiterString, boolean emptyFieldAsNull) throws SQLException { String sql = - buildCopySQL(tableIdentifier, stageIdentifier, filename, delimiterString, emptyFieldAsNull); + insertColumnNames != null && insertColumnNames.length > 0 + ? buildCopySQL( + tableIdentifier, + stageIdentifier, + filename, + insertColumnNames, + csvSelectedColumnNums, + delimiterString, + emptyFieldAsNull) + : buildCopySQL( + tableIdentifier, stageIdentifier, filename, delimiterString, emptyFieldAsNull); + runUpdate(sql); } @@ -190,6 +203,46 @@ protected String buildCopySQL( return sb.toString(); } + protected String buildCopySQL( + TableIdentifier tableIdentifier, + StageIdentifier stageIdentifier, + String snowflakeStageFileName, + String[] insertColumnNames, + int[] csvSelectedColumnNums, + String delimiterString, + boolean emptyFieldAsNull) { + StringBuilder sb = new StringBuilder(); + sb.append("COPY INTO "); + quoteTableIdentifier(sb, tableIdentifier); + sb.append(" ("); + for (int i = 0; i < insertColumnNames.length; i++) { + if (i != 0) { + sb.append(", "); + } + String column = quoteIdentifierString(insertColumnNames[i]); + sb.append(column); + } + sb.append(" ) FROM ( SELECT "); + for (int i = 0; i < csvSelectedColumnNums.length; i++) { + if (i != 0) { + sb.append(", "); + } + sb.append("t.$"); + sb.append(csvSelectedColumnNums[i]); + } + sb.append(" from "); + quoteInternalStoragePath(sb, stageIdentifier, snowflakeStageFileName); + sb.append(" t ) "); + sb.append(" FILE_FORMAT = ( TYPE = CSV FIELD_DELIMITER = '"); + sb.append(delimiterString); + sb.append("'"); + if (!emptyFieldAsNull) { + sb.append(" EMPTY_FIELD_AS_NULL = FALSE"); + } + sb.append(" );"); + return sb.toString(); + } + protected String buildDeleteStageFileSQL( StageIdentifier stageIdentifier, String snowflakeStageFileName) { StringBuilder sb = new StringBuilder(); From a0a2ab5dfd7c0688b1d9578ff9e00bca164406c1 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 26 Feb 2024 19:42:38 +0900 Subject: [PATCH 3/6] refactoring --- README.md | 2 +- .../embulk/output/SnowflakeOutputPlugin.java | 32 +++++++++---------- .../snowflake/SnowflakeOutputConnection.java | 26 ++++++++------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9903d7a..e9f6f82 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Snowflake output plugin for Embulk loads records to Snowflake. - **merge_keys**: key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) - **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values) - **batch_size**: size of a single batch insert (integer, default: 16777216) -- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_insensitive", "none", default: "none") +- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none") - **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`) - **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column. - **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 33f52ad..7dee9a7 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -73,13 +73,13 @@ public interface SnowflakePluginTask extends PluginTask { @ConfigDefault("\"none\"") public MatchByColumnName getMatchByColumnName(); - public void setInsertColumnNames(String[] insertColumnNames); + public void setCopyIntoTableColumnNames(String[] columnNames); - public String[] getInsertColumnNames(); + public String[] getCopyIntoTableColumnNames(); - public void setInsertCSVColumnNums(int[] insertCSVColumnNums); + public void setCopyIntoCSVColumnNumbers(int[] columnNumbers); - public int[] getInsertCSVColumnNums(); + public int[] getCopyIntoCSVColumnNumbers(); public enum MatchByColumnName { CASE_SENSITIVE, @@ -204,19 +204,19 @@ protected void doBegin( SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; SnowflakePluginTask.MatchByColumnName matchByColumnName = pluginTask.getMatchByColumnName(); if (matchByColumnName == SnowflakePluginTask.MatchByColumnName.NONE) { - pluginTask.setInsertCSVColumnNums(new int[0]); - pluginTask.setInsertColumnNames(new String[0]); + pluginTask.setCopyIntoCSVColumnNumbers(new int[0]); + pluginTask.setCopyIntoTableColumnNames(new String[0]); return; } - List insertColumnNames = new ArrayList<>(); - List insertCSVColumnNums = new ArrayList<>(); + List copyIntoTableColumnNames = new ArrayList<>(); + List copyIntoCSVColumnNumbers = new ArrayList<>(); JdbcSchema targetTableSchema = pluginTask.getTargetTableSchema(); BiFunction compare = matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_SENSITIVE ? String::equals : String::equalsIgnoreCase; - int columnNum = 1; + int columnNumber = 1; for (int i = 0; i < targetTableSchema.getCount(); i++) { JdbcColumn targetColumn = targetTableSchema.getColumn(i); if (targetColumn.isSkipColumn()) { @@ -224,13 +224,13 @@ protected void doBegin( } Column schemaColumn = schema.getColumn(i); if (compare.apply(schemaColumn.getName(), targetColumn.getName())) { - insertColumnNames.add(targetColumn.getName()); - insertCSVColumnNums.add(columnNum); + copyIntoTableColumnNames.add(targetColumn.getName()); + copyIntoCSVColumnNumbers.add(columnNumber); } - columnNum += 1; + columnNumber += 1; } - pluginTask.setInsertColumnNames(insertColumnNames.toArray(new String[0])); - pluginTask.setInsertCSVColumnNums(insertCSVColumnNums.stream().mapToInt(i -> i).toArray()); + pluginTask.setCopyIntoTableColumnNames(copyIntoTableColumnNames.toArray(new String[0])); + pluginTask.setCopyIntoCSVColumnNumbers(copyIntoCSVColumnNumbers.stream().mapToInt(i -> i).toArray()); if (task.getMergeKeys().isPresent() && matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_INSENSITIVE) { @@ -264,8 +264,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg return new SnowflakeCopyBatchInsert( getConnector(task, true), this.stageIdentifier, - pluginTask.getInsertColumnNames(), - pluginTask.getInsertCSVColumnNums(), + pluginTask.getCopyIntoTableColumnNames(), + pluginTask.getCopyIntoCSVColumnNumbers(), false, pluginTask.getMaxUploadRetries(), pluginTask.getEmtpyFieldAsNull()); diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java index a0caeb0..259329c 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java @@ -21,19 +21,19 @@ public void runCopy( TableIdentifier tableIdentifier, StageIdentifier stageIdentifier, String filename, - String[] insertColumnNames, - int[] csvSelectedColumnNums, + String[] tableColumnNames, + int[] csvColumnNumbers, String delimiterString, boolean emptyFieldAsNull) throws SQLException { String sql = - insertColumnNames != null && insertColumnNames.length > 0 + tableColumnNames != null && tableColumnNames.length > 0 ? buildCopySQL( tableIdentifier, stageIdentifier, filename, - insertColumnNames, - csvSelectedColumnNums, + tableColumnNames, + csvColumnNumbers, delimiterString, emptyFieldAsNull) : buildCopySQL( @@ -207,28 +207,32 @@ protected String buildCopySQL( TableIdentifier tableIdentifier, StageIdentifier stageIdentifier, String snowflakeStageFileName, - String[] insertColumnNames, - int[] csvSelectedColumnNums, + String[] tableColumnNames, + int[] csvColumnNumbers, String delimiterString, boolean emptyFieldAsNull) { + // Data load with transformation + // Correspondence between CSV column numbers and table column names can be specified. + // https://docs.snowflake.com/ja/sql-reference/sql/copy-into-table + StringBuilder sb = new StringBuilder(); sb.append("COPY INTO "); quoteTableIdentifier(sb, tableIdentifier); sb.append(" ("); - for (int i = 0; i < insertColumnNames.length; i++) { + for (int i = 0; i < tableColumnNames.length; i++) { if (i != 0) { sb.append(", "); } - String column = quoteIdentifierString(insertColumnNames[i]); + String column = quoteIdentifierString(tableColumnNames[i]); sb.append(column); } sb.append(" ) FROM ( SELECT "); - for (int i = 0; i < csvSelectedColumnNums.length; i++) { + for (int i = 0; i < csvColumnNumbers.length; i++) { if (i != 0) { sb.append(", "); } sb.append("t.$"); - sb.append(csvSelectedColumnNums[i]); + sb.append(csvColumnNumbers[i]); } sb.append(" from "); quoteInternalStoragePath(sb, stageIdentifier, snowflakeStageFileName); From 1f84297c495e1f42311f74a2e1697e828d39a728 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 26 Feb 2024 19:43:28 +0900 Subject: [PATCH 4/6] remove mergeKey case_insensitive --- .../java/org/embulk/output/SnowflakeOutputPlugin.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 7dee9a7..8e4a531 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -231,16 +231,6 @@ protected void doBegin( } pluginTask.setCopyIntoTableColumnNames(copyIntoTableColumnNames.toArray(new String[0])); pluginTask.setCopyIntoCSVColumnNumbers(copyIntoCSVColumnNumbers.stream().mapToInt(i -> i).toArray()); - - if (task.getMergeKeys().isPresent() - && matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_INSENSITIVE) { - List mergeKeys = new ArrayList<>(); - for (String mergeKey : task.getMergeKeys().get()) { - JdbcColumn targetColumn = targetTableSchema.findColumn(mergeKey).get(); - mergeKeys.add(targetColumn.getName()); - } - task.setMergeKeys(Optional.of(mergeKeys)); - } } @Override From d552f673de028897a32f2e3988f3c746069fc390 Mon Sep 17 00:00:00 2001 From: chikamura Date: Mon, 26 Feb 2024 19:45:27 +0900 Subject: [PATCH 5/6] spotlessApply --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 8e4a531..a042826 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -230,7 +230,8 @@ protected void doBegin( columnNumber += 1; } pluginTask.setCopyIntoTableColumnNames(copyIntoTableColumnNames.toArray(new String[0])); - pluginTask.setCopyIntoCSVColumnNumbers(copyIntoCSVColumnNumbers.stream().mapToInt(i -> i).toArray()); + pluginTask.setCopyIntoCSVColumnNumbers( + copyIntoCSVColumnNumbers.stream().mapToInt(i -> i).toArray()); } @Override From 2d4b9bffbb6e42c0b5baf467d364edd50afbbf7c Mon Sep 17 00:00:00 2001 From: chikamura Date: Tue, 27 Feb 2024 10:22:23 +0900 Subject: [PATCH 6/6] refactoring --- .../snowflake/SnowflakeCopyBatchInsert.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java index 8bf90a1..3d0e8a8 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java @@ -40,15 +40,15 @@ public class SnowflakeCopyBatchInsert implements BatchInsert { private List> uploadAndCopyFutures; private boolean emptyFieldAsNull; - private String[] insertColumnNames; + private String[] copyIntoTableColumnNames; - private int[] insertCSVColumnNums; + private int[] copyIntoCSVColumnNumbers; public SnowflakeCopyBatchInsert( JdbcOutputConnector connector, StageIdentifier stageIdentifier, - String[] insertColumnNames, - int[] insertCSVColumnNums, + String[] copyIntoTableColumnNames, + int[] copyIntoCSVColumnNumbers, boolean deleteStageFile, int maxUploadRetries, boolean emptyFieldAsNull) @@ -57,8 +57,8 @@ public SnowflakeCopyBatchInsert( openNewFile(); this.connector = connector; this.stageIdentifier = stageIdentifier; - this.insertColumnNames = insertColumnNames; - this.insertCSVColumnNums = insertCSVColumnNums; + this.copyIntoTableColumnNames = copyIntoTableColumnNames; + this.copyIntoCSVColumnNumbers = copyIntoCSVColumnNumbers; this.executorService = Executors.newCachedThreadPool(); this.deleteStageFile = deleteStageFile; this.uploadAndCopyFutures = new ArrayList(); @@ -426,8 +426,8 @@ public Void call() throws SQLException, InterruptedException, ExecutionException tableIdentifier, stageIdentifier, snowflakeStageFileName, - insertColumnNames, - insertCSVColumnNums, + copyIntoTableColumnNames, + copyIntoCSVColumnNumbers, delimiterString, emptyFieldAsNull);