Skip to content

Commit

Permalink
Merge pull request #72 from trocco-io/19356-drop-snowflake-stage
Browse files Browse the repository at this point in the history
Delete tmp stage even on error and Add delete_stage_on_error option
  • Loading branch information
d-hrs authored Mar 11, 2024
2 parents 60f66fa + dd8d24a commit 9cfaa02
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 29 deletions.
64 changes: 36 additions & 28 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskSource;
import org.embulk.output.jdbc.*;
import org.embulk.output.snowflake.PrivateKeyReader;
Expand All @@ -24,8 +25,6 @@
import org.embulk.util.config.ConfigDefault;

public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin {
private StageIdentifier stageIdentifier;

public interface SnowflakePluginTask extends PluginTask {
@Config("driver_path")
@ConfigDefault("null")
Expand Down Expand Up @@ -75,6 +74,10 @@ public interface SnowflakePluginTask extends PluginTask {
@Config("empty_field_as_null")
@ConfigDefault("true")
public boolean getEmtpyFieldAsNull();

@Config("delete_stage_on_error")
@ConfigDefault("false")
public boolean getDeleteStageOnError();
}

@Override
Expand Down Expand Up @@ -144,25 +147,39 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
}

@Override
public ConfigDiff resume(
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
}

@Override
protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount)
throws SQLException {
super.doCommit(con, task, taskCount);
SnowflakeOutputConnection snowflakeCon = (SnowflakeOutputConnection) con;

public ConfigDiff transaction(
ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) {
PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass());
SnowflakePluginTask t = (SnowflakePluginTask) task;
if (this.stageIdentifier == null) {
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
StageIdentifier stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
ConfigDiff configDiff;
SnowflakeOutputConnection snowflakeCon = null;

try {
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
snowflakeCon.runCreateStage(stageIdentifier);
configDiff = super.transaction(config, schema, taskCount, control);
if (t.getDeleteStage()) {
snowflakeCon.runDropStage(stageIdentifier);
}
} catch (Exception e) {
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
try {
snowflakeCon.runDropStage(stageIdentifier);
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
throw new RuntimeException(e);
}

if (t.getDeleteStage()) {
snowflakeCon.runDropStage(this.stageIdentifier);
}
return configDiff;
}

@Override
public ConfigDiff resume(
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
throw new UnsupportedOperationException("snowflake output plugin does not support resuming");
}

@Override
Expand All @@ -179,20 +196,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
throw new UnsupportedOperationException(
"Snowflake output plugin doesn't support 'merge_direct' mode.");
}

SnowflakePluginTask t = (SnowflakePluginTask) task;
// TODO: put some where executes once
if (this.stageIdentifier == null) {
SnowflakeOutputConnection snowflakeCon =
(SnowflakeOutputConnection) getConnector(task, true).connect(true);
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
snowflakeCon.runCreateStage(this.stageIdentifier);
}
SnowflakePluginTask pluginTask = (SnowflakePluginTask) task;

return new SnowflakeCopyBatchInsert(
getConnector(task, true),
this.stageIdentifier,
StageIdentifierHolder.getStageIdentifier(pluginTask),
false,
pluginTask.getMaxUploadRetries(),
pluginTask.getEmtpyFieldAsNull());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public SnowflakeCopyBatchInsert(
@Override
public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException {
this.connection = (SnowflakeOutputConnection) connector.connect(true);
this.connection.runCreateStage(stageIdentifier);
this.tableIdentifier = loadTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
import org.embulk.output.jdbc.JdbcSchema;
import org.embulk.output.jdbc.MergeConfig;
import org.embulk.output.jdbc.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeOutputConnection extends JdbcOutputConnection {
private final Logger logger = LoggerFactory.getLogger(SnowflakeOutputConnection.class);

public SnowflakeOutputConnection(Connection connection) throws SQLException {
super(connection, null);
}
Expand All @@ -32,11 +36,13 @@ public void runCopy(
public void runCreateStage(StageIdentifier stageIdentifier) throws SQLException {
String sql = buildCreateStageSQL(stageIdentifier);
runUpdate(sql);
logger.info("SQL: {}", sql);
}

public void runDropStage(StageIdentifier stageIdentifier) throws SQLException {
String sql = buildDropStageSQL(stageIdentifier);
runUpdate(sql);
logger.info("SQL: {}", sql);
}

public void runUploadFile(
Expand Down

0 comments on commit 9cfaa02

Please sign in to comment.