From e93beda199db687b4720ccffe04294f2e429c645 Mon Sep 17 00:00:00 2001 From: Andy Nogueira Date: Tue, 1 Aug 2023 07:54:30 -0400 Subject: [PATCH] Add gRPC block service (#1142) * initial logic for the gRPC block service (#1094) * initial logic for gRPC block client and test (#1094) * block response and request (#1094) * add configuration for block service (#1094) * use pointer for request parameter (#1094) * change block service GetBlock response (#1094) * return block information (#1094) * convert core type to proto type (#1094) * hookup server (#1094) * hooking directly to the BlockStore instead of Environment (#1094) * changing client return type, use core type (#1094) * convert from proto to core type (#1094) * implemented proper logic to test block service (#1094) * return the latest height if height is 0 (#1094) * adding proper grpc error handling and return (#1094) * rename rpc and message in proto to match ADR-101 spec (#1094) * renaming service and client methods structs to match ADR (#1094) * additional error handling (#1094) * rename get block request and response (#1094) * update method name in the client (#1094) * proto entries for GetLatestHeight logic (#1094) * added logic for streaming new blocks as part of GetLatestHeight (#1094) * ensure subcribers have their own unique id (#1094) * client logic to use a channel parameter (#1094) * added test for GetLatestHeight (#1094) * better streaming test to prevent error (#1094) * remove empty line Co-authored-by: Thane Thomson * removing UNARY_RPC to prevent lint on server streaming (#1094) * remove server reflection, doesn't work well with gogoproto (#1094) * handle negative height parameter (#1094) * remove `Block` from `GetBlockLatestHeight` Co-authored-by: Thane Thomson * changed method name and fixes to test (#1094) * fixes from PR feedback (#1094) * remove vars Co-authored-by: Thane Thomson * fix declaration Co-authored-by: Thane Thomson * simplify error return Co-authored-by: Thane Thomson * simplest error return Co-authored-by: Thane Thomson * remove line Co-authored-by: Thane Thomson * remove var declaration, not needed Co-authored-by: Thane Thomson * use simplest error return Co-authored-by: Thane Thomson * simplest error return Co-authored-by: Thane Thomson * simplest error return Co-authored-by: Thane Thomson * simplify error handling Co-authored-by: Thane Thomson * simplified the logic for subscriber id (#1094) * more fixs based on PR feedback (#1094) * generated new protos (#1094) * remove else logic to allow compilation (#1094) * added logging capabilities to the block service and added log messages (#1094) * ensure node is a valid testing node (#1094) * added more conditions to handle subscription cancel (#1094) * use break in the for loop (#1094) * renamed ResultBlock to Block (#1094) * refactored the client logic use a channel for errors (#1094) * simplify subscriber name Co-authored-by: Thane Thomson * adding error to log Co-authored-by: Thane Thomson * update logging Co-authored-by: Thane Thomson * Simplified and improved logging (#1094) * added logic to drop the height publish if channel full (#0194) * improved test to ensure is not light or seed node (#1094) * refactored block service to use one channel with result type (#1094) * blockservice: Apply service name to all logs Signed-off-by: Thane Thomson * blockservice: Capitalize start of all log messages Signed-off-by: Thane Thomson * blockservice: Remove unnecessary logs Signed-off-by: Thane Thomson * blockservice: Add trivial RPC error response tracing mechanism for easier debugging Signed-off-by: Thane Thomson * blockservice: Simplify constructor Signed-off-by: Thane Thomson * blockservice: Extract type assertion Signed-off-by: Thane Thomson * Add changelog entries Signed-off-by: Thane Thomson * docs: Update configuration-related content Signed-off-by: Thane Thomson * grpc/client: Format Signed-off-by: Thane Thomson * grpc: Extract function to validate or update block height from request Signed-off-by: Thane Thomson --------- Signed-off-by: Thane Thomson Co-authored-by: Thane Thomson --- .../features/1094-grpc-block-service-cfg.md | 2 + .../features/1094-grpc-block-service.md | 3 + config/config.go | 21 + config/toml.go | 4 + docs/core/configuration.md | 49 +- internal/rpctrace/rpctrace.go | 13 + node/node.go | 3 + proto/buf.yaml | 1 - .../tendermint/services/block/v1/block.pb.go | 826 ++++++++++++++++++ .../tendermint/services/block/v1/block.proto | 28 + .../services/block/v1/block_service.pb.go | 205 +++++ .../services/block/v1/block_service.proto | 19 + rpc/grpc/client/block_service.go | 122 +++ rpc/grpc/client/client.go | 20 + rpc/grpc/server/server.go | 16 + .../server/services/blockservice/service.go | 129 +++ test/e2e/tests/grpc_test.go | 102 +++ 17 files changed, 1555 insertions(+), 8 deletions(-) create mode 100644 .changelog/unreleased/features/1094-grpc-block-service-cfg.md create mode 100644 .changelog/unreleased/features/1094-grpc-block-service.md create mode 100644 internal/rpctrace/rpctrace.go create mode 100644 proto/tendermint/services/block/v1/block.pb.go create mode 100644 proto/tendermint/services/block/v1/block.proto create mode 100644 proto/tendermint/services/block/v1/block_service.pb.go create mode 100644 proto/tendermint/services/block/v1/block_service.proto create mode 100644 rpc/grpc/client/block_service.go create mode 100644 rpc/grpc/server/services/blockservice/service.go diff --git a/.changelog/unreleased/features/1094-grpc-block-service-cfg.md b/.changelog/unreleased/features/1094-grpc-block-service-cfg.md new file mode 100644 index 0000000000..f0892b3a14 --- /dev/null +++ b/.changelog/unreleased/features/1094-grpc-block-service-cfg.md @@ -0,0 +1,2 @@ +- `[config]` Add `[grpc.block_service]` section to configure gRPC `BlockService` + ([\#1094](https://github.com/cometbft/cometbft/issues/1094)) \ No newline at end of file diff --git a/.changelog/unreleased/features/1094-grpc-block-service.md b/.changelog/unreleased/features/1094-grpc-block-service.md new file mode 100644 index 0000000000..543e1d6d1e --- /dev/null +++ b/.changelog/unreleased/features/1094-grpc-block-service.md @@ -0,0 +1,3 @@ +- `[grpc]` Add `BlockService` with client to facilitate fetching of blocks and + streaming of the latest committed block height + ([\#1094](https://github.com/cometbft/cometbft/issues/1094)) diff --git a/config/config.go b/config/config.go index c70d14e8eb..cdb67dacb2 100644 --- a/config/config.go +++ b/config/config.go @@ -561,12 +561,16 @@ type GRPCConfig struct { // The gRPC version service provides version information about the node and // the protocols it uses. VersionService *GRPCVersionServiceConfig `mapstructure:"version_service"` + + // The gRPC block service provides block information + BlockService *GRPCBlockServiceConfig `mapstructure:"block_service"` } func DefaultGRPCConfig() *GRPCConfig { return &GRPCConfig{ ListenAddress: "", VersionService: DefaultGRPCVersionServiceConfig(), + BlockService: DefaultGRPCBlockServiceConfig(), } } @@ -574,6 +578,7 @@ func TestGRPCConfig() *GRPCConfig { return &GRPCConfig{ ListenAddress: "tcp://127.0.0.1:36670", VersionService: TestGRPCVersionServiceConfig(), + BlockService: TestGRPCBlockServiceConfig(), } } @@ -606,6 +611,22 @@ func TestGRPCVersionServiceConfig() *GRPCVersionServiceConfig { } } +type GRPCBlockServiceConfig struct { + Enabled bool `mapstructure:"enabled"` +} + +func DefaultGRPCBlockServiceConfig() *GRPCBlockServiceConfig { + return &GRPCBlockServiceConfig{ + Enabled: true, + } +} + +func TestGRPCBlockServiceConfig() *GRPCBlockServiceConfig { + return &GRPCBlockServiceConfig{ + Enabled: true, + } +} + //----------------------------------------------------------------------------- // P2PConfig diff --git a/config/toml.go b/config/toml.go index af2441ba08..ec65926b58 100644 --- a/config/toml.go +++ b/config/toml.go @@ -296,6 +296,10 @@ laddr = "{{ .GRPC.ListenAddress }}" [grpc.version_service] enabled = {{ .GRPC.VersionService.Enabled }} +# The gRPC block service returns block information +[grpc.block_service] +enabled = {{ .GRPC.BlockService.Enabled }} + ####################################################### ### P2P Configuration Options ### ####################################################### diff --git a/docs/core/configuration.md b/docs/core/configuration.md index 0fc008e5c3..ccc0e036aa 100644 --- a/docs/core/configuration.md +++ b/docs/core/configuration.md @@ -34,13 +34,13 @@ like the file below, however, double check by inspecting the proxy_app = "tcp://127.0.0.1:26658" # A custom human readable name for this node -moniker = "anonymous" +moniker = "thinkpad" # If this node is many blocks behind the tip of the chain, BlockSync # allows them to catchup quickly by downloading blocks in parallel # and verifying their commits # -# Deprecated: this key will be removed and BlockSync will be enabled +# Deprecated: this key will be removed and BlockSync will be enabled # unconditionally in the next major release. block_sync = true @@ -148,14 +148,14 @@ unsafe = false # 1024 - 40 - 10 - 50 = 924 = ~900 max_open_connections = 900 -# Maximum number of unique clientIDs that can /subscribe +# Maximum number of unique clientIDs that can /subscribe. # If you're using /broadcast_tx_commit, set to the estimated maximum number # of broadcast_tx_commit calls per block. max_subscription_clients = 100 -# Maximum number of unique queries a given client can /subscribe to -# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to -# the estimated # maximum number of broadcast_tx_commit calls per block. +# Maximum number of unique queries a given client can /subscribe to. +# If you're using /broadcast_tx_commit, set to the estimated maximum number +# of broadcast_tx_commit calls per block. max_subscriptions_per_client = 5 # Experimental parameter to specify the maximum number of events a node will @@ -215,6 +215,40 @@ tls_key_file = "" # pprof listen address (https://golang.org/pkg/net/http/pprof) pprof_laddr = "" +####################################################### +### gRPC Server Configuration Options ### +####################################################### + +# +# Note that the gRPC server is exposed unauthenticated. It is critical that +# this server not be exposed directly to the public internet. If this service +# must be accessed via the public internet, please ensure that appropriate +# precautions are taken (e.g. fronting with a reverse proxy like nginx with TLS +# termination and authentication, using DDoS protection services like +# CloudFlare, etc.). +# + +[grpc] + +# TCP or UNIX socket address for the RPC server to listen on. If not specified, +# the gRPC server will be disabled. +laddr = "" + +# +# Each gRPC service can be turned on/off, and in some cases configured, +# individually. If the gRPC server is not enabled, all individual services' +# configurations are ignored. +# + +# The gRPC version service provides version information about the node and the +# protocols it uses. +[grpc.version_service] +enabled = true + +# The gRPC block service returns block information +[grpc.block_service] +enabled = true + ####################################################### ### P2P Configuration Options ### ####################################################### @@ -386,7 +420,7 @@ chunk_fetchers = "4" [blocksync] # Block Sync version to use: -# +# # In v0.37, v1 and v2 of the block sync protocols were deprecated. # Please use v0 instead. # @@ -432,6 +466,7 @@ create_empty_blocks_interval = "0s" # Reactor sleep duration parameters peer_gossip_sleep_duration = "100ms" +peer_gossip_intraloop_sleep_duration = "0s" peer_query_maj23_sleep_duration = "2s" ####################################################### diff --git a/internal/rpctrace/rpctrace.go b/internal/rpctrace/rpctrace.go new file mode 100644 index 0000000000..b24c91f488 --- /dev/null +++ b/internal/rpctrace/rpctrace.go @@ -0,0 +1,13 @@ +package rpctrace + +import "github.com/gofrs/uuid" + +// New returns a randomly generated string which can be used to assist in +// tracing RPC errors. +func New() (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + return id.String(), nil +} diff --git a/node/node.go b/node/node.go index 4c119fec7d..9a8cc347b1 100644 --- a/node/node.go +++ b/node/node.go @@ -1194,6 +1194,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { if n.config.GRPC.VersionService.Enabled { opts = append(opts, grpcserver.WithVersionService()) } + if n.config.GRPC.BlockService.Enabled { + opts = append(opts, grpcserver.WithBlockService(n.blockStore, n.eventBus, n.Logger)) + } go func() { if err := grpcserver.Serve(listener, opts...); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) diff --git a/proto/buf.yaml b/proto/buf.yaml index c6e0660f14..6b8fa247af 100644 --- a/proto/buf.yaml +++ b/proto/buf.yaml @@ -8,4 +8,3 @@ lint: use: - BASIC - FILE_LOWER_SNAKE_CASE - - UNARY_RPC diff --git a/proto/tendermint/services/block/v1/block.pb.go b/proto/tendermint/services/block/v1/block.pb.go new file mode 100644 index 0000000000..b000f21716 --- /dev/null +++ b/proto/tendermint/services/block/v1/block.pb.go @@ -0,0 +1,826 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: tendermint/services/block/v1/block.proto + +package v1 + +import ( + fmt "fmt" + types "github.com/cometbft/cometbft/proto/tendermint/types" + proto "github.com/cosmos/gogoproto/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type GetByHeightRequest struct { + // The height of the block requested. If set to 0, the latest height will be returned. + Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` +} + +func (m *GetByHeightRequest) Reset() { *m = GetByHeightRequest{} } +func (m *GetByHeightRequest) String() string { return proto.CompactTextString(m) } +func (*GetByHeightRequest) ProtoMessage() {} +func (*GetByHeightRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_d48acf20d1015667, []int{0} +} +func (m *GetByHeightRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetByHeightRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetByHeightRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetByHeightRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetByHeightRequest.Merge(m, src) +} +func (m *GetByHeightRequest) XXX_Size() int { + return m.Size() +} +func (m *GetByHeightRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetByHeightRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetByHeightRequest proto.InternalMessageInfo + +func (m *GetByHeightRequest) GetHeight() int64 { + if m != nil { + return m.Height + } + return 0 +} + +type GetByHeightResponse struct { + BlockId *types.BlockID `protobuf:"bytes,1,opt,name=block_id,json=blockId,proto3" json:"block_id,omitempty"` + Block *types.Block `protobuf:"bytes,2,opt,name=block,proto3" json:"block,omitempty"` +} + +func (m *GetByHeightResponse) Reset() { *m = GetByHeightResponse{} } +func (m *GetByHeightResponse) String() string { return proto.CompactTextString(m) } +func (*GetByHeightResponse) ProtoMessage() {} +func (*GetByHeightResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_d48acf20d1015667, []int{1} +} +func (m *GetByHeightResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetByHeightResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetByHeightResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetByHeightResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetByHeightResponse.Merge(m, src) +} +func (m *GetByHeightResponse) XXX_Size() int { + return m.Size() +} +func (m *GetByHeightResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetByHeightResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetByHeightResponse proto.InternalMessageInfo + +func (m *GetByHeightResponse) GetBlockId() *types.BlockID { + if m != nil { + return m.BlockId + } + return nil +} + +func (m *GetByHeightResponse) GetBlock() *types.Block { + if m != nil { + return m.Block + } + return nil +} + +// GetLatestHeightRequest - empty message since no parameter is required +type GetLatestHeightRequest struct { +} + +func (m *GetLatestHeightRequest) Reset() { *m = GetLatestHeightRequest{} } +func (m *GetLatestHeightRequest) String() string { return proto.CompactTextString(m) } +func (*GetLatestHeightRequest) ProtoMessage() {} +func (*GetLatestHeightRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_d48acf20d1015667, []int{2} +} +func (m *GetLatestHeightRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetLatestHeightRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetLatestHeightRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetLatestHeightRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetLatestHeightRequest.Merge(m, src) +} +func (m *GetLatestHeightRequest) XXX_Size() int { + return m.Size() +} +func (m *GetLatestHeightRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetLatestHeightRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetLatestHeightRequest proto.InternalMessageInfo + +// GetLatestHeightResponse provides the height of the latest committed block. +type GetLatestHeightResponse struct { + // The height of the latest committed block. Will be 0 if no data has been + // committed yet. + Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` +} + +func (m *GetLatestHeightResponse) Reset() { *m = GetLatestHeightResponse{} } +func (m *GetLatestHeightResponse) String() string { return proto.CompactTextString(m) } +func (*GetLatestHeightResponse) ProtoMessage() {} +func (*GetLatestHeightResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_d48acf20d1015667, []int{3} +} +func (m *GetLatestHeightResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetLatestHeightResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetLatestHeightResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetLatestHeightResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetLatestHeightResponse.Merge(m, src) +} +func (m *GetLatestHeightResponse) XXX_Size() int { + return m.Size() +} +func (m *GetLatestHeightResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetLatestHeightResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetLatestHeightResponse proto.InternalMessageInfo + +func (m *GetLatestHeightResponse) GetHeight() int64 { + if m != nil { + return m.Height + } + return 0 +} + +func init() { + proto.RegisterType((*GetByHeightRequest)(nil), "tendermint.services.block.v1.GetByHeightRequest") + proto.RegisterType((*GetByHeightResponse)(nil), "tendermint.services.block.v1.GetByHeightResponse") + proto.RegisterType((*GetLatestHeightRequest)(nil), "tendermint.services.block.v1.GetLatestHeightRequest") + proto.RegisterType((*GetLatestHeightResponse)(nil), "tendermint.services.block.v1.GetLatestHeightResponse") +} + +func init() { + proto.RegisterFile("tendermint/services/block/v1/block.proto", fileDescriptor_d48acf20d1015667) +} + +var fileDescriptor_d48acf20d1015667 = []byte{ + // 274 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x28, 0x49, 0xcd, 0x4b, + 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x2d, 0xd6, + 0x4f, 0xca, 0xc9, 0x4f, 0xce, 0xd6, 0x2f, 0x33, 0x84, 0x30, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, + 0x85, 0x64, 0x10, 0x2a, 0xf5, 0x60, 0x2a, 0xf5, 0x20, 0x0a, 0xca, 0x0c, 0xa5, 0x90, 0x64, 0xf5, + 0x4b, 0x2a, 0x0b, 0x60, 0x86, 0x40, 0xf4, 0x62, 0x91, 0x05, 0x93, 0x10, 0x59, 0x25, 0x1d, 0x2e, + 0x21, 0xf7, 0xd4, 0x12, 0xa7, 0x4a, 0x8f, 0xd4, 0xcc, 0xf4, 0x8c, 0x92, 0xa0, 0xd4, 0xc2, 0xd2, + 0xd4, 0xe2, 0x12, 0x21, 0x31, 0x2e, 0xb6, 0x0c, 0xb0, 0x80, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x73, + 0x10, 0x94, 0xa7, 0x54, 0xc5, 0x25, 0x8c, 0xa2, 0xba, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0xc8, + 0x84, 0x8b, 0x03, 0x6c, 0x63, 0x7c, 0x66, 0x0a, 0x58, 0x03, 0xb7, 0x91, 0xa4, 0x1e, 0x92, 0x8b, + 0x21, 0xf6, 0x39, 0x81, 0x54, 0x78, 0xba, 0x04, 0xb1, 0x83, 0x95, 0x7a, 0xa6, 0x08, 0xe9, 0x72, + 0xb1, 0x82, 0x99, 0x12, 0x4c, 0x60, 0x2d, 0xe2, 0x38, 0xb4, 0x04, 0x41, 0x54, 0x29, 0x49, 0x70, + 0x89, 0xb9, 0xa7, 0x96, 0xf8, 0x24, 0x96, 0xa4, 0x16, 0x97, 0xa0, 0xb8, 0x56, 0xc9, 0x90, 0x4b, + 0x1c, 0x43, 0x06, 0xea, 0x32, 0x1c, 0x1e, 0x71, 0x8a, 0x3c, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, + 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, + 0x63, 0x39, 0x86, 0x28, 0xfb, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, + 0xe4, 0xfc, 0xdc, 0xd4, 0x92, 0xa4, 0xb4, 0x12, 0x04, 0x03, 0x1c, 0x68, 0xfa, 0xf8, 0xe2, 0x2d, + 0x89, 0x0d, 0xac, 0xc6, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xb2, 0xf1, 0x4b, 0x36, 0xde, 0x01, + 0x00, 0x00, +} + +func (m *GetByHeightRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetByHeightRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetByHeightRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Height != 0 { + i = encodeVarintBlock(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *GetByHeightResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetByHeightResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetByHeightResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Block != nil { + { + size, err := m.Block.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlock(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.BlockId != nil { + { + size, err := m.BlockId.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintBlock(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *GetLatestHeightRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetLatestHeightRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetLatestHeightRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetLatestHeightResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetLatestHeightResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetLatestHeightResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Height != 0 { + i = encodeVarintBlock(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintBlock(dAtA []byte, offset int, v uint64) int { + offset -= sovBlock(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *GetByHeightRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Height != 0 { + n += 1 + sovBlock(uint64(m.Height)) + } + return n +} + +func (m *GetByHeightResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.BlockId != nil { + l = m.BlockId.Size() + n += 1 + l + sovBlock(uint64(l)) + } + if m.Block != nil { + l = m.Block.Size() + n += 1 + l + sovBlock(uint64(l)) + } + return n +} + +func (m *GetLatestHeightRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetLatestHeightResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Height != 0 { + n += 1 + sovBlock(uint64(m.Height)) + } + return n +} + +func sovBlock(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozBlock(x uint64) (n int) { + return sovBlock(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *GetByHeightRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetByHeightRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetByHeightRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipBlock(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthBlock + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetByHeightResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetByHeightResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetByHeightResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BlockId", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlock + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlock + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.BlockId == nil { + m.BlockId = &types.BlockID{} + } + if err := m.BlockId.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Block", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBlock + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthBlock + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Block == nil { + m.Block = &types.Block{} + } + if err := m.Block.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBlock(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthBlock + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetLatestHeightRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetLatestHeightRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetLatestHeightRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipBlock(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthBlock + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetLatestHeightResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetLatestHeightResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetLatestHeightResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBlock + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipBlock(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthBlock + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipBlock(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlock + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlock + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBlock + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthBlock + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupBlock + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthBlock + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthBlock = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowBlock = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupBlock = fmt.Errorf("proto: unexpected end of group") +) diff --git a/proto/tendermint/services/block/v1/block.proto b/proto/tendermint/services/block/v1/block.proto new file mode 100644 index 0000000000..d193a0c0d7 --- /dev/null +++ b/proto/tendermint/services/block/v1/block.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; +package tendermint.services.block.v1; + +import "tendermint/types/block.proto"; +import "tendermint/types/types.proto"; + +option go_package = "github.com/cometbft/cometbft/proto/tendermint/services/block/v1"; + +message GetByHeightRequest { + // The height of the block requested. If set to 0, the latest height will be returned. + int64 height = 1; +} + +message GetByHeightResponse { + tendermint.types.BlockID block_id = 1; + tendermint.types.Block block = 2; +} + +// GetLatestHeightRequest - empty message since no parameter is required +message GetLatestHeightRequest { +} + +// GetLatestHeightResponse provides the height of the latest committed block. +message GetLatestHeightResponse { + // The height of the latest committed block. Will be 0 if no data has been + // committed yet. + int64 height = 1; +} diff --git a/proto/tendermint/services/block/v1/block_service.pb.go b/proto/tendermint/services/block/v1/block_service.pb.go new file mode 100644 index 0000000000..d9fa29f977 --- /dev/null +++ b/proto/tendermint/services/block/v1/block_service.pb.go @@ -0,0 +1,205 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: tendermint/services/block/v1/block_service.proto + +package v1 + +import ( + context "context" + fmt "fmt" + grpc1 "github.com/cosmos/gogoproto/grpc" + proto "github.com/cosmos/gogoproto/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +func init() { + proto.RegisterFile("tendermint/services/block/v1/block_service.proto", fileDescriptor_1488dadaa3ae41e3) +} + +var fileDescriptor_1488dadaa3ae41e3 = []byte{ + // 228 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x28, 0x49, 0xcd, 0x4b, + 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x2d, 0xd6, + 0x4f, 0xca, 0xc9, 0x4f, 0xce, 0xd6, 0x2f, 0x33, 0x84, 0x30, 0xe2, 0xa1, 0xe2, 0x7a, 0x05, 0x45, + 0xf9, 0x25, 0xf9, 0x42, 0x32, 0x08, 0x1d, 0x7a, 0x30, 0x1d, 0x7a, 0x60, 0x85, 0x7a, 0x65, 0x86, + 0x52, 0x1a, 0x84, 0xcd, 0x83, 0x98, 0x63, 0xd4, 0xca, 0xc4, 0xc5, 0xe3, 0x04, 0xe2, 0x07, 0x43, + 0x94, 0x09, 0x15, 0x71, 0x71, 0xbb, 0xa7, 0x96, 0x38, 0x55, 0x7a, 0xa4, 0x66, 0xa6, 0x67, 0x94, + 0x08, 0x19, 0xe8, 0xe1, 0xb3, 0x48, 0x0f, 0x49, 0x69, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, + 0x94, 0x21, 0x09, 0x3a, 0x8a, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x1a, 0x18, 0xb9, 0xf8, 0xdd, + 0x53, 0x4b, 0x7c, 0x12, 0x4b, 0x52, 0x8b, 0x4b, 0xa0, 0x16, 0x9b, 0x10, 0x34, 0x06, 0x59, 0x39, + 0xcc, 0x72, 0x53, 0x12, 0x75, 0x41, 0x1c, 0x60, 0xc0, 0xe8, 0x14, 0x79, 0xe2, 0x91, 0x1c, 0xe3, + 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, + 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0xf6, 0xe9, 0x99, 0x25, 0x19, 0xa5, 0x49, 0x7a, 0xc9, 0xf9, + 0xb9, 0xfa, 0xc9, 0xf9, 0xb9, 0xa9, 0x25, 0x49, 0x69, 0x25, 0x08, 0x06, 0x38, 0x14, 0xf5, 0xf1, + 0x05, 0x77, 0x12, 0x1b, 0x58, 0x8d, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xc3, 0xcd, 0x9b, 0x9a, + 0xe5, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BlockServiceClient is the client API for BlockService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BlockServiceClient interface { + // GetBlock retrieves the block information at a particular height, + // if height '0' (zero) is specified it returns the latest height + GetByHeight(ctx context.Context, in *GetByHeightRequest, opts ...grpc.CallOption) (*GetByHeightResponse, error) + // GetLatestHeight returns a stream of the latest block heights committed by + // the network. This is a long-lived stream that is only terminated by the + // server if an error occurs. The caller is expected to handle such + // disconnections and automatically reconnect. + GetLatestHeight(ctx context.Context, in *GetLatestHeightRequest, opts ...grpc.CallOption) (BlockService_GetLatestHeightClient, error) +} + +type blockServiceClient struct { + cc grpc1.ClientConn +} + +func NewBlockServiceClient(cc grpc1.ClientConn) BlockServiceClient { + return &blockServiceClient{cc} +} + +func (c *blockServiceClient) GetByHeight(ctx context.Context, in *GetByHeightRequest, opts ...grpc.CallOption) (*GetByHeightResponse, error) { + out := new(GetByHeightResponse) + err := c.cc.Invoke(ctx, "/tendermint.services.block.v1.BlockService/GetByHeight", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *blockServiceClient) GetLatestHeight(ctx context.Context, in *GetLatestHeightRequest, opts ...grpc.CallOption) (BlockService_GetLatestHeightClient, error) { + stream, err := c.cc.NewStream(ctx, &_BlockService_serviceDesc.Streams[0], "/tendermint.services.block.v1.BlockService/GetLatestHeight", opts...) + if err != nil { + return nil, err + } + x := &blockServiceGetLatestHeightClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type BlockService_GetLatestHeightClient interface { + Recv() (*GetLatestHeightResponse, error) + grpc.ClientStream +} + +type blockServiceGetLatestHeightClient struct { + grpc.ClientStream +} + +func (x *blockServiceGetLatestHeightClient) Recv() (*GetLatestHeightResponse, error) { + m := new(GetLatestHeightResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BlockServiceServer is the server API for BlockService service. +type BlockServiceServer interface { + // GetBlock retrieves the block information at a particular height, + // if height '0' (zero) is specified it returns the latest height + GetByHeight(context.Context, *GetByHeightRequest) (*GetByHeightResponse, error) + // GetLatestHeight returns a stream of the latest block heights committed by + // the network. This is a long-lived stream that is only terminated by the + // server if an error occurs. The caller is expected to handle such + // disconnections and automatically reconnect. + GetLatestHeight(*GetLatestHeightRequest, BlockService_GetLatestHeightServer) error +} + +// UnimplementedBlockServiceServer can be embedded to have forward compatible implementations. +type UnimplementedBlockServiceServer struct { +} + +func (*UnimplementedBlockServiceServer) GetByHeight(ctx context.Context, req *GetByHeightRequest) (*GetByHeightResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetByHeight not implemented") +} +func (*UnimplementedBlockServiceServer) GetLatestHeight(req *GetLatestHeightRequest, srv BlockService_GetLatestHeightServer) error { + return status.Errorf(codes.Unimplemented, "method GetLatestHeight not implemented") +} + +func RegisterBlockServiceServer(s grpc1.Server, srv BlockServiceServer) { + s.RegisterService(&_BlockService_serviceDesc, srv) +} + +func _BlockService_GetByHeight_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetByHeightRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockServiceServer).GetByHeight(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/tendermint.services.block.v1.BlockService/GetByHeight", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockServiceServer).GetByHeight(ctx, req.(*GetByHeightRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _BlockService_GetLatestHeight_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetLatestHeightRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(BlockServiceServer).GetLatestHeight(m, &blockServiceGetLatestHeightServer{stream}) +} + +type BlockService_GetLatestHeightServer interface { + Send(*GetLatestHeightResponse) error + grpc.ServerStream +} + +type blockServiceGetLatestHeightServer struct { + grpc.ServerStream +} + +func (x *blockServiceGetLatestHeightServer) Send(m *GetLatestHeightResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _BlockService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "tendermint.services.block.v1.BlockService", + HandlerType: (*BlockServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetByHeight", + Handler: _BlockService_GetByHeight_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "GetLatestHeight", + Handler: _BlockService_GetLatestHeight_Handler, + ServerStreams: true, + }, + }, + Metadata: "tendermint/services/block/v1/block_service.proto", +} diff --git a/proto/tendermint/services/block/v1/block_service.proto b/proto/tendermint/services/block/v1/block_service.proto new file mode 100644 index 0000000000..d52cc83393 --- /dev/null +++ b/proto/tendermint/services/block/v1/block_service.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; +package tendermint.services.block.v1; + +option go_package = "github.com/cometbft/cometbft/proto/tendermint/services/block/v1"; + +import "tendermint/services/block/v1/block.proto"; + +// BlockService provides information about blocks +service BlockService { + // GetBlock retrieves the block information at a particular height, + // if height '0' (zero) is specified it returns the latest height + rpc GetByHeight(GetByHeightRequest) returns (GetByHeightResponse); + + // GetLatestHeight returns a stream of the latest block heights committed by + // the network. This is a long-lived stream that is only terminated by the + // server if an error occurs. The caller is expected to handle such + // disconnections and automatically reconnect. + rpc GetLatestHeight(GetLatestHeightRequest) returns (stream GetLatestHeightResponse); +} diff --git a/rpc/grpc/client/block_service.go b/rpc/grpc/client/block_service.go new file mode 100644 index 0000000000..96543d6feb --- /dev/null +++ b/rpc/grpc/client/block_service.go @@ -0,0 +1,122 @@ +package client + +import ( + "context" + "fmt" + + blocksvc "github.com/cometbft/cometbft/proto/tendermint/services/block/v1" + "github.com/cometbft/cometbft/types" + "github.com/cosmos/gogoproto/grpc" +) + +// Block data returned by the CometBFT BlockService gRPC API. +type Block struct { + BlockID types.BlockID `json:"block_id"` + Block *types.Block `json:"block"` +} + +// LatestHeightResult type used in GetLatestResult and send to the client +// via a channel +type LatestHeightResult struct { + Height int64 + Error error +} + +// BlockServiceClient provides block information +type BlockServiceClient interface { + GetBlockByHeight(ctx context.Context, height int64) (*Block, error) + // GetLatestHeight provides sends the latest committed block height to the given output + // channel as blocks are committed. + GetLatestHeight(ctx context.Context, resultCh chan<- LatestHeightResult) +} + +type blockServiceClient struct { + client blocksvc.BlockServiceClient +} + +func newBlockServiceClient(conn grpc.ClientConn) BlockServiceClient { + return &blockServiceClient{ + client: blocksvc.NewBlockServiceClient(conn), + } +} + +// GetBlockByHeight implements BlockServiceClient GetBlockByHeight +func (c *blockServiceClient) GetBlockByHeight(ctx context.Context, height int64) (*Block, error) { + req := blocksvc.GetByHeightRequest{ + Height: height, + } + res, err := c.client.GetByHeight(ctx, &req) + if err != nil { + return nil, err + } + + // convert Block from proto to core type + block, err := types.BlockFromProto(res.Block) + if err != nil { + return nil, err + } + + // convert BlockID from proto to core type + blockID, err := types.BlockIDFromProto(res.BlockId) + if err != nil { + return nil, err + } + + response := Block{ + BlockID: *blockID, + Block: block, + } + return &response, nil +} + +// GetLatestHeight implements BlockServiceClient GetLatestHeight +// This method provides an out channel (int64) that streams the latest height. +// The out channel might return non-contiguous heights if the channel becomes full, +func (c *blockServiceClient) GetLatestHeight(ctx context.Context, resultCh chan<- LatestHeightResult) { + req := blocksvc.GetLatestHeightRequest{} + + latestHeightClient, err := c.client.GetLatestHeight(ctx, &req) + if err != nil { + resultCh <- LatestHeightResult{ + Height: 0, + Error: fmt.Errorf("error getting a stream for the latest height"), + } + } + + go func(client blocksvc.BlockService_GetLatestHeightClient) { + for { + response, err := client.Recv() + if err != nil { + resultCh <- LatestHeightResult{ + Height: 0, + Error: fmt.Errorf("error receiving the latest height from a stream"), + } + break + } + select { + case resultCh <- LatestHeightResult{ + Height: response.Height, + Error: fmt.Errorf("error receiving the latest height from a stream"), + }: + default: + } + + } + }(latestHeightClient) +} + +type disabledBlockServiceClient struct{} + +func newDisabledBlockServiceClient() BlockServiceClient { + return &disabledBlockServiceClient{} +} + +// GetBlockByHeight implements BlockServiceClient GetBlockByHeight - disabled client +func (*disabledBlockServiceClient) GetBlockByHeight(context.Context, int64) (*Block, error) { + panic("block service client is disabled") +} + +// GetLatestHeight implements BlockServiceClient GetLatestHeight - disabled client +func (*disabledBlockServiceClient) GetLatestHeight(context.Context, chan<- LatestHeightResult) { + panic("block service client is disabled") +} diff --git a/rpc/grpc/client/client.go b/rpc/grpc/client/client.go index cbc165a64d..b5fac5896b 100644 --- a/rpc/grpc/client/client.go +++ b/rpc/grpc/client/client.go @@ -21,6 +21,7 @@ type Option func(*clientBuilder) // node via gRPC. type Client interface { VersionServiceClient + BlockServiceClient } type clientBuilder struct { @@ -28,6 +29,7 @@ type clientBuilder struct { grpcOpts []ggrpc.DialOption versionServiceEnabled bool + blockServiceEnabled bool } func newClientBuilder() *clientBuilder { @@ -35,6 +37,7 @@ func newClientBuilder() *clientBuilder { dialerFunc: defaultDialerFunc, grpcOpts: make([]ggrpc.DialOption, 0), versionServiceEnabled: true, + blockServiceEnabled: true, } } @@ -46,6 +49,7 @@ type client struct { conn grpc.ClientConn VersionServiceClient + BlockServiceClient } // WithInsecure disables transport security for the underlying client @@ -68,6 +72,17 @@ func WithVersionServiceEnabled(enabled bool) Option { } } +// WithBlockServiceEnabled allows control of whether or not to create a +// client for interacting with the block service of a CometBFT node. +// +// If disabled and the client attempts to access the block service API, the +// client will panic. +func WithBlockServiceEnabled(enabled bool) Option { + return func(b *clientBuilder) { + b.blockServiceEnabled = enabled + } +} + // WithGRPCDialOption allows passing lower-level gRPC dial options through to // the gRPC dialer when creating the client. func WithGRPCDialOption(opt ggrpc.DialOption) Option { @@ -98,9 +113,14 @@ func New(ctx context.Context, addr string, opts ...Option) (Client, error) { if builder.versionServiceEnabled { versionServiceClient = newVersionServiceClient(conn) } + blockServiceClient := newDisabledBlockServiceClient() + if builder.blockServiceEnabled { + blockServiceClient = newBlockServiceClient(conn) + } client := &client{ conn: conn, VersionServiceClient: versionServiceClient, + BlockServiceClient: blockServiceClient, } return client, nil } diff --git a/rpc/grpc/server/server.go b/rpc/grpc/server/server.go index 620e7a4c75..d296f26ec5 100644 --- a/rpc/grpc/server/server.go +++ b/rpc/grpc/server/server.go @@ -6,8 +6,12 @@ import ( "strings" "github.com/cometbft/cometbft/libs/log" + pbblocksvc "github.com/cometbft/cometbft/proto/tendermint/services/block/v1" pbversionsvc "github.com/cometbft/cometbft/proto/tendermint/services/version/v1" + "github.com/cometbft/cometbft/rpc/grpc/server/services/blockservice" "github.com/cometbft/cometbft/rpc/grpc/server/services/versionservice" + "github.com/cometbft/cometbft/store" + "github.com/cometbft/cometbft/types" "google.golang.org/grpc" ) @@ -18,6 +22,7 @@ type Option func(*serverBuilder) type serverBuilder struct { listener net.Listener versionService pbversionsvc.VersionServiceServer + blockService pbblocksvc.BlockServiceServer logger log.Logger grpcOpts []grpc.ServerOption } @@ -53,6 +58,13 @@ func WithVersionService() Option { } } +// WithBlockService enables the block service on the CometBFT server. +func WithBlockService(store *store.BlockStore, eventBus *types.EventBus, logger log.Logger) Option { + return func(b *serverBuilder) { + b.blockService = blockservice.New(store, eventBus, logger) + } +} + // WithLogger enables logging using the given logger. If not specified, the // gRPC server does not log anything. func WithLogger(logger log.Logger) Option { @@ -84,6 +96,10 @@ func Serve(listener net.Listener, opts ...Option) error { pbversionsvc.RegisterVersionServiceServer(server, b.versionService) b.logger.Debug("Registered version service") } + if b.blockService != nil { + pbblocksvc.RegisterBlockServiceServer(server, b.blockService) + b.logger.Debug("Registered block service") + } b.logger.Info("serve", "msg", fmt.Sprintf("Starting gRPC server on %s", listener.Addr())) return server.Serve(b.listener) } diff --git a/rpc/grpc/server/services/blockservice/service.go b/rpc/grpc/server/services/blockservice/service.go new file mode 100644 index 0000000000..39e68e0116 --- /dev/null +++ b/rpc/grpc/server/services/blockservice/service.go @@ -0,0 +1,129 @@ +package blockservice + +import ( + context "context" + "fmt" + + "github.com/cometbft/cometbft/internal/rpctrace" + "github.com/cometbft/cometbft/libs/log" + cmtpubsub "github.com/cometbft/cometbft/libs/pubsub" + blocksvc "github.com/cometbft/cometbft/proto/tendermint/services/block/v1" + "github.com/cometbft/cometbft/store" + "github.com/cometbft/cometbft/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type blockServiceServer struct { + store *store.BlockStore + eventBus *types.EventBus + logger log.Logger +} + +// New creates a new CometBFT version service server. +func New(store *store.BlockStore, eventBus *types.EventBus, logger log.Logger) blocksvc.BlockServiceServer { + return &blockServiceServer{ + store: store, + eventBus: eventBus, + logger: logger.With("service", "BlockService"), + } +} + +// GetByHeight implements v1.BlockServiceServer GetByHeight method +func (s *blockServiceServer) GetByHeight(_ context.Context, req *blocksvc.GetByHeightRequest) (*blocksvc.GetByHeightResponse, error) { + height, err := validateOrUpdateBlockHeight( + req.Height, + s.store.Base(), + s.store.Height(), + ) + if err != nil { + return nil, err + } + + block := s.store.LoadBlock(height) + blockProto, err := block.ToProto() + if err != nil { + return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Block not found for height %d", height)) + } + + blockMeta := s.store.LoadBlockMeta(height) + + blockIDProto := blockMeta.BlockID.ToProto() + + return &blocksvc.GetByHeightResponse{ + BlockId: &blockIDProto, + Block: blockProto, + }, nil +} + +// GetLatestHeight implements v1.BlockServiceServer GetLatestHeight method +func (s *blockServiceServer) GetLatestHeight(_ *blocksvc.GetLatestHeightRequest, stream blocksvc.BlockService_GetLatestHeightServer) error { + logger := s.logger.With("endpoint", "GetLatestHeight") + + traceID, err := rpctrace.New() + if err != nil { + logger.Error("Error generating RPC trace ID", "err", err) + return status.Error(codes.Internal, "Internal server error") + } + + // The trace ID is reused as a unique subscriber ID + sub, err := s.eventBus.Subscribe(context.Background(), traceID, types.QueryForEvent(types.EventNewBlock), 1) + if err != nil { + logger.Error("Cannot subscribe to new block events", "err", err, "traceID", traceID) + return status.Errorf(codes.Internal, "Cannot subscribe to new block events (see logs for trace ID: %s)", traceID) + } + + for { + select { + case msg := <-sub.Out(): + height, err := getHeightFromMsg(msg) + if err != nil { + logger.Error("Failed to extract height from subscription message", "err", err, "traceID", traceID) + return status.Errorf(codes.Internal, "Internal server error (see logs for trace ID: %s)", traceID) + } + if err := stream.Send(&blocksvc.GetLatestHeightResponse{Height: height}); err != nil { + logger.Error("Failed to stream new block", "err", err, "height", height, "traceID", traceID) + return status.Errorf(codes.Unavailable, "Cannot send stream response (see logs for trace ID: %s)", traceID) + } + case <-sub.Cancelled(): + switch sub.Err() { + case cmtpubsub.ErrUnsubscribed: + return status.Error(codes.Canceled, "Subscription terminated") + case nil: + return status.Error(codes.Canceled, "Subscription canceled without errors") + default: + logger.Info("Subscription canceled with errors", "err", sub.Err(), "traceID", traceID) + return status.Errorf(codes.Canceled, "Subscription canceled with errors (see logs for trace ID: %s)", traceID) + } + default: + continue + } + if sub.Err() != nil { + logger.Error("New block subscription error", "err", sub.Err(), "traceID", traceID) + return status.Errorf(codes.Internal, "New block subscription error (see logs for trace ID: %s)", traceID) + } + } +} + +func validateOrUpdateBlockHeight(height, baseHeight, latestHeight int64) (int64, error) { + switch { + case height == 0: + return latestHeight, nil + case height < 0: + return -1, status.Error(codes.InvalidArgument, "Height cannot be negative") + case height < baseHeight: + return -1, status.Errorf(codes.InvalidArgument, "Requested height %d is below base height %d", height, baseHeight) + case height > latestHeight: + return -1, status.Errorf(codes.InvalidArgument, "Requested height %d is higher than latest height %d", height, latestHeight) + } + return height, nil +} + +func getHeightFromMsg(msg cmtpubsub.Message) (int64, error) { + switch eventType := msg.Data().(type) { + case types.EventDataNewBlock: + return eventType.Block.Height, nil + default: + return -1, fmt.Errorf("unexpected event type: %v", eventType) + } +} diff --git a/test/e2e/tests/grpc_test.go b/test/e2e/tests/grpc_test.go index 395aef24f4..fe533203e6 100644 --- a/test/e2e/tests/grpc_test.go +++ b/test/e2e/tests/grpc_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + client2 "github.com/cometbft/cometbft/rpc/grpc/client" e2e "github.com/cometbft/cometbft/test/e2e/pkg" "github.com/cometbft/cometbft/version" "github.com/stretchr/testify/require" @@ -30,3 +31,104 @@ func TestGRPC_Version(t *testing.T) { require.Equal(t, version.BlockProtocol, res.Block) }) } + +func TestGRPC_Block_GetByHeight(t *testing.T) { + testNode(t, func(t *testing.T, node e2e.Node) { + if node.Mode != e2e.ModeFull && node.Mode != e2e.ModeValidator { + return + } + + blocks := fetchBlockChain(t) + + client, err := node.Client() + require.NoError(t, err) + status, err := client.Status(ctx) + require.NoError(t, err) + + first := status.SyncInfo.EarliestBlockHeight + last := status.SyncInfo.LatestBlockHeight + if node.RetainBlocks > 0 { + first++ // avoid race conditions with block pruning + } + + ctx, ctxCancel := context.WithTimeout(context.Background(), time.Minute) + defer ctxCancel() + gRPCClient, err := node.GRPCClient(ctx) + require.NoError(t, err) + + for _, block := range blocks { + if block.Header.Height < first { + continue + } + if block.Header.Height > last { + break + } + + // Get first block + firstBlock, err := gRPCClient.GetBlockByHeight(ctx, first) + + // First block tests + require.NoError(t, err) + require.NotNil(t, firstBlock.BlockID) + require.Equal(t, firstBlock.Block.Height, first) + + // Get last block + lastBlock, err := gRPCClient.GetBlockByHeight(ctx, last) + + // Last block tests + require.NoError(t, err) + require.NotNil(t, lastBlock.BlockID) + require.Equal(t, lastBlock.Block.Height, last) + } + }) +} + +func TestGRPC_Block_GetLatestHeight(t *testing.T) { + + var node *e2e.Node + testnet := loadTestnet(t) + // this assumes at least one node is valid for testing + // not a light or seed node + for _, n := range testnet.ArchiveNodes() { + if !n.Stateless() { + node = n + break + } + } + require.NotNil(t, node) + + client, err := node.Client() + require.NoError(t, err) + status, err := client.Status(ctx) + require.NoError(t, err) + + gCtx, cancel := context.WithTimeout(context.Background(), time.Minute) + gRPCClient, err := node.GRPCClient(ctx) + require.NoError(t, err) + + resultCh := make(chan client2.LatestHeightResult) + gRPCClient.GetLatestHeight(gCtx, resultCh) + + count := 0 + for { + select { + case <-gCtx.Done(): + require.NoError(t, gCtx.Err()) + case result := <-resultCh: + if err != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.GreaterOrEqual(t, result.Height, status.SyncInfo.EarliestBlockHeight) + count++ + } + } + if count == 10 { + cancel() + return + } + if gCtx.Err() != nil { + require.Error(t, gCtx.Err()) + } + } +}