Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ct: don't clone every row when producing ct_times #30123

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

danhhz
Copy link
Contributor

@danhhz danhhz commented Oct 21, 2024

In the previous impl, the split in the dataflow graph resulted in every row being cloned, just to be immediately thrown away. This is wasteful and potentially doubles the memory requirements. Instead use a custom 1-input 2-output operator (passing through the input).

While I'm in here, also resolve a TODO to switch the operators to extension traits on Collection.

Touches https://github.com/MaterializeInc/database-issues/issues/8427

Motivation

  • This PR fixes a recognized bug.

Tips for reviewer

Definitely review with hide whitespace changes on

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@danhhz danhhz requested a review from antiguru October 21, 2024 19:28
@danhhz danhhz requested a review from a team as a code owner October 21, 2024 19:28
@danhhz danhhz mentioned this pull request Oct 21, 2024
5 tasks
In the previous impl, the split in the dataflow graph resulted in every
row being cloned, just to be immediately thrown away. This is wasteful
and potentially doubles the memory requirements. Instead use a custom
1-input 2-output operator (passing through the input).

While I'm in here, also resolve a TODO to switch the operators to
extension traits on `Collection`.
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank, this looks good. I left a comment around using the ConsolidatingContainerBuilder to opportunistically consolidate the times output, which would simplify your implementation (and would permit consolidating accross input batches. As a rule of thumb, use the consolidating container builder whenever an operator changes the key/value cardinality and we can expect reasonable amounts of consolidation. The cost of the builder is not free, but low enough that it doesn't matter too much.)

Comment on lines +721 to 743
move |_frontiers| {
let mut passthrough = passthrough.activate();
let mut times = times.activate();
while let Some((cap, data)) = input.next() {
data.swap(&mut buf);
for ((), ts, diff) in buf.drain(..) {
notificator.notify_at(cap.delayed(&ts));
if let Some(sum) = stash.get_mut(&ts) {
sum.plus_equals(&diff);
data.swap(&mut passthrough_buf);
for (_data, ts, diff) in &passthrough_buf {
if let Some(d) = times_hash.get_mut(ts) {
d.plus_equals(diff);
} else {
stash.insert(ts, diff);
}
times_hash.insert(*ts, diff.clone());
};
}
let times_iter = times_hash
.drain()
// Silly to emit zero diffs.
.filter(|(_, diff)| !diff.is_zero())
.map(|(ts, diff)| ((), ts, diff));
times_buf.extend(times_iter);
passthrough
.session(&cap)
.give_container(&mut passthrough_buf);
times.session(&cap).give_container(&mut times_buf);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could use the ConsolidatingContainerBuilder instead of maintaining its own hashmap, but it might change semantics. For an example, see flat_map. The consolidating container builder sits on a buffer of fixed length, appends, and whenever full or time ticks, it'll consolidate the buffer. This only ensures opportunistically consolidated outputs, so duplicate elements might still occur.

In this case, the stream seems to be ((), time, diff), with time being totally ordered. It seems this would be the best case example for the consolidating builder, unless you need potentially zero or more timestamps per input batch. (Note that the implementation emits to times on every input.next() iteration.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants