From 6ef1ef7a96b1d269c3ab3fc5d6ae6bc09c32af89 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Tue, 22 Oct 2024 16:32:01 +0200 Subject: [PATCH] Add mapping for iceberg --- .../IcebergTableStreamingRecordMapper.java | 90 +++++++------------ 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java index d78fcbb9e..2b8d263c4 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java @@ -5,93 +5,67 @@ import static com.snowflake.kafka.connector.records.RecordService.*; import com.snowflake.kafka.connector.Utils; -import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import java.util.AbstractMap; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import java.util.stream.Collectors; import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; class IcebergTableStreamingRecordMapper implements StreamingRecordMapper { private final ObjectMapper mapper; + private static final TypeReference> OBJECTS_MAP_TYPE_REFERENCE = + new TypeReference>() {}; + + private static final TypeReference> HEADERS_MAP_TYPE_REFERENCE = + new TypeReference>() {}; + public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) { this.mapper = objectMapper; } @Override public Map processSnowflakeRecord( - SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata) - throws JsonProcessingException { + SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata) { final Map streamingIngestRow = new HashMap<>(); for (JsonNode node : row.getContent().getData()) { if (schematizationEnabled) { - streamingIngestRow.putAll(getMapFromJsonNodeForStreamingIngest(node)); + streamingIngestRow.putAll(getMapForSchematization(node)); } else { - Map result = - mapper.convertValue(node, new TypeReference>() {}); - streamingIngestRow.put(TABLE_COLUMN_CONTENT, result); + streamingIngestRow.put(TABLE_COLUMN_CONTENT, getMapForNoSchematization(node)); } } if (includeAllMetadata) { - streamingIngestRow.put( - TABLE_COLUMN_METADATA, getMapFromJsonNodeForIceberg(row.getMetadata())); + streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapForMetadata(row.getMetadata())); } return streamingIngestRow; } - private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) - throws JsonProcessingException { - return getMapFromJsonNodeForStreamingIngest(node, true); - } - - private Map getMapFromJsonNodeForIceberg(JsonNode node) - throws JsonProcessingException { - return getMapFromJsonNodeForStreamingIngest(node, false); + private Map getMapForNoSchematization(JsonNode node) { + return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE); } - private Map getMapFromJsonNodeForStreamingIngest( - JsonNode node, boolean quoteColumnName) throws JsonProcessingException { - final Map streamingIngestRow = new HashMap<>(); - - // return empty if tombstone record - if (node.isEmpty()) { - return streamingIngestRow; - } - - Iterator columnNames = node.fieldNames(); - while (columnNames.hasNext()) { - String columnName = columnNames.next(); - JsonNode columnNode = node.get(columnName); - Object columnValue; - if (columnNode.isTextual()) { - columnValue = columnNode.textValue(); - } else if (columnNode.isNull()) { - columnValue = null; - } else { - columnValue = writeValueAsStringOrNan(columnNode); - } - // while the value is always dumped into a string, the Streaming Ingest SDK - // will transform the value according to its type in the table - streamingIngestRow.put( - quoteColumnName ? Utils.quoteNameIfNeeded(columnName) : columnName, columnValue); - } - // Thrown an exception if the input JsonNode is not in the expected format - if (streamingIngestRow.isEmpty()) { - throw SnowflakeErrors.ERROR_0010.getException( - "Not able to convert node to Snowpipe Streaming input format"); - } - return streamingIngestRow; + private Map getMapForSchematization(JsonNode node) { + // we need to quote the keys on the first level of the map as they are column names in the table + // the rest must stay as is as the nested objects are not column names but fields name with case + // sensitivity + return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE).entrySet().stream() + .map( + entry -> + new AbstractMap.SimpleEntry<>( + Utils.quoteNameIfNeeded(entry.getKey()), entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException { - if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) { - return "NaN"; - } else { - return mapper.writeValueAsString(columnNode); - } + private Map getMapForMetadata(JsonNode metadataNode) { + Map values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE); + // we don't want headers to be serialized as Map so we overwrite it as + // Map + Map headers = + mapper.convertValue(metadataNode.findValue(HEADERS), HEADERS_MAP_TYPE_REFERENCE); + values.put(HEADERS, headers); + return values; } }