Skip to content

Commit

Permalink
Add PG sync intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunlol committed Nov 12, 2024
1 parent 90e705a commit f6e5532
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 134 deletions.
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ BEMIDB_ICEBERG_PATH=../iceberg
# BEMIDB_AWS_SECRET_ACCESS_KEY=[REPLACE_ME]

PG_DATABASE_URL=postgres://username:password@localhost:5432/database_name
# PG_SYNC_INTERVAL=1h
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ Sync data from a Postgres database:
bemidb --pg-database-url postgres://postgres:postgres@localhost:5432/dbname sync
```

Sync data periodically from a Postgres database:
```sh
bemidb --pg-database-url postgres://postgres:postgres@localhost:5432/dbname --interval 1h sync
```

Run BemiDB database:

```sh
Expand Down
132 changes: 0 additions & 132 deletions devbox.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,15 @@
"version": "1.23.1",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/nvaay1c4banbccyvv6ba1gzyqpypjmfq-go-1.23.1",
"default": true
}
],
"store_path": "/nix/store/nvaay1c4banbccyvv6ba1gzyqpypjmfq-go-1.23.1"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/9ylsay11jb3p6yarkmlz0fin76cdypwa-go-1.23.1",
"default": true
}
],
"store_path": "/nix/store/9ylsay11jb3p6yarkmlz0fin76cdypwa-go-1.23.1"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/zkg5xhyx2rs03dq0qp14nqlx9ff1y5c5-go-1.23.1",
"default": true
}
],
"store_path": "/nix/store/zkg5xhyx2rs03dq0qp14nqlx9ff1y5c5-go-1.23.1"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/mi0ybwsm6pmxzv9hsm6bcbqaq1pkf8wh-go-1.23.1",
"default": true
}
],
"store_path": "/nix/store/mi0ybwsm6pmxzv9hsm6bcbqaq1pkf8wh-go-1.23.1"
}
}
Expand All @@ -57,119 +29,15 @@
"version": "16.4",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/6dzxj78wph840cpwslh96s4gpm0iwch2-postgresql-16.4",
"default": true
},
{
"name": "man",
"path": "/nix/store/z1n2vh799a5icpaxbrjfqsasagb276bk-postgresql-16.4-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/afjpl8ilq8s6j6zh4qqyy6mxz3v2xbav-postgresql-16.4-dev"
},
{
"name": "doc",
"path": "/nix/store/ry9d9by692xj92y5b9j6z0aa5y3lh3px-postgresql-16.4-doc"
},
{
"name": "lib",
"path": "/nix/store/d1im42w02x8gl2y380r4hgj8xgkkkbwc-postgresql-16.4-lib"
}
],
"store_path": "/nix/store/6dzxj78wph840cpwslh96s4gpm0iwch2-postgresql-16.4"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/37r0vmsb8xd1kv3wjd99kr59q99ja3g0-postgresql-16.4",
"default": true
},
{
"name": "man",
"path": "/nix/store/mawnv85hv5y64csbmpgrnz88j7r8cby5-postgresql-16.4-man",
"default": true
},
{
"name": "debug",
"path": "/nix/store/71hz4hv1n6ivymbzd0jm3a61cyj9fwh5-postgresql-16.4-debug"
},
{
"name": "dev",
"path": "/nix/store/ibhwvhq4gkdibkfrkqg9vmip9mhhrg2q-postgresql-16.4-dev"
},
{
"name": "doc",
"path": "/nix/store/rmvkab0pxjjjznk350syr3gzpa13dz1k-postgresql-16.4-doc"
},
{
"name": "lib",
"path": "/nix/store/39mnmp40qhpq2h6r3cj66s23sb5fkzr6-postgresql-16.4-lib"
}
],
"store_path": "/nix/store/37r0vmsb8xd1kv3wjd99kr59q99ja3g0-postgresql-16.4"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/vlgydd1rakmw9j14i8dgrlhzj4pa82vi-postgresql-16.4",
"default": true
},
{
"name": "man",
"path": "/nix/store/2wm2caki07a557z97228n2zxrd3a8j4b-postgresql-16.4-man",
"default": true
},
{
"name": "doc",
"path": "/nix/store/r03r96a44grl85sflw6hvwwlrzr32rk9-postgresql-16.4-doc"
},
{
"name": "lib",
"path": "/nix/store/cy3q9y20jwk1vkd6jxf3mnq6xzbb9dn8-postgresql-16.4-lib"
},
{
"name": "dev",
"path": "/nix/store/96nxx00m06jl2jmvb16916l2rpwb13hk-postgresql-16.4-dev"
}
],
"store_path": "/nix/store/vlgydd1rakmw9j14i8dgrlhzj4pa82vi-postgresql-16.4"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/mjjfx6yyaaba5hmv6bga20m8fxrca93l-postgresql-16.4",
"default": true
},
{
"name": "man",
"path": "/nix/store/b8cvsw47h2487y4j805zi0645x3ajh1i-postgresql-16.4-man",
"default": true
},
{
"name": "doc",
"path": "/nix/store/apbxfs52v8im9725mn2f1jhgbdfggrpd-postgresql-16.4-doc"
},
{
"name": "lib",
"path": "/nix/store/32cprs7xwxvb0rw2imfrgy5vcacc27hc-postgresql-16.4-lib"
},
{
"name": "debug",
"path": "/nix/store/alcnsd7fkkr3iipvcn9gzsyv16kab6m9-postgresql-16.4-debug"
},
{
"name": "dev",
"path": "/nix/store/pqya8lq5jyplfmbmafrrwsrsi07d5ssn-postgresql-16.4-dev"
}
],
"store_path": "/nix/store/mjjfx6yyaaba5hmv6bga20m8fxrca93l-postgresql-16.4"
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
ENV_AWS_SECRET_ACCESS_KEY = "BEMIDB_AWS_SECRET_ACCESS_KEY"

ENV_PG_DATABASE_URL = "PG_DATABASE_URL"
ENV_PG_SYNC_INTERVAL = "PG_SYNC_INTERVAL"

DEFAULT_PORT = "54321"
DEFAULT_DATABASE = "bemidb"
Expand All @@ -39,6 +40,7 @@ type Config struct {
StorageType string
PgDatabaseUrl string
Aws AwsConfig
Interval string
}

type AwsConfig struct {
Expand Down Expand Up @@ -89,6 +91,8 @@ func registerFlags() {
panic("Invalid storage type " + _config.StorageType + ". Must be one of " + strings.Join(STORAGE_TYPES, ", "))
}

flag.StringVar(&_config.Interval, "interval", os.Getenv(ENV_PG_SYNC_INTERVAL), "Interval between syncs (e.g., 1h, 30m). Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'.")

flag.StringVar(&_config.PgDatabaseUrl, "pg-database-url", os.Getenv(ENV_PG_DATABASE_URL), "PostgreSQL database URL")

if _config.StorageType == STORAGE_TYPE_AWS_S3 {
Expand Down
19 changes: 18 additions & 1 deletion src/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func TestLoadConfig(t *testing.T) {
if config.StorageType != "LOCAL" {
t.Errorf("Expected storageType to be LOCAL, got %s", config.StorageType)
}

if config.Interval != "" {
t.Errorf("Expected interval to be empty, got %s", config.Interval)
}
})

t.Run("Uses config values from environment variables", func(t *testing.T) {
Expand All @@ -35,6 +39,7 @@ func TestLoadConfig(t *testing.T) {
t.Setenv("BEMIDB_ICEBERG_PATH", "iceberg-path")
t.Setenv("BEMIDB_LOG_LEVEL", "ERROR")
t.Setenv("BEMIDB_STORAGE_TYPE", "LOCAL")
t.Setenv("PG_SYNC_INTERVAL", "30m")

config := LoadConfig(true)

Expand All @@ -56,6 +61,10 @@ func TestLoadConfig(t *testing.T) {
if config.StorageType != "LOCAL" {
t.Errorf("Expected storageType to be local, got %s", config.StorageType)
}

if config.Interval != "30m" {
t.Errorf("Expected interval to be 30m, got %s", config.Interval)
}
})

t.Run("Uses config values from environment variables with AWS S3 storage", func(t *testing.T) {
Expand Down Expand Up @@ -106,16 +115,21 @@ func TestLoadConfig(t *testing.T) {

t.Run("Uses config values from environment variables for PG", func(t *testing.T) {
t.Setenv("PG_DATABASE_URL", "postgres://user:password@localhost:5432/template1")
t.Setenv("PG_SYNC_INTERVAL", "1h")

config := LoadConfig(true)

if config.PgDatabaseUrl != "postgres://user:password@localhost:5432/template1" {
t.Errorf("Expected pgDatabaseUrl to be postgres://user:password@localhost:5432/template1, got %s", config.PgDatabaseUrl)
}

if config.Interval != "1h" {
t.Errorf("Expected interval to be 1h, got %s", config.Interval)
}
})

t.Run("Uses command line arguments", func(t *testing.T) {
setTestArgs([]string{"--port", "12345", "--database", "mydb", "--init-sql", "./init/duckdb.sql", "--iceberg-path", "iceberg-path", "--log-level", "ERROR", "--storage-type", "local"})
setTestArgs([]string{"--port", "12345", "--database", "mydb", "--init-sql", "./init/duckdb.sql", "--iceberg-path", "iceberg-path", "--log-level", "ERROR", "--storage-type", "local", "--interval", "2h30m"})

config := LoadConfig()

Expand All @@ -137,5 +151,8 @@ func TestLoadConfig(t *testing.T) {
if config.StorageType != "local" {
t.Errorf("Expected storageType to be local, got %s", config.StorageType)
}
if config.Interval != "2h30m" {
t.Errorf("Expected interval to be 2h30m, got %s", config.Interval)
}
})
}
16 changes: 15 additions & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"time"
)

const VERSION = "0.4.0"
Expand All @@ -22,7 +23,20 @@ func main() {
case "start":
start(config)
case "sync":
syncFromPg(config)
if config.Interval != "" {
duration, err := time.ParseDuration(config.Interval)
if err != nil {
panic("Invalid interval format: " + config.Interval)
}
LogInfo(config, "Starting sync loop with interval:", config.Interval)
for {
syncFromPg(config)
LogInfo(config, "Sleeping for", config.Interval)
time.Sleep(duration)
}
} else {
syncFromPg(config)
}
case "version":
fmt.Println("BemiDB version:", VERSION)
default:
Expand Down

0 comments on commit f6e5532

Please sign in to comment.