Skip to content

Commit

Permalink
Fixing watch change filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Zohaib Sibte Hassan committed Dec 28, 2023
1 parent 11a10d9 commit cdfa495
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,40 +253,55 @@ func (conn *SqliteStreamDB) initTriggers(tableName string) error {
return nil
}

func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher *fsnotify.Watcher) {
for {
select {
case ev, ok := <-watcher.Events:
if !ok {
close(changed)
return
}

if ev.Op == fsnotify.Chmod {
continue
}

changed <- ev
}
}
}

func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) {
shmPath := path + "-shm"
walPath := path + "-wal"

errDB := watcher.Add(path)
errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
dbChanged := make(chan fsnotify.Event)

tickerDur := time.Duration(cfg.Config.PollingInterval) * time.Millisecond
changeLogTicker := utils.NewTimeoutPublisher(tickerDur)

// Publish change logs for any residual change logs before starting watcher
conn.publishChangeLog()
conn.filterChangesTo(dbChanged, watcher)

for {
changeLogTicker.Reset()

changesPublished := false
err := conn.WithReadTx(func(_tx *sql.Tx) error {
select {
case ev, ok := <-watcher.Events:
case ev, ok := <-dbChanged:
if !ok {
return ErrEndOfWatch
}

log.Debug().Int("change", int(ev.Op)).Msg("Change detected")
if ev.Op != fsnotify.Chmod {
conn.publishChangeLog()
changesPublished = true
}
conn.publishChangeLog()
case <-changeLogTicker.Channel():
log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout")
conn.publishChangeLog()
changesPublished = true
}

return nil
Expand All @@ -310,8 +325,6 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string)
if errWal != nil {
errWal = watcher.Add(walPath)
}

log.Debug().Bool("changes", changesPublished).Msg("Changes published")
}
}

Expand Down

0 comments on commit cdfa495

Please sign in to comment.