Skip to content

Commit

Permalink
Versioned sc structs
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hannum <[email protected]>
  • Loading branch information
markhannum committed Jan 27, 2025
1 parent 2e09d8f commit 96a32da
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 35 deletions.
33 changes: 24 additions & 9 deletions bdb/llmeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <sys/poll.h>
#include "debug_switches.h"
#include "alias.h"
#include "sc_version.h"

extern int gbl_maxretries;
extern int gbl_disable_access_controls;
extern int get_csc2_version_tran(const char *table, tran_type *tran);
Expand Down Expand Up @@ -118,7 +120,7 @@ typedef enum {
data = no data
does db use authentication? */
,
LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */
LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */

,
LLMETA_SQLITE_STAT1_PREV_DONT_USE = 20 /* store previous sqlite-stat1 records- dont use this. */
Expand Down Expand Up @@ -176,7 +178,8 @@ typedef enum {
LLMETA_LUA_SFUNC_FLAG = 54,
LLMETA_NEWSC_REDO_GENID = 55, /* 55 + TABLENAME + GENID -> MAX-LSN */
LLMETA_SCHEMACHANGE_STATUS_V2 = 56,
LLMETA_SCHEMACHANGE_LIST = 57, /* list of all sc-s in a uuid txh */
LLMETA_SCHEMACHANGE_STATUS_VERSIONED = 57,
LLMETA_SCHEMACHANGE_LIST = 58, /* list of all sc-s in a uuid txh */
} llmetakey_t;

struct llmeta_file_type_key {
Expand Down Expand Up @@ -11336,30 +11339,42 @@ int bdb_del_view(tran_type *t, const char *view_name)
coincide with the first 4 bytes of the rqid (fastseed) stored as the first
member in old (7.0's) LLMETA_SCHEMACHANGE_STATUS payload.
*/
static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end)
static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end, int *version)
{
int first = 0;

if (p_buf >= p_buf_end) return -1;
if (p_buf >= p_buf_end)
return -1;

buf_get(&first, sizeof(first), p_buf, p_buf_end);
p_buf = (void *)buf_get(&first, sizeof(first), p_buf, p_buf_end);

if (first == SC_VERSIONED) {
buf_get(version, sizeof(int), p_buf, p_buf_end);
return LLMETA_SCHEMACHANGE_STATUS_VERSIONED;
}
if (first > SC_INVALID && first < SC_LAST) {
return LLMETA_SCHEMACHANGE_STATUS_V2;
}
return LLMETA_SCHEMACHANGE_STATUS;
}

void *buf_get_schemachange(struct schema_change_type *s, void *p_buf,
void *p_buf_end)
void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end)
{
int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end);
int version = 0;
int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end, &version);

switch (sc_key_type) {
case LLMETA_SCHEMACHANGE_STATUS_VERSIONED:
if (version < SC_MIN_VERSION || version > SC_VERSION) {
logmsg(LOGMSG_ERROR, "%s: unknown sc-version %d\n", __func__, version);
return NULL;
}
p_buf = p_buf + sizeof(int) + sizeof(int);
return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, version);
case LLMETA_SCHEMACHANGE_STATUS:
return buf_get_schemachange_v1(s, (void *)p_buf, (void *)p_buf_end);
case LLMETA_SCHEMACHANGE_STATUS_V2:
return buf_get_schemachange_v2(s, (void *)p_buf, (void *)p_buf_end);
return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, 2);
default:
break;
}
Expand Down
2 changes: 2 additions & 0 deletions db/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ static char *legacy_options[] = {
"setattr max_sql_idle_time 864000",
"retrieve_gen_from_ckp 0",
"recovery_ckp 0",
"sc_versioned 0",
"sc_current_version 2",
};
// clang-format on

Expand Down
2 changes: 2 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ extern int gbl_memp_dump_cache_threshold;
extern int gbl_disable_ckp;
extern int gbl_abort_on_illegal_log_put;
extern int gbl_sc_close_txn;
extern int gbl_sc_versioned;
extern int gbl_sc_current_version;
extern int gbl_master_sends_query_effects;
extern int gbl_create_dba_user;
extern int gbl_lock_dba_user;
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2385,6 +2385,10 @@ REGISTER_TUNABLE("wal_osync", "Open WAL files using the O_SYNC flag (Default: of
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_headroom", "Percentage threshold for low headroom calculation. (Default: 10)", TUNABLE_DOUBLE,
&gbl_sc_headroom, INTERNAL | SIGNED, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_versioned", "Enable versioned schema changes (Default: on)", TUNABLE_BOOLEAN, &gbl_sc_versioned, 0,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_current_version", "Current schema-change version (Default: 2)", TUNABLE_INTEGER,
&gbl_sc_current_version, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("fdb_incoherence_percentage", "Generate random incoherent errors in remsql", TUNABLE_INTEGER,
&gbl_fdb_incoherence_percentage, INTERNAL, NULL, percent_verify, NULL, NULL);
REGISTER_TUNABLE("fdb_socket_timeout_ms", "Timeout ms for fdb communications. (Default: 10000)", TUNABLE_INTEGER,
Expand Down
14 changes: 8 additions & 6 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -806,19 +806,22 @@ static uint8_t *osqlcomm_schemachange_type_get(struct schema_change_type *sc,
return tmp_buf;
}

extern int gbl_sc_current_version;

static uint8_t *
osqlcomm_schemachange_rpl_type_put(osql_rpl_t *hd,
struct schema_change_type *sc,
uint8_t *p_buf, uint8_t *p_buf_end)
{
size_t sc_len = schemachange_packed_size(sc);
int sc_version = gbl_sc_current_version;
size_t sc_len = schemachange_packed_size(sc, sc_version);

if (p_buf_end < p_buf ||
OSQLCOMM_RPL_TYPE_LEN + sc_len > (p_buf_end - p_buf))
return NULL;

p_buf = osqlcomm_rpl_type_put(hd, p_buf, p_buf_end);
p_buf = buf_put_schemachange(sc, p_buf, p_buf_end);
p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version);

return p_buf;
}
Expand All @@ -828,14 +831,15 @@ osqlcomm_schemachange_uuid_rpl_type_put(osql_uuid_rpl_t *hd,
struct schema_change_type *sc,
uint8_t *p_buf, uint8_t *p_buf_end)
{
size_t sc_len = schemachange_packed_size(sc);
int sc_version = gbl_sc_current_version;
size_t sc_len = schemachange_packed_size(sc, sc_version);

if (p_buf_end < p_buf ||
OSQLCOMM_UUID_RPL_TYPE_LEN + sc_len > (p_buf_end - p_buf))
return NULL;

p_buf = osqlcomm_uuid_rpl_type_put(hd, p_buf, p_buf_end);
p_buf = buf_put_schemachange(sc, p_buf, p_buf_end);
p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version);

return p_buf;
}
Expand Down Expand Up @@ -8393,8 +8397,6 @@ netinfo_type *osql_get_netinfo(void)
int osql_send_schemachange(osql_target_t *target, unsigned long long rqid,
uuid_t uuid, struct schema_change_type *sc, int type)
{

schemachange_packed_size(sc);
size_t osql_rpl_size =
((rqid == OSQL_RQID_USE_UUID) ? OSQLCOMM_UUID_RPL_TYPE_LEN
: OSQLCOMM_RPL_TYPE_LEN) +
Expand Down
8 changes: 8 additions & 0 deletions db/process_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -2622,6 +2622,14 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
} else if (tokcmp(tok, ltok, "scforceabort") == 0) {
logmsg(LOGMSG_USER, "Forcibly resetting schema change flat\n");
wait_for_sc_to_stop("forceabort", __func__, __LINE__);
} else if (tokcmp(tok, ltok, "sc_version_test") == 0) {
int sc_version_test(void);
int rc = sc_version_test();
if (rc) {
logmsg(LOGMSG_ERROR, "Schema change version test failed\n");
} else {
logmsg(LOGMSG_USER, "Schema change version test passed\n");
}
} else if (tokcmp(tok, ltok, "get_db_dir") == 0) {
logmsg(LOGMSG_USER, "Database Base Directory: %s\n", thedb->basedir);
} else if (tokcmp(tok, ltok, "debug") == 0) {
Expand Down
100 changes: 90 additions & 10 deletions schemachange/sc_struct.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "schemachange.h"
#include "sc_struct.h"
#include "sc_version.h"
#include "logmsg.h"
#include "sc_csc2.h"
#include "sc_schema.h"
Expand Down Expand Up @@ -132,7 +133,10 @@ static size_t _partition_packed_size(struct comdb2_partition *p)
}
}

size_t schemachange_packed_size(struct schema_change_type *s)
int gbl_sc_current_version = SC_VERSION;
int gbl_sc_versioned = 1;

size_t schemachange_packed_size(struct schema_change_type *s, int sc_version)
{
s->tablename_len = strlen(s->tablename) + 1;
s->fname_len = strlen(s->fname) + 1;
Expand Down Expand Up @@ -161,6 +165,14 @@ size_t schemachange_packed_size(struct schema_change_type *s)
sizeof(s->usedbtablevers) + sizeof(s->qdb_file_ver) +
_partition_packed_size(&s->partition);

if (gbl_sc_versioned) {
s->packed_len += 8;

if (sc_version > 2) {
s->packed_len += sizeof(s->version_test);
}
}

return s->packed_len;
}

Expand All @@ -184,10 +196,25 @@ static void *buf_put_dests(struct schema_change_type *s, void *p_buf,
return p_buf;
}

void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end)
void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version)
{
int versioned = gbl_sc_versioned;
if (p_buf >= p_buf_end)
return NULL;

if (p_buf >= p_buf_end) return NULL;
if (versioned) {
if (version < SC_MIN_VERSION || version > SC_VERSION) {
logmsg(LOGMSG_ERROR, "%s: invalid version, %d\n", __func__, version);
return NULL;
}
int neg1 = -1;
p_buf = buf_put(&neg1, sizeof(neg1), p_buf, p_buf_end);
p_buf = buf_put(&version, sizeof(version), p_buf, p_buf_end);
}

if (versioned && version > 2) {
p_buf = buf_put(&s->version_test, sizeof(s->version_test), p_buf, p_buf_end);
}

p_buf = buf_put(&s->kind, sizeof(s->kind), p_buf, p_buf_end);

Expand Down Expand Up @@ -317,6 +344,50 @@ void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_bu
return p_buf;
}

int sc_version_test(void)
{
int fail_count = 0;

for (int i = SC_MIN_VERSION - 1; i <= SC_VERSION + 1; i++) {
struct schema_change_type *s = new_schemachange_type();
s->version_test = i;
int packed_len = schemachange_packed_size(s, i);
uint8_t *buf = malloc(packed_len);
uint8_t *p_buf = buf;
uint8_t *p_buf_end = buf + packed_len;
p_buf = buf_put_schemachange(s, p_buf, p_buf_end, i);
if (!p_buf) {
if (i >= SC_MIN_VERSION && i <= SC_VERSION) {
logmsg(LOGMSG_ERROR, "Failed to pack version %d\n", i);
fail_count++;
}
} else {
struct schema_change_type unpacked = {0};
p_buf = buf;
p_buf = buf_get_schemachange(&unpacked, p_buf, p_buf_end);
if (i == 2) {
if (unpacked.version_test != -1) {
logmsg(LOGMSG_ERROR, "Failed to unpack version %d\n", i);
fail_count++;
}
}
if (i == 3) {
if (unpacked.version_test != 3) {
logmsg(LOGMSG_ERROR, "Failed to unpack version %d\n", i);
fail_count++;
}
}
}
free(buf);
free(s);
}
if (fail_count) {
logmsg(LOGMSG_ERROR, "%s: failed %d tests\n", __func__, fail_count);
}

return fail_count ? -1 : 0;
}

static const void *buf_get_dests(struct schema_change_type *s,
const void *p_buf, void *p_buf_end)
{
Expand Down Expand Up @@ -557,11 +628,18 @@ void *buf_get_schemachange_v1(struct schema_change_type *s, void *p_buf,
return p_buf;
}

void *buf_get_schemachange_v2(struct schema_change_type *s,
void *p_buf, void *p_buf_end)
void *buf_get_schemachange_versioned(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version)
{

if (p_buf >= p_buf_end) return NULL;
if (p_buf >= p_buf_end)
return NULL;

/* version_test is just for testing - I will remove when I use it */
if (version > 2) {
buf_get(&s->version_test, sizeof(s->version_test), p_buf, p_buf_end);
} else {
s->version_test = -1;
}

p_buf = (uint8_t *)buf_get(&s->kind, sizeof(s->kind), p_buf, p_buf_end);

Expand Down Expand Up @@ -752,8 +830,9 @@ int pack_schema_change_type(struct schema_change_type *s, void **packed,
size_t *packed_len)
{

int sc_version = gbl_sc_current_version;
/* compute the length of our buffer */
*packed_len = schemachange_packed_size(s);
*packed_len = schemachange_packed_size(s, sc_version);

/* grab memory for our buffer */
*packed = malloc(*packed_len);
Expand All @@ -770,7 +849,7 @@ int pack_schema_change_type(struct schema_change_type *s, void **packed,
uint8_t *p_buf_end = (p_buf + *packed_len);

/* pack all the data */
p_buf = buf_put_schemachange(s, p_buf, p_buf_end);
p_buf = buf_put_schemachange(s, p_buf, p_buf_end, sc_version);

if (p_buf != (uint8_t *)((char *)(*packed)) + *packed_len) {
logmsg(LOGMSG_ERROR,
Expand Down Expand Up @@ -1198,8 +1277,9 @@ int schema_change_headers(struct schema_change_type *s)
struct schema_change_type *
clone_schemachange_type(struct schema_change_type *sc)
{
int sc_version = gbl_sc_current_version;
struct schema_change_type *newsc;
size_t sc_len = schemachange_packed_size(sc);
size_t sc_len = schemachange_packed_size(sc, sc_version);
uint8_t *p_buf, *p_buf_end, *buf;

p_buf = buf = calloc(1, sc_len);
Expand All @@ -1208,7 +1288,7 @@ clone_schemachange_type(struct schema_change_type *sc)

p_buf_end = p_buf + sc_len;

p_buf = buf_put_schemachange(sc, p_buf, p_buf_end);
p_buf = buf_put_schemachange(sc, p_buf, p_buf_end, sc_version);
if (!p_buf) {
free(buf);
return NULL;
Expand Down
23 changes: 23 additions & 0 deletions schemachange/sc_version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2025 Bloomberg Finance L.P.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef INCLUDE_SC_VERSION_H
#define INCLUDE_SC_VERSION_H

#define SC_MIN_VERSION 2
#define SC_VERSION 3

#endif
Loading

0 comments on commit 96a32da

Please sign in to comment.