diff --git a/engine/execution/computation/committer/committer.go b/engine/execution/computation/committer/committer.go index d5a1a43d2df..bdfe5537c4e 100644 --- a/engine/execution/computation/committer/committer.go +++ b/engine/execution/computation/committer/committer.go @@ -44,17 +44,7 @@ func (s *LedgerViewCommitter) CommitView(view state.View, baseState flow.StateCo } func (s *LedgerViewCommitter) commitView(view state.View, baseState flow.StateCommitment) (newCommit flow.StateCommitment, err error) { - ids, values := view.RegisterUpdates() - update, err := ledger.NewUpdate( - baseState, - execState.RegisterIDSToKeys(ids), - execState.RegisterValuesToValues(values), - ) - if err != nil { - return nil, fmt.Errorf("cannot create ledger update: %w", err) - } - - return s.ldg.Set(update) + return execState.CommitDelta(s.ldg, view, baseState) } func (s *LedgerViewCommitter) collectProofs(view state.View, baseState flow.StateCommitment) (proof []byte, err error) { diff --git a/engine/execution/ingestion/engine.go b/engine/execution/ingestion/engine.go index 7a735caed29..f5013b30656 100644 --- a/engine/execution/ingestion/engine.go +++ b/engine/execution/ingestion/engine.go @@ -454,7 +454,8 @@ func (e *Engine) enqueueBlockAndCheckExecutable( // if it's not added, it means the block is not a new block, it already // exists in the queue, then bail if !added { - log.Debug().Msg("block already exists in the execution queue") + log.Debug().Hex("block_id", logging.Entity(executableBlock)). + Msg("block already exists in the execution queue") return nil } @@ -842,8 +843,8 @@ func newQueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Qu // G func enqueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) (*queue.Queue, bool) { for _, queue := range queues.All() { - if queue.TryAdd(blockify) { - return queue, true + if stored, isNew := queue.TryAdd(blockify); stored { + return queue, isNew } } return newQueue(blockify, queues) diff --git a/engine/execution/ingestion/engine_test.go b/engine/execution/ingestion/engine_test.go index b0e1d1ccfc4..0ba6fd7087b 100644 --- a/engine/execution/ingestion/engine_test.go +++ b/engine/execution/ingestion/engine_test.go @@ -65,6 +65,7 @@ type testingContext struct { snapshot *protocol.Snapshot identity *flow.Identity broadcastedReceipts map[flow.Identifier]*flow.ExecutionReceipt + collectionRequester *module.MockRequester } func runWithEngine(t *testing.T, f func(testingContext)) { @@ -178,6 +179,7 @@ func runWithEngine(t *testing.T, f func(testingContext)) { blocks: blocks, collections: collections, state: protocolState, + collectionRequester: request, conduit: conduit, collectionConduit: collectionConduit, computationManager: computationManager, @@ -193,25 +195,13 @@ func runWithEngine(t *testing.T, f func(testingContext)) { func (ctx *testingContext) assertSuccessfulBlockComputation(commits map[flow.Identifier]flow.StateCommitment, onPersisted func(blockID flow.Identifier, commit flow.StateCommitment), executableBlock *entity.ExecutableBlock, previousExecutionResultID flow.Identifier, expectBroadcast bool) { computationResult := executionUnittest.ComputationResultForBlockFixture(executableBlock) - newStateCommitment := unittest.StateCommitmentFixture() - if len(computationResult.StateSnapshots) == 0 { // if block was empty, no new state commitment is produced - newStateCommitment = executableBlock.StartState - } + newStateCommitment := executableBlock.StartState ctx.computationManager. - On("ComputeBlock", mock.Anything, executableBlock, mock.Anything). + On("ComputeBlock", mock.Anything, executableBlock, mock.Anything).Run(func(args mock.Arguments) { + }). Return(computationResult, nil).Once() - for _, view := range computationResult.StateSnapshots { - ctx.executionState. - On("CommitDelta", mock.Anything, view.Delta, executableBlock.StartState). - Return(newStateCommitment, nil) - - ctx.executionState. - On("GetRegistersWithProofs", mock.Anything, mock.Anything, mock.Anything). - Return(nil, nil, nil) - } - ctx.executionState.On("NewView", executableBlock.StartState).Return(new(delta.View)) ctx.executionState. @@ -391,6 +381,104 @@ func TestExecuteOneBlock(t *testing.T) { }) } +func TestBlocksArentExecutedMultipleTimes(t *testing.T) { + runWithEngine(t, func(ctx testingContext) { + + colSigner := unittest.IdentifierFixture() + + // A <- B <- C + blockA := unittest.BlockHeaderFixture() + blockB := unittest.ExecutableBlockFixtureWithParent(nil, &blockA) + blockB.StartState = unittest.StateCommitmentFixture() + + blockC := unittest.ExecutableBlockFixtureWithParent([][]flow.Identifier{{colSigner}}, blockB.Block.Header) + blockC.StartState = blockB.StartState //blocks are empty, so no state change is expected + + logBlocks(map[string]*entity.ExecutableBlock{ + "B": blockB, + "C": blockC, + }) + + collection := blockC.Collections()[0].Collection() + + commits := make(map[flow.Identifier]flow.StateCommitment) + commits[blockB.Block.Header.ParentID] = blockB.StartState + + wg := sync.WaitGroup{} + ctx.mockStateCommitsWithMap(commits) + + ctx.state.On("Sealed").Return(ctx.snapshot) + ctx.snapshot.On("Head").Return(&blockA, nil) + + // wait finishing execution until all the blocks are sent to execution + wgPut := sync.WaitGroup{} + wgPut.Add(1) + + // add extra flag to make sure B was indeed executed before C + wasBExecuted := false + + ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) { + wgPut.Wait() + wg.Done() + + wasBExecuted = true + }, blockB, unittest.IdentifierFixture(), true) + + ctx.assertSuccessfulBlockComputation(commits, func(blockID flow.Identifier, commit flow.StateCommitment) { + wg.Done() + require.True(t, wasBExecuted) + }, blockC, unittest.IdentifierFixture(), true) + + // make sure collection requests are sent + // first, the collection should not be found, so the request will be sent. Next, it will be queried again, and this time + // it should return fine + gomock.InOrder( + ctx.collections.EXPECT().ByID(blockC.Collections()[0].Guarantee.CollectionID).DoAndReturn(func(_ flow.Identifier) (*flow.Collection, error) { + // make sure request for collection from block C are sent before block B finishes execution + require.False(t, wasBExecuted) + return nil, storageerr.ErrNotFound + }), + ctx.collections.EXPECT().ByID(blockC.Collections()[0].Guarantee.CollectionID).DoAndReturn(func(_ flow.Identifier) (*flow.Collection, error) { + return &collection, nil + }), + ) + + ctx.collectionRequester.EXPECT().EntityByID(gomock.Any(), gomock.Any()).DoAndReturn(func(_ flow.Identifier, _ flow.IdentityFilter) { + // parallel run to avoid deadlock, ingestion engine is thread-safe + go func() { + err := ctx.engine.handleCollection(unittest.IdentifierFixture(), &collection) + require.NoError(t, err) + }() + }) + ctx.collections.EXPECT().Store(&collection) + + times := 4 + + wg.Add(1) // wait for block B to be executed + for i := 0; i < times; i++ { + err := ctx.engine.handleBlock(context.Background(), blockB.Block) + require.NoError(t, err) + } + wg.Add(1) // wait for block C to be executed + // add extra block to ensure the execution can continue after duplicated blocks + err := ctx.engine.handleBlock(context.Background(), blockC.Block) + require.NoError(t, err) + wgPut.Done() + + unittest.AssertReturnsBefore(t, wg.Wait, 5*time.Second) + + _, more := <-ctx.engine.Done() //wait for all the blocks to be processed + require.False(t, more) + + _, ok := commits[blockB.ID()] + require.True(t, ok) + + _, ok = commits[blockC.ID()] + require.True(t, ok) + + }) +} + func logBlocks(blocks map[string]*entity.ExecutableBlock) { log := unittest.Logger() for name, b := range blocks { diff --git a/engine/execution/state/state.go b/engine/execution/state/state.go index 2884ecc4963..cb77cf6b0e2 100644 --- a/engine/execution/state/state.go +++ b/engine/execution/state/state.go @@ -61,9 +61,6 @@ type ReadOnlyExecutionState interface { type ExecutionState interface { ReadOnlyExecutionState - // CommitDelta commits a register delta and returns the new state commitment. - CommitDelta(context.Context, delta.Delta, flow.StateCommitment) (flow.StateCommitment, error) - UpdateHighestExecutedBlockIfHigher(context.Context, *flow.Header) error PersistExecutionState(ctx context.Context, header *flow.Header, endState flow.StateCommitment, chunkDataPacks []*flow.ChunkDataPack, executionReceipt *flow.ExecutionReceipt, events []flow.Event, serviceEvents []flow.Event, results []flow.TransactionResult) error @@ -211,8 +208,12 @@ func (s *state) NewView(commitment flow.StateCommitment) *delta.View { return delta.NewView(LedgerGetRegister(s.ls, commitment)) } -func CommitDelta(ldg ledger.Ledger, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) { - ids, values := delta.RegisterUpdates() +type RegisterUpdatesHolder interface { + RegisterUpdates() ([]flow.RegisterID, []flow.RegisterValue) +} + +func CommitDelta(ldg ledger.Ledger, ruh RegisterUpdatesHolder, baseState flow.StateCommitment) (flow.StateCommitment, error) { + ids, values := ruh.RegisterUpdates() update, err := ledger.NewUpdate( baseState, @@ -232,12 +233,12 @@ func CommitDelta(ldg ledger.Ledger, delta delta.Delta, baseState flow.StateCommi return commit, nil } -func (s *state) CommitDelta(ctx context.Context, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) { - span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta) - defer span.Finish() - - return CommitDelta(s.ls, delta, baseState) -} +//func (s *state) CommitDelta(ctx context.Context, delta delta.Delta, baseState flow.StateCommitment) (flow.StateCommitment, error) { +// span, _ := s.tracer.StartSpanFromContext(ctx, trace.EXECommitDelta) +// defer span.Finish() +// +// return CommitDelta(s.ls, delta, baseState) +//} func (s *state) getRegisters(commit flow.StateCommitment, registerIDs []flow.RegisterID) (*ledger.Query, []ledger.Value, error) { diff --git a/engine/execution/state/state_test.go b/engine/execution/state/state_test.go index a1b297a2a67..fb50f352da6 100644 --- a/engine/execution/state/state_test.go +++ b/engine/execution/state/state_test.go @@ -21,7 +21,7 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) -func prepareTest(f func(t *testing.T, es state.ExecutionState)) func(*testing.T) { +func prepareTest(f func(t *testing.T, es state.ExecutionState, l *ledger.Ledger)) func(*testing.T) { return func(t *testing.T) { unittest.RunWithBadgerDB(t, func(badgerDB *badger.DB) { metricsCollector := &metrics.NoopCollector{} @@ -53,7 +53,7 @@ func prepareTest(f func(t *testing.T, es state.ExecutionState)) func(*testing.T) ls, stateCommitments, blocks, headers, collections, chunkDataPacks, results, receipts, myReceipts, events, serviceEvents, txResults, badgerDB, trace.NewNoopTracer(), ) - f(t, es) + f(t, es, ls) }) } } @@ -63,7 +63,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { registerID2 := "vegetable" - t.Run("commit write and read new state", prepareTest(func(t *testing.T, es state.ExecutionState) { + t.Run("commit write and read new state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) { // TODO: use real block ID sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{}) assert.NoError(t, err) @@ -75,7 +75,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view1.Set(registerID2, "", "", flow.RegisterValue("carrot")) assert.NoError(t, err) - sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1) + sc2, err := state.CommitDelta(l, view1.Delta(), sc1) assert.NoError(t, err) view2 := es.NewView(sc2) @@ -89,7 +89,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { assert.Equal(t, flow.RegisterValue("carrot"), b2) })) - t.Run("commit write and read previous state", prepareTest(func(t *testing.T, es state.ExecutionState) { + t.Run("commit write and read previous state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) { // TODO: use real block ID sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{}) assert.NoError(t, err) @@ -98,8 +98,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view1.Set(registerID1, "", "", []byte("apple")) assert.NoError(t, err) - - sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1) + sc2, err := state.CommitDelta(l, view1.Delta(), sc1) assert.NoError(t, err) // update value and get resulting state commitment @@ -107,7 +106,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view2.Set(registerID1, "", "", []byte("orange")) assert.NoError(t, err) - sc3, err := es.CommitDelta(context.Background(), view2.Delta(), sc2) + sc3, err := state.CommitDelta(l, view2.Delta(), sc2) assert.NoError(t, err) // create a view for previous state version @@ -127,7 +126,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { assert.Equal(t, flow.RegisterValue("orange"), b2) })) - t.Run("commit delete and read new state", prepareTest(func(t *testing.T, es state.ExecutionState) { + t.Run("commit delete and read new state", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) { // TODO: use real block ID sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{}) assert.NoError(t, err) @@ -139,7 +138,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view1.Set(registerID2, "", "", []byte("apple")) assert.NoError(t, err) - sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1) + sc2, err := state.CommitDelta(l, view1.Delta(), sc1) assert.NoError(t, err) // update value and get resulting state commitment @@ -147,7 +146,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view2.Delete(registerID1, "", "") assert.NoError(t, err) - sc3, err := es.CommitDelta(context.Background(), view2.Delta(), sc2) + sc3, err := state.CommitDelta(l, view2.Delta(), sc2) assert.NoError(t, err) // create a view for previous state version @@ -167,7 +166,7 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { assert.Empty(t, b2) })) - t.Run("commit delta and persist state commit for the second time should be OK", prepareTest(func(t *testing.T, es state.ExecutionState) { + t.Run("commit delta and persist state commit for the second time should be OK", prepareTest(func(t *testing.T, es state.ExecutionState, l *ledger.Ledger) { // TODO: use real block ID sc1, err := es.StateCommitmentByBlockID(context.Background(), flow.Identifier{}) assert.NoError(t, err) @@ -179,11 +178,11 @@ func TestExecutionStateWithTrieStorage(t *testing.T) { err = view1.Set(registerID2, "", "", flow.RegisterValue("apple")) assert.NoError(t, err) - sc2, err := es.CommitDelta(context.Background(), view1.Delta(), sc1) + sc2, err := state.CommitDelta(l, view1.Delta(), sc1) assert.NoError(t, err) // committing for the second time should be OK - sc2Same, err := es.CommitDelta(context.Background(), view1.Delta(), sc1) + sc2Same, err := state.CommitDelta(l, view1.Delta(), sc1) assert.NoError(t, err) require.Equal(t, sc2, sc2Same) diff --git a/module/mempool/queue/queue.go b/module/mempool/queue/queue.go index b2563704a3e..c2ad5941d88 100644 --- a/module/mempool/queue/queue.go +++ b/module/mempool/queue/queue.go @@ -117,25 +117,27 @@ func dequeue(queue *Queue) *Queue { // TryAdd(elmt) is an idempotent operation for the same elmt, i.e. // after the first, subsequent additions of the same elements are NoOps. // Returns: -// True if and only if _after_ the operation, the element is stored in the +// stored = True if and only if _after_ the operation, the element is stored in the // queue. This is the case if (a) element was newly added to the queue or // (b) element was already stored in the queue _before_ the call. -// Adding an element fails with return value `false` in the following cases: +// new = Indicates if element was new to the queue, when `stored` was true. It lets +// distinguish (a) and (b) cases. +// Adding an element fails with return value `false` for `stored` in the following cases: // * element.ParentID() is _not_ stored in the queue // * element's height is _unequal to_ its parent's height + 1 -func (q *Queue) TryAdd(element Blockify) bool { +func (q *Queue) TryAdd(element Blockify) (stored bool, new bool) { if _, found := q.Nodes[element.ID()]; found { // (b) element was already stored in the queue _before_ the call. - return true + return true, false } // at this point, we are sure that the element is _not_ in the queue and therefore, // the element cannot be referenced as a child by any other element in the queue n, ok := q.Nodes[element.ParentID()] if !ok { - return false + return false, false } if n.Item.Height() != element.Height()-1 { - return false + return false, false } newNode := &Node{ Item: element, @@ -150,7 +152,7 @@ func (q *Queue) TryAdd(element Blockify) bool { if element.Height() > q.Highest.Item.Height() { q.Highest = newNode } - return true + return true, true } // Attach joins the other queue to this one, modifying this queue in-place. diff --git a/module/mempool/queue/queue_test.go b/module/mempool/queue/queue_test.go index 5488a2b68da..1ee10cbd339 100644 --- a/module/mempool/queue/queue_test.go +++ b/module/mempool/queue/queue_test.go @@ -35,58 +35,72 @@ func TestQueue(t *testing.T) { queue := NewQueue(a) t.Run("Adding", func(t *testing.T) { - added := queue.TryAdd(b) //parent not added yet + stored, _ := queue.TryAdd(b) //parent not stored yet size := queue.Size() height := queue.Height() - assert.False(t, added) + assert.False(t, stored) assert.Equal(t, 1, size) assert.Equal(t, uint64(0), height) - added = queue.TryAdd(c) + stored, new := queue.TryAdd(c) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) assert.Equal(t, 2, size) assert.Equal(t, uint64(1), height) - added = queue.TryAdd(b) + stored, new = queue.TryAdd(b) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) + assert.Equal(t, 3, size) + assert.Equal(t, uint64(2), height) + + stored, new = queue.TryAdd(b) //repeat + size = queue.Size() + height = queue.Height() + assert.True(t, stored) + assert.False(t, new) assert.Equal(t, 3, size) assert.Equal(t, uint64(2), height) - added = queue.TryAdd(f) //parent not added yet - assert.False(t, added) + stored, _ = queue.TryAdd(f) //parent not stored yet + assert.False(t, stored) - added = queue.TryAdd(d) + stored, new = queue.TryAdd(d) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) assert.Equal(t, 4, size) assert.Equal(t, uint64(2), height) - added = queue.TryAdd(dBroken) // wrong height - assert.False(t, added) + stored, _ = queue.TryAdd(dBroken) // wrong height + assert.False(t, stored) - added = queue.TryAdd(e) + stored, new = queue.TryAdd(e) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) assert.Equal(t, 5, size) assert.Equal(t, uint64(3), height) - added = queue.TryAdd(f) + stored, new = queue.TryAdd(f) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) assert.Equal(t, 6, size) assert.Equal(t, uint64(3), height) - added = queue.TryAdd(g) + stored, new = queue.TryAdd(g) size = queue.Size() height = queue.Height() - assert.True(t, added) + assert.True(t, stored) + assert.True(t, new) assert.Equal(t, 7, size) assert.Equal(t, uint64(3), height) }) @@ -173,14 +187,16 @@ func TestQueue(t *testing.T) { t.Run("Attaching", func(t *testing.T) { queue := NewQueue(a) - added := queue.TryAdd(c) + added, new := queue.TryAdd(c) assert.True(t, added) + assert.True(t, new) assert.Equal(t, 2, queue.Size()) assert.Equal(t, uint64(1), queue.Height()) queueB := NewQueue(b) - added = queueB.TryAdd(g) + added, new = queueB.TryAdd(g) assert.True(t, added) + assert.True(t, new) assert.Equal(t, 2, queueB.Size()) assert.Equal(t, uint64(1), queueB.Height()) @@ -195,8 +211,9 @@ func TestQueue(t *testing.T) { assert.Equal(t, 4, queue.Size()) assert.Equal(t, uint64(3), queue.Height()) - added = queue.TryAdd(d) + added, new = queue.TryAdd(d) assert.True(t, added) + assert.True(t, new) assert.Equal(t, 5, queue.Size()) assert.Equal(t, uint64(3), queue.Height()) @@ -214,15 +231,25 @@ func TestQueue(t *testing.T) { // we should only get one child queue f--d--c t.Run("Adding_Idempotent", func(t *testing.T) { queue := NewQueue(a) - assert.True(t, queue.TryAdd(c)) - assert.True(t, queue.TryAdd(d)) - assert.True(t, queue.TryAdd(f)) + add, new := queue.TryAdd(c) + assert.True(t, add) + assert.True(t, new) + + add, new = queue.TryAdd(d) + assert.True(t, add) + assert.True(t, new) + + add, new = queue.TryAdd(f) + assert.True(t, add) + assert.True(t, new) assert.Equal(t, 4, queue.Size()) assert.Equal(t, uint64(3), queue.Height()) // adding c a second time - assert.True(t, queue.TryAdd(c)) + add, new = queue.TryAdd(c) + assert.True(t, add) + assert.False(t, new) // Dequeueing a head, childQueues := queue.Dismount() @@ -241,12 +268,22 @@ func TestQueue(t *testing.T) { // attach queueA to queueB: we expect an error as the queues have nodes in common t.Run("Attaching_partially_overlapped_queue", func(t *testing.T) { queueA := NewQueue(c) - assert.True(t, queueA.TryAdd(b)) - assert.True(t, queueA.TryAdd(g)) + add, new := queueA.TryAdd(b) + assert.True(t, add) + assert.True(t, new) + + add, new = queueA.TryAdd(g) + assert.True(t, add) + assert.True(t, new) queueB := NewQueue(a) - assert.True(t, queueB.TryAdd(c)) - assert.True(t, queueB.TryAdd(d)) + add, new = queueB.TryAdd(c) + assert.True(t, add) + assert.True(t, new) + + add, new = queueB.TryAdd(d) + assert.True(t, add) + assert.True(t, new) err := queueB.Attach(queueA) assert.Error(t, err)