Skip to content

Commit

Permalink
Merge pull request #115 from lensesio-dev/bugfix/header-translation
Browse files Browse the repository at this point in the history
Header translation
  • Loading branch information
andrewstevenson authored Sep 26, 2024
2 parents e0d925d + d8bae26 commit 9287343
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public class ReportHeadersConstants {
public static final String INPUT_TOPIC = "input_topic";
public static final String INPUT_KEY = "input_key";
public static final String INPUT_PAYLOAD = "input_payload";
public static final String ERROR = "error_message";

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
*/
package io.lenses.streamreactor.connect.reporting.model.generic;

import static io.lenses.streamreactor.common.util.ByteConverters.toBytes;

import cyclops.control.Option;
import cyclops.control.Try;
import io.lenses.streamreactor.connect.reporting.model.ReportHeadersConstants;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class ProducerRecordConverter {
Expand All @@ -48,11 +46,16 @@ private static Optional<ProducerRecord<byte[], String>> createRecord(List<Header

private static Optional<List<Header>> convertToHeaders(ReportingRecord originalRecord) {
return Try.withCatch(() -> List.<Header>of(
new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, toBytes(originalRecord.getTopicPartition().topic())),
new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, toBytes(originalRecord.getOffset())),
new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, toBytes(originalRecord.getTimestamp())),
new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, originalRecord.getTopicPartition().topic().getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_PARTITION, String.valueOf(originalRecord.getTopicPartition()
.partition()).getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, String.valueOf(originalRecord.getOffset()).getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, String.valueOf(originalRecord.getTimestamp())
.getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_KEY, null),
new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, toBytes(originalRecord.getPayload()))
new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, originalRecord.getPayload().getBytes()),
new RecordHeader(ReportHeadersConstants.ERROR, originalRecord.getError().map(String::getBytes).orElseGet(
""::getBytes))
), IOException.class)
.peekFailed(f -> log.warn(
String.format("Couldn't transform record to Report. Report won't be sent. Topic=%s, Offset=%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
*/
package io.lenses.streamreactor.connect.reporting.model.generic;

import cyclops.data.Seq;
import cyclops.control.Option;
import cyclops.data.tuple.Tuple2;
import io.lenses.streamreactor.connect.reporting.model.RecordReport;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.val;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.Optional;

@AllArgsConstructor
Expand All @@ -40,6 +39,7 @@ public class ReportingRecord implements RecordReport {
private String endpoint;
private String payload;
private List<Tuple2<String, String>> headers;
private Option<String> error;

@Override
public Optional<ProducerRecord<byte[], String>> produceReportRecord(String reportingTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,25 @@
*/
package io.lenses.streamreactor.connect.reporting.model.generic;

import static io.lenses.streamreactor.common.util.ByteConverters.toBytes;
import static org.assertj.core.api.Assertions.from;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import cyclops.data.Seq;
import cyclops.data.tuple.Tuple2;
import cyclops.control.Option;
import io.lenses.streamreactor.connect.reporting.model.ReportHeadersConstants;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;

import static org.assertj.core.api.Assertions.from;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ProducerRecordConverterTest {

private static final String REPORTING_TOPIC = "reporting";
Expand All @@ -44,6 +43,7 @@ class ProducerRecordConverterTest {
private static final long TIMESTAMP = 222L;
private static final String ENDPOINT = "endpoint.local";
private static final String JSON_PAYLOAD = "{\"payload\": \"somevalue\"}";
private static final String ERROR = "Bad things happened";

@Test
void convertShouldProduceProducerRecord() throws IOException {
Expand All @@ -60,7 +60,7 @@ void convertShouldProduceProducerRecord() throws IOException {

assertNotNull(record.headers());
Header[] headers = record.headers().toArray();
assertEquals(5, headers.length);
assertEquals(7, headers.length);

assertThat(record)
.returns(REPORTING_TOPIC, from(ProducerRecord::topic))
Expand All @@ -72,17 +72,20 @@ void convertShouldProduceProducerRecord() throws IOException {

private Header[] buildExpectedHeaders() throws IOException {
return new Header[]{
new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, toBytes(TOPIC)),
new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, toBytes(OFFSET)),
new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, toBytes(TIMESTAMP)),
new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, TOPIC.getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_PARTITION, String.valueOf(PARTITION).getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, String.valueOf(OFFSET).getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, String.valueOf(TIMESTAMP).getBytes()),
new RecordHeader(ReportHeadersConstants.INPUT_KEY, null),
new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, toBytes(JSON_PAYLOAD))
new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, JSON_PAYLOAD.getBytes()),
new RecordHeader(ReportHeadersConstants.ERROR, "".getBytes())
};
}

private ReportingRecord createReportingRecord() {
return new ReportingRecord(new TopicPartition(TOPIC, PARTITION), OFFSET,
TIMESTAMP, ENDPOINT, JSON_PAYLOAD, Collections.emptyList()
TIMESTAMP, ENDPOINT, JSON_PAYLOAD, Collections.emptyList(),
Option.none()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.lenses.streamreactor.connect.http.sink
import cats.effect.IO
import cats.effect.Ref
import cats.effect.unsafe.implicits.global
import cats.implicits.catsSyntaxOptionId
import cats.implicits.none
import com.typesafe.scalalogging.LazyLogging
import cyclops.data.tuple
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
Expand All @@ -37,6 +39,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata
import java.util
import scala.collection.immutable.Queue
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.jdk.OptionConverters.RichOption

class HttpWriter(
sinkName: String,
Expand Down Expand Up @@ -189,20 +192,21 @@ class HttpWriter(
): IO[Unit] = {
val maxRecord = OffsetMergeUtils.maxRecord(renderedRecords)

val reportRecord = (templateContent: String) =>
val reportRecord = (error: Option[String]) =>
new ReportingRecord(
maxRecord.topicPartitionOffset.toTopicPartition.toKafka,
maxRecord.topicPartitionOffset.offset.value,
maxRecord.timestamp,
processedTemplate.endpoint,
templateContent,
processedTemplate.content,
convertToCyclopsTuples(processedTemplate.headers),
cyclops.control.Option.fromOptional(error.toJava),
)

responseIo.flatTap { _ =>
IO(successReporter.enqueue(reportRecord(processedTemplate.content)))
IO(successReporter.enqueue(reportRecord(none)))
}.handleErrorWith { error =>
IO(errorReporter.enqueue(reportRecord(error.getMessage))) *> IO.raiseError(error)
IO(errorReporter.enqueue(reportRecord(error.getMessage.some))) *> IO.raiseError(error)
}
}

Expand Down

0 comments on commit 9287343

Please sign in to comment.