Skip to content

Commit

Permalink
feat(webhook): webhook waiting for persistency. (#20164)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Feb 14, 2025
1 parent 7ab159f commit 4a4e03a
Show file tree
Hide file tree
Showing 26 changed files with 688 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions e2e_test/webhook/check_1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ select data ->> 'source', data->> 'auth_algo' from segment_hmac_sha1;
segment hmac_sha1

query TT
select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
hubspot sha256_v2
github hmac_sha1
7 changes: 6 additions & 1 deletion e2e_test/webhook/check_2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ query TT
select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
----
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2

query TT
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
github hmac_sha1
7 changes: 6 additions & 1 deletion e2e_test/webhook/check_3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ select data ->> 'source', data->> 'auth_algo' from hubspot_sha256_v2;
----
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2
hubspot sha256_v2

query TT
select data ->> 'source', data->> 'auth_algo' from test_primary_key;
----
github hmac_sha1
13 changes: 13 additions & 0 deletions e2e_test/webhook/create_table.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,16 @@ create table hubspot_sha256_v2 (
, 'UTF8')
), 'hex')
);

statement ok
create table test_primary_key (
data JSONB PRIMARY KEY
) WITH (
connector = 'webhook',
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'x-hub-signature',
'sha1=' || encode(hmac(test_secret, data, 'sha1'), 'hex')
);

statement error Adding/dropping a column of a table with webhook has not been implemented.
ALTER TABLE github_hmac_sha1 ADD COLUMN new_col int;
3 changes: 3 additions & 0 deletions e2e_test/webhook/drop_table.slt.part
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
DROP TABLE test_primary_key;

statement ok
DROP TABLE hubspot_sha256_v2;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/webhook/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ def send_github_hmac_sha1(secret):
send_webhook(url, headers, payload_json)


def send_test_primary_key(secret):
payload = message
payload['source'] = "github"
payload['auth_algo'] = "hmac_sha1"
url = SERVER_URL + "test_primary_key"

payload_json = json.dumps(payload)
signature = generate_signature_hmac(secret, payload_json, 'sha1', "sha1=")
# Webhook message headers
headers = {
"Content-Type": "application/json",
"X-Hub-Signature": signature # Custom signature header
}
send_webhook(url, headers, payload_json)


def send_github_hmac_sha256(secret):
payload = message
payload['source'] = "github"
Expand Down Expand Up @@ -143,3 +159,6 @@ def send_hubspot_sha256_v2(secret):
send_segment_hmac_sha1(secret)
# hubspot
send_hubspot_sha256_v2(secret)

# ensure the single column can still work as normal
send_test_primary_key(secret)
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ message StreamSourceInfo {
message WebhookSourceInfo {
secret.SecretRef secret_ref = 1;
expr.ExprNode signature_expr = 2;
// Return until the data is persisted in the storage layer or not. Default is true.
bool wait_for_persistence = 3;
}

message Source {
Expand Down
31 changes: 31 additions & 0 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ message GetDataResponse {
data.DataChunk record_batch = 2;
}

message FastInsertRequest {
// Id of the table to perform inserting.
uint32 table_id = 1;
// Version of the table.
uint64 table_version_id = 2;
repeated uint32 column_indices = 3;
data.DataChunk data_chunk = 4;

// An optional field and will be `None` for tables without user-defined pk.
// The `BatchInsertExecutor` should add a column with NULL value which will
// be filled in streaming.
optional uint32 row_id_index = 5;

// Use this number to assign the insert req to different worker nodes and dml channels.
uint32 request_id = 6;
bool wait_for_persistence = 7;
// TODO(kexiang): add support for default columns. plan_common.ExprContext expr_context is needed for it.
}

message FastInsertResponse {
enum Status {
UNSPECIFIED = 0;
SUCCEEDED = 1;
DML_FAILED = 2;
}
Status status = 1;
string error_message = 2;
}

message ExecuteRequest {
batch_plan.TaskId task_id = 1;
batch_plan.PlanFragment plan = 2;
Expand All @@ -73,6 +102,8 @@ service TaskService {
// Cancel an already-died (self execution-failure, previous aborted, completed) task will still succeed.
rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse);
rpc Execute(ExecuteRequest) returns (stream GetDataResponse);
// A lightweight version insert, only for non-pgwire insert, such as inserts from webhooks and websockets.
rpc FastInsert(FastInsertRequest) returns (FastInsertResponse);
}

message GetDataRequest {
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository = { workspace = true }

[dependencies]
anyhow = "1"
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
either = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod tests {
assert_eq!(*chunk.columns()[2], array);
});

assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(_));
assert_matches!(reader.next().await.unwrap()?, TxnMsg::End(..));
let epoch = u64::MAX;
let full_range = (Bound::Unbounded, Bound::Unbounded);
let store_content = store
Expand Down
Loading

0 comments on commit 4a4e03a

Please sign in to comment.