Skip to content

Commit

Permalink
Move schematizationEnabled
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 24, 2024
1 parent 47f1ddd commit 89b11e6
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ class IcebergTableStreamingRecordMapper extends StreamingRecordMapper {
private static final TypeReference<Map<String, Object>> OBJECTS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};

public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) {
super(objectMapper);
public IcebergTableStreamingRecordMapper(
ObjectMapper objectMapper, boolean schematizationEnabled) {
super(objectMapper, schematizationEnabled);
}

@Override
public Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeMetadata)
public Map<String, Object> processSnowflakeRecord(SnowflakeTableRow row, boolean includeMetadata)
throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();
for (JsonNode node : row.getContent().getData()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class RecordService {
private static final String KEY_SCHEMA_ID = "key_schema_id";
static final String HEADERS = "headers";

private final boolean enableSchematization;

private final StreamingRecordMapper streamingRecordMapper;

// For each task, we require a separate instance of SimpleDataFormat, since they are not
Expand All @@ -95,23 +93,15 @@ public class RecordService {
// This class is designed to work with empty metadata config map
private SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig();

RecordService(
Clock clock,
boolean enableSchematization,
StreamingRecordMapper streamingRecordMapper,
ObjectMapper mapper) {
RecordService(Clock clock, StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) {
this.clock = clock;
this.enableSchematization = enableSchematization;
this.streamingRecordMapper = streamingRecordMapper;
this.mapper = mapper;
}

/** Creates a record service with a UTC {@link Clock}. */
RecordService(
boolean enableSchematization,
StreamingRecordMapper streamingRecordMapper,
ObjectMapper mapper) {
this(Clock.systemUTC(), enableSchematization, streamingRecordMapper, mapper);
RecordService(StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) {
this(Clock.systemUTC(), streamingRecordMapper, mapper);
}

public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) {
Expand Down Expand Up @@ -219,8 +209,7 @@ public Map<String, Object> getProcessedRecordForStreamingIngest(SinkRecord recor
throws JsonProcessingException {
SnowflakeTableRow row = processRecord(record, clock.instant());

return streamingRecordMapper.processSnowflakeRecord(
row, enableSchematization, metadataConfig.allFlag);
return streamingRecordMapper.processSnowflakeRecord(row, metadataConfig.allFlag);
}

/** For now there are two columns one is content and other is metadata. Both are Json */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ public static RecordService createRecordService(
ObjectMapper objectMapper = new ObjectMapper();
if (isIcebergEnabled) {
return new RecordService(
enableSchematization, new IcebergTableStreamingRecordMapper(objectMapper), objectMapper);
new IcebergTableStreamingRecordMapper(objectMapper, enableSchematization), objectMapper);
} else {
return new RecordService(
enableSchematization,
new SnowflakeTableStreamingRecordMapper(objectMapper),
new SnowflakeTableStreamingRecordMapper(objectMapper, enableSchematization),
objectMapper);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

class SnowflakeTableStreamingRecordMapper extends StreamingRecordMapper {

public SnowflakeTableStreamingRecordMapper(ObjectMapper mapper) {
super(mapper);
public SnowflakeTableStreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) {
super(mapper, schematizationEnabled);
}

@Override
public Map<String, Object> processSnowflakeRecord(
RecordService.SnowflakeTableRow row,
boolean schematizationEnabled,
boolean includeAllMetadata)
RecordService.SnowflakeTableRow row, boolean includeAllMetadata)
throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();
for (JsonNode node : row.getContent().getData()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
abstract class StreamingRecordMapper {

protected final ObjectMapper mapper;
protected final boolean schematizationEnabled;

public StreamingRecordMapper(ObjectMapper mapper) {
public StreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) {
this.mapper = mapper;
this.schematizationEnabled = schematizationEnabled;
}

abstract Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata)
throws JsonProcessingException;
SnowflakeTableRow row, boolean includeAllMetadata) throws JsonProcessingException;

protected String getTextualValue(JsonNode valueNode) throws JsonProcessingException {
String value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.junit.jupiter.params.provider.MethodSource;

class IcebergTableStreamingRecordMapperTest {
private final IcebergTableStreamingRecordMapper mapper =
new IcebergTableStreamingRecordMapper(objectMapper);
private static final ObjectMapper objectMapper = new ObjectMapper();

private static final ImmutableMap<String, Object> primitiveJsonAsMap =
Expand Down Expand Up @@ -95,7 +93,9 @@ void shouldMapRecord_schematizationEnabled(
String description, SnowflakeTableRow row, Map<String, Object> expected)
throws JsonProcessingException {
// When
Map<String, Object> result = mapper.processSnowflakeRecord(row, true, true);
IcebergTableStreamingRecordMapper mapper =
new IcebergTableStreamingRecordMapper(objectMapper, true);
Map<String, Object> result = mapper.processSnowflakeRecord(row, true);

// Then
assertThat(result).isEqualTo(expected);
Expand All @@ -106,8 +106,12 @@ void shouldMapRecord_schematizationEnabled(
void shouldMapMetadata(String description, SnowflakeTableRow row, Map<String, Object> expected)
throws JsonProcessingException {
// When
Map<String, Object> result = mapper.processSnowflakeRecord(row, false, true);
Map<String, Object> resultSchematized = mapper.processSnowflakeRecord(row, true, true);
IcebergTableStreamingRecordMapper mapper =
new IcebergTableStreamingRecordMapper(objectMapper, false);
IcebergTableStreamingRecordMapper mapperSchematization =
new IcebergTableStreamingRecordMapper(objectMapper, true);
Map<String, Object> result = mapper.processSnowflakeRecord(row, true);
Map<String, Object> resultSchematized = mapperSchematization.processSnowflakeRecord(row, true);

// Then
assertThat(result.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected);
Expand All @@ -120,8 +124,12 @@ void shouldSkipMapMetadata() throws JsonProcessingException {
SnowflakeTableRow row = buildRow(primitiveJsonExample);

// When
Map<String, Object> result = mapper.processSnowflakeRecord(row, false, false);
Map<String, Object> resultSchematized = mapper.processSnowflakeRecord(row, true, false);
IcebergTableStreamingRecordMapper mapper =
new IcebergTableStreamingRecordMapper(objectMapper, false);
IcebergTableStreamingRecordMapper mapperSchematization =
new IcebergTableStreamingRecordMapper(objectMapper, true);
Map<String, Object> result = mapper.processSnowflakeRecord(row, false);
Map<String, Object> resultSchematized = mapperSchematization.processSnowflakeRecord(row, false);

// Then
assertThat(result).doesNotContainKey(Utils.TABLE_COLUMN_METADATA);
Expand All @@ -134,7 +142,9 @@ void shouldMapRecord_schematizationDisabled(
String description, SnowflakeTableRow row, Map<String, Object> expected)
throws JsonProcessingException {
// When
Map<String, Object> result = mapper.processSnowflakeRecord(row, false, true);
IcebergTableStreamingRecordMapper mapper =
new IcebergTableStreamingRecordMapper(objectMapper, false);
Map<String, Object> result = mapper.processSnowflakeRecord(row, true);

// Then
assertThat(result).isEqualTo(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void connectorTimestamp_byDefault_writes() throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
RecordService service =
new RecordService(
fixedClock, false, new SnowflakeTableStreamingRecordMapper(mapper), mapper);
fixedClock, new SnowflakeTableStreamingRecordMapper(mapper, false), mapper);

// when
Map<String, Object> recordData = service.getProcessedRecordForStreamingIngest(record);
Expand Down

0 comments on commit 89b11e6

Please sign in to comment.