Skip to content

Commit

Permalink
Add mapping for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 22, 2024
1 parent d6bec27 commit 6ef1ef7
Showing 1 changed file with 32 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,93 +5,67 @@
import static com.snowflake.kafka.connector.records.RecordService.*;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode;

class IcebergTableStreamingRecordMapper implements StreamingRecordMapper {
private final ObjectMapper mapper;

private static final TypeReference<Map<String, Object>> OBJECTS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};

private static final TypeReference<Map<String, String>> HEADERS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, String>>() {};

public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) {
this.mapper = objectMapper;
}

@Override
public Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata)
throws JsonProcessingException {
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata) {
final Map<String, Object> streamingIngestRow = new HashMap<>();
for (JsonNode node : row.getContent().getData()) {
if (schematizationEnabled) {
streamingIngestRow.putAll(getMapFromJsonNodeForStreamingIngest(node));
streamingIngestRow.putAll(getMapForSchematization(node));
} else {
Map<String, Object> result =
mapper.convertValue(node, new TypeReference<Map<String, Object>>() {});
streamingIngestRow.put(TABLE_COLUMN_CONTENT, result);
streamingIngestRow.put(TABLE_COLUMN_CONTENT, getMapForNoSchematization(node));
}
}
if (includeAllMetadata) {
streamingIngestRow.put(
TABLE_COLUMN_METADATA, getMapFromJsonNodeForIceberg(row.getMetadata()));
streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapForMetadata(row.getMetadata()));
}
return streamingIngestRow;
}

private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
throws JsonProcessingException {
return getMapFromJsonNodeForStreamingIngest(node, true);
}

private Map<String, Object> getMapFromJsonNodeForIceberg(JsonNode node)
throws JsonProcessingException {
return getMapFromJsonNodeForStreamingIngest(node, false);
private Map<String, Object> getMapForNoSchematization(JsonNode node) {
return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE);
}

private Map<String, Object> getMapFromJsonNodeForStreamingIngest(
JsonNode node, boolean quoteColumnName) throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();

// return empty if tombstone record
if (node.isEmpty()) {
return streamingIngestRow;
}

Iterator<String> columnNames = node.fieldNames();
while (columnNames.hasNext()) {
String columnName = columnNames.next();
JsonNode columnNode = node.get(columnName);
Object columnValue;
if (columnNode.isTextual()) {
columnValue = columnNode.textValue();
} else if (columnNode.isNull()) {
columnValue = null;
} else {
columnValue = writeValueAsStringOrNan(columnNode);
}
// while the value is always dumped into a string, the Streaming Ingest SDK
// will transform the value according to its type in the table
streamingIngestRow.put(
quoteColumnName ? Utils.quoteNameIfNeeded(columnName) : columnName, columnValue);
}
// Thrown an exception if the input JsonNode is not in the expected format
if (streamingIngestRow.isEmpty()) {
throw SnowflakeErrors.ERROR_0010.getException(
"Not able to convert node to Snowpipe Streaming input format");
}
return streamingIngestRow;
private Map<String, Object> getMapForSchematization(JsonNode node) {
// we need to quote the keys on the first level of the map as they are column names in the table
// the rest must stay as is as the nested objects are not column names but fields name with case
// sensitivity
return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE).entrySet().stream()
.map(
entry ->
new AbstractMap.SimpleEntry<>(
Utils.quoteNameIfNeeded(entry.getKey()), entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException {
if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) {
return "NaN";
} else {
return mapper.writeValueAsString(columnNode);
}
private Map<String, Object> getMapForMetadata(JsonNode metadataNode) {
Map<String, Object> values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE);
// we don't want headers to be serialized as Map<String, Object> so we overwrite it as
// Map<String, String>
Map<String, String> headers =
mapper.convertValue(metadataNode.findValue(HEADERS), HEADERS_MAP_TYPE_REFERENCE);
values.put(HEADERS, headers);
return values;
}
}

0 comments on commit 6ef1ef7

Please sign in to comment.