Skip to content

Commit

Permalink
Versioned sc structs
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hannum <[email protected]>
  • Loading branch information
markhannum committed Jan 25, 2025
1 parent 24f6a0f commit 0d9c40a
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 15 deletions.
34 changes: 25 additions & 9 deletions bdb/llmeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <sys/poll.h>
#include "debug_switches.h"
#include "alias.h"
#include "sc_version.h"

extern int gbl_maxretries;
extern int gbl_disable_access_controls;
extern int get_csc2_version_tran(const char *table, tran_type *tran);
Expand Down Expand Up @@ -118,7 +120,7 @@ typedef enum {
data = no data
does db use authentication? */
,
LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */
LLMETA_ACCESSCONTROL_TABLExNODE = 19 /* XXX Deprecated */

,
LLMETA_SQLITE_STAT1_PREV_DONT_USE = 20 /* store previous sqlite-stat1 records- dont use this. */
Expand Down Expand Up @@ -176,7 +178,8 @@ typedef enum {
LLMETA_LUA_SFUNC_FLAG = 54,
LLMETA_NEWSC_REDO_GENID = 55, /* 55 + TABLENAME + GENID -> MAX-LSN */
LLMETA_SCHEMACHANGE_STATUS_V2 = 56,
LLMETA_SCHEMACHANGE_LIST = 57, /* list of all sc-s in a uuid txh */
LLMETA_SCHEMACHANGE_STATUS_VERSIONED = 57,
LLMETA_SCHEMACHANGE_LIST = 58, /* list of all sc-s in a uuid txh */
} llmetakey_t;

struct llmeta_file_type_key {
Expand Down Expand Up @@ -11336,30 +11339,43 @@ int bdb_del_view(tran_type *t, const char *view_name)
coincide with the first 4 bytes of the rqid (fastseed) stored as the first
member in old (7.0's) LLMETA_SCHEMACHANGE_STATUS payload.
*/
static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end)
static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end, int *version)
{
int first = 0;

if (p_buf >= p_buf_end) return -1;
if (p_buf >= p_buf_end)
return -1;

buf_get(&first, sizeof(first), p_buf, p_buf_end);
p_buf = (void *)buf_get(&first, sizeof(first), p_buf, p_buf_end);

if (first == SC_VERSIONED) {
buf_get(version, sizeof(int), p_buf, p_buf_end);
return LLMETA_SCHEMACHANGE_STATUS_VERSIONED;
}
if (first > SC_INVALID && first < SC_LAST) {
return LLMETA_SCHEMACHANGE_STATUS_V2;
}
return LLMETA_SCHEMACHANGE_STATUS;
}

void *buf_get_schemachange(struct schema_change_type *s, void *p_buf,
void *p_buf_end)
void *buf_get_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end)
{
int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end);
int version = 0;
int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end, &version);

switch (sc_key_type) {
case LLMETA_SCHEMACHANGE_STATUS_VERSIONED:
/* Increment & delete code to deprecate old versions */
if (version < 2 || version > SC_VERSION) {
logmsg(LOGMSG_ERROR, "%s: unknown sc-version %d\n", __func__, version);
return NULL;
}
p_buf = p_buf + sizeof(int) + sizeof(int);
return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, version);
case LLMETA_SCHEMACHANGE_STATUS:
return buf_get_schemachange_v1(s, (void *)p_buf, (void *)p_buf_end);
case LLMETA_SCHEMACHANGE_STATUS_V2:
return buf_get_schemachange_v2(s, (void *)p_buf, (void *)p_buf_end);
return buf_get_schemachange_versioned(s, (void *)p_buf, (void *)p_buf_end, 2);
default:
break;
}
Expand Down
2 changes: 2 additions & 0 deletions db/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ static char *legacy_options[] = {
"setattr max_sql_idle_time 864000",
"retrieve_gen_from_ckp 0",
"recovery_ckp 0",
"sc_versioned 0",
"sc_current_version 2",
};
// clang-format on

Expand Down
2 changes: 2 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ extern int gbl_memp_dump_cache_threshold;
extern int gbl_disable_ckp;
extern int gbl_abort_on_illegal_log_put;
extern int gbl_sc_close_txn;
extern int gbl_sc_versioned;
extern int gbl_sc_current_version;
extern int gbl_master_sends_query_effects;
extern int gbl_create_dba_user;
extern int gbl_lock_dba_user;
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2385,6 +2385,10 @@ REGISTER_TUNABLE("wal_osync", "Open WAL files using the O_SYNC flag (Default: of
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_headroom", "Percentage threshold for low headroom calculation. (Default: 10)", TUNABLE_DOUBLE,
&gbl_sc_headroom, INTERNAL | SIGNED, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_versioned", "Enable versioned schema changes (Default: on)", TUNABLE_BOOLEAN, &gbl_sc_versioned, 0,
NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("sc_current_version", "Current schema-change version (Default: 2)", TUNABLE_INTEGER,
&gbl_sc_current_version, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("fdb_incoherence_percentage", "Generate random incoherent errors in remsql", TUNABLE_INTEGER,
&gbl_fdb_incoherence_percentage, INTERNAL, NULL, percent_verify, NULL, NULL);
REGISTER_TUNABLE("fdb_socket_timeout_ms", "Timeout ms for fdb communications. (Default: 10000)", TUNABLE_INTEGER,
Expand Down
46 changes: 42 additions & 4 deletions schemachange/sc_struct.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "schemachange.h"
#include "sc_struct.h"
#include "sc_version.h"
#include "logmsg.h"
#include "sc_csc2.h"
#include "sc_schema.h"
Expand Down Expand Up @@ -132,6 +133,10 @@ static size_t _partition_packed_size(struct comdb2_partition *p)
}
}

/* To change, increment SC_VERSION & set legacy-defaults to old version */
int gbl_sc_current_version = SC_VERSION;
int gbl_sc_versioned = 1;

size_t schemachange_packed_size(struct schema_change_type *s)
{
s->tablename_len = strlen(s->tablename) + 1;
Expand Down Expand Up @@ -161,6 +166,20 @@ size_t schemachange_packed_size(struct schema_change_type *s)
sizeof(s->usedbtablevers) + sizeof(s->qdb_file_ver) +
_partition_packed_size(&s->partition);

if (gbl_sc_versioned) {
s->packed_len += 8;
/*
int sc_current_version = gbl_sc_current_version;
if (sc_current_version > 2) {
s->packed_len += XXXX;
}
if (sc_current_version > 3) {
s->packed_len += XXXX;
}
*/
}

return s->packed_len;
}

Expand All @@ -186,8 +205,20 @@ static void *buf_put_dests(struct schema_change_type *s, void *p_buf,

void *buf_put_schemachange(struct schema_change_type *s, void *p_buf, void *p_buf_end)
{
int version = gbl_sc_current_version;
if (p_buf >= p_buf_end)
return NULL;

if (p_buf >= p_buf_end) return NULL;
if (gbl_sc_versioned) {
int neg1 = -1, vers = version;
p_buf = buf_put(&neg1, sizeof(neg1), p_buf, p_buf_end);
p_buf = buf_put(&vers, sizeof(vers), p_buf, p_buf_end);
}
/*
* if (version > 2) {
* p_buf = buf_put(&s->new_field, sizeof(s->new_field), p_buf, p_buf_end);
* }
*/

p_buf = buf_put(&s->kind, sizeof(s->kind), p_buf, p_buf_end);

Expand Down Expand Up @@ -557,11 +588,18 @@ void *buf_get_schemachange_v1(struct schema_change_type *s, void *p_buf,
return p_buf;
}

void *buf_get_schemachange_v2(struct schema_change_type *s,
void *p_buf, void *p_buf_end)
void *buf_get_schemachange_versioned(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version)
{

if (p_buf >= p_buf_end) return NULL;
if (p_buf >= p_buf_end)
return NULL;
/*
* if (version > 2) {
* buf_get(&s->new_field, sizeof(s->new_field), p_buf, p_buf_end);
* } else {
* s->new_field = -1;
* }
*/

p_buf = (uint8_t *)buf_get(&s->kind, sizeof(s->kind), p_buf, p_buf_end);

Expand Down
22 changes: 22 additions & 0 deletions schemachange/sc_version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2025 Bloomberg Finance L.P.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef INCLUDE_SC_VERSION_H
#define INCLUDE_SC_VERSION_H

#define SC_VERSION 2

#endif
4 changes: 2 additions & 2 deletions schemachange/schemachange.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct timepart_view;

/* in sync with do_schema_change_if */
enum schema_change_kind {
SC_VERSIONED = -1,
SC_INVALID = 0,
SC_LEGACY_QUEUE = 1,
SC_LEGACY_MORESTRIPE = 2,
Expand Down Expand Up @@ -429,8 +430,7 @@ void *buf_get_schemachange(struct schema_change_type *s, void *p_buf,
void *p_buf_end);
void *buf_get_schemachange_v1(struct schema_change_type *s, void *p_buf,
void *p_buf_end);
void *buf_get_schemachange_v2(struct schema_change_type *s, void *p_buf,
void *p_buf_end);
void *buf_get_schemachange_versioned(struct schema_change_type *s, void *p_buf, void *p_buf_end, int version);
/* This belong into sc_util.h */
int check_sc_ok(struct schema_change_type *s);

Expand Down
2 changes: 2 additions & 0 deletions tests/tunables.test/t00_all_tunables.expected
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@
(name='sc_async', description='Run transactional schema changes asynchronously.', type='BOOLEAN', value='ON', read_only='N')
(name='sc_async_maxthreads', description='Max number of threads for asynchronous schema changes.', type='INTEGER', value='5', read_only='N')
(name='sc_check_lockwaits_sec', description='Frequency of checking lockwaits during schemachange (in seconds).', type='INTEGER', value='1', read_only='N')
(name='sc_current_version', description='Current schema-change version (Default: 2)', type='INTEGER', value='2', read_only='N')
(name='sc_decrease_thrds_on_deadlock', description='Decrease number of schema change threads on deadlock - way to have schema change backoff.', type='BOOLEAN', value='ON', read_only='N')
(name='sc_del_unused_files_threshold', description='', type='INTEGER', value='30000', read_only='Y')
(name='sc_delay_verify_error', description='', type='INTEGER', value='100', read_only='N')
Expand All @@ -876,6 +877,7 @@
(name='sc_resume_watchdog_timer', description='sc_resuming_watchdog timer', type='INTEGER', value='60', read_only='N')
(name='sc_status_max_rows', description='Max number of rows returned in comdb2_sc_status (Default: 1000)', type='INTEGER', value='1000', read_only='N')
(name='sc_use_num_threads', description='Start up to this many threads for parallel rebuilding during schema change. 0 means use one per dtastripe. Setting is capped at dtastripe.', type='INTEGER', value='0', read_only='N')
(name='sc_versioned', description='Enable versioned schema changes (Default: on)', type='BOOLEAN', value='ON', read_only='N')
(name='sc_via_ddl_only', description='If set, we don't do checks needed for comdb2sc.', type='BOOLEAN', value='OFF', read_only='N')
(name='scatterkeys', description='', type='BOOLEAN', value='OFF', read_only='N')
(name='scconvert_finish_delay', description='Delay returning from convert_record when a stripe finishes. This would create a scenario where scgenids are on the right of any new genids to reproduce a vutf8 schema change bug. ', type='BOOLEAN', value='OFF', read_only='N')
Expand Down

0 comments on commit 0d9c40a

Please sign in to comment.