From 7ad3363748e2aa8889e220d2d6f820201b82cc96 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Mon, 29 Jul 2024 14:44:39 +0800 Subject: [PATCH 1/3] 1 --- .../exec/memory_scratch_sink_operator.cpp | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 5 ++- be/src/util/arrow/row_batch.cpp | 37 +++++++++++-------- be/src/util/arrow/row_batch.h | 11 ++++-- be/src/vec/runtime/vparquet_transformer.cpp | 3 +- .../serde/data_type_serde_arrow_test.cpp | 4 +- .../data/arrow_flight_sql_p0/test_select.out | 4 ++ .../arrow_flight_sql_p0/test_select.groovy | 12 ++++++ 8 files changed, 53 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 69e30791c139af..8f721bb864515f 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -96,7 +96,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._output_vexpr_ctxs, *input_block, &block)); std::shared_ptr block_arrow_schema; // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema)); + RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone())); RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), &result, _timezone_obj)); local_state._queue->blocking_put(result); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 0608beaf522290..f04ace2e292595 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -17,6 +17,8 @@ #include "result_sink_operator.h" +#include + #include #include @@ -81,7 +83,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); + RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); if (state->query_options().enable_parallel_result_sink) { state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); } else { diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index cb4ccf15ba2d52..ba6f4adf6c655d 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -46,7 +46,8 @@ namespace doris { using strings::Substitute; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result) { +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone) { switch (type.type) { case TYPE_NULL: *result = arrow::null(); @@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr 3) { - *result = std::make_shared(arrow::TimeUnit::MICRO); + *result = std::make_shared(arrow::TimeUnit::MICRO, timezone); } else if (type.scale > 0) { - *result = std::make_shared(arrow::TimeUnit::MILLI); + *result = std::make_shared(arrow::TimeUnit::MILLI, timezone); } else { - *result = std::make_shared(arrow::TimeUnit::SECOND); + *result = std::make_shared(arrow::TimeUnit::SECOND, timezone); } break; case TYPE_DECIMALV2: @@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr item_type; - static_cast(convert_to_arrow_type(type.children[0], &item_type)); + static_cast(convert_to_arrow_type(type.children[0], &item_type, timezone)); *result = std::make_shared(item_type); break; } @@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr key_type; std::shared_ptr val_type; - static_cast(convert_to_arrow_type(type.children[0], &key_type)); - static_cast(convert_to_arrow_type(type.children[1], &val_type)); + static_cast(convert_to_arrow_type(type.children[0], &key_type, timezone)); + static_cast(convert_to_arrow_type(type.children[1], &val_type, timezone)); *result = std::make_shared(key_type, val_type); break; } @@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr> fields; for (size_t i = 0; i < type.children.size(); i++) { std::shared_ptr field_type; - static_cast(convert_to_arrow_type(type.children[i], &field_type)); + static_cast(convert_to_arrow_type(type.children[i], &field_type, timezone)); fields.push_back(std::make_shared(type.field_names[i], field_type, type.contains_nulls[i])); } @@ -156,20 +157,22 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* field) { +Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr* field, + const std::string& timezone) { std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type)); + RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone)); *field = arrow::field(desc->col_name(), type, desc->is_nullable()); return Status::OK(); } Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr* result) { + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(), - &arrow_type)); + &arrow_type, timezone)); fields.push_back(std::make_shared(type_and_name.name, arrow_type, type_and_name.type->is_nullable())); } @@ -178,12 +181,13 @@ Status convert_block_arrow_schema(const vectorized::Block& block, } Status convert_to_arrow_schema(const RowDescriptor& row_desc, - std::shared_ptr* result) { + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (auto tuple_desc : row_desc.tuple_descriptors()) { for (auto desc : tuple_desc->slots()) { std::shared_ptr field; - RETURN_IF_ERROR(convert_to_arrow_field(desc, &field)); + RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone)); fields.push_back(field); } } @@ -192,12 +196,13 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc, } Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr* result) { + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (int i = 0; i < output_vexpr_ctxs.size(); i++) { std::shared_ptr arrow_type; auto root_expr = output_vexpr_ctxs.at(i)->root(); - RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type)); + RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone)); auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty() ? root_expr->expr_label() : fmt::format("{}_{}", root_expr->data_type()->get_name(), i); diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index ddffc3324d3451..9a33719a1cfbcc 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -41,17 +41,20 @@ namespace doris { class RowDescriptor; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result); +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone); // Convert Doris RowDescriptor to Arrow Schema. Status convert_to_arrow_schema(const RowDescriptor& row_desc, - std::shared_ptr* result); + std::shared_ptr* result, const std::string& timezone); Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr* result); + std::shared_ptr* result, + const std::string& timezone); Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr* result); + std::shared_ptr* result, + const std::string& timezone); Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 1969858349f0e9..f0810d6c7ceead 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() { std::vector> fields; for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type)); + RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type, + _state->timezone())); if (_parquet_schemas != nullptr) { std::shared_ptr field = arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp index 4793ae8128aa9f..fc692b8f67569e 100644 --- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp @@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() { RowDescriptor row_desc(&tuple_desc, true); // arrow schema std::shared_ptr _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK()); + EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); // serialize std::shared_ptr result; @@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) { RowDescriptor row_desc(&tuple_desc, true); // arrow schema std::shared_ptr _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK()); + EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); // serialize std::shared_ptr result; diff --git a/regression-test/data/arrow_flight_sql_p0/test_select.out b/regression-test/data/arrow_flight_sql_p0/test_select.out index d643597bbafcb5..1d34f6e7dbd673 100644 --- a/regression-test/data/arrow_flight_sql_p0/test_select.out +++ b/regression-test/data/arrow_flight_sql_p0/test_select.out @@ -2,3 +2,7 @@ -- !arrow_flight_sql -- 777 4 +-- !arrow_flight_sql_datetime -- +333 plsql333 2024-07-21 04:00:00.123456 2024-07-21 04:00:00.0 +222 plsql222 2024-07-20 04:00:00.123456 2024-07-20 04:00:00.0 +111 plsql111 2024-07-19 04:00:00.123456 2024-07-19 04:00:00.0 \ No newline at end of file diff --git a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy index 55b3c301e244f4..950fb4af7e9034 100644 --- a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy +++ b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy @@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") { sql """INSERT INTO ${tableName} VALUES(111, "plsql333")""" qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}" + + tableName = "test_select_datetime" + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + create table ${tableName} (id int, name varchar(20), f_datetime_p datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash (`id`) buckets 4 + properties ("replication_num"="1"); + """ + sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 12:00:00.123456","2024-07-19 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 12:00:00.123456","2024-07-20 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 12:00:00.123456","2024-07-21 12:00:00")""" + + qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc" } From 876f019e8b2297b8e186b12d23c89641454cd1a8 Mon Sep 17 00:00:00 2001 From: gdliu3 Date: Mon, 29 Jul 2024 17:31:12 +0800 Subject: [PATCH 2/3] 1 --- regression-test/data/arrow_flight_sql_p0/test_select.out | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/regression-test/data/arrow_flight_sql_p0/test_select.out b/regression-test/data/arrow_flight_sql_p0/test_select.out index 1d34f6e7dbd673..f2f4b86bbf5ceb 100644 --- a/regression-test/data/arrow_flight_sql_p0/test_select.out +++ b/regression-test/data/arrow_flight_sql_p0/test_select.out @@ -3,6 +3,6 @@ 777 4 -- !arrow_flight_sql_datetime -- -333 plsql333 2024-07-21 04:00:00.123456 2024-07-21 04:00:00.0 -222 plsql222 2024-07-20 04:00:00.123456 2024-07-20 04:00:00.0 -111 plsql111 2024-07-19 04:00:00.123456 2024-07-19 04:00:00.0 \ No newline at end of file +333 plsql333 2024-07-21 12:00:00.123456 2024-07-21 12:00:00.0 +222 plsql222 2024-07-20 12:00:00.123456 2024-07-20 12:00:00.0 +111 plsql111 2024-07-19 12:00:00.123456 2024-07-19 12:00:00.0 \ No newline at end of file From 53b29bdb163a3b6dcc8131338c1927b9f7f985a3 Mon Sep 17 00:00:00 2001 From: liugddx Date: Tue, 24 Sep 2024 18:48:28 +0800 Subject: [PATCH 3/3] 1 --- regression-test/framework/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index aded781c08bb71..6b749bf0fd1dae 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -379,7 +379,7 @@ under the License. org.apache.doris flink-doris-connector-1.16 - 1.6.1 + 24.0.0