Skip to content

Commit

Permalink
Revert "SNOW-811265 Do not close channel on rebalance (snowflakedb#651)…
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel authored and khsoneji committed Oct 12, 2023
1 parent c055821 commit a6380e3
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,14 @@ public void closeAll() {
/**
* This function is called during rebalance.
*
* <p>We don't close the channels. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel
* anyways. [Check c'tor of {@link TopicPartitionChannel}]
* <p>All the channels are closed. The client is still active. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel.
*
* <p>We will wipe the cache partitionsToChannel so that in {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch
* offsetToken
*
* @param partitions a list of topic partitions to close/shutdown
* @param partitions a list of topic partition
*/
@Override
public void close(Collection<TopicPartition> partitions) {
Expand All @@ -324,16 +323,15 @@ public void close(Collection<TopicPartition> partitions) {
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
// channels are created
if (topicPartitionChannel != null) {
topicPartitionChannel.closeChannel();
}
LOGGER.info(
"Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)",
"Closing partitionChannel:{}, partition:{}, topic:{}",
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
topicPartition.topic(),
topicPartition.partition());
});
LOGGER.info(
"Closing {} partitions and Clearing partitionsToChannel Map of size:{}",
partitions.size(),
partitionsToChannel.size());
partitionsToChannel.clear();
}

Expand Down

0 comments on commit a6380e3

Please sign in to comment.