Skip to content

Commit

Permalink
source-hubspot-native: add automatic backfills for delayed incrementa…
Browse files Browse the repository at this point in the history
…l streams

Delayed incremental streams should be wrapped in a `try/except` block to
trigger automatic backfills if we encounter a `MustBackfillBinding`
error.

I kept the recent incremental streams wrapped in the `try/except` that
serves the same purpose since it's possible for `MustBackfillBinding`
exceptions to be raised if a task is disabled long enough for the
cursor to fall far enough behind the recent changes available from
HubSpot.
  • Loading branch information
Alex-Bair committed Oct 18, 2024
1 parent 2d18e9f commit 9aa980f
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions source-hubspot-native/source_hubspot_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,24 @@ async def process_changes(
delayed_emitted = 0
if delayed_fetch_next_end - delayed_fetch_next_start > delayed_fetch_minimum_window:
# Poll the delayed stream for documents if we need to.
async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end):
if ts > delayed_fetch_next_end:
# In case the FetchDelayedFn is unable to filter based on
# `delayed_fetch_next_end`.
continue
elif ts > delayed_fetch_next_start:
if cache.has_as_recent_as(object_name, key, ts):
cache_hits += 1
try:
async for ts, key, obj in fetch_delayed(log, http, delayed_fetch_next_start, delayed_fetch_next_end):
if ts > delayed_fetch_next_end:
# In case the FetchDelayedFn is unable to filter based on
# `delayed_fetch_next_end`.
continue

delayed_emitted += 1
yield obj
else:
break
elif ts > delayed_fetch_next_start:
if cache.has_as_recent_as(object_name, key, ts):
cache_hits += 1
continue

delayed_emitted += 1
yield obj
else:
break
except MustBackfillBinding:
log.info("triggering automatic backfill for %s", object_name)
yield Triggers.BACKFILL

last_delayed_fetch_end[object_name] = delayed_fetch_next_end
evicted = cache.cleanup(object_name, delayed_fetch_next_end)
Expand Down

0 comments on commit 9aa980f

Please sign in to comment.