From f7fea09f60892b937d1496b5df73748671f4e380 Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Wed, 7 Aug 2024 01:08:10 +0530 Subject: [PATCH] MB-61985: Add a Scorch event for Index() (#2054) - Add a scorch event for notifying listeners, if available, that a document is about to get added to the index, using the Index() API. - This event can be useful for halting indexing before the Index() operation is carried out - since Index() converts an interface into a Document object using the IndexMapping's MapDocument, which consumes some extra memory. - Shifted the EventKindBatchIntroductionStart event to before the documents were analyzed as document analysis consumes extra memory. --- index.go | 3 +++ index/scorch/event.go | 4 ++++ index/scorch/scorch.go | 12 ++++++++---- index_impl.go | 15 +++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/index.go b/index.go index 7d4c9be9b..acbefc695 100644 --- a/index.go +++ b/index.go @@ -46,6 +46,9 @@ func (b *Batch) Index(id string, data interface{}) error { if id == "" { return ErrorEmptyID } + if eventIndex, ok := b.index.(index.EventIndex); ok { + eventIndex.FireIndexEvent() + } doc := document.NewDocument(id) err := b.index.Mapping().MapDocument(doc, data) if err != nil { diff --git a/index/scorch/event.go b/index/scorch/event.go index 0f653ccf4..e0bc3b60a 100644 --- a/index/scorch/event.go +++ b/index/scorch/event.go @@ -67,3 +67,7 @@ var EventKindMergeTaskIntroduction = EventKind(8) // EventKindPreMergeCheck is fired before the merge begins to check if // the caller should proceed with the merge. var EventKindPreMergeCheck = EventKind(9) + +// EventKindIndexStart is fired when Index() is invoked which +// creates a new Document object from an interface using the index mapping. +var EventKindIndexStart = EventKind(10) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 7966d844d..429d1daa9 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -49,7 +49,7 @@ type Scorch struct { unsafeBatch bool - rootLock sync.RWMutex + rootLock sync.RWMutex root *IndexSnapshot // holds 1 ref-count on the root rootPersisted []chan error // closed when root is persisted @@ -376,6 +376,8 @@ func (s *Scorch) Delete(id string) error { func (s *Scorch) Batch(batch *index.Batch) (err error) { start := time.Now() + // notify handlers that we're about to index a batch of data + s.fireEvent(EventKindBatchIntroductionStart, 0) defer func() { s.fireEvent(EventKindBatchIntroduction, time.Since(start)) }() @@ -434,9 +436,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { indexStart := time.Now() - // notify handlers that we're about to introduce a segment - s.fireEvent(EventKindBatchIntroductionStart, 0) - var newSegment segment.Segment var bufBytes uint64 stats := newFieldStats() @@ -878,3 +877,8 @@ func (s *Scorch) CopyReader() index.CopyReader { s.rootLock.Unlock() return rv } + +// external API to fire a scorch event (EventKindIndexStart) externally from bleve +func (s *Scorch) FireIndexEvent() { + s.fireEvent(EventKindIndexStart, 0) +} diff --git a/index_impl.go b/index_impl.go index 55212e3e6..e6debf17a 100644 --- a/index_impl.go +++ b/index_impl.go @@ -256,6 +256,8 @@ func (i *indexImpl) Index(id string, data interface{}) (err error) { return ErrorIndexClosed } + i.FireIndexEvent() + doc := document.NewDocument(id) err = i.m.MapDocument(doc, data) if err != nil { @@ -1112,3 +1114,16 @@ func (f FileSystemDirectory) GetWriter(filePath string) (io.WriteCloser, return os.OpenFile(filepath.Join(string(f), dir, file), os.O_RDWR|os.O_CREATE, 0600) } + +func (i *indexImpl) FireIndexEvent() { + // get the internal index implementation + internalIndex, err := i.Advanced() + if err != nil { + return + } + // check if the internal index implementation supports events + if internalEventIndex, ok := internalIndex.(index.EventIndex); ok { + // fire the Index() event + internalEventIndex.FireIndexEvent() + } +}