Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix](arrow) Fix time zone issues and accuracy issues #38215

Merged
merged 4 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._output_vexpr_ctxs, *input_block, &block));
std::shared_ptr<arrow::Schema> block_arrow_schema;
// After expr executed, use recaculated schema as final schema
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema));
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone()));
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "result_sink_operator.h"

#include <sys/select.h>

#include <memory>
#include <utility>

Expand Down Expand Up @@ -81,7 +83,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
state->timezone()));
if (state->query_options().enable_parallel_result_sink) {
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema);
} else {
Expand Down
37 changes: 21 additions & 16 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace doris {

using strings::Substitute;

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result) {
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'convert_to_arrow_type' exceeds recommended size/complexity thresholds [readability-function-size]

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
       ^
Additional context

be/src/util/arrow/row_batch.cpp:48: 108 lines including whitespace and comments (threshold 80)

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
       ^

const std::string& timezone) {
switch (type.type) {
case TYPE_NULL:
*result = arrow::null();
Expand Down Expand Up @@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
break;
case TYPE_DATETIMEV2:
if (type.scale > 3) {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
} else if (type.scale > 0) {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI, timezone);
} else {
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND, timezone);
}
break;
case TYPE_DECIMALV2:
Expand All @@ -120,16 +121,16 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
case TYPE_ARRAY: {
DCHECK_EQ(type.children.size(), 1);
std::shared_ptr<arrow::DataType> item_type;
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type));
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type, timezone));
*result = std::make_shared<arrow::ListType>(item_type);
break;
}
case TYPE_MAP: {
DCHECK_EQ(type.children.size(), 2);
std::shared_ptr<arrow::DataType> key_type;
std::shared_ptr<arrow::DataType> val_type;
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type));
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type));
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type, timezone));
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type, timezone));
*result = std::make_shared<arrow::MapType>(key_type, val_type);
break;
}
Expand All @@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
std::vector<std::shared_ptr<arrow::Field>> fields;
for (size_t i = 0; i < type.children.size(); i++) {
std::shared_ptr<arrow::DataType> field_type;
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type));
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type, timezone));
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
type.contains_nulls[i]));
}
Expand All @@ -156,20 +157,22 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
return Status::OK();
}

Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field) {
Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field,
const std::string& timezone) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type));
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone));
*field = arrow::field(desc->col_name(), type, desc->is_nullable());
return Status::OK();
}

Status convert_block_arrow_schema(const vectorized::Block& block,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const auto& type_and_name : block) {
std::shared_ptr<arrow::DataType> arrow_type;
RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(),
&arrow_type));
&arrow_type, timezone));
fields.push_back(std::make_shared<arrow::Field>(type_and_name.name, arrow_type,
type_and_name.type->is_nullable()));
}
Expand All @@ -178,12 +181,13 @@ Status convert_block_arrow_schema(const vectorized::Block& block,
}

Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto tuple_desc : row_desc.tuple_descriptors()) {
for (auto desc : tuple_desc->slots()) {
std::shared_ptr<arrow::Field> field;
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field));
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone));
fields.push_back(field);
}
}
Expand All @@ -192,12 +196,13 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
}

Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
std::shared_ptr<arrow::Schema>* result) {
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> arrow_type;
auto root_expr = output_vexpr_ctxs.at(i)->root();
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone));
auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty()
? root_expr->expr_label()
: fmt::format("{}_{}", root_expr->data_type()->get_name(), i);
Expand Down
11 changes: 7 additions & 4 deletions be/src/util/arrow/row_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ namespace doris {

class RowDescriptor;

Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result);
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
const std::string& timezone);

// Convert Doris RowDescriptor to Arrow Schema.
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result, const std::string& timezone);

Status convert_block_arrow_schema(const vectorized::Block& block,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone);

Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
std::shared_ptr<arrow::Schema>* result);
std::shared_ptr<arrow::Schema>* result,
const std::string& timezone);

Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> type;
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type));
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
_state->timezone()));
if (_parquet_schemas != nullptr) {
std::shared_ptr<arrow::Field> field =
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
Expand Down
4 changes: 2 additions & 2 deletions be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() {
RowDescriptor row_desc(&tuple_desc, true);
// arrow schema
std::shared_ptr<arrow::Schema> _arrow_schema;
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());

// serialize
std::shared_ptr<arrow::RecordBatch> result;
Expand Down Expand Up @@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
RowDescriptor row_desc(&tuple_desc, true);
// arrow schema
std::shared_ptr<arrow::Schema> _arrow_schema;
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());

// serialize
std::shared_ptr<arrow::RecordBatch> result;
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/arrow_flight_sql_p0/test_select.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
-- !arrow_flight_sql --
777 4

-- !arrow_flight_sql_datetime --
333 plsql333 2024-07-21 12:00:00.123456 2024-07-21 12:00:00.0
222 plsql222 2024-07-20 12:00:00.123456 2024-07-20 12:00:00.0
111 plsql111 2024-07-19 12:00:00.123456 2024-07-19 12:00:00.0
2 changes: 1 addition & 1 deletion regression-test/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ under the License.
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.6.1</version>
<version>24.0.0</version>
</dependency>
<!-- flink end -->
<dependency>
Expand Down
12 changes: 12 additions & 0 deletions regression-test/suites/arrow_flight_sql_p0/test_select.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") {
sql """INSERT INTO ${tableName} VALUES(111, "plsql333")"""

qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}"

tableName = "test_select_datetime"
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
create table ${tableName} (id int, name varchar(20), f_datetime_p datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash (`id`) buckets 4
properties ("replication_num"="1");
"""
sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 12:00:00.123456","2024-07-19 12:00:00")"""
sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 12:00:00.123456","2024-07-20 12:00:00")"""
sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 12:00:00.123456","2024-07-21 12:00:00")"""

qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc"
}
Loading