diff --git a/buildkite/src/Command/Base.dhall b/buildkite/src/Command/Base.dhall index 32ae7f2a820..deefb5a1742 100644 --- a/buildkite/src/Command/Base.dhall +++ b/buildkite/src/Command/Base.dhall @@ -131,6 +131,7 @@ let targetToAgent = , Integration = toMap { size = "integration" } , QA = toMap { size = "qa" } , Hardfork = toMap { size = "hardfork" } + , Perf = toMap { size = "perf" } , Multi = toMap { size = "generic-multi" } } target diff --git a/buildkite/src/Command/Bench/Base.dhall b/buildkite/src/Command/Bench/Base.dhall new file mode 100644 index 00000000000..48427e6738d --- /dev/null +++ b/buildkite/src/Command/Bench/Base.dhall @@ -0,0 +1,99 @@ +let PipelineMode = ../../Pipeline/Mode.dhall + +let PipelineTag = ../../Pipeline/Tag.dhall + +let Pipeline = ../../Pipeline/Dsl.dhall + +let JobSpec = ../../Pipeline/JobSpec.dhall + +let DebianVersions = ../../Constants/DebianVersions.dhall + +let RunInToolchain = ../../Command/RunInToolchain.dhall + +let Network = ../../Constants/Network.dhall + +let Profiles = ../../Constants/Profiles.dhall + +let Command = ../../Command/Base.dhall + +let Docker = ../../Command/Docker/Type.dhall + +let Size = ../Size.dhall + +let Benchmarks = ../../Constants/Benchmarks.dhall + +let SelectFiles = ../../Lib/SelectFiles.dhall + +let Spec = + { Type = + { key : Text + , bench : Text + , label : Text + , size : Size + , name : Text + , path : Text + , mode : PipelineMode.Type + , dependsOn : List Command.TaggedKey.Type + , additionalDirtyWhen : List SelectFiles.Type + , yellowThreshold : Double + , redThreshold : Double + } + , default = + { mode = PipelineMode.Type.PullRequest + , size = Size.Medium + , dependsOn = + DebianVersions.dependsOn + DebianVersions.DebVersion.Bullseye + Network.Type.Devnet + Profiles.Type.Standard + , additionalDirtyWhen = [] : List SelectFiles.Type + , yellowThreshold = 0.1 + , redThreshold = 0.2 + } + } + +let command + : Spec.Type -> Command.Type + = \(spec : Spec.Type) + -> Command.build + Command.Config::{ + , commands = + RunInToolchain.runInToolchain + (Benchmarks.toEnvList Benchmarks.Type::{=}) + "./buildkite/scripts/benchmarks.sh ${spec.bench} --red-threshold ${Double/show + spec.redThreshold} --yellow-threshold ${Double/show + spec.yellowThreshold}" + , label = "Perf: ${spec.label}" + , key = spec.key + , target = spec.size + , docker = None Docker.Type + , depends_on = spec.dependsOn + } + +let pipeline + : Spec.Type -> Pipeline.Config.Type + = \(spec : Spec.Type) + -> Pipeline.Config::{ + , spec = JobSpec::{ + , dirtyWhen = + [ SelectFiles.strictlyStart (SelectFiles.contains "src") + , SelectFiles.exactly + "buildkite/src/Command/Bench/Base" + "dhall" + , SelectFiles.contains "scripts/benchmark" + , SelectFiles.contains "buildkite/scripts/benchmark" + ] + # spec.additionalDirtyWhen + , path = spec.path + , name = spec.name + , mode = spec.mode + , tags = + [ PipelineTag.Type.Long + , PipelineTag.Type.Test + , PipelineTag.Type.Stable + ] + } + , steps = [ command spec ] + } + +in { command = command, pipeline = pipeline, Spec = Spec } diff --git a/buildkite/src/Command/Size.dhall b/buildkite/src/Command/Size.dhall index eda37582dc4..a7cadacc02a 100644 --- a/buildkite/src/Command/Size.dhall +++ b/buildkite/src/Command/Size.dhall @@ -1 +1 @@ -< XLarge | Large | Medium | Small | Integration | QA | Hardfork | Multi > +< XLarge | Large | Medium | Small | Integration | QA | Hardfork | Multi | Perf > diff --git a/buildkite/src/Constants/Benchmarks.dhall b/buildkite/src/Constants/Benchmarks.dhall new file mode 100644 index 00000000000..d303dd42499 --- /dev/null +++ b/buildkite/src/Constants/Benchmarks.dhall @@ -0,0 +1,21 @@ +let Spec = + { Type = { tokenEnvName : Text, bucket : Text, org : Text, host : Text } + , default = + { tokenEnvName = "\\\${INFLUX_TOKEN}" + , bucket = "\\\${INFLUX_BUCKET_NAME}" + , org = "\\\${INFLUX_ORG}" + , host = "\\\${INFLUX_HOST}" + } + } + +let toEnvList = + \(spec : Spec.Type) + -> [ "INFLUX_HOST=${spec.host}" + , "INFLUX_TOKEN=${spec.tokenEnvName}" + , "INFLUX_ORG=${spec.org}" + , "INFLUX_BUCKET_NAME=${spec.bucket}" + ] + +let mainlineBranches = "[develop,compatible,master]" + +in { Type = Spec, toEnvList = toEnvList, mainlineBranches = mainlineBranches } diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap.go index f12063ed48b..8fd76b2a7aa 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/bitswap.go +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap.go @@ -12,7 +12,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" exchange "github.com/ipfs/go-ipfs-exchange-interface" - ipld "github.com/ipfs/go-ipld-format" ) type bitswapDeleteCmd struct { @@ -91,62 +90,14 @@ func announceNewRootBlock(ctx context.Context, engine *bitswap.Bitswap, storage return storage.SetStatus(ctx, root, codanet.Full) } -func (bs *BitswapCtx) deleteRoot(root BitswapBlockLink) error { - if err := bs.storage.SetStatus(bs.ctx, root, codanet.Deleting); err != nil { - return err - } - ClearRootDownloadState(bs, root) - allDescendants := []BitswapBlockLink{root} - viewBlockF := func(b []byte) error { - links, _, err := ReadBitswapBlock(b) - if err == nil { - for _, l := range links { - var l2 BitswapBlockLink - copy(l2[:], l[:]) - allDescendants = append(allDescendants, l2) - } - } - return err - } - for _, block := range allDescendants { - if err := bs.storage.ViewBlock(bs.ctx, block, viewBlockF); err != nil && err != (ipld.ErrNotFound{Cid: codanet.BlockHashToCid(block)}) { - return err - } - } - if err := bs.storage.DeleteBlocks(bs.ctx, allDescendants); err != nil { - return err - } - return bs.storage.DeleteStatus(bs.ctx, root) -} - -func ClearRootDownloadState(bs BitswapState, root root) { - rootStates := bs.RootDownloadStates() - state, has := rootStates[root] - if !has { - return - } - nodeParams := bs.NodeDownloadParams() - delete(rootStates, root) - state.allDescendants.ForEach(func(c cid.Cid) error { - np, hasNp := nodeParams[c] - if hasNp { - delete(np, root) - if len(np) == 0 { - delete(nodeParams, c) - } - } - return nil - }) - state.cancelF() +func (bs *BitswapCtx) SendResourceUpdate(type_ ipc.ResourceUpdateType, tag BitswapDataTag, root root) { + bs.SendResourceUpdates(type_, tag, root) } -func (bs *BitswapCtx) SendResourceUpdate(type_ ipc.ResourceUpdateType, root root) { - bs.SendResourceUpdates(type_, root) -} -func (bs *BitswapCtx) SendResourceUpdates(type_ ipc.ResourceUpdateType, roots ...root) { +func (bs *BitswapCtx) SendResourceUpdates(type_ ipc.ResourceUpdateType, tag BitswapDataTag, roots ...root) { // Non-blocking upcall sending select { - case bs.outMsgChan <- mkResourceUpdatedUpcall(type_, roots): + case bs.outMsgChan <- mkResourceUpdatedUpcall(type_, tag, roots): default: for _, root := range roots { bitswapLogger.Errorf("Failed to send resource update of type %d"+ @@ -242,25 +193,27 @@ func (bs *BitswapCtx) Loop() { ClearRootDownloadState(bs, root) case cmd := <-bs.addCmds: configuredCheck() - blocks, root := SplitDataToBitswapBlocksLengthPrefixedWithTag(bs.maxBlockSize, cmd.data, BlockBodyTag) + blocks, root := SplitDataToBitswapBlocksLengthPrefixedWithTag(bs.maxBlockSize, cmd.data, cmd.tag) err := announceNewRootBlock(bs.ctx, bs.engine, bs.storage, blocks, root) if err == nil { - bs.SendResourceUpdate(ipc.ResourceUpdateType_added, root) + bs.SendResourceUpdate(ipc.ResourceUpdateType_added, cmd.tag, root) } else { bitswapLogger.Errorf("Failed to announce root cid %s (%s)", codanet.BlockHashToCidSuffix(root), err) } case cmd := <-bs.deleteCmds: configuredCheck() - success := []root{} + success := map[BitswapDataTag][]root{} for _, root := range cmd.rootIds { - err := bs.deleteRoot(root) + tag, err := DeleteRoot(bs, root) if err == nil { - success = append(success, root) + success[tag] = append(success[tag], root) } else { bitswapLogger.Errorf("Error processing delete request for %s: %s", codanet.BlockHashToCidSuffix(root), err) } } - bs.SendResourceUpdates(ipc.ResourceUpdateType_removed, success...) + for tag, roots := range success { + bs.SendResourceUpdates(ipc.ResourceUpdateType_removed, tag, roots...) + } case cmd := <-bs.downloadCmds: configuredCheck() // We put all ids to map to avoid diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap_delete.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap_delete.go new file mode 100644 index 00000000000..78e82fafa54 --- /dev/null +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap_delete.go @@ -0,0 +1,104 @@ +package main + +import ( + "codanet" + "errors" + + "github.com/ipfs/go-cid" +) + +func ClearRootDownloadState(bs BitswapState, root root) { + rootStates := bs.RootDownloadStates() + state, has := rootStates[root] + if !has { + return + } + nodeParams := bs.NodeDownloadParams() + delete(rootStates, root) + state.allDescendants.ForEach(func(c cid.Cid) error { + np, hasNp := nodeParams[c] + if hasNp { + delete(np, root) + if len(np) == 0 { + delete(nodeParams, c) + } + } + return nil + }) + state.cancelF() +} + +// getTag retrieves root's tag, whether the root is still being processed +// or its processing was completed +func getTag(bs BitswapState, root BitswapBlockLink) (tag BitswapDataTag, err error) { + state, has := bs.RootDownloadStates()[root] + if has { + tag = state.getTag() + } else { + err = bs.ViewBlock(root, func(b []byte) error { + _, fullBlockData, err := ReadBitswapBlock(b) + if err != nil { + return err + } + if len(fullBlockData) < 5 { + return errors.New("root block is too short") + } + tag = BitswapDataTag(fullBlockData[4]) + return nil + }) + } + return +} + +func DeleteRoot(bs BitswapState, root BitswapBlockLink) (BitswapDataTag, error) { + if err := bs.SetStatus(root, codanet.Deleting); err != nil { + return 255, err + } + tag, err := getTag(bs, root) + if err != nil { + return tag, err + } + ClearRootDownloadState(bs, root) + + // Performing breadth-first search (BFS) + + // descendantMap is a "visited" set, to ensure we do not + // traverse into nodes we once visited + descendantMap := map[[32]byte]struct{}{root: {}} + + // allDescendants is a list of all discovered nodes, + // serving as both "queue" to be iterated over during BFS, + // and as a list of all nodes visited at the end of + // BFS iteration + allDescendants := []BitswapBlockLink{root} + viewBlockF := func(b []byte) error { + links, _, err := ReadBitswapBlock(b) + if err == nil { + for _, l := range links { + var l2 BitswapBlockLink + copy(l2[:], l[:]) + _, has := descendantMap[l2] + // Checking if the nodes was visited before + if !has { + descendantMap[l2] = struct{}{} + // Add an item to BFS queue + allDescendants = append(allDescendants, l2) + } + } + } + return err + } + // Iteration is done via index-based loop, because underlying + // array gets extended during iteration, and regular iterator + // wouldn't see these changes + for i := 0; i < len(allDescendants); i++ { + block := allDescendants[i] + if err := bs.ViewBlock(block, viewBlockF); err != nil && !isBlockNotFound(block, err) { + return tag, err + } + } + if err := bs.DeleteBlocks(allDescendants); err != nil { + return tag, err + } + return tag, bs.DeleteStatus(root) +} diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader.go index 8273e02a5bc..0a620fc91c5 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader.go +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader.go @@ -10,7 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" ) @@ -86,7 +85,7 @@ type BitswapState interface { DepthIndices() DepthIndices NewSession(downloadTimeout time.Duration) (BlockRequester, context.CancelFunc) RegisterDeadlineTracker(root, time.Duration) - SendResourceUpdate(type_ ipc.ResourceUpdateType, root root) + SendResourceUpdate(type_ ipc.ResourceUpdateType, tag BitswapDataTag, root root) CheckInvariants() } @@ -109,7 +108,7 @@ func kickStartRootDownload(root_ BitswapBlockLink, tag BitswapDataTag, bs Bitswa bitswapLogger.Debugf("Skipping download request for %s due to status: %s", codanet.BlockHashToCidSuffix(root_), err) status, err := bs.GetStatus(root_) if err == nil && status == codanet.Full { - bs.SendResourceUpdate(ipc.ResourceUpdateType_added, root_) + bs.SendResourceUpdate(ipc.ResourceUpdateType_added, tag, root_) } return } @@ -140,7 +139,7 @@ func kickStartRootDownload(root_ BitswapBlockLink, tag BitswapDataTag, bs Bitswa copy(rootBlock, b) return nil } - if err := bs.ViewBlock(root_, rootBlockViewF); err != nil && err != (ipld.ErrNotFound{Cid: codanet.BlockHashToCid(root_)}) { + if err := bs.ViewBlock(root_, rootBlockViewF); err != nil && !isBlockNotFound(root_, err) { handleError(err) return } @@ -280,8 +279,8 @@ func processDownloadedBlock(block blocks.Block, bs BitswapState) { newParams, malformed := processDownloadedBlockStep(oldPs, block, rps, bs.MaxBlockSize(), depthIndices, bs.DataConfig()) for root, err := range malformed { bitswapLogger.Warnf("Block %s of root %s is malformed: %s", id, codanet.BlockHashToCidSuffix(root), err) - ClearRootDownloadState(bs, root) - bs.SendResourceUpdate(ipc.ResourceUpdateType_broken, root) + DeleteRoot(bs, root) + bs.SendResourceUpdate(ipc.ResourceUpdateType_broken, rps[root].getTag(), root) } blocksToProcess := make([]blocks.Block, 0) @@ -316,7 +315,7 @@ func processDownloadedBlock(block blocks.Block, bs BitswapState) { b, _ := blocks.NewBlockWithCid(blockBytes, childId) blocksToProcess = append(blocksToProcess, b) } else { - if err != (ipld.ErrNotFound{Cid: codanet.BlockHashToCid(link)}) { + if !isBlockNotFound(link, err) { // we still schedule blocks for downloading // this case should rarely happen in practice bitswapLogger.Warnf("Failed to retrieve block %s from storage: %s", childId, err) @@ -338,7 +337,7 @@ func processDownloadedBlock(block blocks.Block, bs BitswapState) { bitswapLogger.Warnf("Failed to update status of fully downloaded root %s: %s", root, err) } ClearRootDownloadState(bs, root) - bs.SendResourceUpdate(ipc.ResourceUpdateType_added, root) + bs.SendResourceUpdate(ipc.ResourceUpdateType_added, rootState.tag, root) } } for _, b := range blocksToProcess { diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader_test.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader_test.go index af1b2b5378b..5855a13dde2 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap_downloader_test.go @@ -670,7 +670,7 @@ func (bs *testBitswapState) RegisterDeadlineTracker(root_ root, downloadTimeout downloadTimeout time.Duration }{root: root_, downloadTimeout: downloadTimeout}) } -func (bs *testBitswapState) SendResourceUpdate(type_ ipc.ResourceUpdateType, root root) { +func (bs *testBitswapState) SendResourceUpdate(type_ ipc.ResourceUpdateType, _tag BitswapDataTag, root root) { type1, has := bs.resourceUpdates[root] if has && type1 != type_ { panic("duplicate resource update") diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap_msg.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap_msg.go index ab6f18ec140..d41e67a1306 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/bitswap_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap_msg.go @@ -27,12 +27,12 @@ func (m AddResourcePush) handle(app *app) { } } -type DeleteResourcePushT = ipc.Libp2pHelperInterface_DeleteResource -type DeleteResourcePush DeleteResourcePushT +type RemoveResourcePushT = ipc.Libp2pHelperInterface_RemoveResource +type RemoveResourcePush RemoveResourcePushT -func fromDeleteResourcePush(m ipcPushMessage) (pushMessage, error) { - i, err := m.DeleteResource() - return DeleteResourcePush(i), err +func fromRemoveResourcePush(m ipcPushMessage) (pushMessage, error) { + i, err := m.RemoveResource() + return RemoveResourcePush(i), err } func extractRootBlockList(l ipc.RootBlockId_List) ([]root, error) { @@ -52,14 +52,14 @@ func extractRootBlockList(l ipc.RootBlockId_List) ([]root, error) { return ids, nil } -func (m DeleteResourcePush) handle(app *app) { - idsM, err := DeleteResourcePushT(m).Ids() +func (m RemoveResourcePush) handle(app *app) { + idsM, err := RemoveResourcePushT(m).Ids() var links []root if err == nil { links, err = extractRootBlockList(idsM) } if err != nil { - app.P2p.Logger.Errorf("DeleteResourcePush.handle: error %s", err) + app.P2p.Logger.Errorf("RemoveResourcePush.handle: error %s", err) return } app.bitswapCtx.deleteCmds <- bitswapDeleteCmd{links} diff --git a/src/app/libp2p_helper/src/libp2p_helper/bitswap_test.go b/src/app/libp2p_helper/src/libp2p_helper/bitswap_test.go index ec96f2ccd67..93b25d85cbc 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/bitswap_test.go +++ b/src/app/libp2p_helper/src/libp2p_helper/bitswap_test.go @@ -15,7 +15,6 @@ import ( capnp "capnproto.org/go/capnp/v3" "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" multihash "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" "golang.org/x/crypto/blake2b" @@ -71,12 +70,12 @@ func getRootIds(ids ipc.RootBlockId_List) ([]BitswapBlockLink, error) { return links, nil } -func deleteResource(n testNode, root root) error { +func removeResource(n testNode, root root) error { _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) if err != nil { return err } - m, err := ipc.NewRootLibp2pHelperInterface_DeleteResource(seg) + m, err := ipc.NewRootLibp2pHelperInterface_RemoveResource(seg) if err != nil { return err } @@ -88,7 +87,7 @@ func deleteResource(n testNode, root root) error { if err != nil { return err } - DeleteResourcePush(m).handle(n.node) + RemoveResourcePush(m).handle(n.node) return nil } @@ -189,7 +188,7 @@ func confirmBlocksNotInStorage(bs *BitswapCtx, resource []byte) error { }) if err == nil { return fmt.Errorf("block %s wasn't deleted", codanet.BlockHashToCidSuffix(h)) - } else if err != (ipld.ErrNotFound{Cid: codanet.BlockHashToCid(h)}) { + } else if !isBlockNotFound(h, err) { return err } } @@ -393,7 +392,7 @@ func (conf bitswapTestConfig) execute(nodes []testNode, delayBeforeDownload bool if !resourceReplicated[ni] { continue } - err = deleteResource(nodes[ni], roots[ni]) + err = removeResource(nodes[ni], roots[ni]) if err != nil { return fmt.Errorf("Error removing own resources: %v", err) } diff --git a/src/app/libp2p_helper/src/libp2p_helper/error.go b/src/app/libp2p_helper/src/libp2p_helper/error.go index c42db326a81..49ffc3a3ef1 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/error.go +++ b/src/app/libp2p_helper/src/libp2p_helper/error.go @@ -1,9 +1,11 @@ package main import ( + "codanet" "fmt" "github.com/go-errors/errors" + ipld "github.com/ipfs/go-ipld-format" ) // TODO: wrap these in a new type, encode them differently in the rpc mainloop @@ -48,3 +50,7 @@ func needsConfigure() error { func needsDHT() error { return badRPC(errors.New("helper not yet joined to pubsub")) } + +func isBlockNotFound(block BitswapBlockLink, err error) bool { + return err == ipld.ErrNotFound{Cid: codanet.BlockHashToCid(block)} +} diff --git a/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go b/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go index a4472c443c4..d7e7c0f88dc 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/incoming_msg.go @@ -34,7 +34,7 @@ var rpcRequestExtractors = map[ipc.Libp2pHelperInterface_RpcRequest_Which]extrac var pushMesssageExtractors = map[ipc.Libp2pHelperInterface_PushMessage_Which]extractPushMessage{ ipc.Libp2pHelperInterface_PushMessage_Which_addResource: fromAddResourcePush, - ipc.Libp2pHelperInterface_PushMessage_Which_deleteResource: fromDeleteResourcePush, + ipc.Libp2pHelperInterface_PushMessage_Which_removeResource: fromRemoveResourcePush, ipc.Libp2pHelperInterface_PushMessage_Which_downloadResource: fromDownloadResourcePush, ipc.Libp2pHelperInterface_PushMessage_Which_validation: fromValidationPush, ipc.Libp2pHelperInterface_PushMessage_Which_heartbeatPeer: fromHeartbeatPeerPush, diff --git a/src/app/libp2p_helper/src/libp2p_helper/msg.go b/src/app/libp2p_helper/src/libp2p_helper/msg.go index 053bbd64062..680b76bd487 100644 --- a/src/app/libp2p_helper/src/libp2p_helper/msg.go +++ b/src/app/libp2p_helper/src/libp2p_helper/msg.go @@ -395,7 +395,7 @@ func mkStreamMessageReceivedUpcall(streamIdx uint64, data []byte) *capnp.Message }) } -func mkResourceUpdatedUpcall(type_ ipc.ResourceUpdateType, rootIds []root) *capnp.Message { +func mkResourceUpdatedUpcall(type_ ipc.ResourceUpdateType, tag BitswapDataTag, rootIds []root) *capnp.Message { return mkPushMsg(func(m ipc.DaemonInterface_PushMessage) { im, err := m.NewResourceUpdated() panicOnErr(err) @@ -403,6 +403,7 @@ func mkResourceUpdatedUpcall(type_ ipc.ResourceUpdateType, rootIds []root) *capn panic("too many root ids in a single upcall") } im.SetType(type_) + im.SetTag(uint8(tag)) mIds, err := im.NewIds(int32(len(rootIds))) panicOnErr(err) for i, rootId := range rootIds { diff --git a/src/lib/block_producer/block_producer.ml b/src/lib/block_producer/block_producer.ml index ab54c4cbe31..b66aca0f2f4 100644 --- a/src/lib/block_producer/block_producer.ml +++ b/src/lib/block_producer/block_producer.ml @@ -656,6 +656,559 @@ let validate_genesis_protocol_state_block ~genesis_state_hash (b, v) = |> Result.map ~f:(Fn.flip Validation.with_body (Mina_block.body @@ With_hash.data b)) +let log_bootstrap_mode ~logger () = + [%log info] "Pausing block production while bootstrapping" + +let genesis_breadcrumb_creator ~context:(module Context : CONTEXT) prover = + let open Context in + let started = ref false in + let genesis_breadcrumb_ivar = Ivar.create () in + fun () -> + if !started then Ivar.read genesis_breadcrumb_ivar + else ( + started := true ; + let max_num_retries = 3 in + let rec go retries = + [%log info] + "Generating genesis proof ($attempts_remaining / $max_attempts)" + ~metadata: + [ ("attempts_remaining", `Int retries) + ; ("max_attempts", `Int max_num_retries) + ] ; + match%bind + Prover.create_genesis_block prover + (Genesis_proof.to_inputs precomputed_values) + with + | Ok res -> + Ivar.fill genesis_breadcrumb_ivar (Ok res) ; + return (Ok res) + | Error err -> + [%log error] "Failed to generate genesis breadcrumb: $error" + ~metadata:[ ("error", Error_json.error_to_yojson err) ] ; + if retries > 0 then go (retries - 1) + else ( + Ivar.fill genesis_breadcrumb_ivar (Error err) ; + return (Error err) ) + in + go max_num_retries ) + +let produce ~genesis_breadcrumb ~context:(module Context : CONTEXT) ~prover + ~verifier ~trust_system ~get_completed_work ~transaction_resource_pool + ~frontier_reader ~time_controller ~transition_writer ~log_block_creation + ~block_reward_threshold ~block_produced_bvar ~slot_tx_end ~slot_chain_end + ~net ~zkapp_cmd_limit_hardcap ivar + (scheduled_time, block_data, winner_pubkey) = + let open Context in + let module Breadcrumb = Transition_frontier.Breadcrumb in + let open Interruptible.Let_syntax in + let rejected_blocks_logger = + Logger.create ~id:Logger.Logger_id.rejected_blocks () + in + match Broadcast_pipe.Reader.peek frontier_reader with + | None -> + log_bootstrap_mode ~logger () ; + Interruptible.return () + | Some frontier -> ( + let global_slot = + Consensus.Data.Block_data.global_slot_since_genesis block_data + in + Internal_tracing.with_slot global_slot + @@ fun () -> + [%log internal] "Begin_block_production" ; + let open Transition_frontier.Extensions in + let transition_registry = + get_extension + (Transition_frontier.extensions frontier) + Transition_registry + in + let crumb = Transition_frontier.best_tip frontier in + let crumb = + let crumb_global_slot_since_genesis = + Breadcrumb.protocol_state crumb + |> Protocol_state.consensus_state + |> Consensus.Data.Consensus_state.global_slot_since_genesis + in + let block_global_slot_since_genesis = + Consensus.Proof_of_stake.Data.Block_data.global_slot_since_genesis + block_data + in + if + Mina_numbers.Global_slot_since_genesis.equal + crumb_global_slot_since_genesis block_global_slot_since_genesis + then + (* We received a block for this slot over the network before + attempting to produce our own. Build upon its parent instead + of attempting (and failing) to build upon the block itself. + *) + Transition_frontier.find_exn frontier (Breadcrumb.parent_hash crumb) + else crumb + in + let start = Block_time.now time_controller in + [%log info] + ~metadata: + [ ("parent_hash", Breadcrumb.parent_hash crumb |> State_hash.to_yojson) + ; ( "protocol_state" + , Breadcrumb.protocol_state crumb |> Protocol_state.value_to_yojson + ) + ] + "Producing new block with parent $parent_hash%!" ; + let previous_transition = Breadcrumb.block_with_hash crumb in + let previous_protocol_state = + Header.protocol_state + @@ Mina_block.header (With_hash.data previous_transition) + in + let%bind previous_protocol_state_proof = + if + Consensus.Data.Consensus_state.is_genesis_state + (Protocol_state.consensus_state previous_protocol_state) + && Option.is_none precomputed_values.proof_data + then ( + match%bind Interruptible.uninterruptible (genesis_breadcrumb ()) with + | Ok block -> + let proof = Blockchain_snark.Blockchain.proof block in + Interruptible.lift (Deferred.return proof) (Deferred.never ()) + | Error err -> + [%log error] + "Aborting block production: cannot generate a genesis proof" + ~metadata:[ ("error", Error_json.error_to_yojson err) ] ; + Interruptible.lift (Deferred.never ()) (Deferred.return ()) ) + else + return + ( Header.protocol_state_proof + @@ Mina_block.header (With_hash.data previous_transition) ) + in + [%log internal] "Get_transactions_from_pool" ; + let transactions = + Network_pool.Transaction_pool.Resource_pool.transactions + transaction_resource_pool + |> Sequence.map + ~f:Transaction_hash.User_command_with_valid_signature.data + in + let%bind () = Interruptible.lift (Deferred.return ()) (Ivar.read ivar) in + [%log internal] "Generate_next_state" ; + let%bind next_state_opt = + generate_next_state ~commit_id ~constraint_constants ~scheduled_time + ~block_data ~previous_protocol_state ~time_controller + ~staged_ledger:(Breadcrumb.staged_ledger crumb) + ~transactions ~get_completed_work ~logger ~log_block_creation + ~winner_pk:winner_pubkey ~block_reward_threshold + ~zkapp_cmd_limit:!zkapp_cmd_limit ~zkapp_cmd_limit_hardcap + ~slot_tx_end ~slot_chain_end + in + [%log internal] "Generate_next_state_done" ; + match next_state_opt with + | None -> + Interruptible.return () + | Some (protocol_state, internal_transition, pending_coinbase_witness) -> + let diff = + Internal_transition.staged_ledger_diff internal_transition + in + let commands = Staged_ledger_diff.commands diff in + let transactions_count = List.length commands in + let protocol_state_hashes = Protocol_state.hashes protocol_state in + let consensus_state_with_hashes = + { With_hash.hash = protocol_state_hashes + ; data = Protocol_state.consensus_state protocol_state + } + in + [%log internal] "@produced_block_state_hash" + ~metadata: + [ ( "state_hash" + , `String + (Mina_base.State_hash.to_base58_check + protocol_state_hashes.state_hash ) ) + ] ; + Internal_tracing.with_state_hash protocol_state_hashes.state_hash + @@ fun () -> + Debug_assert.debug_assert (fun () -> + [%test_result: [ `Take | `Keep ]] + (Consensus.Hooks.select + ~context:(module Context) + ~existing: + (With_hash.map ~f:Mina_block.consensus_state + previous_transition ) + ~candidate:consensus_state_with_hashes ) + ~expect:`Take + ~message: + "newly generated consensus states should be selected over \ + their parent" ; + let root_consensus_state_with_hashes = + Transition_frontier.root frontier + |> Breadcrumb.consensus_state_with_hashes + in + [%test_result: [ `Take | `Keep ]] + (Consensus.Hooks.select + ~context:(module Context) + ~existing:root_consensus_state_with_hashes + ~candidate:consensus_state_with_hashes ) + ~expect:`Take + ~message: + "newly generated consensus states should be selected over \ + the tf root" ) ; + Interruptible.uninterruptible + (let open Deferred.Let_syntax in + let emit_breadcrumb () = + let open Deferred.Result.Let_syntax in + [%log internal] + ~metadata:[ ("transactions_count", `Int transactions_count) ] + "Produce_state_transition_proof" ; + let%bind protocol_state_proof = + time ~logger ~time_controller + "Protocol_state_proof proving time(ms)" (fun () -> + O1trace.thread "dispatch_block_proving" (fun () -> + Prover.prove prover ~prev_state:previous_protocol_state + ~prev_state_proof:previous_protocol_state_proof + ~next_state:protocol_state internal_transition + pending_coinbase_witness ) + |> Deferred.Result.map_error ~f:(fun err -> + `Prover_error + ( err + , ( previous_protocol_state_proof + , internal_transition + , pending_coinbase_witness ) ) ) ) + in + let staged_ledger_diff = + Internal_transition.staged_ledger_diff internal_transition + in + let previous_state_hash = + (Protocol_state.hashes previous_protocol_state).state_hash + in + [%log internal] "Produce_chain_transition_proof" ; + let delta_block_chain_proof = + Transition_chain_prover.prove + ~length:(Mina_numbers.Length.to_int consensus_constants.delta) + ~frontier previous_state_hash + |> Option.value_exn + in + [%log internal] "Produce_validated_transition" ; + let%bind transition = + let open Result.Let_syntax in + Validation.wrap + { With_hash.hash = protocol_state_hashes + ; data = + (let body = Body.create staged_ledger_diff in + Mina_block.create ~body + ~header: + (Header.create ~protocol_state ~protocol_state_proof + ~delta_block_chain_proof () ) ) + } + |> Validation.skip_time_received_validation + `This_block_was_not_received_via_gossip + |> Validation.skip_protocol_versions_validation + `This_block_has_valid_protocol_versions + |> validate_genesis_protocol_state_block + ~genesis_state_hash: + (Protocol_state.genesis_state_hash + ~state_hash:(Some previous_state_hash) + previous_protocol_state ) + >>| Validation.skip_proof_validation + `This_block_was_generated_internally + >>| Validation.skip_delta_block_chain_validation + `This_block_was_not_received_via_gossip + >>= Validation.validate_frontier_dependencies + ~to_header:Mina_block.header + ~context:(module Context) + ~root_block: + ( Transition_frontier.root frontier + |> Breadcrumb.block_with_hash ) + ~is_block_in_frontier: + (Fn.compose Option.is_some + (Transition_frontier.find frontier) ) + |> Deferred.return + in + let transition_receipt_time = Some (Time.now ()) in + let%bind breadcrumb = + time ~logger ~time_controller + "Build breadcrumb on produced block" (fun () -> + Breadcrumb.build ~logger ~precomputed_values ~verifier + ~get_completed_work:(Fn.const None) ~trust_system + ~parent:crumb ~transition + ~sender:None (* Consider skipping `All here *) + ~skip_staged_ledger_verification:`Proofs + ~transition_receipt_time () ) + |> Deferred.Result.map_error ~f:(function + | `Invalid_staged_ledger_diff e -> + `Invalid_staged_ledger_diff (e, staged_ledger_diff) + | ( `Fatal_error _ + | `Invalid_genesis_protocol_state + | `Invalid_staged_ledger_hash _ + | `Not_selected_over_frontier_root + | `Parent_missing_from_frontier + | `Prover_error _ ) as err -> + err ) + in + let txs = + Mina_block.transactions ~constraint_constants + (Breadcrumb.block breadcrumb) + |> List.map ~f:Transaction.yojson_summary_with_status + in + [%log internal] "@block_metadata" + ~metadata: + [ ( "blockchain_length" + , Mina_numbers.Length.to_yojson + @@ Mina_block.blockchain_length + @@ Breadcrumb.block breadcrumb ) + ; ("transactions", `List txs) + ] ; + [%str_log info] + ~metadata:[ ("breadcrumb", Breadcrumb.to_yojson breadcrumb) ] + Block_produced ; + (* let uptime service (and any other waiters) know about breadcrumb *) + Bvar.broadcast block_produced_bvar breadcrumb ; + Mina_metrics.(Counter.inc_one Block_producer.blocks_produced) ; + Mina_metrics.Block_producer.( + Block_production_delay_histogram.observe block_production_delay + Time.( + Span.to_ms + @@ diff (now ()) + @@ Block_time.to_time_exn scheduled_time)) ; + [%log internal] "Send_breadcrumb_to_transition_frontier" ; + let%bind.Async.Deferred () = + Strict_pipe.Writer.write transition_writer breadcrumb + in + let metadata = + [ ( "state_hash" + , State_hash.to_yojson protocol_state_hashes.state_hash ) + ] + in + [%log internal] "Wait_for_confirmation" ; + [%log debug] ~metadata + "Waiting for block $state_hash to be inserted into frontier" ; + Deferred.choose + [ Deferred.choice + (Transition_registry.register transition_registry + protocol_state_hashes.state_hash ) + (Fn.const (Ok `Transition_accepted)) + ; Deferred.choice + ( Block_time.Timeout.create time_controller + (* We allow up to 20 seconds for the transition + to make its way from the transition_writer to + the frontier. + This value is chosen to be reasonably + generous. In theory, this should not take + terribly long. But long cycles do happen in + our system, and with medium curves those long + cycles can be substantial. + *) + (Block_time.Span.of_ms 20000L) + ~f:(Fn.const ()) + |> Block_time.Timeout.to_deferred ) + (Fn.const (Ok `Timed_out)) + ] + >>= function + | `Transition_accepted -> + [%log internal] "Transition_accepted" ; + [%log info] ~metadata + "Generated transition $state_hash was accepted into \ + transition frontier" ; + Deferred.map ~f:Result.return + (Mina_networking.broadcast_state net + (Breadcrumb.block_with_hash breadcrumb) ) + | `Timed_out -> + (* FIXME #3167: this should be fatal, and more + importantly, shouldn't happen. + *) + [%log internal] "Transition_accept_timeout" ; + let msg : (_, unit, string, unit) format4 = + "Timed out waiting for generated transition $state_hash to \ + enter transition frontier. Continuing to produce new \ + blocks anyway. This may mean your CPU is overloaded. \ + Consider disabling `-run-snark-worker` if it's \ + configured." + in + let span = + Block_time.diff (Block_time.now time_controller) start + in + let metadata = + [ ( "time" + , `Int (Block_time.Span.to_ms span |> Int64.to_int_exn) ) + ; ( "protocol_state" + , Protocol_state.Value.to_yojson protocol_state ) + ] + @ metadata + in + [%log' debug rejected_blocks_logger] ~metadata msg ; + [%log fatal] ~metadata msg ; + return () + in + let%bind res = emit_breadcrumb () in + let span = Block_time.diff (Block_time.now time_controller) start in + handle_block_production_errors ~logger ~rejected_blocks_logger + ~time_taken:span ~previous_protocol_state ~protocol_state res) ) + +let generate_genesis_proof_if_needed ~genesis_breadcrumb ~frontier_reader () = + match Broadcast_pipe.Reader.peek frontier_reader with + | Some transition_frontier -> + let consensus_state = + Transition_frontier.best_tip transition_frontier + |> Transition_frontier.Breadcrumb.consensus_state + in + if Consensus.Data.Consensus_state.is_genesis_state consensus_state then + genesis_breadcrumb () |> Deferred.ignore_m + else Deferred.return () + | None -> + Deferred.return () + +let iteration ~schedule_next_vrf_check ~produce_block_now + ~schedule_block_production ~next_vrf_check_now ~genesis_breadcrumb + ~context:(module Context : CONTEXT) ~vrf_evaluator ~time_controller + ~coinbase_receiver ~frontier_reader ~set_next_producer_timing + ~transition_frontier ~vrf_evaluation_state ~epoch_data_for_vrf + ~ledger_snapshot i slot = + O1trace.thread "block_producer_iteration" + @@ fun () -> + let consensus_state = + Transition_frontier.( + best_tip transition_frontier |> Breadcrumb.consensus_state) + in + let i' = + Mina_numbers.Length.succ + epoch_data_for_vrf.Consensus.Data.Epoch_data_for_vrf.epoch + in + let new_global_slot = epoch_data_for_vrf.global_slot in + let open Context in + let%bind () = + if Mina_numbers.Length.(i' > i) then + Vrf_evaluation_state.update_epoch_data ~vrf_evaluator ~epoch_data_for_vrf + ~logger vrf_evaluation_state ~vrf_poll_interval + else Deferred.unit + in + let%bind () = + (*Poll once every slot if the evaluation for the epoch is not completed or the evaluation is completed*) + if + Mina_numbers.Global_slot_since_hard_fork.(new_global_slot > slot) + && not (Vrf_evaluation_state.finished vrf_evaluation_state) + then + Vrf_evaluation_state.poll ~vrf_evaluator ~logger vrf_evaluation_state + ~vrf_poll_interval + else Deferred.unit + in + match Core.Queue.dequeue vrf_evaluation_state.queue with + | None -> ( + (*Keep trying until we get some slots*) + let poll () = + let%bind () = Async.after vrf_poll_interval in + let%bind () = + Vrf_evaluation_state.poll ~vrf_evaluator ~logger vrf_evaluation_state + ~vrf_poll_interval + in + schedule_next_vrf_check (Block_time.now time_controller) + in + match Vrf_evaluation_state.evaluator_status vrf_evaluation_state with + | Completed -> + let epoch_end_time = + Consensus.Hooks.epoch_end_time ~constants:consensus_constants + epoch_data_for_vrf.epoch + in + set_next_producer_timing (`Check_again epoch_end_time) consensus_state ; + [%log info] "No more slots won in this epoch" ; + schedule_next_vrf_check epoch_end_time + | At last_slot -> + set_next_producer_timing (`Evaluating_vrf last_slot) consensus_state ; + poll () + | Start -> + set_next_producer_timing (`Evaluating_vrf new_global_slot) + consensus_state ; + poll () ) + | Some slot_won -> ( + let winning_global_slot = slot_won.global_slot in + let slot, epoch = + let t = + Consensus.Data.Consensus_time.of_global_slot winning_global_slot + ~constants:consensus_constants + in + Consensus.Data.Consensus_time.(slot t, epoch t) + in + [%log info] "Block producer won slot $slot in epoch $epoch" + ~metadata: + [ ( "slot" + , Mina_numbers.Global_slot_since_genesis.( + to_yojson @@ of_uint32 slot) ) + ; ("epoch", Mina_numbers.Length.to_yojson epoch) + ] ; + let now = Block_time.now time_controller in + let curr_global_slot = + Consensus.Data.Consensus_time.( + of_time_exn ~constants:consensus_constants now |> to_global_slot) + in + let winner_pk = fst slot_won.delegator in + let data = + Consensus.Hooks.get_block_data ~slot_won ~ledger_snapshot + ~coinbase_receiver:!coinbase_receiver + in + if + Mina_numbers.Global_slot_since_hard_fork.( + curr_global_slot = winning_global_slot) + then ( + (*produce now*) + [%log info] "Producing a block now" ; + set_next_producer_timing + (`Produce_now (data, winner_pk)) + consensus_state ; + Mina_metrics.(Counter.inc_one Block_producer.slots_won) ; + let%bind () = + generate_genesis_proof_if_needed ~genesis_breadcrumb ~frontier_reader + () + in + produce_block_now (now, data, winner_pk) ) + else + match + Mina_numbers.Global_slot_since_hard_fork.diff winning_global_slot + curr_global_slot + with + | None -> + [%log warn] + "Skipping block production for global slot $slot_won because it \ + has passed. Current global slot is $curr_slot" + ~metadata: + [ ( "slot_won" + , Mina_numbers.Global_slot_since_hard_fork.to_yojson + winning_global_slot ) + ; ( "curr_slot" + , Mina_numbers.Global_slot_since_hard_fork.to_yojson + curr_global_slot ) + ] ; + next_vrf_check_now () + | Some slot_diff -> + [%log info] "Producing a block in $slots slots" + ~metadata: + [ ("slots", Mina_numbers.Global_slot_span.to_yojson slot_diff) ] ; + let time = + Consensus.Data.Consensus_time.( + start_time ~constants:consensus_constants + (of_global_slot ~constants:consensus_constants + winning_global_slot )) + |> Block_time.to_span_since_epoch |> Block_time.Span.to_ms + in + set_next_producer_timing + (`Produce (time, data, winner_pk)) + consensus_state ; + Mina_metrics.(Counter.inc_one Block_producer.slots_won) ; + let scheduled_time = time_of_ms time in + don't_wait_for + ((* Attempt to generate a genesis proof in the slot + immediately before we'll actually need it, so that + it isn't limiting our block production time in the + won slot. + This also allows non-genesis blocks to be received + in the meantime and alleviate the need to produce + one at all, if this won't have block height 1. + *) + let scheduled_genesis_time = + time_of_ms + Int64.( + time - of_int constraint_constants.block_window_duration_ms) + in + let span_till_time = + Block_time.diff scheduled_genesis_time + (Block_time.now time_controller) + |> Block_time.Span.to_time_span + in + let%bind () = after span_till_time in + generate_genesis_proof_if_needed ~genesis_breadcrumb + ~frontier_reader () ) ; + schedule_block_production (scheduled_time, data, winner_pk) ) + let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier ~trust_system ~get_completed_work ~transaction_resource_pool ~time_controller ~consensus_local_state ~coinbase_receiver ~frontier_reader @@ -663,46 +1216,9 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier ~block_reward_threshold ~block_produced_bvar ~vrf_evaluation_state ~net ~zkapp_cmd_limit_hardcap = let open Context in - let constraint_constants = precomputed_values.constraint_constants in - let consensus_constants = precomputed_values.consensus_constants in O1trace.sync_thread "produce_blocks" (fun () -> let genesis_breadcrumb = - let started = ref false in - let genesis_breadcrumb_ivar = Ivar.create () in - fun () -> - if !started then Ivar.read genesis_breadcrumb_ivar - else ( - started := true ; - let max_num_retries = 3 in - let rec go retries = - [%log info] - "Generating genesis proof ($attempts_remaining / $max_attempts)" - ~metadata: - [ ("attempts_remaining", `Int retries) - ; ("max_attempts", `Int max_num_retries) - ] ; - match%bind - Prover.create_genesis_block prover - (Genesis_proof.to_inputs precomputed_values) - with - | Ok res -> - Ivar.fill genesis_breadcrumb_ivar (Ok res) ; - return (Ok res) - | Error err -> - [%log error] "Failed to generate genesis breadcrumb: $error" - ~metadata:[ ("error", Error_json.error_to_yojson err) ] ; - if retries > 0 then go (retries - 1) - else ( - Ivar.fill genesis_breadcrumb_ivar (Error err) ; - return (Error err) ) - in - go max_num_retries ) - in - let rejected_blocks_logger = - Logger.create ~id:Logger.Logger_id.rejected_blocks () - in - let log_bootstrap_mode () = - [%log info] "Pausing block production while bootstrapping" + genesis_breadcrumb_creator ~context:(module Context) prover in let slot_tx_end = Runtime_config.slot_tx_end precomputed_values.runtime_config @@ -710,378 +1226,23 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier let slot_chain_end = Runtime_config.slot_chain_end precomputed_values.runtime_config in - let module Breadcrumb = Transition_frontier.Breadcrumb in - let produce ivar (scheduled_time, block_data, winner_pubkey) = - let open Interruptible.Let_syntax in - match Broadcast_pipe.Reader.peek frontier_reader with - | None -> - log_bootstrap_mode () ; Interruptible.return () - | Some frontier -> ( - let global_slot = - Consensus.Data.Block_data.global_slot_since_genesis block_data - in - Internal_tracing.with_slot global_slot - @@ fun () -> - [%log internal] "Begin_block_production" ; - let open Transition_frontier.Extensions in - let transition_registry = - get_extension - (Transition_frontier.extensions frontier) - Transition_registry - in - let crumb = Transition_frontier.best_tip frontier in - let crumb = - let crumb_global_slot_since_genesis = - Breadcrumb.protocol_state crumb - |> Protocol_state.consensus_state - |> Consensus.Data.Consensus_state.global_slot_since_genesis - in - let block_global_slot_since_genesis = - Consensus.Proof_of_stake.Data.Block_data - .global_slot_since_genesis block_data - in - if - Mina_numbers.Global_slot_since_genesis.equal - crumb_global_slot_since_genesis - block_global_slot_since_genesis - then - (* We received a block for this slot over the network before - attempting to produce our own. Build upon its parent instead - of attempting (and failing) to build upon the block itself. - *) - Transition_frontier.find_exn frontier - (Breadcrumb.parent_hash crumb) - else crumb - in - let start = Block_time.now time_controller in - [%log info] - ~metadata: - [ ( "parent_hash" - , Breadcrumb.parent_hash crumb |> State_hash.to_yojson ) - ; ( "protocol_state" - , Breadcrumb.protocol_state crumb - |> Protocol_state.value_to_yojson ) - ] - "Producing new block with parent $parent_hash%!" ; - let previous_transition = Breadcrumb.block_with_hash crumb in - let previous_protocol_state = - Header.protocol_state - @@ Mina_block.header (With_hash.data previous_transition) - in - let%bind previous_protocol_state_proof = - if - Consensus.Data.Consensus_state.is_genesis_state - (Protocol_state.consensus_state previous_protocol_state) - && Option.is_none precomputed_values.proof_data - then ( - match%bind - Interruptible.uninterruptible (genesis_breadcrumb ()) - with - | Ok block -> - let proof = Blockchain_snark.Blockchain.proof block in - Interruptible.lift (Deferred.return proof) - (Deferred.never ()) - | Error err -> - [%log error] - "Aborting block production: cannot generate a genesis \ - proof" - ~metadata:[ ("error", Error_json.error_to_yojson err) ] ; - Interruptible.lift (Deferred.never ()) (Deferred.return ()) - ) - else - return - ( Header.protocol_state_proof - @@ Mina_block.header (With_hash.data previous_transition) ) - in - [%log internal] "Get_transactions_from_pool" ; - let transactions = - Network_pool.Transaction_pool.Resource_pool.transactions - transaction_resource_pool - |> Sequence.map - ~f:Transaction_hash.User_command_with_valid_signature.data - in - let%bind () = - Interruptible.lift (Deferred.return ()) (Ivar.read ivar) - in - [%log internal] "Generate_next_state" ; - let%bind next_state_opt = - generate_next_state ~commit_id ~constraint_constants - ~scheduled_time ~block_data ~previous_protocol_state - ~time_controller - ~staged_ledger:(Breadcrumb.staged_ledger crumb) - ~transactions ~get_completed_work ~logger ~log_block_creation - ~winner_pk:winner_pubkey ~block_reward_threshold - ~zkapp_cmd_limit:!zkapp_cmd_limit ~zkapp_cmd_limit_hardcap - ~slot_tx_end ~slot_chain_end - in - [%log internal] "Generate_next_state_done" ; - match next_state_opt with - | None -> - Interruptible.return () - | Some - (protocol_state, internal_transition, pending_coinbase_witness) - -> - let diff = - Internal_transition.staged_ledger_diff internal_transition - in - let commands = Staged_ledger_diff.commands diff in - let transactions_count = List.length commands in - let protocol_state_hashes = - Protocol_state.hashes protocol_state - in - let consensus_state_with_hashes = - { With_hash.hash = protocol_state_hashes - ; data = Protocol_state.consensus_state protocol_state - } - in - [%log internal] "@produced_block_state_hash" - ~metadata: - [ ( "state_hash" - , `String - (Mina_base.State_hash.to_base58_check - protocol_state_hashes.state_hash ) ) - ] ; - Internal_tracing.with_state_hash - protocol_state_hashes.state_hash - @@ fun () -> - Debug_assert.debug_assert (fun () -> - [%test_result: [ `Take | `Keep ]] - (Consensus.Hooks.select - ~context:(module Context) - ~existing: - (With_hash.map ~f:Mina_block.consensus_state - previous_transition ) - ~candidate:consensus_state_with_hashes ) - ~expect:`Take - ~message: - "newly generated consensus states should be selected \ - over their parent" ; - let root_consensus_state_with_hashes = - Transition_frontier.root frontier - |> Breadcrumb.consensus_state_with_hashes - in - [%test_result: [ `Take | `Keep ]] - (Consensus.Hooks.select - ~context:(module Context) - ~existing:root_consensus_state_with_hashes - ~candidate:consensus_state_with_hashes ) - ~expect:`Take - ~message: - "newly generated consensus states should be selected \ - over the tf root" ) ; - Interruptible.uninterruptible - (let open Deferred.Let_syntax in - let emit_breadcrumb () = - let open Deferred.Result.Let_syntax in - [%log internal] - ~metadata: - [ ("transactions_count", `Int transactions_count) ] - "Produce_state_transition_proof" ; - let%bind protocol_state_proof = - time ~logger ~time_controller - "Protocol_state_proof proving time(ms)" (fun () -> - O1trace.thread "dispatch_block_proving" (fun () -> - Prover.prove prover - ~prev_state:previous_protocol_state - ~prev_state_proof:previous_protocol_state_proof - ~next_state:protocol_state internal_transition - pending_coinbase_witness ) - |> Deferred.Result.map_error ~f:(fun err -> - `Prover_error - ( err - , ( previous_protocol_state_proof - , internal_transition - , pending_coinbase_witness ) ) ) ) - in - let staged_ledger_diff = - Internal_transition.staged_ledger_diff internal_transition - in - let previous_state_hash = - (Protocol_state.hashes previous_protocol_state).state_hash - in - [%log internal] "Produce_chain_transition_proof" ; - let delta_block_chain_proof = - Transition_chain_prover.prove - ~length: - (Mina_numbers.Length.to_int consensus_constants.delta) - ~frontier previous_state_hash - |> Option.value_exn - in - [%log internal] "Produce_validated_transition" ; - let%bind transition = - let open Result.Let_syntax in - Validation.wrap - { With_hash.hash = protocol_state_hashes - ; data = - (let body = Body.create staged_ledger_diff in - Mina_block.create ~body - ~header: - (Header.create ~protocol_state - ~protocol_state_proof - ~delta_block_chain_proof () ) ) - } - |> Validation.skip_time_received_validation - `This_block_was_not_received_via_gossip - |> Validation.skip_protocol_versions_validation - `This_block_has_valid_protocol_versions - |> validate_genesis_protocol_state_block - ~genesis_state_hash: - (Protocol_state.genesis_state_hash - ~state_hash:(Some previous_state_hash) - previous_protocol_state ) - >>| Validation.skip_proof_validation - `This_block_was_generated_internally - >>| Validation.skip_delta_block_chain_validation - `This_block_was_not_received_via_gossip - >>= Validation.validate_frontier_dependencies - ~to_header:Mina_block.header - ~context:(module Context) - ~root_block: - ( Transition_frontier.root frontier - |> Breadcrumb.block_with_hash ) - ~is_block_in_frontier: - (Fn.compose Option.is_some - (Transition_frontier.find frontier) ) - |> Deferred.return - in - let transition_receipt_time = Some (Time.now ()) in - let%bind breadcrumb = - time ~logger ~time_controller - "Build breadcrumb on produced block" (fun () -> - Breadcrumb.build ~logger ~precomputed_values ~verifier - ~get_completed_work:(Fn.const None) ~trust_system - ~parent:crumb ~transition - ~sender:None (* Consider skipping `All here *) - ~skip_staged_ledger_verification:`Proofs - ~transition_receipt_time () ) - |> Deferred.Result.map_error ~f:(function - | `Invalid_staged_ledger_diff e -> - `Invalid_staged_ledger_diff - (e, staged_ledger_diff) - | ( `Fatal_error _ - | `Invalid_genesis_protocol_state - | `Invalid_staged_ledger_hash _ - | `Not_selected_over_frontier_root - | `Parent_missing_from_frontier - | `Prover_error _ ) as err -> - err ) - in - let txs = - Mina_block.transactions ~constraint_constants - (Breadcrumb.block breadcrumb) - |> List.map ~f:Transaction.yojson_summary_with_status - in - [%log internal] "@block_metadata" - ~metadata: - [ ( "blockchain_length" - , Mina_numbers.Length.to_yojson - @@ Mina_block.blockchain_length - @@ Breadcrumb.block breadcrumb ) - ; ("transactions", `List txs) - ] ; - [%str_log info] - ~metadata: - [ ("breadcrumb", Breadcrumb.to_yojson breadcrumb) ] - Block_produced ; - (* let uptime service (and any other waiters) know about breadcrumb *) - Bvar.broadcast block_produced_bvar breadcrumb ; - Mina_metrics.( - Counter.inc_one Block_producer.blocks_produced) ; - Mina_metrics.Block_producer.( - Block_production_delay_histogram.observe - block_production_delay - Time.( - Span.to_ms - @@ diff (now ()) - @@ Block_time.to_time_exn scheduled_time)) ; - [%log internal] "Send_breadcrumb_to_transition_frontier" ; - let%bind.Async.Deferred () = - Strict_pipe.Writer.write transition_writer breadcrumb - in - let metadata = - [ ( "state_hash" - , State_hash.to_yojson protocol_state_hashes.state_hash - ) - ] - in - [%log internal] "Wait_for_confirmation" ; - [%log debug] ~metadata - "Waiting for block $state_hash to be inserted into \ - frontier" ; - Deferred.choose - [ Deferred.choice - (Transition_registry.register transition_registry - protocol_state_hashes.state_hash ) - (Fn.const (Ok `Transition_accepted)) - ; Deferred.choice - ( Block_time.Timeout.create time_controller - (* We allow up to 20 seconds for the transition - to make its way from the transition_writer to - the frontier. - This value is chosen to be reasonably - generous. In theory, this should not take - terribly long. But long cycles do happen in - our system, and with medium curves those long - cycles can be substantial. - *) - (Block_time.Span.of_ms 20000L) - ~f:(Fn.const ()) - |> Block_time.Timeout.to_deferred ) - (Fn.const (Ok `Timed_out)) - ] - >>= function - | `Transition_accepted -> - [%log internal] "Transition_accepted" ; - [%log info] ~metadata - "Generated transition $state_hash was accepted into \ - transition frontier" ; - Deferred.map ~f:Result.return - (Mina_networking.broadcast_state net - (Breadcrumb.block_with_hash breadcrumb) ) - | `Timed_out -> - (* FIXME #3167: this should be fatal, and more - importantly, shouldn't happen. - *) - [%log internal] "Transition_accept_timeout" ; - let msg : (_, unit, string, unit) format4 = - "Timed out waiting for generated transition \ - $state_hash to enter transition frontier. \ - Continuing to produce new blocks anyway. This may \ - mean your CPU is overloaded. Consider disabling \ - `-run-snark-worker` if it's configured." - in - let span = - Block_time.diff (Block_time.now time_controller) start - in - let metadata = - [ ( "time" - , `Int - (Block_time.Span.to_ms span |> Int64.to_int_exn) - ) - ; ( "protocol_state" - , Protocol_state.Value.to_yojson protocol_state ) - ] - @ metadata - in - [%log' debug rejected_blocks_logger] ~metadata msg ; - [%log fatal] ~metadata msg ; - return () - in - let%bind res = emit_breadcrumb () in - let span = - Block_time.diff (Block_time.now time_controller) start - in - handle_block_production_errors ~logger ~rejected_blocks_logger - ~time_taken:span ~previous_protocol_state ~protocol_state - res) ) + let produce = + produce ~genesis_breadcrumb + ~context:(module Context : CONTEXT) + ~prover ~verifier ~trust_system ~get_completed_work + ~transaction_resource_pool ~frontier_reader ~time_controller + ~transition_writer ~log_block_creation ~block_reward_threshold + ~block_produced_bvar ~slot_tx_end ~slot_chain_end ~net + ~zkapp_cmd_limit_hardcap in + let module Breadcrumb = Transition_frontier.Breadcrumb in let production_supervisor = Singleton_supervisor.create ~task:produce in let scheduler = Singleton_scheduler.create time_controller in let rec check_next_block_timing slot i () = (* Begin checking for the ability to produce a block *) match Broadcast_pipe.Reader.peek frontier_reader with | None -> - log_bootstrap_mode () ; + log_bootstrap_mode ~logger () ; don't_wait_for (let%map () = Broadcast_pipe.Reader.iter_until frontier_reader @@ -1135,21 +1296,8 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier "Block producer will begin producing only empty blocks after \ $slot_diff slots" slot_tx_end ; - - let generate_genesis_proof_if_needed () = - match Broadcast_pipe.Reader.peek frontier_reader with - | Some transition_frontier -> - let consensus_state = - Transition_frontier.best_tip transition_frontier - |> Breadcrumb.consensus_state - in - if - Consensus.Data.Consensus_state.is_genesis_state - consensus_state - then genesis_breadcrumb () |> Deferred.ignore_m - else Deferred.return () - | None -> - Deferred.return () + let next_vrf_check_now = + check_next_block_timing new_global_slot i' in (* TODO: Re-enable this assertion when it doesn't fail dev demos * (see #5354) @@ -1158,179 +1306,34 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier ~constants:consensus_constants ~consensus_state ~local_state:consensus_local_state = None ) ; *) + let produce_block_now triple = + ignore + ( Interruptible.finally + (Singleton_supervisor.dispatch production_supervisor triple) + ~f:next_vrf_check_now + : (_, _) Interruptible.t ) + in don't_wait_for - (let%bind () = - if Mina_numbers.Length.(i' > i) then - Vrf_evaluation_state.update_epoch_data ~vrf_evaluator - ~epoch_data_for_vrf ~logger vrf_evaluation_state - ~vrf_poll_interval - else Deferred.unit - in - let%bind () = - (*Poll once every slot if the evaluation for the epoch is not completed or the evaluation is completed*) - if - Mina_numbers.Global_slot_since_hard_fork.( - new_global_slot > slot) - && not (Vrf_evaluation_state.finished vrf_evaluation_state) - then - Vrf_evaluation_state.poll ~vrf_evaluator ~logger - vrf_evaluation_state ~vrf_poll_interval - else Deferred.unit - in - match Core.Queue.dequeue vrf_evaluation_state.queue with - | None -> ( - (*Keep trying until we get some slots*) - let poll () = - let%bind () = Async.after vrf_poll_interval in - let%map () = - Vrf_evaluation_state.poll ~vrf_evaluator ~logger - vrf_evaluation_state ~vrf_poll_interval - in - Singleton_scheduler.schedule scheduler - (Block_time.now time_controller) - ~f:(check_next_block_timing new_global_slot i') - in - match - Vrf_evaluation_state.evaluator_status vrf_evaluation_state - with - | Completed -> - let epoch_end_time = - Consensus.Hooks.epoch_end_time - ~constants:consensus_constants - epoch_data_for_vrf.epoch - in - set_next_producer_timing (`Check_again epoch_end_time) - consensus_state ; - [%log info] "No more slots won in this epoch" ; - return - (Singleton_scheduler.schedule scheduler epoch_end_time - ~f:(check_next_block_timing new_global_slot i') ) - | At last_slot -> - set_next_producer_timing (`Evaluating_vrf last_slot) - consensus_state ; - poll () - | Start -> - set_next_producer_timing - (`Evaluating_vrf new_global_slot) consensus_state ; - poll () ) - | Some slot_won -> ( - let winning_global_slot = slot_won.global_slot in - let slot, epoch = - let t = - Consensus.Data.Consensus_time.of_global_slot - winning_global_slot ~constants:consensus_constants - in - Consensus.Data.Consensus_time.(slot t, epoch t) - in - [%log info] "Block producer won slot $slot in epoch $epoch" - ~metadata: - [ ( "slot" - , Mina_numbers.Global_slot_since_genesis.( - to_yojson @@ of_uint32 slot) ) - ; ("epoch", Mina_numbers.Length.to_yojson epoch) - ] ; - let now = Block_time.now time_controller in - let curr_global_slot = - Consensus.Data.Consensus_time.( - of_time_exn ~constants:consensus_constants now - |> to_global_slot) - in - let winner_pk = fst slot_won.delegator in - let data = - Consensus.Hooks.get_block_data ~slot_won ~ledger_snapshot - ~coinbase_receiver:!coinbase_receiver - in - if - Mina_numbers.Global_slot_since_hard_fork.( - curr_global_slot = winning_global_slot) - then ( - (*produce now*) - [%log info] "Producing a block now" ; - set_next_producer_timing - (`Produce_now (data, winner_pk)) - consensus_state ; - Mina_metrics.(Counter.inc_one Block_producer.slots_won) ; - let%map () = generate_genesis_proof_if_needed () in - ignore - ( Interruptible.finally - (Singleton_supervisor.dispatch production_supervisor - (now, data, winner_pk) ) - ~f:(check_next_block_timing new_global_slot i') - : (_, _) Interruptible.t ) ) - else - match - Mina_numbers.Global_slot_since_hard_fork.diff - winning_global_slot curr_global_slot - with - | None -> - [%log warn] - "Skipping block production for global slot \ - $slot_won because it has passed. Current global \ - slot is $curr_slot" - ~metadata: - [ ( "slot_won" - , Mina_numbers.Global_slot_since_hard_fork - .to_yojson winning_global_slot ) - ; ( "curr_slot" - , Mina_numbers.Global_slot_since_hard_fork - .to_yojson curr_global_slot ) - ] ; - return (check_next_block_timing new_global_slot i' ()) - | Some slot_diff -> - [%log info] "Producing a block in $slots slots" - ~metadata: - [ ( "slots" - , Mina_numbers.Global_slot_span.to_yojson - slot_diff ) - ] ; - let time = - Consensus.Data.Consensus_time.( - start_time ~constants:consensus_constants - (of_global_slot ~constants:consensus_constants - winning_global_slot )) - |> Block_time.to_span_since_epoch - |> Block_time.Span.to_ms - in - set_next_producer_timing - (`Produce (time, data, winner_pk)) - consensus_state ; - Mina_metrics.(Counter.inc_one Block_producer.slots_won) ; - let scheduled_time = time_of_ms time in - don't_wait_for - ((* Attempt to generate a genesis proof in the slot - immediately before we'll actually need it, so that - it isn't limiting our block production time in the - won slot. - This also allows non-genesis blocks to be received - in the meantime and alleviate the need to produce - one at all, if this won't have block height 1. - *) - let scheduled_genesis_time = - time_of_ms - Int64.( - time - - of_int - constraint_constants - .block_window_duration_ms) - in - let span_till_time = - Block_time.diff scheduled_genesis_time - (Block_time.now time_controller) - |> Block_time.Span.to_time_span - in - let%bind () = after span_till_time in - generate_genesis_proof_if_needed () ) ; - Singleton_scheduler.schedule scheduler scheduled_time - ~f:(fun () -> - ignore - ( Interruptible.finally - (Singleton_supervisor.dispatch - production_supervisor - (scheduled_time, data, winner_pk) ) - ~f: - (check_next_block_timing new_global_slot i') - : (_, _) Interruptible.t ) ) ; - Deferred.return () ) ) + ( iteration + ~schedule_next_vrf_check: + (Fn.compose Deferred.return + (Singleton_scheduler.schedule scheduler + ~f:next_vrf_check_now ) ) + ~produce_block_now: + (Fn.compose Deferred.return produce_block_now) + ~schedule_block_production:(fun (time, data, winner) -> + Singleton_scheduler.schedule scheduler time ~f:(fun () -> + produce_block_now (time, data, winner) ) ; + Deferred.unit ) + ~next_vrf_check_now: + (Fn.compose Deferred.return next_vrf_check_now) + ~genesis_breadcrumb + ~context:(module Context) + ~vrf_evaluator ~time_controller ~coinbase_receiver + ~frontier_reader ~set_next_producer_timing + ~transition_frontier ~vrf_evaluation_state ~epoch_data_for_vrf + ~ledger_snapshot i slot + : unit Deferred.t ) in let start () = check_next_block_timing Mina_numbers.Global_slot_since_hard_fork.zero @@ -1361,9 +1364,6 @@ let run ~context:(module Context : CONTEXT) ~vrf_evaluator ~prover ~verifier let run_precomputed ~context:(module Context : CONTEXT) ~verifier ~trust_system ~time_controller ~frontier_reader ~transition_writer ~precomputed_blocks = let open Context in - let log_bootstrap_mode () = - [%log info] "Pausing block production while bootstrapping" - in let rejected_blocks_logger = Logger.create ~id:Logger.Logger_id.rejected_blocks () in @@ -1393,7 +1393,8 @@ let run_precomputed ~context:(module Context : CONTEXT) ~verifier ~trust_system in match Broadcast_pipe.Reader.peek frontier_reader with | None -> - log_bootstrap_mode () ; return () + log_bootstrap_mode ~logger () ; + return () | Some frontier -> let open Transition_frontier.Extensions in let transition_registry = @@ -1552,7 +1553,7 @@ let run_precomputed ~context:(module Context : CONTEXT) ~verifier ~trust_system (* Begin checking for the ability to produce a block *) match Broadcast_pipe.Reader.peek frontier_reader with | None -> - log_bootstrap_mode () ; + log_bootstrap_mode ~logger () ; let%bind () = Broadcast_pipe.Reader.iter_until frontier_reader ~f:(Fn.compose Deferred.return Option.is_some) diff --git a/src/lib/mina_net2/libp2p_helper.ml b/src/lib/mina_net2/libp2p_helper.ml index 2ef08749016..116a85be3c1 100644 --- a/src/lib/mina_net2/libp2p_helper.ml +++ b/src/lib/mina_net2/libp2p_helper.ml @@ -221,8 +221,7 @@ let handle_incoming_message t msg ~handle_push_message = handle_push_message t (DaemonInterface.PushMessage.get push_msg) ) ) | Undefined n -> - Libp2p_ipc.undefined_union ~context:"DaemonInterface.Message" n ; - Deferred.unit + Libp2p_ipc.undefined_union ~context:"DaemonInterface.Message" n let spawn ?(allow_multiple_instances = false) ~logger ~pids ~conf_dir ~handle_push_message () = diff --git a/src/libp2p_ipc/libp2p_ipc.capnp b/src/libp2p_ipc/libp2p_ipc.capnp index 8e603d46178..fca0b6b1139 100644 --- a/src/libp2p_ipc/libp2p_ipc.capnp +++ b/src/libp2p_ipc/libp2p_ipc.capnp @@ -332,7 +332,7 @@ struct Libp2pHelperInterface { result @1 :ValidationResult; } - struct DeleteResource { + struct RemoveResource { ids @0 :List(RootBlockId); } @@ -420,7 +420,7 @@ struct Libp2pHelperInterface { union { validation @1 :Libp2pHelperInterface.Validation; addResource @2 :Libp2pHelperInterface.AddResource; - deleteResource @3 :Libp2pHelperInterface.DeleteResource; + removeResource @3 :Libp2pHelperInterface.RemoveResource; downloadResource @4 :Libp2pHelperInterface.DownloadResource; heartbeatPeer @5 :Libp2pHelperInterface.HeartbeatPeer; } @@ -475,6 +475,7 @@ struct DaemonInterface { struct ResourceUpdate { type @0 :ResourceUpdateType; ids @1 :List(RootBlockId); + tag @2 :UInt8; } struct PushMessage { diff --git a/src/libp2p_ipc/libp2p_ipc.ml b/src/libp2p_ipc/libp2p_ipc.ml index 6258b436f34..d1f0d40f81b 100644 --- a/src/libp2p_ipc/libp2p_ipc.ml +++ b/src/libp2p_ipc/libp2p_ipc.ml @@ -265,6 +265,40 @@ let push_message_to_outgoing_message request = Builder.Libp2pHelperInterface.Message.( builder_op push_message_set_builder request) +let create_remove_resource_push_message ~ids = + let ids = + List.map ids ~f:(fun id -> + build' + (module Builder.RootBlockId) + Builder.RootBlockId.(op blake2b_hash_set id) ) + in + build' + (module Builder.Libp2pHelperInterface.PushMessage) + Builder.Libp2pHelperInterface.PushMessage.( + builder_op header_set_builder (create_push_message_header ()) + *> reader_op remove_resource_set_reader + (build + (module Builder.Libp2pHelperInterface.RemoveResource) + Builder.Libp2pHelperInterface.RemoveResource.( + list_op ids_set_list ids) )) + +let create_download_resource_push_message ~tag ~ids = + let ids = + List.map ids ~f:(fun id -> + build' + (module Builder.RootBlockId) + Builder.RootBlockId.(op blake2b_hash_set id) ) + in + build' + (module Builder.Libp2pHelperInterface.PushMessage) + Builder.Libp2pHelperInterface.PushMessage.( + builder_op header_set_builder (create_push_message_header ()) + *> reader_op download_resource_set_reader + (build + (module Builder.Libp2pHelperInterface.DownloadResource) + Builder.Libp2pHelperInterface.DownloadResource.( + op tag_set_exn tag *> list_op ids_set_list ids) )) + let create_add_resource_push_message ~tag ~data = build' (module Builder.Libp2pHelperInterface.PushMessage) diff --git a/src/libp2p_ipc/libp2p_ipc.mli b/src/libp2p_ipc/libp2p_ipc.mli index bdb960ffaf8..99e9c75acaa 100644 --- a/src/libp2p_ipc/libp2p_ipc.mli +++ b/src/libp2p_ipc/libp2p_ipc.mli @@ -36,7 +36,7 @@ module Subscription_id : sig val create : unit -> t end -val undefined_union : context:string -> int -> unit +val undefined_union : context:string -> int -> 'a val unsafe_parse_peer_id : peer_id -> Peer.Id.t @@ -97,6 +97,11 @@ val create_validation_push_message : val create_add_resource_push_message : tag:int -> data:string -> push_message +val create_download_resource_push_message : + tag:int -> ids:string list -> push_message + +val create_remove_resource_push_message : ids:string list -> push_message + val create_heartbeat_peer_push_message : peer_id:Peer.Id.t -> push_message val push_message_to_outgoing_message : push_message -> outgoing_message diff --git a/src/test/archive/patch_archive_test/patch_archive_test.ml b/src/test/archive/patch_archive_test/patch_archive_test.ml index 8d0b2f91a67..cdeaa635ddf 100644 --- a/src/test/archive/patch_archive_test/patch_archive_test.ml +++ b/src/test/archive/patch_archive_test/patch_archive_test.ml @@ -77,7 +77,10 @@ let main ~db_uri ~network_data_folder () = let n = List.init missing_blocks_count ~f:(fun _ -> - Random.int (List.length extensional_files) ) + (* never remove last and first block as missing-block-guardian can have issues when patching it + as it patching only gaps + *) + Random.int (List.length extensional_files - 2) + 1 ) in let unpatched_extensional_files =