Annotation of the Aggregator

// This is the structure that holds the Aggregator
type Monitor struct {
    bvsContract     string             // Address of the BVS contract
    bvsDirectoryApi api.BVSDirectory   // BVS Directory API wrapper
    chainIO         io.ChainIO         // ChainIO allows easy interaction to contracts
}

// NewMonitor creates a new Monitor instance with a Cosmos client and BVS contract.
func NewMonitor() *Monitor {
    // Figure our the path of the current executable. This is used later on to
    // figure out where the Babylon keys and home is (just a fixed structure happend
    // to be used in the project)
    _, currentFile, _, ok := runtime.Caller(0)
    if !ok {
        panic("cannot get current file")
    }
    // Figure out the parent directory of the executable. The babylon home directory
    // should be $EXE_DIR/../../$KEY_DIR
    configDir := filepath.Dir(currentFile)
    homeDir := configDir + "/../../" + core.C.Owner.KeyDir
    fmt.Printf("homeDir: %s\n", homeDir)

    // Initialize a logger and prometheus for monitoring
    elkLogger := logger.NewELKLogger("bvs_demo")
    reg := prometheus.NewRegistry()
    metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
    // Initialize ChainIO, this should be very familare by now
    chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, homeDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
        MaxRetries:             3,
        RetryInterval:          1 * time.Second,
        ConfirmationTimeout:    60 * time.Second,
        GasPriceAdjustmentRate: "1.1",
    })
    if err != nil {
        panic(err)
    }
    // Setup key ring to access private keys
    chainIO, err = chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
    if err != nil {
        panic(err)
    }
    // Grab the current account for use later
    account, err := chainIO.GetCurrentAccount()
    if err != nil {
        panic(err)
    }
    address := account.GetAddress().String()
    fmt.Printf("address: %s\n", address)
    
    // Query the BVS directory and find the BVS we are monitoring
    txResp, err := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
    if err != nil {
        panic(err)
    }
    
    // Construct the Aggregator and return it to the caller
    bvsDirectoryApi := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory)
    return &Monitor{
        bvsContract:     txResp.BVSContract,
        bvsDirectoryApi: bvsDirectoryApi,
        chainIO:         chainIO,
    }
}

// Starts monitoring of the task queue
func (m *Monitor) Run(ctx context.Context) {
    core.L.Info("Start to monitor task queue")
    for {
        // The HTTP endpoint (see below) pushes finished task to Redis. Here we 
        // anticipate results from Redis and forward it to the BVS contract
        results, err := core.S.RedisConn.BLPop(context.Background(), 0, core.PkTaskQueue).Result()
        fmt.Printf("results: %+v\n", results)
        if err != nil {
            core.L.Error(fmt.Sprintf("Failed to read task queue, due to {%s}", err))
            continue
        }
        fmt.Printf("result--->: %s\n", results[1])
        task := core.Task{}
        if err := json.Unmarshal([]byte(results[1]), &task); err != nil {
            core.L.Error(fmt.Sprintf("Failed to parse task queue, due to {%s}", err))
            continue
        }
        fmt.Printf("task: %+v\n", task)
        pkTaskResult := fmt.Sprintf("%s%d", core.PkTaskResult, task.TaskId)
        taskResultStr, err := json.Marshal(task.TaskResult)
        if err != nil {
            core.L.Error(fmt.Sprintf("Failed to marshal task result, due to {%s}", err))
            return
        }
        // Put the data onto Redis. Later on verifyTask() pulls data from Redis
        if err := core.S.RedisConn.LPush(ctx, pkTaskResult, taskResultStr).Err(); err != nil {
            core.L.Error(fmt.Sprintf("Failed to save task result, due to {%s}", err))
            return
        }
        // calls verifyTask() to check if enough results have been sent to the aggregator
        // and if the result should be forwarded to the contract
        m.verifyTask(ctx, task.TaskId)
    }
}

// verifyTask checks if a result has been submitted by enough off-chain processes,
// then submit back to the BVS contract.
func (m *Monitor) verifyTask(ctx context.Context, taskId uint64) {
    pkTaskResult := fmt.Sprintf("%s%d", core.PkTaskResult, taskId)
    // Grab the task info from Redis
    results, err := core.S.RedisConn.LRange(ctx, pkTaskResult, 0, -1).Result()
    fmt.Printf("verify results: %s\n", results)
    if err != nil {
        core.L.Error(fmt.Sprintf("Failed to read task result, due to {%s}", err))
        return
    }

    resultCntMap := make(map[int64]uint)
    resultOperatorMap := make(map[int64][]string)
    var taskResult core.TaskResult
    for _, result := range results {
        fmt.Printf("verify result: %s\n", result)
        if err := json.Unmarshal([]byte(result), &taskResult); err != nil {
            core.L.Error(fmt.Sprintf("Failed to parse task result, due to {%s}", err))
            return
        }
        // Sum up the number of results submitted and how manu operator has submitted
        resultCntMap[taskResult.Result]++
        resultOperatorMap[taskResult.Result] = append(resultOperatorMap[taskResult.Result], taskResult.Operator)
        // If the number of results submitted is larger then a predefined threshold. 
        // Mark it as done on Redis (avoid duplicated send), submit to BVS contract
        // and remove the processing record from Redis
        if resultCntMap[taskResult.Result] >= core.C.App.Threshold {
            pkTaskFinished := fmt.Sprintf("%s%d", core.PkTaskFinished, taskId)
            // Set as finished
            if err := core.S.RedisConn.Set(ctx, pkTaskFinished, taskResult.Result, 0).Err(); err != nil {
                core.L.Error(fmt.Sprintf("Failed to set task finished, due to {%s}", err))
                return
            }
            // Submit to BVS contract
            operators := strings.Join(resultOperatorMap[taskResult.Result], "&")
            core.L.Info(fmt.Sprintf("Task {%d} is finished. The result is {%d}. The operators are {%s}", taskId, taskResult.Result, operators))
            if err := m.sendTaskResult(taskId, taskResult.Result, operators); err != nil {
                core.L.Error(fmt.Sprintf("Failed to send task result, due to {%s}", err))
            }
            // Remove processing recordes from Redis
            pkTaskOperator := fmt.Sprintf("%s%d", core.PkTaskOperator, taskId)
            core.S.RedisConn.Del(ctx, pkTaskResult)
            core.S.RedisConn.Del(ctx, pkTaskOperator)
            break
        }
    }
}

// sendTaskResult sends the task result to BVS contract via the BVSSquaring API
func (m *Monitor) sendTaskResult(taskId uint64, result int64, operators string) error {
    fmt.Println("sendTaskResult", taskId, result, operators)

    // Create a new BvsSquaring interface
    bvsSquaring := BvsSquaringApi.NewBVSSquaring(m.chainIO)
    bvsSquaring.BindClient(m.bvsContract)
    // Send result to BVS Contract
    _, err := bvsSquaring.RespondToTask(context.Background(), int64(taskId), result, operators)
    if err != nil {
        return err
    }

    return nil

}

// Check if the operator is registered with BVS contract
func (m *Monitor) VerifyOperator(operator string) (bool, error) {
    rsp, err := m.bvsDirectoryApi.QueryOperator(operator, operator)
    if err != nil {
        core.L.Error(fmt.Sprintf("Failed to query operator, due to {%s}", err))
        return false, err
    }
    fmt.Printf("txnRsp: %+v\n", rsp)
    if rsp.Status == "registered" {
        return true, nil
    }

    return false, nil
}

That concludes most of the aggregator flow. If you are paying attention, you SHOULD (and I mean RFC 2119 SHOULD) be questioning "where is the code that accepts input from the off-chain compute process? It is in another file.

type Payload struct {
    TaskId    uint64 `json:"taskID" binding:"required"`
    Result    int64  `json:"result" binding:"required"`
    Timestamp int64  `json:"timestamp" binding:"required"`
    Signature string `json:"signature" binding:"required"`
    PubKey    string `json:"pubKey" binding:"required"`
}

// Aggregator handles the aggregator endpoint for the API.
//
// It parses the payload from the request body and verifies the signature.
// It checks if the timestamp is within the allowed range.
// It verifies if the task is finished and if the operator has already sent the task.
// If all checks pass, it saves the task to the queue.
// It returns an HTTP response with the status of the operation.
func Aggregator(c *gin.Context) {
    // parse payload
    var payload Payload
    if err := c.ShouldBindJSON(&payload); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }
    fmt.Printf("payload: %+v\n", payload)

    // get current timestamp, make sure it is within a defined limit
    nowTs := time.Now().Unix()
    if payload.Timestamp > nowTs || payload.Timestamp < nowTs-60*2 {
        c.JSON(http.StatusBadRequest, gin.H{"error": "timestamp out of range"})
        return
    }

    // verify signature
    pubKey, address, err := util.PubKeyToAddress(payload.PubKey)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }
    fmt.Printf("pubKey: %s\n", pubKey)
    fmt.Printf("address: %s\n", address)
    fmt.Printf("payload.PubKey: %s\n", payload.PubKey)

    // The Aggregator reconstructs the payload and validaes both that
    // 1. The same msgPayload is created by the off-chain process
    // 2. The off-chain process indeed signed the message and it is not altered
    msgPayload := fmt.Sprintf("%s-%d-%d-%d", core.C.Chain.BvsHash, payload.Timestamp, payload.TaskId, payload.Result)
    msgBytes := []byte(msgPayload)
    if isValid, err := signer.VerifySignature(pubKey, msgBytes, payload.Signature); err != nil || !isValid {
        c.JSON(http.StatusBadRequest, gin.H{"error": "invalid signature"})
        return
    }

    // verify task hasn't already been finished
    pkTaskFinished := fmt.Sprintf("%s%d", core.PkTaskFinished, payload.TaskId)
    if isExist, err := core.S.RedisConn.Exists(c, pkTaskFinished).Result(); err != nil || isExist == 1 {
        c.JSON(http.StatusBadRequest, gin.H{"error": "task already finished"})
        return
    }

    // Check the operator is valid (see annotaion above)
    if ok, err := svc.MONITOR.VerifyOperator(address); err != nil || !ok {
        c.JSON(http.StatusBadRequest, gin.H{"error": "invalid operator"})
        return
    }

    // verify we haven't sent the result yet
    taskOperatorKey := fmt.Sprintf("%s%d", core.PkTaskOperator, payload.TaskId)
    if result, err := core.S.RedisConn.Eval(c, core.LuaScript, []string{taskOperatorKey}, address).Result(); err != nil || result.(int64) == 1 {
        c.JSON(http.StatusBadRequest, gin.H{"error": "task already send"})
        return
    }

    // save task to queue to be sent to the BVS contract
    task := core.Task{TaskId: payload.TaskId, TaskResult: core.TaskResult{Operator: address, Result: payload.Result}}
    taskStr, err := json.Marshal(task)
    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }
    if _, err := core.S.RedisConn.LPush(c, core.PkTaskQueue, taskStr).Result(); err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
        return
    }

    c.JSON(http.StatusOK, gin.H{"status": "success"})
}

Last updated