Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiwen-up authored Jun 12, 2024
2 parents 880bee4 + 78b05a1 commit 6347cc0
Show file tree
Hide file tree
Showing 234 changed files with 6,614 additions and 1,339 deletions.
4 changes: 2 additions & 2 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ github:
- COMPILE (DORIS_COMPILE)
- External Regression (Doris External Regression)
- FE UT (Doris FE UT)
#- BE UT (Doris BE UT)
#- P0 Regression (Doris Regression)
- BE UT (Doris BE UT)
- P0 Regression (Doris Regression)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
5 changes: 5 additions & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()

add_thirdparty(azure-core)
add_thirdparty(azure-identity)
add_thirdparty(azure-storage-blobs)
add_thirdparty(azure-storage-common)

add_thirdparty(minizip LIB64)
add_thirdparty(simdjson LIB64)
add_thirdparty(idn LIB64)
Expand Down
28 changes: 8 additions & 20 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/obj_storage_client.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
Expand Down Expand Up @@ -1378,23 +1379,8 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr

if (!existed_fs) {
// No such FS instance on BE
S3Conf s3_conf {
.bucket = param.s3_storage_param.bucket,
.prefix = param.s3_storage_param.root_path,
.client_conf = {
.endpoint = param.s3_storage_param.endpoint,
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
.use_virtual_addressing = !param.s3_storage_param.use_path_style,
}};
auto res = io::S3FileSystem::create(std::move(s3_conf), std::to_string(param.id));
auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
std::to_string(param.id));
if (!res.has_value()) {
st = std::move(res).error();
} else {
Expand All @@ -1403,10 +1389,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
fs = std::move(existed_fs);
Expand Down
15 changes: 3 additions & 12 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "gen_cpp/Types_types.h"
#include "gen_cpp/cloud.pb.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/obj_storage_client.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
Expand Down Expand Up @@ -825,17 +826,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
}

auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(),
S3Conf {
.bucket = obj_store.bucket(),
.prefix = obj_store.prefix(),
.client_conf {.endpoint = obj_store.endpoint(),
.region = obj_store.region(),
.ak = obj_store.ak(),
.sk = obj_store.sk()},
.sse_enabled = obj_store.sse_enabled(),
.provider = obj_store.provider(),
},
vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
};

Expand All @@ -853,7 +844,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
auto j = resp.mutable_storage_vault(i);
auto* j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}
Expand Down
13 changes: 6 additions & 7 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
}

// update rowset meta tablet schema if tablet schema updated
if (_context.tablet_schema->num_variant_columns() > 0) {
_rowset_meta->set_tablet_schema(_context.tablet_schema);
}
auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
: _context.tablet_schema;
_rowset_meta->set_tablet_schema(rowset_schema);

if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
Expand All @@ -115,10 +115,9 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
&rowset),
"rowset init failed when build new rowset");
RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
_rowset_meta, &rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
Expand Down
17 changes: 17 additions & 0 deletions be/src/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,20 @@ inline const std::string& Exception::to_string() const {
return Status::Error<false>(e.code(), e.to_string()); \
} \
} while (0)

#define ASSIGN_STATUS_IF_CATCH_EXCEPTION(stmt, status_) \
do { \
try { \
doris::enable_thread_catch_bad_alloc++; \
Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \
{ stmt; } \
} catch (const doris::Exception& e) { \
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \
status_ = Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
} else { \
status_ = e.to_status(); \
} \
} \
} while (0);
30 changes: 26 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <memory>
#include <mutex>
#include <ostream>
#include <utility>

#include "agent/be_exec_version_manager.h"
#include "common/logging.h"
Expand Down Expand Up @@ -1029,6 +1030,30 @@ Status IRuntimeFilter::publish(bool publish_local) {
return Status::OK();
}

class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<pipeline::Dependency> _dependency;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Base::_process_if_meet_error_status(status);
}

public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency)
: Base(req, callback), _dependency(std::move(dependency)) {}
};

Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
DCHECK(is_producer());

Expand Down Expand Up @@ -1069,10 +1094,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {

auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
auto closure =
AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request,
callback);
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down
Loading

0 comments on commit 6347cc0

Please sign in to comment.