Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/v0.6.0' into update_to_0_6
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielbosio committed Sep 11, 2024
2 parents 6b062a7 + 339aa71 commit 49a9f1f
Show file tree
Hide file tree
Showing 37 changed files with 1,062 additions and 388 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ You can use the link to the explorer to check the status of your transaction.
aligned verify-proof-onchain \
--aligned-verification-data ~/.aligned/aligned_verification_data/*.json \
--rpc_url https://ethereum-holesky-rpc.publicnode.com \
--chain holesky
--chain holesky \
--payment_service_addr 0x815aeCA64a974297942D2Bbf034ABEe22a38A003
```
This is reading the result of the verification of the proof in Ethereum.
Expand Down Expand Up @@ -179,4 +180,4 @@ For submitting proofs generated by your own project to the network via CLI, see
## Integrating Aligned into your Project
If you are developing applications using Aligned, we offer a [Rust-SDK](docs/3_guides/1_SDK_how_to.md) for submitting proofs directly to the network within your applications.
If you are developing applications using Aligned, we offer a [Rust-SDK](docs/guides/1_SDK.md) for submitting proofs directly to the network within your applications.
2 changes: 1 addition & 1 deletion aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Listen for new task created in the ServiceManager contract in a separate goroutine
// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
if listenErr != nil {
Expand Down
127 changes: 110 additions & 17 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type BatchData struct {

type Aggregator struct {
AggregatorConfig *config.AggregatorConfig
NewBatchChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
NewBatchChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch
NewBatchChanV2 chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
avsReader *chainio.AvsReader
avsSubscriber *chainio.AvsSubscriber
avsWriter *chainio.AvsWriter
Expand Down Expand Up @@ -91,7 +92,8 @@ type Aggregator struct {
}

func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error) {
newBatchChan := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
newBatchChan := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch)
newBatchChanV2 := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
if err != nil {
Expand Down Expand Up @@ -162,6 +164,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
avsSubscriber: avsSubscriber,
avsWriter: avsWriter,
NewBatchChan: newBatchChan,
NewBatchChanV2: newBatchChanV2,

batchesIdentifierHashByIdx: batchesIdentifierHashByIdx,
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
Expand Down Expand Up @@ -215,6 +218,8 @@ func (agg *Aggregator) Start(ctx context.Context) error {

const MaxSentTxRetries = 5

var switchBlockNumber = uint64(2_268_375) // 2_268_375 is the block at sep 3th 15:00

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
if blsAggServiceResp.Err != nil {
agg.taskMutex.Lock()
Expand Down Expand Up @@ -276,21 +281,38 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

current_task_block := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
for i := 0; i < MaxSentTxRetries; i++ {
receipt, err := agg.sendAggregatedResponse(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Gas cost used to send aggregated response", "gasUsed", receipt.GasUsed)

agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
if current_task_block < switchBlockNumber {
agg.logger.Info("agg if V1")
receipt, err := agg.sendAggregatedResponse(batchData.BatchMerkleRoot, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Gas cost used to send aggregated response", "gasUsed", receipt.GasUsed)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)

} else {
agg.logger.Info("agg if V2")
_, err = agg.sendAggregatedResponseV2(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)
}

agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
Expand All @@ -303,7 +325,32 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

// / Sends response to contract and waits for transaction receipt
// / Returns error if it fails to send tx or receipt is not found
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
agg.walletMutex.Lock()
agg.logger.Infof("- Locked Wallet Resources: Sending aggregated response for batch",
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))

txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchMerkleRoot[:]), err)
return nil, err
}

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchMerkleRoot[:]))

receipt, err := utils.WaitForTransactionReceipt(
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
if err != nil {
return nil, err
}

agg.metrics.IncAggregatedResponses()

return receipt, nil
}
func (agg *Aggregator) sendAggregatedResponseV2(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

Expand All @@ -313,7 +360,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
"senderAddress", hex.EncodeToString(senderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
txHash, err := agg.avsWriter.SendAggregatedResponseV2(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
Expand All @@ -334,7 +381,53 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
return receipt, nil
}

func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task",
"Batch merkle root", "0x"+hex.EncodeToString(batchMerkleRoot[:]))

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Adding new task")

// --- UPDATE BATCH - INDEX CACHES ---
batchIndex := agg.nextBatchIndex
if _, ok := agg.batchesIdxByIdentifierHash[batchMerkleRoot]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}

// This shouldn't happen, since both maps are updated together
if _, ok := agg.batchesIdentifierHashByIdx[batchIndex]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}

agg.batchesIdxByIdentifierHash[batchMerkleRoot] = batchIndex
agg.batchCreatedBlockByIdx[batchIndex] = uint64(taskCreatedBlock)
agg.batchesIdentifierHashByIdx[batchIndex] = batchMerkleRoot
agg.batchDataByIdentifierHash[batchMerkleRoot] = BatchData{
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: [20]byte{},
}
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}

agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", "0x"+hex.EncodeToString(batchMerkleRoot[:]))
}
func (agg *Aggregator) AddNewTaskV2(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

Expand Down
93 changes: 92 additions & 1 deletion aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,98 @@ func (agg *Aggregator) ServeOperators() error {
// Returns:
// - 0: Success
// - 1: Error
func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {
func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))

taskIndex := uint32(0)
ok := false

for i := 0; i < waitForEventRetries; i++ {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchMerkleRoot]
if !ok {
agg.taskMutex.Unlock()
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
time.Sleep(waitForEventSleepSeconds)
} else {
break
}
}

if !ok {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
*reply = 1
return nil
}

// Note: we already have lock here
agg.logger.Debug("- Checking if operator already responded")
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
if !ok {
batchResponses = make(map[eigentypes.Bytes32]struct{})
agg.operatorRespondedBatch[taskIndex] = batchResponses
}

if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
*reply = 0
agg.logger.Warn("Operator already responded, ignoring",
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))

agg.taskMutex.Unlock()
return nil
}

batchResponses[signedTaskResponse.OperatorId] = struct{}{}

// Don't wait infinitely if it can't answer
// Create a context with a timeout of 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // Ensure the cancel function is called to release resources

// Create a channel to signal when the task is done
done := make(chan struct{})

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)

if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
// remove operator from the list of operators that responded
// so that it can try again
delete(batchResponses, signedTaskResponse.OperatorId)
} else {
agg.logger.Info("BLS process succeeded")
}

close(done)
}()

*reply = 1
// Wait for either the context to be done or the task to complete
select {
case <-ctx.Done():
// The context's deadline was exceeded or it was canceled
agg.logger.Info("Bls process timed out, operator signature will be lost. Batch may not reach quorum")
case <-done:
// The task completed successfully
agg.logger.Info("Bls context finished correctly")
*reply = 0
}

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
agg.taskMutex.Unlock()

return nil
}
func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *types.SignedTaskResponseV2, reply *uint8) error {
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
Expand Down
34 changes: 32 additions & 2 deletions aggregator/internal/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,37 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
if err != nil {
return err
}
err = agg.subscribeToNewTasksV2()
if err != nil {
return err
}

var switchBlockNumber = uint32(2_268_375) // 2_268_375 is the block at sep 3th 15:00

for {
select {
case err := <-agg.taskSubscriber:
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)

// TODO not sure if this is the best way, but no way to calculate blocknumber from here
err = agg.subscribeToNewTasks()
errV2 := agg.subscribeToNewTasksV2()
if err != nil {
return err
}
if errV2 != nil {
return err
}
case newBatch := <-agg.NewBatchChan:
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task")
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.SenderAddress, newBatch.TaskCreatedBlock)
if newBatch.TaskCreatedBlock < switchBlockNumber {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V1")
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
}
case newBatchV2 := <-agg.NewBatchChanV2:
if newBatchV2.TaskCreatedBlock >= switchBlockNumber {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V2")
agg.AddNewTaskV2(newBatchV2.BatchMerkleRoot, newBatchV2.SenderAddress, newBatchV2.TaskCreatedBlock)
}
}
}
}
Expand All @@ -32,3 +51,14 @@ func (agg *Aggregator) subscribeToNewTasks() error {

return err
}
func (agg *Aggregator) subscribeToNewTasksV2() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV2(agg.NewBatchChanV2)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
}

return err
}
2 changes: 1 addition & 1 deletion batcher/aligned-sdk/abi/AlignedLayerServiceManager.json

Large diffs are not rendered by default.

341 changes: 181 additions & 160 deletions contracts/bindings/AlignedLayerServiceManager/binding.go

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading

0 comments on commit 49a9f1f

Please sign in to comment.