Skip to content

Commit

Permalink
SNOW-811265 Do not close channel on rebalance (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel authored Jun 28, 2023
1 parent cfcf882 commit 52f0a8a
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,15 @@ public void closeAll() {
/**
* This function is called during rebalance.
*
* <p>All the channels are closed. The client is still active. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel.
* <p>We don't close the channels. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel
* anyways. [Check c'tor of {@link TopicPartitionChannel}]
*
* <p>We will wipe the cache partitionsToChannel so that in {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch
* offsetToken
*
* @param partitions a list of topic partition
* @param partitions a list of topic partitions to close/shutdown
*/
@Override
public void close(Collection<TopicPartition> partitions) {
Expand All @@ -321,15 +322,16 @@ public void close(Collection<TopicPartition> partitions) {
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
// channels are created
if (topicPartitionChannel != null) {
topicPartitionChannel.closeChannel();
}
LOGGER.info(
"Closing partitionChannel:{}, partition:{}, topic:{}",
"Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)",
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
topicPartition.topic(),
topicPartition.partition());
});
LOGGER.info(
"Closing {} partitions and Clearing partitionsToChannel Map of size:{}",
partitions.size(),
partitionsToChannel.size());
partitionsToChannel.clear();
}

Expand Down

0 comments on commit 52f0a8a

Please sign in to comment.