Skip to content

Commit

Permalink
feat: pikiwidb support multi-raft cluster (#442)
Browse files Browse the repository at this point in the history
* refactor: remove the singleton of PRaft
  • Loading branch information
longfar-ncy authored Oct 26, 2024
1 parent d08c598 commit b5045c7
Show file tree
Hide file tree
Showing 35 changed files with 1,397 additions and 307 deletions.
4 changes: 2 additions & 2 deletions etc/conf/pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 2

################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800
rocksdb-periodic-second 259200;

############################### RAFT ###############################
use-raft no
use-raft yes
# Braft relies on brpc to communicate via the default port number plus the port offset
raft-port-offset 10
41 changes: 41 additions & 0 deletions ppd/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "butil/errno.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "pd_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
PlacementDriverServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

// 启动服务
if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
}
93 changes: 93 additions & 0 deletions ppd/pd.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
syntax = "proto3";
package pikiwidb;
option cc_generic_services = true;

message Peer {
string group_id = 1;
int32 cluster_idx = 2;
};

message GetClusterInfoRequest {
};

message GetClusterInfoResponse {
bool success = 1;
repeated Store store = 2;
};

message Store {
int64 store_id = 1;
string ip = 2;
int32 port = 3;
StoreState state = 4;
repeated Region region = 5;
};

message Region {
int64 region_id = 1;
string start_key = 2;
string end_key = 3;
repeated RegionEpoch region_epoch = 4;
repeated Peer peers = 5;
};

message RegionEpoch {
int64 conf_change_ver = 1; // conf change version
int64 region_ver = 2; // region version (split or merge)
};

enum StoreState {
UP = 0;
OFFLINE = 1;
TOMBSTONE = 2;
};

message RegionOptions {
string start_key = 1;
string end_key = 2;
int32 max_data_size = 3;
};

message CreateAllRegionsRequest {
int64 regions_count = 1;
int32 region_peers_count = 2;
repeated RegionOptions regionOptions = 3;
};

message CreateAllRegionsResponse {
bool success = 1;
};

message DeleteAllRegionsRequest {
};

message DeleteAllRegionsResponse {
bool success = 1;
};

message AddStoreRequest {
string ip = 1;
int32 port = 2;
};

message AddStoreResponse {
bool success = 1;
optional int64 store_id = 2;
optional string redirect = 3;
};

message RemoveStoreRequest {
int64 store_id = 1;
};

message RemoveStoreResponse {
bool success = 1;
};

service PlacementDriverService {
rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse);
rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse);
rpc AddStore(AddStoreRequest) returns (AddStoreResponse);
rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
};
61 changes: 61 additions & 0 deletions ppd/pd_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "pd_service.h"

#include "pd_server.h"
#include "spdlog/spdlog.h"

namespace pikiwidb {
void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port());
if (!success) {
response->set_success(false);
return;
}

response->set_success(true);
response->set_store_id(store_id);
spdlog::info("add store success: {}", store_id);
}

void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller,
const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
PDSERVER.GetClusterInfo(response);
}

void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response,
::google::protobuf::Closure* done) {}
} // namespace pikiwidb
44 changes: 44 additions & 0 deletions ppd/pd_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#pragma once

#include "pd.pb.h"

namespace pikiwidb {

class PlacementDriverServiceImpl : public PlacementDriverService {
public:
PlacementDriverServiceImpl() = default;

void CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override;

void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override;

void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override;

void OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override;

void ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override;
};

} // namespace pikiwidb
39 changes: 39 additions & 0 deletions pproxy/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "proxy_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
ProxyServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
}
26 changes: 26 additions & 0 deletions pproxy/proxy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
syntax = "proto3";
package pikiwidb.proxy;
option cc_generic_services = true;

message RunCommandRequest {
string command = 1;
}

message RunCommandResponse {
string output = 1;
}
message GetRouteInfoRequest {
}
message GetRouteInfoResponse {
message RouteInfo {
string group_id = 1;
string endpoint = 2;
int32 role = 3;
}
repeated RouteInfo infos = 1;
}

service ProxyService {
rpc RunCommand(RunCommandRequest) returns (RunCommandResponse);
rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse);
}
65 changes: 65 additions & 0 deletions pproxy/proxy_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "proxy_service.h"

#include <array>
#include <memory>
#include <string>

namespace pikiwidb::proxy {
void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) {
std::string command = request->command(); // 检查命令是否在白名单中

if (!IsCommandAllowed(command)) {
response->set_error("Command not allowed");
done->Run();
return;
}

std::string output = ExecuteCommand(command);
if (output.empty()) {
response->set_error("Command execution failed");
} else {
response->set_output(output);
}
done->Run();
}

void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response,
::google::protobuf::Closure* done) {}

std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
if (!IsCommandAllowed(command)) {
return "Command not allowed";
}

std::array<char, 128> buffer;
std::string result;
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
return "Failed to execute command";
}

while (true) {
if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) {
if (feof(pipe.get())) {
break;
} else {
return "Error reading command output";
}
}
result += buffer.data();
}
return result;
}

} // namespace pikiwidb::proxy
Loading

0 comments on commit b5045c7

Please sign in to comment.