diff --git a/db/change_log.go b/db/change_log.go index 011e0d0..b7c61aa 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -28,7 +28,11 @@ var ErrEndOfWatch = errors.New("watching event finished") //go:embed table_change_log_script.tmpl var tableChangeLogScriptTemplate string + +//go:embed global_change_log_script.tmpl +var globalChangeLogScriptTemplate string var tableChangeLogTpl *template.Template +var globalChangeLogTpl *template.Template var spaceStripper = regexp.MustCompile(`\n\s+`) @@ -42,6 +46,10 @@ const ( const changeLogName = "change_log" const upsertQuery = `INSERT OR REPLACE INTO %s(%s) VALUES (%s)` +type globalChangeLogTemplateData struct { + Prefix string +} + type triggerTemplateData struct { Prefix string TableName string @@ -65,6 +73,10 @@ func init() { tableChangeLogTpl = template.Must( template.New("tableChangeLogScriptTemplate").Parse(tableChangeLogScriptTemplate), ) + + globalChangeLogTpl = template.Must( + template.New("globalChangeLogScriptTemplate").Parse(globalChangeLogScriptTemplate), + ) } func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error { @@ -116,6 +128,19 @@ func (conn *SqliteStreamDB) globalMetaTable() string { return conn.prefix + "_change_log_global" } +func (conn *SqliteStreamDB) globalCDCScript() (string, error) { + buf := new(bytes.Buffer) + err := globalChangeLogTpl.Execute(buf, &globalChangeLogTemplateData{ + Prefix: conn.prefix, + }) + + if err != nil { + return "", err + } + + return spaceStripper.ReplaceAllString(buf.String(), "\n "), nil +} + func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error) { columns, ok := conn.watchTablesSchema[tableName] if !ok { @@ -180,6 +205,27 @@ func (conn *SqliteStreamDB) getPrimaryKeyMap(event *ChangeLogEvent) map[string]a return ret } +func (conn *SqliteStreamDB) initGlobalChangeLog() error { + sqlConn, err := conn.pool.Borrow() + if err != nil { + return err + } + defer sqlConn.Return() + + script, err := conn.globalCDCScript() + if err != nil { + return err + } + + log.Info().Msg("Creating global change log table") + _, err = sqlConn.DB().Exec(script) + if err != nil { + return err + } + + return nil +} + func (conn *SqliteStreamDB) initTriggers(tableName string) error { sqlConn, err := conn.pool.Borrow() if err != nil { diff --git a/db/global_change_log_script.tmpl b/db/global_change_log_script.tmpl new file mode 100644 index 0000000..17945e6 --- /dev/null +++ b/db/global_change_log_script.tmpl @@ -0,0 +1,7 @@ +{{$GlobalChangeLogTableName := (printf "%s_change_log_global" .Prefix)}} + +CREATE TABLE IF NOT EXISTS {{$GlobalChangeLogTableName}} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + change_table_id INTEGER, + table_name TEXT +); diff --git a/db/sqlite.go b/db/sqlite.go index 262e717..fc43b9d 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -184,16 +184,6 @@ func (conn *SqliteStreamDB) InstallCDC(tables []string) error { return nil } -func (conn *SqliteStreamDB) installChangeLogTriggers() error { - for tableName := range conn.watchTablesSchema { - err := conn.initTriggers(tableName) - if err != nil { - return err - } - } - return nil -} - func (conn *SqliteStreamDB) RemoveCDC(tables bool) error { sqlConn, err := conn.pool.Borrow() if err != nil { @@ -214,6 +204,20 @@ func (conn *SqliteStreamDB) RemoveCDC(tables bool) error { return nil } +func (conn *SqliteStreamDB) installChangeLogTriggers() error { + if err := conn.initGlobalChangeLog(); err != nil { + return err + } + + for tableName := range conn.watchTablesSchema { + err := conn.initTriggers(tableName) + if err != nil { + return err + } + } + return nil +} + func getTableInfo(tx *goqu.TxDatabase, table string) ([]*ColumnInfo, error) { query := "SELECT name, type, `notnull`, dflt_value, pk FROM pragma_table_info(?)" stmt, err := tx.Prepare(query) diff --git a/db/table_change_log_script.tmpl b/db/table_change_log_script.tmpl index 2abb535..c14ccfc 100644 --- a/db/table_change_log_script.tmpl +++ b/db/table_change_log_script.tmpl @@ -1,12 +1,6 @@ {{$ChangeLogTableName := (printf "%s%s_change_log" .Prefix .TableName)}} {{$GlobalChangeLogTableName := (printf "%s_change_log_global" .Prefix)}} -CREATE TABLE IF NOT EXISTS {{$GlobalChangeLogTableName}} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - change_table_id INTEGER, - table_name TEXT -); - CREATE TABLE IF NOT EXISTS {{$ChangeLogTableName}} ( id INTEGER PRIMARY KEY AUTOINCREMENT, {{range $index, $col := .Columns}} diff --git a/go.mod b/go.mod index 6b7b9e9..ddfe729 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,18 @@ module github.com/maxpert/marmot go 1.18 require ( - github.com/BurntSushi/toml v1.3.1 + github.com/BurntSushi/toml v1.3.2 github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef github.com/denisbrodbeck/machineid v1.0.1 github.com/doug-martin/goqu/v9 v9.18.0 github.com/fsnotify/fsnotify v1.6.0 github.com/fxamacker/cbor/v2 v2.4.0 - github.com/klauspost/compress v1.16.5 + github.com/google/uuid v1.3.0 + github.com/klauspost/compress v1.16.7 github.com/mattn/go-sqlite3 v1.14.17 - github.com/minio/minio-go/v7 v7.0.58 - github.com/nats-io/nats-server/v2 v2.9.19 - github.com/nats-io/nats.go v1.27.1 + github.com/minio/minio-go/v7 v7.0.61 + github.com/nats-io/nats-server/v2 v2.9.20 + github.com/nats-io/nats.go v1.28.0 github.com/rs/zerolog v1.29.1 github.com/samber/lo v1.38.1 ) @@ -21,7 +22,6 @@ require ( require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/lib/pq v1.10.4 // indirect @@ -39,11 +39,11 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/objx v0.1.1 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/crypto v0.9.0 // indirect - golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/crypto v0.11.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect + golang.org/x/net v0.12.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/protobuf v1.26.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 526eb21..befd7e7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v1.3.1 h1:rHnDkSK+/g6DlREUK73PkmIs60pqrnuduK+JmP++JmU= github.com/BurntSushi/toml v1.3.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= @@ -33,6 +35,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -55,6 +59,8 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.58 h1:B9/8Az8Om/2kX8Ys2ai2PZbBTokRE5W6P5OaqnAs6po= github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE= +github.com/minio/minio-go/v7 v7.0.61 h1:87c+x8J3jxQ5VUGimV9oHdpjsAvy3fhneEBKuoKEVUI= +github.com/minio/minio-go/v7 v7.0.61/go.mod h1:BTu8FcrEw+HidY0zd/0eny43QnVNkXRPXrLXFuQBHXg= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -66,8 +72,12 @@ github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA= github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= +github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k= +github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= github.com/nats-io/nats.go v1.27.1 h1:OuYnal9aKVSnOzLQIzf7554OXMCG7KbaTkCSBHRcSoo= github.com/nats-io/nats.go v1.27.1/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -97,11 +107,17 @@ golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= +golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -114,9 +130,13 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=