Skip to content

Commit

Permalink
Merge pull request #60 from maxpert/isolate-global
Browse files Browse the repository at this point in the history
Isolating global change log creation
  • Loading branch information
maxpert authored Jul 21, 2023
2 parents e65c5d3 + 4ce5e4f commit 1f8ce3d
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 27 deletions.
46 changes: 46 additions & 0 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ var ErrEndOfWatch = errors.New("watching event finished")

//go:embed table_change_log_script.tmpl
var tableChangeLogScriptTemplate string

//go:embed global_change_log_script.tmpl
var globalChangeLogScriptTemplate string
var tableChangeLogTpl *template.Template
var globalChangeLogTpl *template.Template

var spaceStripper = regexp.MustCompile(`\n\s+`)

Expand All @@ -42,6 +46,10 @@ const (
const changeLogName = "change_log"
const upsertQuery = `INSERT OR REPLACE INTO %s(%s) VALUES (%s)`

type globalChangeLogTemplateData struct {
Prefix string
}

type triggerTemplateData struct {
Prefix string
TableName string
Expand All @@ -65,6 +73,10 @@ func init() {
tableChangeLogTpl = template.Must(
template.New("tableChangeLogScriptTemplate").Parse(tableChangeLogScriptTemplate),
)

globalChangeLogTpl = template.Must(
template.New("globalChangeLogScriptTemplate").Parse(globalChangeLogScriptTemplate),
)
}

func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error {
Expand Down Expand Up @@ -116,6 +128,19 @@ func (conn *SqliteStreamDB) globalMetaTable() string {
return conn.prefix + "_change_log_global"
}

func (conn *SqliteStreamDB) globalCDCScript() (string, error) {
buf := new(bytes.Buffer)
err := globalChangeLogTpl.Execute(buf, &globalChangeLogTemplateData{
Prefix: conn.prefix,
})

if err != nil {
return "", err
}

return spaceStripper.ReplaceAllString(buf.String(), "\n "), nil
}

func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error) {
columns, ok := conn.watchTablesSchema[tableName]
if !ok {
Expand Down Expand Up @@ -180,6 +205,27 @@ func (conn *SqliteStreamDB) getPrimaryKeyMap(event *ChangeLogEvent) map[string]a
return ret
}

func (conn *SqliteStreamDB) initGlobalChangeLog() error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return err
}
defer sqlConn.Return()

script, err := conn.globalCDCScript()
if err != nil {
return err
}

log.Info().Msg("Creating global change log table")
_, err = sqlConn.DB().Exec(script)
if err != nil {
return err
}

return nil
}

func (conn *SqliteStreamDB) initTriggers(tableName string) error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions db/global_change_log_script.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{{$GlobalChangeLogTableName := (printf "%s_change_log_global" .Prefix)}}

CREATE TABLE IF NOT EXISTS {{$GlobalChangeLogTableName}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
change_table_id INTEGER,
table_name TEXT
);
24 changes: 14 additions & 10 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,6 @@ func (conn *SqliteStreamDB) InstallCDC(tables []string) error {
return nil
}

func (conn *SqliteStreamDB) installChangeLogTriggers() error {
for tableName := range conn.watchTablesSchema {
err := conn.initTriggers(tableName)
if err != nil {
return err
}
}
return nil
}

func (conn *SqliteStreamDB) RemoveCDC(tables bool) error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
Expand All @@ -214,6 +204,20 @@ func (conn *SqliteStreamDB) RemoveCDC(tables bool) error {
return nil
}

func (conn *SqliteStreamDB) installChangeLogTriggers() error {
if err := conn.initGlobalChangeLog(); err != nil {
return err
}

for tableName := range conn.watchTablesSchema {
err := conn.initTriggers(tableName)
if err != nil {
return err
}
}
return nil
}

func getTableInfo(tx *goqu.TxDatabase, table string) ([]*ColumnInfo, error) {
query := "SELECT name, type, `notnull`, dflt_value, pk FROM pragma_table_info(?)"
stmt, err := tx.Prepare(query)
Expand Down
6 changes: 0 additions & 6 deletions db/table_change_log_script.tmpl
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
{{$ChangeLogTableName := (printf "%s%s_change_log" .Prefix .TableName)}}
{{$GlobalChangeLogTableName := (printf "%s_change_log_global" .Prefix)}}

CREATE TABLE IF NOT EXISTS {{$GlobalChangeLogTableName}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
change_table_id INTEGER,
table_name TEXT
);

CREATE TABLE IF NOT EXISTS {{$ChangeLogTableName}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{{range $index, $col := .Columns}}
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@ module github.com/maxpert/marmot
go 1.18

require (
github.com/BurntSushi/toml v1.3.1
github.com/BurntSushi/toml v1.3.2
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/denisbrodbeck/machineid v1.0.1
github.com/doug-martin/goqu/v9 v9.18.0
github.com/fsnotify/fsnotify v1.6.0
github.com/fxamacker/cbor/v2 v2.4.0
github.com/klauspost/compress v1.16.5
github.com/google/uuid v1.3.0
github.com/klauspost/compress v1.16.7
github.com/mattn/go-sqlite3 v1.14.17
github.com/minio/minio-go/v7 v7.0.58
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nats.go v1.27.1
github.com/minio/minio-go/v7 v7.0.61
github.com/nats-io/nats-server/v2 v2.9.20
github.com/nats-io/nats.go v1.28.0
github.com/rs/zerolog v1.29.1
github.com/samber/lo v1.38.1
)

require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/lib/pq v1.10.4 // indirect
Expand All @@ -39,11 +39,11 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v1.3.1 h1:rHnDkSK+/g6DlREUK73PkmIs60pqrnuduK+JmP++JmU=
github.com/BurntSushi/toml v1.3.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM=
Expand Down Expand Up @@ -33,6 +35,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand All @@ -55,6 +59,8 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.58 h1:B9/8Az8Om/2kX8Ys2ai2PZbBTokRE5W6P5OaqnAs6po=
github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE=
github.com/minio/minio-go/v7 v7.0.61 h1:87c+x8J3jxQ5VUGimV9oHdpjsAvy3fhneEBKuoKEVUI=
github.com/minio/minio-go/v7 v7.0.61/go.mod h1:BTu8FcrEw+HidY0zd/0eny43QnVNkXRPXrLXFuQBHXg=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand All @@ -66,8 +72,12 @@ github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA=
github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k=
github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo=
github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down Expand Up @@ -97,11 +107,17 @@ golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -114,9 +130,13 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down

0 comments on commit 1f8ce3d

Please sign in to comment.