Annotation of the Aggregator
// This is the structure that holds the Aggregator
type Monitor struct {
bvsContract string // Address of the BVS contract
bvsDirectoryApi api.BVSDirectory // BVS Directory API wrapper
chainIO io.ChainIO // ChainIO allows easy interaction to contracts
// NewMonitor creates a new Monitor instance with a Cosmos client and BVS contract.
func NewMonitor() *Monitor {
// Figure our the path of the current executable. This is used later on to
// figure out where the Babylon keys and home is (just a fixed structure happend
// to be used in the project)
_, currentFile, _, ok := runtime.Caller(0)
if !ok {
panic("cannot get current file")
// Figure out the parent directory of the executable. The babylon home directory
// should be $EXE_DIR/../../$KEY_DIR
configDir := filepath.Dir(currentFile)
homeDir := configDir + "/../../" + core.C.Owner.KeyDir
fmt.Printf("homeDir: %s\n", homeDir)
// Initialize a logger and prometheus for monitoring
elkLogger := logger.NewELKLogger("bvs_demo")
reg := prometheus.NewRegistry()
metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
// Initialize ChainIO, this should be very familare by now
chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, homeDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
MaxRetries: 3,
RetryInterval: 1 * time.Second,
ConfirmationTimeout: 60 * time.Second,
GasPriceAdjustmentRate: "1.1",
if err != nil {
// Setup key ring to access private keys
chainIO, err = chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
if err != nil {
// Grab the current account for use later
account, err := chainIO.GetCurrentAccount()
if err != nil {
address := account.GetAddress().String()
fmt.Printf("address: %s\n", address)
// Query the BVS directory and find the BVS we are monitoring
txResp, err := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
if err != nil {
// Construct the Aggregator and return it to the caller
bvsDirectoryApi := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory)
return &Monitor{
bvsContract: txResp.BVSContract,
bvsDirectoryApi: bvsDirectoryApi,
chainIO: chainIO,
// Starts monitoring of the task queue
func (m *Monitor) Run(ctx context.Context) {
core.L.Info("Start to monitor task queue")
for {
// The HTTP endpoint (see below) pushes finished task to Redis. Here we
// anticipate results from Redis and forward it to the BVS contract
results, err := core.S.RedisConn.BLPop(context.Background(), 0, core.PkTaskQueue).Result()
fmt.Printf("results: %+v\n", results)
if err != nil {
core.L.Error(fmt.Sprintf("Failed to read task queue, due to {%s}", err))
fmt.Printf("result--->: %s\n", results[1])
task := core.Task{}
if err := json.Unmarshal([]byte(results[1]), &task); err != nil {
core.L.Error(fmt.Sprintf("Failed to parse task queue, due to {%s}", err))
fmt.Printf("task: %+v\n", task)
pkTaskResult := fmt.Sprintf("%s%d", core.PkTaskResult, task.TaskId)
taskResultStr, err := json.Marshal(task.TaskResult)
if err != nil {
core.L.Error(fmt.Sprintf("Failed to marshal task result, due to {%s}", err))
// Put the data onto Redis. Later on verifyTask() pulls data from Redis
if err := core.S.RedisConn.LPush(ctx, pkTaskResult, taskResultStr).Err(); err != nil {
core.L.Error(fmt.Sprintf("Failed to save task result, due to {%s}", err))
// calls verifyTask() to check if enough results have been sent to the aggregator
// and if the result should be forwarded to the contract
m.verifyTask(ctx, task.TaskId)
// verifyTask checks if a result has been submitted by enough off-chain processes,
// then submit back to the BVS contract.
func (m *Monitor) verifyTask(ctx context.Context, taskId uint64) {
pkTaskResult := fmt.Sprintf("%s%d", core.PkTaskResult, taskId)
// Grab the task info from Redis
results, err := core.S.RedisConn.LRange(ctx, pkTaskResult, 0, -1).Result()
fmt.Printf("verify results: %s\n", results)
if err != nil {
core.L.Error(fmt.Sprintf("Failed to read task result, due to {%s}", err))
resultCntMap := make(map[int64]uint)
resultOperatorMap := make(map[int64][]string)
var taskResult core.TaskResult
for _, result := range results {
fmt.Printf("verify result: %s\n", result)
if err := json.Unmarshal([]byte(result), &taskResult); err != nil {
core.L.Error(fmt.Sprintf("Failed to parse task result, due to {%s}", err))
// Sum up the number of results submitted and how manu operator has submitted
resultOperatorMap[taskResult.Result] = append(resultOperatorMap[taskResult.Result], taskResult.Operator)
// If the number of results submitted is larger then a predefined threshold.
// Mark it as done on Redis (avoid duplicated send), submit to BVS contract
// and remove the processing record from Redis
if resultCntMap[taskResult.Result] >= core.C.App.Threshold {
pkTaskFinished := fmt.Sprintf("%s%d", core.PkTaskFinished, taskId)
// Set as finished
if err := core.S.RedisConn.Set(ctx, pkTaskFinished, taskResult.Result, 0).Err(); err != nil {
core.L.Error(fmt.Sprintf("Failed to set task finished, due to {%s}", err))
// Submit to BVS contract
operators := strings.Join(resultOperatorMap[taskResult.Result], "&")
core.L.Info(fmt.Sprintf("Task {%d} is finished. The result is {%d}. The operators are {%s}", taskId, taskResult.Result, operators))
if err := m.sendTaskResult(taskId, taskResult.Result, operators); err != nil {
core.L.Error(fmt.Sprintf("Failed to send task result, due to {%s}", err))
// Remove processing recordes from Redis
pkTaskOperator := fmt.Sprintf("%s%d", core.PkTaskOperator, taskId)
core.S.RedisConn.Del(ctx, pkTaskResult)
core.S.RedisConn.Del(ctx, pkTaskOperator)
// sendTaskResult sends the task result to BVS contract via the BVSSquaring API
func (m *Monitor) sendTaskResult(taskId uint64, result int64, operators string) error {
fmt.Println("sendTaskResult", taskId, result, operators)
// Create a new BvsSquaring interface
bvsSquaring := BvsSquaringApi.NewBVSSquaring(m.chainIO)
// Send result to BVS Contract
_, err := bvsSquaring.RespondToTask(context.Background(), int64(taskId), result, operators)
if err != nil {
return err
return nil
// Check if the operator is registered with BVS contract
func (m *Monitor) VerifyOperator(operator string) (bool, error) {
rsp, err := m.bvsDirectoryApi.QueryOperator(operator, operator)
if err != nil {
core.L.Error(fmt.Sprintf("Failed to query operator, due to {%s}", err))
return false, err
fmt.Printf("txnRsp: %+v\n", rsp)
if rsp.Status == "registered" {
return true, nil
return false, nil
That concludes most of the aggregator flow. If you are paying attention, you SHOULD (and I mean RFC 2119 SHOULD) be questioning "where is the code that accepts input from the off-chain compute process? It is in another file.
type Payload struct {
TaskId uint64 `json:"taskID" binding:"required"`
Result int64 `json:"result" binding:"required"`
Timestamp int64 `json:"timestamp" binding:"required"`
Signature string `json:"signature" binding:"required"`
PubKey string `json:"pubKey" binding:"required"`
// Aggregator handles the aggregator endpoint for the API.
// It parses the payload from the request body and verifies the signature.
// It checks if the timestamp is within the allowed range.
// It verifies if the task is finished and if the operator has already sent the task.
// If all checks pass, it saves the task to the queue.
// It returns an HTTP response with the status of the operation.
func Aggregator(c *gin.Context) {
// parse payload
var payload Payload
if err := c.ShouldBindJSON(&payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
fmt.Printf("payload: %+v\n", payload)
// get current timestamp, make sure it is within a defined limit
nowTs := time.Now().Unix()
if payload.Timestamp > nowTs || payload.Timestamp < nowTs-60*2 {
c.JSON(http.StatusBadRequest, gin.H{"error": "timestamp out of range"})
// verify signature
pubKey, address, err := util.PubKeyToAddress(payload.PubKey)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
fmt.Printf("pubKey: %s\n", pubKey)
fmt.Printf("address: %s\n", address)
fmt.Printf("payload.PubKey: %s\n", payload.PubKey)
// The Aggregator reconstructs the payload and validaes both that
// 1. The same msgPayload is created by the off-chain process
// 2. The off-chain process indeed signed the message and it is not altered
msgPayload := fmt.Sprintf("%s-%d-%d-%d", core.C.Chain.BvsHash, payload.Timestamp, payload.TaskId, payload.Result)
msgBytes := []byte(msgPayload)
if isValid, err := signer.VerifySignature(pubKey, msgBytes, payload.Signature); err != nil || !isValid {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid signature"})
// verify task hasn't already been finished
pkTaskFinished := fmt.Sprintf("%s%d", core.PkTaskFinished, payload.TaskId)
if isExist, err := core.S.RedisConn.Exists(c, pkTaskFinished).Result(); err != nil || isExist == 1 {
c.JSON(http.StatusBadRequest, gin.H{"error": "task already finished"})
// Check the operator is valid (see annotaion above)
if ok, err := svc.MONITOR.VerifyOperator(address); err != nil || !ok {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid operator"})
// verify we haven't sent the result yet
taskOperatorKey := fmt.Sprintf("%s%d", core.PkTaskOperator, payload.TaskId)
if result, err := core.S.RedisConn.Eval(c, core.LuaScript, []string{taskOperatorKey}, address).Result(); err != nil || result.(int64) == 1 {
c.JSON(http.StatusBadRequest, gin.H{"error": "task already send"})
// save task to queue to be sent to the BVS contract
task := core.Task{TaskId: payload.TaskId, TaskResult: core.TaskResult{Operator: address, Result: payload.Result}}
taskStr, err := json.Marshal(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
if _, err := core.S.RedisConn.LPush(c, core.PkTaskQueue, taskStr).Result(); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
c.JSON(http.StatusOK, gin.H{"status": "success"})
Last updated