diff --git a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/ReportHeadersConstants.java b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/ReportHeadersConstants.java index 6e39486fd..528efee66 100644 --- a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/ReportHeadersConstants.java +++ b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/ReportHeadersConstants.java @@ -23,5 +23,6 @@ public class ReportHeadersConstants { public static final String INPUT_TOPIC = "input_topic"; public static final String INPUT_KEY = "input_key"; public static final String INPUT_PAYLOAD = "input_payload"; + public static final String ERROR = "error_message"; } diff --git a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverter.java b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverter.java index 36e9cb7fb..c8d8bb0ce 100644 --- a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverter.java +++ b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverter.java @@ -15,14 +15,8 @@ */ package io.lenses.streamreactor.connect.reporting.model.generic; -import static io.lenses.streamreactor.common.util.ByteConverters.toBytes; - -import cyclops.control.Option; import cyclops.control.Try; import io.lenses.streamreactor.connect.reporting.model.ReportHeadersConstants; -import java.io.IOException; -import java.util.List; -import java.util.Optional; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -30,6 +24,10 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + @NoArgsConstructor(access = AccessLevel.PRIVATE) @Slf4j public class ProducerRecordConverter { @@ -48,11 +46,16 @@ private static Optional> createRecord(List
> convertToHeaders(ReportingRecord originalRecord) { return Try.withCatch(() -> List.
of( - new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, toBytes(originalRecord.getTopicPartition().topic())), - new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, toBytes(originalRecord.getOffset())), - new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, toBytes(originalRecord.getTimestamp())), + new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, originalRecord.getTopicPartition().topic().getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_PARTITION, String.valueOf(originalRecord.getTopicPartition() + .partition()).getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, String.valueOf(originalRecord.getOffset()).getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, String.valueOf(originalRecord.getTimestamp()) + .getBytes()), new RecordHeader(ReportHeadersConstants.INPUT_KEY, null), - new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, toBytes(originalRecord.getPayload())) + new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, originalRecord.getPayload().getBytes()), + new RecordHeader(ReportHeadersConstants.ERROR, originalRecord.getError().map(String::getBytes).orElseGet( + ""::getBytes)) ), IOException.class) .peekFailed(f -> log.warn( String.format("Couldn't transform record to Report. Report won't be sent. Topic=%s, Offset=%s", diff --git a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ReportingRecord.java b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ReportingRecord.java index 702352b0c..35892c64c 100644 --- a/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ReportingRecord.java +++ b/java-connectors/kafka-connect-sink-reporting/src/main/java/io/lenses/streamreactor/connect/reporting/model/generic/ReportingRecord.java @@ -15,17 +15,16 @@ */ package io.lenses.streamreactor.connect.reporting.model.generic; -import cyclops.data.Seq; +import cyclops.control.Option; import cyclops.data.tuple.Tuple2; import io.lenses.streamreactor.connect.reporting.model.RecordReport; -import java.util.List; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; -import lombok.val; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import java.util.List; import java.util.Optional; @AllArgsConstructor @@ -40,6 +39,7 @@ public class ReportingRecord implements RecordReport { private String endpoint; private String payload; private List> headers; + private Option error; @Override public Optional> produceReportRecord(String reportingTopic) { diff --git a/java-connectors/kafka-connect-sink-reporting/src/test/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverterTest.java b/java-connectors/kafka-connect-sink-reporting/src/test/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverterTest.java index 3ec757bdc..12722446f 100644 --- a/java-connectors/kafka-connect-sink-reporting/src/test/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverterTest.java +++ b/java-connectors/kafka-connect-sink-reporting/src/test/java/io/lenses/streamreactor/connect/reporting/model/generic/ProducerRecordConverterTest.java @@ -15,26 +15,25 @@ */ package io.lenses.streamreactor.connect.reporting.model.generic; -import static io.lenses.streamreactor.common.util.ByteConverters.toBytes; -import static org.assertj.core.api.Assertions.from; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import cyclops.data.Seq; -import cyclops.data.tuple.Tuple2; +import cyclops.control.Option; import io.lenses.streamreactor.connect.reporting.model.ReportHeadersConstants; -import java.io.IOException; -import java.util.Collections; -import java.util.Optional; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.from; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + class ProducerRecordConverterTest { private static final String REPORTING_TOPIC = "reporting"; @@ -44,6 +43,7 @@ class ProducerRecordConverterTest { private static final long TIMESTAMP = 222L; private static final String ENDPOINT = "endpoint.local"; private static final String JSON_PAYLOAD = "{\"payload\": \"somevalue\"}"; + private static final String ERROR = "Bad things happened"; @Test void convertShouldProduceProducerRecord() throws IOException { @@ -60,7 +60,7 @@ void convertShouldProduceProducerRecord() throws IOException { assertNotNull(record.headers()); Header[] headers = record.headers().toArray(); - assertEquals(5, headers.length); + assertEquals(7, headers.length); assertThat(record) .returns(REPORTING_TOPIC, from(ProducerRecord::topic)) @@ -72,17 +72,20 @@ void convertShouldProduceProducerRecord() throws IOException { private Header[] buildExpectedHeaders() throws IOException { return new Header[]{ - new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, toBytes(TOPIC)), - new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, toBytes(OFFSET)), - new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, toBytes(TIMESTAMP)), + new RecordHeader(ReportHeadersConstants.INPUT_TOPIC, TOPIC.getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_PARTITION, String.valueOf(PARTITION).getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_OFFSET, String.valueOf(OFFSET).getBytes()), + new RecordHeader(ReportHeadersConstants.INPUT_TIMESTAMP, String.valueOf(TIMESTAMP).getBytes()), new RecordHeader(ReportHeadersConstants.INPUT_KEY, null), - new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, toBytes(JSON_PAYLOAD)) + new RecordHeader(ReportHeadersConstants.INPUT_PAYLOAD, JSON_PAYLOAD.getBytes()), + new RecordHeader(ReportHeadersConstants.ERROR, "".getBytes()) }; } private ReportingRecord createReportingRecord() { return new ReportingRecord(new TopicPartition(TOPIC, PARTITION), OFFSET, - TIMESTAMP, ENDPOINT, JSON_PAYLOAD, Collections.emptyList() + TIMESTAMP, ENDPOINT, JSON_PAYLOAD, Collections.emptyList(), + Option.none() ); } } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala index 0fb222780..5c52a4fd8 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala @@ -18,6 +18,8 @@ package io.lenses.streamreactor.connect.http.sink import cats.effect.IO import cats.effect.Ref import cats.effect.unsafe.implicits.global +import cats.implicits.catsSyntaxOptionId +import cats.implicits.none import com.typesafe.scalalogging.LazyLogging import cyclops.data.tuple import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition @@ -37,6 +39,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import java.util import scala.collection.immutable.Queue import scala.jdk.CollectionConverters.SeqHasAsJava +import scala.jdk.OptionConverters.RichOption class HttpWriter( sinkName: String, @@ -189,20 +192,21 @@ class HttpWriter( ): IO[Unit] = { val maxRecord = OffsetMergeUtils.maxRecord(renderedRecords) - val reportRecord = (templateContent: String) => + val reportRecord = (error: Option[String]) => new ReportingRecord( maxRecord.topicPartitionOffset.toTopicPartition.toKafka, maxRecord.topicPartitionOffset.offset.value, maxRecord.timestamp, processedTemplate.endpoint, - templateContent, + processedTemplate.content, convertToCyclopsTuples(processedTemplate.headers), + cyclops.control.Option.fromOptional(error.toJava), ) responseIo.flatTap { _ => - IO(successReporter.enqueue(reportRecord(processedTemplate.content))) + IO(successReporter.enqueue(reportRecord(none))) }.handleErrorWith { error => - IO(errorReporter.enqueue(reportRecord(error.getMessage))) *> IO.raiseError(error) + IO(errorReporter.enqueue(reportRecord(error.getMessage.some))) *> IO.raiseError(error) } }