Skip to content

Commit

Permalink
Reducing verbose logging
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed May 20, 2024
1 parent 8d92410 commit 839f5bd
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="io.lenses.streamreactor.connect.gcp.pubsub" level="DEBUG"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ private SubscriptionAdminClient createSubscriptionAdminClient(final AuthMode aut
}

public String topicNameFor(final String subscriptionId) {
log.info("Requesting subscription details for projectId: {}, subscriptionId: {}", projectId, subscriptionId);
val subscription = subscriptionAdminClient.getSubscription(createSubscriptionName(subscriptionId));
log.info("Found subscription details for subscriptionId: {}", subscription.getTopic());
log.info("Found topic details {} for subscriptionId {}", subscription.getTopic(), subscriptionId);
return subscription.getTopic();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public PubSubSubscriber(
String projectId,
PubSubSubscription subscription
) {
log.info("Starting PubSubSubscriber");
log.info("Starting PubSubSubscriber for subscription {}", subscription.getSubscriptionId());
targetTopicName = subscription.getTargetKafkaTopic();
batchSize = subscription.getBatchSize();
messageQueue = new ConcurrentLinkedQueue<>();
Expand All @@ -81,7 +81,6 @@ public PubSubSubscriber(
}

public void startAsync() {
log.info("Start Subscriber");
gcpSubscriber.startAsync();
}

Expand All @@ -93,7 +92,6 @@ private MessageReceiver createMessageReceiver() {
}

public List<PubSubMessageData> getMessages() {
log.info("GetMessages");
return IntStream.range(0, batchSize)
.mapToObj(i -> messageQueue.poll())
.filter(Objects::nonNull)
Expand All @@ -107,7 +105,7 @@ public List<PubSubMessageData> getMessages() {
}

public void acknowledge(String messageId) {
log.info("Sending acknowledgement for {}}", messageId);
log.trace("Sending acknowledgement for {}}", messageId);
Optional
.ofNullable(ackCache.getIfPresent(messageId))
.ifPresent(e -> {
Expand All @@ -117,7 +115,6 @@ public void acknowledge(String messageId) {
}

public void stopAsync() {
log.info("Stopping Subscriber");
gcpSubscriber.stopAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public PubSubSubscriberManager(
List<PubSubSubscription> subscriptionConfigs,
SubscriberCreator subscriberCreator
) {
log.info("Starting PubSubSubscriberManager");
log.info("Starting PubSubSubscriberManager for {} subscriptions", subscriptionConfigs.size());
subscribers =
subscriptionConfigs
.parallelStream()
Expand All @@ -51,22 +51,22 @@ public PubSubSubscriberManager(
}

public List<PubSubMessageData> poll() {
log.info("Polling messages from PubSub");
log.trace("Polling messages from all partitions");
val subs =
subscribers
.values()
.parallelStream()
.flatMap(pubSubSubscriber -> pubSubSubscriber.getMessages().stream())
.collect(Collectors.toList());
log.info("Polled {} messages from PubSub", subs.size());
log.debug("Polled {} messages from all partitions", subs.size());
return subs;
}

public void commitRecord(
SourcePartition sourcePartition,
SourceOffset sourceOffset
) {
log.info("Committing record for partition {} with offset {}", sourcePartition, sourceOffset);
log.trace("Committing record for partition {} with offset {}", sourcePartition, sourceOffset);
subscribers
.get(sourcePartition.getSubscriptionId())
.acknowledge(sourceOffset.getMessageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

/**
* SourceOffset holds the message id from the PubSub message, to allow kafka connect to track the offset of the message,
* for later use in the GCP offset acknowledgement.
*/
@AllArgsConstructor
@Getter
@ToString
public class SourceOffset {

private static final String KEY_MESSAGE_ID = "message.id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

/**
* SourcePartition holds the location from which the message was sourced within GCP PubSub, for use reporting back
* partitions to Kafka Connect for later use in the GCP record acknowledgement..
*/
@Getter
@AllArgsConstructor
@ToString
public class SourcePartition {

private static final String KEY_PROJECT_ID = "project.id";
Expand Down

0 comments on commit 839f5bd

Please sign in to comment.