Skip to content

Commit

Permalink
source-mongodb: require sorting on _id for batch captures
Browse files Browse the repository at this point in the history
The assumption that documents are returned in an ascending insertion order does
not appear to be true for non-standard MongoDB deployments. These kinds of
deployments don't support change streams and must be captured in a batch mode,
so for bindings that use batch mode always require an explicit sort.
  • Loading branch information
williamhbaker committed Oct 15, 2024
1 parent bf96dd7 commit 325751e
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions source-mongodb/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,16 @@ func (c *capture) doBackfill(

cursorField := binding.resource.getCursorField()
var opts *options.FindOptions
if cursorField == idProperty {
if cursorField == idProperty && binding.resource.getMode() == captureModeChangeStream {
// By not specifying a sort parameter, MongoDB uses natural sort to order documents. Natural
// sort is approximately insertion order (but not guaranteed). We hint to MongoDB to use the _id
// index (an index that always exists) to speed up the process. Note that if we specify the sort
// explicitly by { $natural: 1 }, then the database will disregard any indices and do a full
// collection scan. See https://www.mongodb.com/docs/manual/reference/method/cursor.hint

// Note: This assumption is only true for "standard" MongoDB instances that support change
// streams. For other flavors of MongoDB that do not support change streams and/or we are using
// a batch capture mode, a sort will be required.
opts = options.Find().SetHint(bson.M{idProperty: 1})
} else {
// Other cursor fields require a sort.
Expand Down Expand Up @@ -339,7 +343,7 @@ func (c *capture) pullCursor(
var doc bson.M
var err error
if lastCursor, err = cursor.Current.LookupErr(cursorField); err != nil {
return 0, fmt.Errorf("looking up idProperty: %w", err)
return 0, fmt.Errorf("looking up cursor field '%s': %w", cursorField, err)
} else if err = cursor.Decode(&doc); err != nil {
return 0, fmt.Errorf("backfill decoding document: %w", err)
}
Expand Down

0 comments on commit 325751e

Please sign in to comment.