Skip to content

Commit

Permalink
MB-61985: Add a Scorch event for Index() (#2054)
Browse files Browse the repository at this point in the history
- Add a scorch event for notifying listeners, if available, that a
document is about to get added to the index, using the Index() API.
- This event can be useful for halting indexing before the Index()
operation is carried out - since Index() converts an interface into a
Document object using the IndexMapping's MapDocument, which consumes
some extra memory.
- Shifted the EventKindBatchIntroductionStart event to before the
documents were analyzed as document analysis consumes extra memory.
  • Loading branch information
CascadingRadium authored Aug 6, 2024
1 parent 807d89b commit f7fea09
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
3 changes: 3 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (b *Batch) Index(id string, data interface{}) error {
if id == "" {
return ErrorEmptyID
}
if eventIndex, ok := b.index.(index.EventIndex); ok {
eventIndex.FireIndexEvent()
}
doc := document.NewDocument(id)
err := b.index.Mapping().MapDocument(doc, data)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions index/scorch/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ var EventKindMergeTaskIntroduction = EventKind(8)
// EventKindPreMergeCheck is fired before the merge begins to check if
// the caller should proceed with the merge.
var EventKindPreMergeCheck = EventKind(9)

// EventKindIndexStart is fired when Index() is invoked which
// creates a new Document object from an interface using the index mapping.
var EventKindIndexStart = EventKind(10)
12 changes: 8 additions & 4 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Scorch struct {

unsafeBatch bool

rootLock sync.RWMutex
rootLock sync.RWMutex

root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
Expand Down Expand Up @@ -376,6 +376,8 @@ func (s *Scorch) Delete(id string) error {
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()

// notify handlers that we're about to index a batch of data
s.fireEvent(EventKindBatchIntroductionStart, 0)
defer func() {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()
Expand Down Expand Up @@ -434,9 +436,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

indexStart := time.Now()

// notify handlers that we're about to introduce a segment
s.fireEvent(EventKindBatchIntroductionStart, 0)

var newSegment segment.Segment
var bufBytes uint64
stats := newFieldStats()
Expand Down Expand Up @@ -878,3 +877,8 @@ func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Unlock()
return rv
}

// external API to fire a scorch event (EventKindIndexStart) externally from bleve
func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
}
15 changes: 15 additions & 0 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) {
return ErrorIndexClosed
}

i.FireIndexEvent()

doc := document.NewDocument(id)
err = i.m.MapDocument(doc, data)
if err != nil {
Expand Down Expand Up @@ -1112,3 +1114,16 @@ func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser,
return os.OpenFile(filepath.Join(string(f), dir, file),
os.O_RDWR|os.O_CREATE, 0600)
}

func (i *indexImpl) FireIndexEvent() {
// get the internal index implementation
internalIndex, err := i.Advanced()
if err != nil {
return
}
// check if the internal index implementation supports events
if internalEventIndex, ok := internalIndex.(index.EventIndex); ok {
// fire the Index() event
internalEventIndex.FireIndexEvent()
}
}

0 comments on commit f7fea09

Please sign in to comment.