diff --git a/README.md b/README.md index 5a48651..33ec2e9 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ It consists of a single binary that seamlessly connects to a Postgres database, - [Configuration](#configuration) - [Local disk storage](#local-disk-storage) - [S3 block storage](#s3-block-storage) + - [Periodic data sync](#periodic-data-sync) - [Architecture](#architecture) - [Benchmark](#benchmark) - [Data type mapping](#data-type-mapping) @@ -53,21 +54,10 @@ Sync data from a Postgres database: ./bemidb --pg-database-url postgres://postgres:postgres@localhost:5432/dbname sync ``` -Sync data periodically from a Postgres database: -```sh -./bemidb --pg-database-url postgres://postgres:postgres@localhost:5432/dbname --interval 1h sync -``` -_This will sync the data every hour._ - -Alternatively, you can set the interval using an environment variable. Add the following line to your `.env` file: -```env -PG_SYNC_INTERVAL=1h -``` - Run BemiDB database: ```sh -bemidb start +./bemidb start ``` Run Postgres queries on top of the BemiDB database: @@ -88,7 +78,7 @@ By default, BemiDB stores data on the local disk. Here is an example of running BemiDB with default settings and storing data in a local `iceberg` directory: ```sh -bemidb \ +./bemidb \ --port 54321 \ --database bemidb \ --storage-type LOCAL \ @@ -98,12 +88,26 @@ bemidb \ start ``` +To run BemiDB with environment variables: + +```sh +# Default settings +export BEMIDB_PORT=54321 +export BEMIDB_DATABASE=bemidb +export BEMIDB_STORAGE_TYPE=LOCAL +export BEMIDB_ICEBERG_PATH=./iceberg +export BEMIDB_INIT_SQL=./init.sql +export BEMIDB_LOG_LEVEL=INFO + +./bemidb start +``` + ### S3 block storage BemiDB natively supports S3 storage. You can specify the S3 settings using the following flags: ```sh -bemidb \ +./bemidb \ --port 54321 \ --database bemidb \ --storage-type AWS_S3 \ @@ -115,6 +119,22 @@ bemidb \ start ``` +To run BemiDB with environment variables: + +```sh +export BEMIDB_PORT=54321 +export BEMIDB_DATABASE=bemidb +export BEMIDB_STORAGE_TYPE=AWS_S3 +export BEMIDB_ICEBERG_PATH=iceberg +export BEMIDB_AWS_REGION=[AWS_REGION] +export BEMIDB_AWS_S3_BUCKET=[AWS_S3_BUCKET] +export BEMIDB_AWS_ACCESS_KEY_ID=[AWS_ACCESS_KEY_ID] +export BEMIDB_AWS_SECRET_ACCESS_KEY=[AWS_SECRET_ACCESS_KEY] + +./bemidb start +``` + + Here is the minimal IAM policy required for BemiDB to work with S3: ```json @@ -139,6 +159,25 @@ Here is the minimal IAM policy required for BemiDB to work with S3: } ``` +### Periodic data sync + +Sync data periodically from a Postgres database: + +```sh +./bemidb --pg-database-url postgres://postgres:postgres@localhost:5432/dbname --interval 1h sync +``` + +Alternatively, you can set the interval using environment variables: + +```sh +export PG_DATABASE_URL=postgres://postgres:postgres@localhost:5432/dbname +export PG_SYNC_INTERVAL=1h + +./bemidb sync +``` + +Note that incremental real-time replication is not supported yet (WIP). Please see the [Future roadmap](#future-roadmap). + ## Architecture BemiDB consists of the following main components: @@ -171,31 +210,32 @@ See the [benchmark](/benchmark) directory for more details. Primitive data types are mapped as follows: -| PostgreSQL | Parquet | Iceberg | -|---------------|---------------------------------------------------|----------------------------------| -| `char` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `varchar` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `text` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `bpchar` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `int2` | `INT32` | `int` | -| `int4` | `INT32` | `int` | -| `int8` | `INT64` | `long` | -| `float4` | `FLOAT` | `float` | -| `float8` | `FLOAT` | `float` | -| `numeric` | `FIXED_LEN_BYTE_ARRAY` (`DECIMAL`) | `decimal(P, S)` | -| `bool` | `BOOLEAN` | `boolean` | -| `date` | `INT32` (`DATE`) | `date` | -| `time` | `INT64` (`TIME_MICROS` / `TIME_MILLIS`) | `time` | -| `timetz` | `INT64` (`TIME_MICROS` / `TIME_MILLIS`) | `time` | -| `timestamp` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamp` / `timestamp_ns` | -| `timestamptz` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamptz` / `timestamptz_ns` | -| `uuid` | `FIXED_LEN_BYTE_ARRAY` | `uuid` | -| `bytea` | `BYTE_ARRAY` (`UTF8`) | `binary` | -| `interval` | `BYTE_ARRAY` (`INTERVAL`) | `string` | -| `json` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `jsonb` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `tsvector` | `BYTE_ARRAY` (`UTF8`) | `string` | -| `_*` (array) | `REPEATED` `*` | `list` | +| PostgreSQL | Parquet | Iceberg | +|-------------------------|---------------------------------------------------|----------------------------------| +| `bool` | `BOOLEAN` | `boolean` | +| `char` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `varchar` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `text` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `bpchar` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `int2` | `INT32` | `int` | +| `int4` | `INT32` | `int` | +| `int8` | `INT64` | `long` | +| `float4` | `FLOAT` | `float` | +| `float8` | `FLOAT` | `float` | +| `numeric` | `FIXED_LEN_BYTE_ARRAY` (`DECIMAL`) | `decimal(P, S)` | +| `date` | `INT32` (`DATE`) | `date` | +| `time` | `INT64` (`TIME_MICROS` / `TIME_MILLIS`) | `time` | +| `timetz` | `INT64` (`TIME_MICROS` / `TIME_MILLIS`) | `time` | +| `timestamp` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamp` / `timestamp_ns` | +| `timestamptz` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamptz` / `timestamptz_ns` | +| `uuid` | `FIXED_LEN_BYTE_ARRAY` | `uuid` | +| `bytea` | `BYTE_ARRAY` (`UTF8`) | `binary` | +| `interval` | `BYTE_ARRAY` (`INTERVAL`) | `string` | +| `json` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `jsonb` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `tsvector` | `BYTE_ARRAY` (`UTF8`) | `string` | +| `_*` (array) | `REPEATED` `*` | `list` | +| `*` (user-defined type) | `BYTE_ARRAY` (`UTF8`) | `string` | ## Future roadmap diff --git a/src/config.go b/src/config.go index 8f81e95..4c089e2 100644 --- a/src/config.go +++ b/src/config.go @@ -40,7 +40,7 @@ type Config struct { StorageType string PgDatabaseUrl string Aws AwsConfig - Interval string + SyncInterval string } type AwsConfig struct { @@ -91,7 +91,7 @@ func registerFlags() { panic("Invalid storage type " + _config.StorageType + ". Must be one of " + strings.Join(STORAGE_TYPES, ", ")) } - flag.StringVar(&_config.Interval, "interval", os.Getenv(ENV_PG_SYNC_INTERVAL), "Interval between syncs (e.g., 1h, 30m). Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'.") + flag.StringVar(&_config.SyncInterval, "interval", os.Getenv(ENV_PG_SYNC_INTERVAL), "Interval between syncs (e.g., 1h, 30m). Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'.") flag.StringVar(&_config.PgDatabaseUrl, "pg-database-url", os.Getenv(ENV_PG_DATABASE_URL), "PostgreSQL database URL") diff --git a/src/config_test.go b/src/config_test.go index e24d7eb..46f43c9 100644 --- a/src/config_test.go +++ b/src/config_test.go @@ -27,8 +27,8 @@ func TestLoadConfig(t *testing.T) { t.Errorf("Expected storageType to be LOCAL, got %s", config.StorageType) } - if config.Interval != "" { - t.Errorf("Expected interval to be empty, got %s", config.Interval) + if config.SyncInterval != "" { + t.Errorf("Expected interval to be empty, got %s", config.SyncInterval) } }) @@ -62,8 +62,8 @@ func TestLoadConfig(t *testing.T) { t.Errorf("Expected storageType to be local, got %s", config.StorageType) } - if config.Interval != "30m" { - t.Errorf("Expected interval to be 30m, got %s", config.Interval) + if config.SyncInterval != "30m" { + t.Errorf("Expected interval to be 30m, got %s", config.SyncInterval) } }) @@ -123,8 +123,8 @@ func TestLoadConfig(t *testing.T) { t.Errorf("Expected pgDatabaseUrl to be postgres://user:password@localhost:5432/template1, got %s", config.PgDatabaseUrl) } - if config.Interval != "1h" { - t.Errorf("Expected interval to be 1h, got %s", config.Interval) + if config.SyncInterval != "1h" { + t.Errorf("Expected interval to be 1h, got %s", config.SyncInterval) } }) @@ -151,8 +151,8 @@ func TestLoadConfig(t *testing.T) { if config.StorageType != "local" { t.Errorf("Expected storageType to be local, got %s", config.StorageType) } - if config.Interval != "2h30m" { - t.Errorf("Expected interval to be 2h30m, got %s", config.Interval) + if config.SyncInterval != "2h30m" { + t.Errorf("Expected interval to be 2h30m, got %s", config.SyncInterval) } }) } diff --git a/src/main.go b/src/main.go index 87d6df8..57e3d97 100644 --- a/src/main.go +++ b/src/main.go @@ -23,15 +23,15 @@ func main() { case "start": start(config) case "sync": - if config.Interval != "" { - duration, err := time.ParseDuration(config.Interval) + if config.SyncInterval != "" { + duration, err := time.ParseDuration(config.SyncInterval) if err != nil { - panic("Invalid interval format: " + config.Interval) + panic("Invalid interval format: " + config.SyncInterval) } - LogInfo(config, "Starting sync loop with interval:", config.Interval) + LogInfo(config, "Starting sync loop with interval:", config.SyncInterval) for { syncFromPg(config) - LogInfo(config, "Sleeping for", config.Interval) + LogInfo(config, "Sleeping for", config.SyncInterval) time.Sleep(duration) } } else {