diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 742d1809..319bd3c9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -81,6 +81,11 @@ public class RowBatch { private static final Logger logger = LoggerFactory.getLogger(RowBatch.class); private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); + private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) + .toFormatter(); + private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; private final DateTimeFormatter dateTimeFormatter = @@ -374,9 +379,7 @@ public void convertArrowToRowBatch() throws DorisException { continue; } String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); - value = completeMilliseconds(value); - LocalDateTime parse = LocalDateTime.parse(value, dateTimeV2Formatter); - addValueToRow(rowIndex, parse); + addValueToRow(rowIndex, value); } } else if (curFieldVector instanceof TimeStampVector) { TimeStampVector timeStampVector = (TimeStampVector) curFieldVector; @@ -388,7 +391,8 @@ public void convertArrowToRowBatch() throws DorisException { continue; } LocalDateTime dateTime = getDateTime(rowIndex, timeStampVector); - addValueToRow(rowIndex, dateTime); + String formatted = DATE_TIME_FORMATTER.format(dateTime); + addValueToRow(rowIndex, formatted); } } diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 0320cd86..f12014f6 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -273,7 +273,7 @@ public void testRowBatch() throws Exception { (float) 1.1, (double) 1.1, Date.valueOf("2008-08-08"), - LocalDateTime.of(2008, 8, 8, 0, 0, 0), + "2008-08-08 00:00:00", Decimal.apply(1234L, 4, 2), "char1" ); @@ -287,7 +287,7 @@ public void testRowBatch() throws Exception { (float) 2.2, (double) 2.2, Date.valueOf("1900-08-08"), - LocalDateTime.of(1900, 8, 8, 0, 0, 0), + "1900-08-08 00:00:00", Decimal.apply(8888L, 4, 2), "char2" ); @@ -301,7 +301,7 @@ public void testRowBatch() throws Exception { (float) 3.3, (double) 3.3, Date.valueOf("2100-08-08"), - LocalDateTime.of(2100, 8, 8, 0, 0, 0), + "2100-08-08 00:00:00", Decimal.apply(10L, 2, 0), "char3" ); @@ -832,16 +832,16 @@ public void testDateTime() throws IOException, DorisException { Assert.assertTrue(rowBatch.hasNext()); List actualRow0 = rowBatch.next(); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(0)); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(1)); + Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(0)); + Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(1)); List actualRow1 = rowBatch.next(); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), actualRow1.get(0)); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), actualRow1.get(1)); + Assert.assertEquals("2024-03-20 00:00:01", actualRow1.get(0)); + Assert.assertEquals("2024-03-20 00:00:00.123", actualRow1.get(1)); List actualRow2 = rowBatch.next(); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), actualRow2.get(0)); - Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), actualRow2.get(1)); + Assert.assertEquals("2024-03-20 00:00:02", actualRow2.get(0)); + Assert.assertEquals("2024-03-20 00:00:00.123456", actualRow2.get(1)); Assert.assertFalse(rowBatch.hasNext());