Skip to content

Commit

Permalink
Merge pull request #16174 from MinaProtocol/dkijania/port_revive_arch…
Browse files Browse the repository at this point in the history
…ive_test_stability_fix_port

Port revive patch archive test again
  • Loading branch information
dkijania authored Oct 17, 2024
2 parents e118c42 + c410507 commit 4b6844c
Show file tree
Hide file tree
Showing 19 changed files with 912 additions and 686 deletions.
1 change: 1 addition & 0 deletions buildkite/src/Command/Base.dhall
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ let targetToAgent =
, Integration = toMap { size = "integration" }
, QA = toMap { size = "qa" }
, Hardfork = toMap { size = "hardfork" }
, Perf = toMap { size = "perf" }
, Multi = toMap { size = "generic-multi" }
}
target
Expand Down
99 changes: 99 additions & 0 deletions buildkite/src/Command/Bench/Base.dhall
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
let PipelineMode = ../../Pipeline/Mode.dhall

let PipelineTag = ../../Pipeline/Tag.dhall

let Pipeline = ../../Pipeline/Dsl.dhall

let JobSpec = ../../Pipeline/JobSpec.dhall

let DebianVersions = ../../Constants/DebianVersions.dhall

let RunInToolchain = ../../Command/RunInToolchain.dhall

let Network = ../../Constants/Network.dhall

let Profiles = ../../Constants/Profiles.dhall

let Command = ../../Command/Base.dhall

let Docker = ../../Command/Docker/Type.dhall

let Size = ../Size.dhall

let Benchmarks = ../../Constants/Benchmarks.dhall

let SelectFiles = ../../Lib/SelectFiles.dhall

let Spec =
{ Type =
{ key : Text
, bench : Text
, label : Text
, size : Size
, name : Text
, path : Text
, mode : PipelineMode.Type
, dependsOn : List Command.TaggedKey.Type
, additionalDirtyWhen : List SelectFiles.Type
, yellowThreshold : Double
, redThreshold : Double
}
, default =
{ mode = PipelineMode.Type.PullRequest
, size = Size.Medium
, dependsOn =
DebianVersions.dependsOn
DebianVersions.DebVersion.Bullseye
Network.Type.Devnet
Profiles.Type.Standard
, additionalDirtyWhen = [] : List SelectFiles.Type
, yellowThreshold = 0.1
, redThreshold = 0.2
}
}

let command
: Spec.Type -> Command.Type
= \(spec : Spec.Type)
-> Command.build
Command.Config::{
, commands =
RunInToolchain.runInToolchain
(Benchmarks.toEnvList Benchmarks.Type::{=})
"./buildkite/scripts/benchmarks.sh ${spec.bench} --red-threshold ${Double/show
spec.redThreshold} --yellow-threshold ${Double/show
spec.yellowThreshold}"
, label = "Perf: ${spec.label}"
, key = spec.key
, target = spec.size
, docker = None Docker.Type
, depends_on = spec.dependsOn
}

let pipeline
: Spec.Type -> Pipeline.Config.Type
= \(spec : Spec.Type)
-> Pipeline.Config::{
, spec = JobSpec::{
, dirtyWhen =
[ SelectFiles.strictlyStart (SelectFiles.contains "src")
, SelectFiles.exactly
"buildkite/src/Command/Bench/Base"
"dhall"
, SelectFiles.contains "scripts/benchmark"
, SelectFiles.contains "buildkite/scripts/benchmark"
]
# spec.additionalDirtyWhen
, path = spec.path
, name = spec.name
, mode = spec.mode
, tags =
[ PipelineTag.Type.Long
, PipelineTag.Type.Test
, PipelineTag.Type.Stable
]
}
, steps = [ command spec ]
}

in { command = command, pipeline = pipeline, Spec = Spec }
2 changes: 1 addition & 1 deletion buildkite/src/Command/Size.dhall
Original file line number Diff line number Diff line change
@@ -1 +1 @@
< XLarge | Large | Medium | Small | Integration | QA | Hardfork | Multi >
< XLarge | Large | Medium | Small | Integration | QA | Hardfork | Multi | Perf >
21 changes: 21 additions & 0 deletions buildkite/src/Constants/Benchmarks.dhall
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
let Spec =
{ Type = { tokenEnvName : Text, bucket : Text, org : Text, host : Text }
, default =
{ tokenEnvName = "\\\${INFLUX_TOKEN}"
, bucket = "\\\${INFLUX_BUCKET_NAME}"
, org = "\\\${INFLUX_ORG}"
, host = "\\\${INFLUX_HOST}"
}
}

let toEnvList =
\(spec : Spec.Type)
-> [ "INFLUX_HOST=${spec.host}"
, "INFLUX_TOKEN=${spec.tokenEnvName}"
, "INFLUX_ORG=${spec.org}"
, "INFLUX_BUCKET_NAME=${spec.bucket}"
]

let mainlineBranches = "[develop,compatible,master]"

in { Type = Spec, toEnvList = toEnvList, mainlineBranches = mainlineBranches }
71 changes: 12 additions & 59 deletions src/app/libp2p_helper/src/libp2p_helper/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
)

type bitswapDeleteCmd struct {
Expand Down Expand Up @@ -91,62 +90,14 @@ func announceNewRootBlock(ctx context.Context, engine *bitswap.Bitswap, storage
return storage.SetStatus(ctx, root, codanet.Full)
}

func (bs *BitswapCtx) deleteRoot(root BitswapBlockLink) error {
if err := bs.storage.SetStatus(bs.ctx, root, codanet.Deleting); err != nil {
return err
}
ClearRootDownloadState(bs, root)
allDescendants := []BitswapBlockLink{root}
viewBlockF := func(b []byte) error {
links, _, err := ReadBitswapBlock(b)
if err == nil {
for _, l := range links {
var l2 BitswapBlockLink
copy(l2[:], l[:])
allDescendants = append(allDescendants, l2)
}
}
return err
}
for _, block := range allDescendants {
if err := bs.storage.ViewBlock(bs.ctx, block, viewBlockF); err != nil && err != (ipld.ErrNotFound{Cid: codanet.BlockHashToCid(block)}) {
return err
}
}
if err := bs.storage.DeleteBlocks(bs.ctx, allDescendants); err != nil {
return err
}
return bs.storage.DeleteStatus(bs.ctx, root)
}

func ClearRootDownloadState(bs BitswapState, root root) {
rootStates := bs.RootDownloadStates()
state, has := rootStates[root]
if !has {
return
}
nodeParams := bs.NodeDownloadParams()
delete(rootStates, root)
state.allDescendants.ForEach(func(c cid.Cid) error {
np, hasNp := nodeParams[c]
if hasNp {
delete(np, root)
if len(np) == 0 {
delete(nodeParams, c)
}
}
return nil
})
state.cancelF()
func (bs *BitswapCtx) SendResourceUpdate(type_ ipc.ResourceUpdateType, tag BitswapDataTag, root root) {
bs.SendResourceUpdates(type_, tag, root)
}

func (bs *BitswapCtx) SendResourceUpdate(type_ ipc.ResourceUpdateType, root root) {
bs.SendResourceUpdates(type_, root)
}
func (bs *BitswapCtx) SendResourceUpdates(type_ ipc.ResourceUpdateType, roots ...root) {
func (bs *BitswapCtx) SendResourceUpdates(type_ ipc.ResourceUpdateType, tag BitswapDataTag, roots ...root) {
// Non-blocking upcall sending
select {
case bs.outMsgChan <- mkResourceUpdatedUpcall(type_, roots):
case bs.outMsgChan <- mkResourceUpdatedUpcall(type_, tag, roots):
default:
for _, root := range roots {
bitswapLogger.Errorf("Failed to send resource update of type %d"+
Expand Down Expand Up @@ -242,25 +193,27 @@ func (bs *BitswapCtx) Loop() {
ClearRootDownloadState(bs, root)
case cmd := <-bs.addCmds:
configuredCheck()
blocks, root := SplitDataToBitswapBlocksLengthPrefixedWithTag(bs.maxBlockSize, cmd.data, BlockBodyTag)
blocks, root := SplitDataToBitswapBlocksLengthPrefixedWithTag(bs.maxBlockSize, cmd.data, cmd.tag)
err := announceNewRootBlock(bs.ctx, bs.engine, bs.storage, blocks, root)
if err == nil {
bs.SendResourceUpdate(ipc.ResourceUpdateType_added, root)
bs.SendResourceUpdate(ipc.ResourceUpdateType_added, cmd.tag, root)
} else {
bitswapLogger.Errorf("Failed to announce root cid %s (%s)", codanet.BlockHashToCidSuffix(root), err)
}
case cmd := <-bs.deleteCmds:
configuredCheck()
success := []root{}
success := map[BitswapDataTag][]root{}
for _, root := range cmd.rootIds {
err := bs.deleteRoot(root)
tag, err := DeleteRoot(bs, root)
if err == nil {
success = append(success, root)
success[tag] = append(success[tag], root)
} else {
bitswapLogger.Errorf("Error processing delete request for %s: %s", codanet.BlockHashToCidSuffix(root), err)
}
}
bs.SendResourceUpdates(ipc.ResourceUpdateType_removed, success...)
for tag, roots := range success {
bs.SendResourceUpdates(ipc.ResourceUpdateType_removed, tag, roots...)
}
case cmd := <-bs.downloadCmds:
configuredCheck()
// We put all ids to map to avoid
Expand Down
104 changes: 104 additions & 0 deletions src/app/libp2p_helper/src/libp2p_helper/bitswap_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

import (
"codanet"
"errors"

"github.com/ipfs/go-cid"
)

func ClearRootDownloadState(bs BitswapState, root root) {
rootStates := bs.RootDownloadStates()
state, has := rootStates[root]
if !has {
return
}
nodeParams := bs.NodeDownloadParams()
delete(rootStates, root)
state.allDescendants.ForEach(func(c cid.Cid) error {
np, hasNp := nodeParams[c]
if hasNp {
delete(np, root)
if len(np) == 0 {
delete(nodeParams, c)
}
}
return nil
})
state.cancelF()
}

// getTag retrieves root's tag, whether the root is still being processed
// or its processing was completed
func getTag(bs BitswapState, root BitswapBlockLink) (tag BitswapDataTag, err error) {
state, has := bs.RootDownloadStates()[root]
if has {
tag = state.getTag()
} else {
err = bs.ViewBlock(root, func(b []byte) error {
_, fullBlockData, err := ReadBitswapBlock(b)
if err != nil {
return err
}
if len(fullBlockData) < 5 {
return errors.New("root block is too short")
}
tag = BitswapDataTag(fullBlockData[4])
return nil
})
}
return
}

func DeleteRoot(bs BitswapState, root BitswapBlockLink) (BitswapDataTag, error) {
if err := bs.SetStatus(root, codanet.Deleting); err != nil {
return 255, err
}
tag, err := getTag(bs, root)
if err != nil {
return tag, err
}
ClearRootDownloadState(bs, root)

// Performing breadth-first search (BFS)

// descendantMap is a "visited" set, to ensure we do not
// traverse into nodes we once visited
descendantMap := map[[32]byte]struct{}{root: {}}

// allDescendants is a list of all discovered nodes,
// serving as both "queue" to be iterated over during BFS,
// and as a list of all nodes visited at the end of
// BFS iteration
allDescendants := []BitswapBlockLink{root}
viewBlockF := func(b []byte) error {
links, _, err := ReadBitswapBlock(b)
if err == nil {
for _, l := range links {
var l2 BitswapBlockLink
copy(l2[:], l[:])
_, has := descendantMap[l2]
// Checking if the nodes was visited before
if !has {
descendantMap[l2] = struct{}{}
// Add an item to BFS queue
allDescendants = append(allDescendants, l2)
}
}
}
return err
}
// Iteration is done via index-based loop, because underlying
// array gets extended during iteration, and regular iterator
// wouldn't see these changes
for i := 0; i < len(allDescendants); i++ {
block := allDescendants[i]
if err := bs.ViewBlock(block, viewBlockF); err != nil && !isBlockNotFound(block, err) {
return tag, err
}
}
if err := bs.DeleteBlocks(allDescendants); err != nil {
return tag, err
}
return tag, bs.DeleteStatus(root)
}
Loading

0 comments on commit 4b6844c

Please sign in to comment.