Skip to content

Commit

Permalink
Fix repeated block execution (#545)
Browse files Browse the repository at this point in the history
* Fix multiple block execution

* linting

* Remove unnecessary delay

* Ensure blocks are being enqueued and executed afterwards

* Ensure blocks are being enqueued and executed afterwards

* Update tests and queue to make sure new blocks arent wrongly treated as duplicates

* Move view commit to use one share function

* Update engine/execution/ingestion/engine.go

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>
Co-authored-by: Kan Zhang <kan@axiomzen.co>
  • Loading branch information
3 people authored Apr 14, 2021
1 parent 6c2188e commit 438f333
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 90 deletions.
12 changes: 1 addition & 11 deletions engine/execution/computation/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,7 @@ func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCo
}

func (s *LedgerViewCommitter) commitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, err error) {
ids, values := view.RegisterUpdates()
update, err := ledger.NewUpdate(
baseState,
execState.RegisterIDSToKeys(ids),
execState.RegisterValuesToValues(values),
)
if err != nil {
return nil, fmt.Errorf("cannot create ledger update: %w", err)
}

return s.ldg.Set(update)
return execState.CommitDelta(s.ldg, view, baseState)
}

func (s *LedgerViewCommitter) collectProofs(view state.View, baseState flow.StateCommitment) (proof []byte, err error) {
Expand Down
7 changes: 4 additions & 3 deletions engine/execution/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ func (e *Engine) enqueueBlockAndCheckExecutable(
// if it's not added, it means the block is not a new block, it already
// exists in the queue, then bail
if !added {
log.Debug().Msg("block already exists in the execution queue")
log.Debug().Hex("block_id", logging.Entity(executableBlock)).
Msg("block already exists in the execution queue")
return nil
}

Expand Down Expand Up @@ -842,8 +843,8 @@ func newQueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Qu
// G
func enqueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Queue, bool) {
for _, queue := range queues.All() {
if queue.TryAdd(blockify) {
return queue, true
if stored, isNew := queue.TryAdd(blockify); stored {
return queue, isNew
}
}
return newQueue(blockify, queues)
Expand Down
118 changes: 103 additions & 15 deletions engine/execution/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type testingContext struct {
snapshot *protocol.Snapshot
identity *flow.Identity
broadcastedReceipts map[flow.Identifier]*flow.ExecutionReceipt
collectionRequester *module.MockRequester
}

func runWithEngine(t *testing.T, f func(testingContext)) {
Expand Down Expand Up @@ -178,6 +179,7 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
blocks: blocks,
collections: collections,
state: protocolState,
collectionRequester: request,
conduit: conduit,
collectionConduit: collectionConduit,
computationManager: computationManager,
Expand All @@ -193,25 +195,13 @@ func runWithEngine(t *testing.T, f func(testingContext)) {

func (ctx *testingContext) assertSuccessfulBlockComputation(commits map[flow.Identifier]flow.StateCommitment, onPersisted func(blockID flow.Identifier, commit flow.StateCommitment), executableBlock *entity.ExecutableBlock, previousExecutionResultID flow.Identifier, expectBroadcast bool) {
computationResult := executionUnittest.ComputationResultForBlockFixture(executableBlock)
newStateCommitment := unittest.StateCommitmentFixture()
if len(computationResult.StateSnapshots) == 0 { // if block was empty, no new state commitment is produced
newStateCommitment = executableBlock.StartState
}
newStateCommitment := executableBlock.StartState

ctx.computationManager.
On("ComputeBlock", mock.Anything, executableBlock, mock.Anything).
On("ComputeBlock", mock.Anything, executableBlock, mock.Anything).Run(func(args mock.Arguments) {
}).
Return(computationResult, nil).Once()

for _, view := range computationResult.StateSnapshots {
ctx.executionState.
On("CommitDelta", mock.Anything, view.Delta, executableBlock.StartState).
Return(newStateCommitment, nil)

ctx.executionState.
On("GetRegistersWithProofs", mock.Anything, mock.Anything, mock.Anything).
Return(nil, nil, nil)
}

ctx.executionState.On("NewView", executableBlock.StartState).Return(new(delta.View))

ctx.executionState.
Expand Down Expand Up @@ -391,6 +381,104 @@ func TestExecuteOneBlock(t *testing.T) {
})
}

func TestBlocksArentExecutedMultipleTimes(t *testing.T) {
runWithEngine(t, func(ctx testingContext) {

colSigner := unittest.IdentifierFixture()

// A <- B <- C
blockA := unittest.BlockHeaderFixture()
blockB := unittest.ExecutableBlockFixtureWithParent(nil, &blockA)
blockB.StartState = unittest.StateCommitmentFixture()

blockC := unittest.ExecutableBlockFixtureWithParent([][]flow.Identifier{{colSigner}}, blockB.Block.Header)
blockC.StartState = blockB.StartState //blocks are empty, so no state change is expected

logBlocks(map[string]*entity.ExecutableBlock{
"B": blockB,
"C": blockC,
})

collection := blockC.Collections()[0].Collection()

commits := make(map[flow.Identifier]flow.StateCommitment)
commits[blockB.Block.Header.ParentID] = blockB.StartState

wg := sync.WaitGroup{}
ctx.mockStateCommitsWithMap(commits)

ctx.state.On("Sealed").Return(ctx.snapshot)
ctx.snapshot.On("Head").Return(&blockA, nil)

// wait finishing execution until all the blocks are sent to execution
wgPut := sync.WaitGroup{}
wgPut.Add(1)

// add extra flag to make sure B was indeed executed before C
wasBExecuted := false

ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) {
wgPut.Wait()
wg.Done()

wasBExecuted = true
}, blockB, unittest.IdentifierFixture(), true)

ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) {
wg.Done()
require.True(t, wasBExecuted)
}, blockC, unittest.IdentifierFixture(), true)

// make sure collection requests are sent
// first, the collection should not be found, so the request will be sent. Next, it will be queried again, and this time
// it should return fine
gomock.InOrder(
ctx.collections.EXPECT().ByID(blockC.Collections()[0].Guarantee.CollectionID).DoAndReturn(func(_ flow.Identifier) (*flow.Collection, error) {
// make sure request for collection from block C are sent before block B finishes execution
require.False(t, wasBExecuted)
return nil, storageerr.ErrNotFound
}),
ctx.collections.EXPECT().ByID(blockC.Collections()[0].Guarantee.CollectionID).DoAndReturn(func(_ flow.Identifier) (*flow.Collection, error) {
return &collection, nil
}),
)

ctx.collectionRequester.EXPECT().EntityByID(gomock.Any(), gomock.Any()).DoAndReturn(func(_ flow.Identifier, _ flow.IdentityFilter) {
// parallel run to avoid deadlock, ingestion engine is thread-safe
go func() {
err := ctx.engine.handleCollection(unittest.IdentifierFixture(), &collection)
require.NoError(t, err)
}()
})
ctx.collections.EXPECT().Store(&collection)

times := 4

wg.Add(1) // wait for block B to be executed
for i := 0; i < times; i++ {
err := ctx.engine.handleBlock(context.Background(), blockB.Block)
require.NoError(t, err)
}
wg.Add(1) // wait for block C to be executed
// add extra block to ensure the execution can continue after duplicated blocks
err := ctx.engine.handleBlock(context.Background(), blockC.Block)
require.NoError(t, err)
wgPut.Done()

unittest.AssertReturnsBefore(t, wg.Wait, 5*time.Second)

_, more := <-ctx.engine.Done() //wait for all the blocks to be processed
require.False(t, more)

_, ok := commits[blockB.ID()]
require.True(t, ok)

_, ok = commits[blockC.ID()]
require.True(t, ok)

})
}

func logBlocks(blocks map[string]*entity.ExecutableBlock) {
log := unittest.Logger()
for name, b := range blocks {
Expand Down
23 changes: 12 additions & 11 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type ReadOnlyExecutionState interface {
type ExecutionState interface {
ReadOnlyExecutionState

// CommitDelta commits a register delta and returns the new state commitment.
CommitDelta(context.Context, delta.Delta, flow.StateCommitment) (flow.StateCommitment, error)

UpdateHighestExecutedBlockIfHigher(context.Context, *flow.Header) error

PersistExecutionState(ctx context.Context, header *flow.Header, endState flow.StateCommitment, chunkDataPacks []*flow.ChunkDataPack, executionReceipt *flow.ExecutionReceipt, events []flow.Event, serviceEvents []flow.Event, results []flow.TransactionResult) error
Expand Down Expand Up @@ -211,8 +208,12 @@ func (s *state) NewView(commitment flow.StateCommitment) *delta.View {
return delta.NewView(LedgerGetRegister(s.ls, commitment))
}

func CommitDelta(ldg ledger.Ledger, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) {
ids, values := delta.RegisterUpdates()
type RegisterUpdatesHolder interface {
RegisterUpdates() ([]flow.RegisterID, []flow.RegisterValue)
}

func CommitDelta(ldg ledger.Ledger, ruh RegisterUpdatesHolder, baseState flow.StateCommitment) (flow.StateCommitment, error) {
ids, values := ruh.RegisterUpdates()

update, err := ledger.NewUpdate(
baseState,
Expand All @@ -232,12 +233,12 @@ func CommitDelta(ldg ledger.Ledger, delta delta.Delta, baseState flow.StateCommi
return commit, nil
}

func (s *state) CommitDelta(ctx context.Context, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) {
span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta)
defer span.Finish()

return CommitDelta(s.ls, delta, baseState)
}
//func (s *state) CommitDelta(ctx context.Context, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) {
// span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta)
// defer span.Finish()
//
// return CommitDelta(s.ls, delta, baseState)
//}

func (s *state) getRegisters(commit flow.StateCommitment, registerIDs []flow.RegisterID) (*ledger.Query, []ledger.Value, error) {

Expand Down
27 changes: 13 additions & 14 deletions engine/execution/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/onflow/flow-go/utils/unittest"
)

func prepareTest(f func(t *testing.T, es state.ExecutionState)) func(*testing.T) {
func prepareTest(f func(t *testing.T, es state.ExecutionState, l *ledger.Ledger)) func(*testing.T) {
return func(t *testing.T) {
unittest.RunWithBadgerDB(t, func(badgerDB *badger.DB) {
metricsCollector := &metrics.NoopCollector{}
Expand Down Expand Up @@ -53,7 +53,7 @@ func prepareTest(f func(t *testing.T, es state.ExecutionState)) func(*testing.T)
ls, stateCommitments, blocks, headers, collections, chunkDataPacks, results, receipts, myReceipts, events, serviceEvents, txResults, badgerDB, trace.NewNoopTracer(),
)

f(t, es)
f(t, es, ls)
})
}
}
Expand All @@ -63,7 +63,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {

registerID2 := "vegetable"

t.Run("commit write and read new state", prepareTest(func(t *testing.T, es state.ExecutionState) {
t.Run("commit write and read new state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) {
// TODO: use real block ID
sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{})
assert.NoError(t, err)
Expand All @@ -75,7 +75,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
err = view1.Set(registerID2, "", "", flow.RegisterValue("carrot"))
assert.NoError(t, err)

sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1)
sc2, err := state.CommitDelta(l, view1.Delta(), sc1)
assert.NoError(t, err)

view2 := es.NewView(sc2)
Expand All @@ -89,7 +89,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
assert.Equal(t, flow.RegisterValue("carrot"), b2)
}))

t.Run("commit write and read previous state", prepareTest(func(t *testing.T, es state.ExecutionState) {
t.Run("commit write and read previous state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) {
// TODO: use real block ID
sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{})
assert.NoError(t, err)
Expand All @@ -98,16 +98,15 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {

err = view1.Set(registerID1, "", "", []byte("apple"))
assert.NoError(t, err)

sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1)
sc2, err := state.CommitDelta(l, view1.Delta(), sc1)
assert.NoError(t, err)

// update value and get resulting state commitment
view2 := es.NewView(sc2)
err = view2.Set(registerID1, "", "", []byte("orange"))
assert.NoError(t, err)

sc3, err := es.CommitDelta(context.Background(), view2.Delta(), sc2)
sc3, err := state.CommitDelta(l, view2.Delta(), sc2)
assert.NoError(t, err)

// create a view for previous state version
Expand All @@ -127,7 +126,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
assert.Equal(t, flow.RegisterValue("orange"), b2)
}))

t.Run("commit delete and read new state", prepareTest(func(t *testing.T, es state.ExecutionState) {
t.Run("commit delete and read new state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) {
// TODO: use real block ID
sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{})
assert.NoError(t, err)
Expand All @@ -139,15 +138,15 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
err = view1.Set(registerID2, "", "", []byte("apple"))
assert.NoError(t, err)

sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1)
sc2, err := state.CommitDelta(l, view1.Delta(), sc1)
assert.NoError(t, err)

// update value and get resulting state commitment
view2 := es.NewView(sc2)
err = view2.Delete(registerID1, "", "")
assert.NoError(t, err)

sc3, err := es.CommitDelta(context.Background(), view2.Delta(), sc2)
sc3, err := state.CommitDelta(l, view2.Delta(), sc2)
assert.NoError(t, err)

// create a view for previous state version
Expand All @@ -167,7 +166,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
assert.Empty(t, b2)
}))

t.Run("commit delta and persist state commit for the second time should be OK", prepareTest(func(t *testing.T, es state.ExecutionState) {
t.Run("commit delta and persist state commit for the second time should be OK", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) {
// TODO: use real block ID
sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{})
assert.NoError(t, err)
Expand All @@ -179,11 +178,11 @@ func TestExecutionStateWithTrieStorage(t *testing.T) {
err = view1.Set(registerID2, "", "", flow.RegisterValue("apple"))
assert.NoError(t, err)

sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1)
sc2, err := state.CommitDelta(l, view1.Delta(), sc1)
assert.NoError(t, err)

// committing for the second time should be OK
sc2Same, err := es.CommitDelta(context.Background(), view1.Delta(), sc1)
sc2Same, err := state.CommitDelta(l, view1.Delta(), sc1)
assert.NoError(t, err)

require.Equal(t, sc2, sc2Same)
Expand Down
16 changes: 9 additions & 7 deletions module/mempool/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,27 @@ func dequeue(queue *Queue) *Queue {
// TryAdd(elmt) is an idempotent operation for the same elmt, i.e.
// after the first, subsequent additions of the same elements are NoOps.
// Returns:
// True if and only if _after_ the operation, the element is stored in the
// stored = True if and only if _after_ the operation, the element is stored in the
// queue. This is the case if (a) element was newly added to the queue or
// (b) element was already stored in the queue _before_ the call.
// Adding an element fails with return value `false` in the following cases:
// new = Indicates if element was new to the queue, when `stored` was true. It lets
// distinguish (a) and (b) cases.
// Adding an element fails with return value `false` for `stored` in the following cases:
// * element.ParentID() is _not_ stored in the queue
// * element's height is _unequal to_ its parent's height + 1
func (q *Queue) TryAdd(element Blockify) bool {
func (q *Queue) TryAdd(element Blockify) (stored bool, new bool) {
if _, found := q.Nodes[element.ID()]; found {
// (b) element was already stored in the queue _before_ the call.
return true
return true, false
}
// at this point, we are sure that the element is _not_ in the queue and therefore,
// the element cannot be referenced as a child by any other element in the queue
n, ok := q.Nodes[element.ParentID()]
if !ok {
return false
return false, false
}
if n.Item.Height() != element.Height()-1 {
return false
return false, false
}
newNode := &Node{
Item: element,
Expand All @@ -150,7 +152,7 @@ func (q *Queue) TryAdd(element Blockify) bool {
if element.Height() > q.Highest.Item.Height() {
q.Highest = newNode
}
return true
return true, true
}

// Attach joins the other queue to this one, modifying this queue in-place.
Expand Down
Loading

0 comments on commit 438f333

Please sign in to comment.