Annotation of the off-chain compute program

NOTE FOR SELF: Need much better introduction. Trying power through the actual code annotation first RN.

The code used in this annotation is taken from commit c5764b62732516f634c08251258474cb8f4a97db

type Node struct {
    bvsContract       string
    stateBankContract string
    bvsDriverContract string
    pubKeyStr         string
    chainIO           io.ChainIO
    stateBank         api.StateBank
}

type Payload struct {
    TaskId    int64  `json:"taskID"`
    Result    int64  `json:"result"`
    Timestamp int64  `json:"timestamp"`
    Signature string `json:"signature"`
    PubKey    string `json:"pubKey"`
}

// The NewNode function basically is the same as NewMonitor in the previous section. But with a bit more
// initalization for keys and state
func NewNode() *Node {
    elkLogger := logger.NewELKLogger("bvs_demo")
    reg := prometheus.NewRegistry()
    metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
    // Create a new ChainIO instance
    // ChainIO is our main interfaace from a off-chain program to any on-chain contract
    chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, core.C.Owner.KeyDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
        MaxRetries:             3,
        RetryInterval:          1 * time.Second,
        ConfirmationTimeout:    60 * time.Second,
        GasPriceAdjustmentRate: "1.1",
    })
    if err != nil {
        panic(err)
    }
    // Setup the keyring so we have access to private keys
    chainIO, err = chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
    if err != nil {
        panic(err)
    }
    account, err := chainIO.GetCurrentAccount()
    if err != nil {
        panic(err)
    }
    pubKeyStr := base64.StdEncoding.EncodeToString(account.GetPubKey().Bytes())
    // Reterve the information about the BVS
    txResp, err := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
    if err != nil {
        panic(err)
    }
    // Initalize the StateBank
    // A StateBank is used to store BVS state on-chain. Off-chain processes are responsible
    // to keep track of events from the StateBank and update it's local view accordingly
    // (in order to react differently depending on the BVS state)
    stateBank := api.NewStateBankImpl(chainIO)

    // construce the Node struct and return it
    return &Node{
        bvsContract:       txResp.BVSContract,
        stateBankContract: txResp.StateBank,
        bvsDriverContract: txResp.BVSDriver,
        stateBank:         stateBank,
        chainIO:           chainIO,
        pubKeyStr:         pubKeyStr,
    }
}

// Run starts the node's main execution loop.
func (n *Node) Run(ctx context.Context) {
    go func() {
        // Synchronize StateBank. Note that even though it is a plain function call,
        // internally, syncStateBank() runs forever and thus StateBack will not desync
        if err := n.syncStateBank(ctx); err != nil {
            panic(err)
        }
    }()
    // Listens to events and run the computation 
    if err := n.monitorDriver(ctx); err != nil {
        panic(err)
    }
}

// Initialize and synchronize the StateBank. Even though this function returns very
// soon, it starts a goruntine that is long-lived and this StateBank will not loose
// it's synchronization
func (n *Node) syncStateBank(ctx context.Context) (err error) {
    // Query the RPC node to reterve the current block height then initializes an
    // Indexer to listen to the "wasm-UpdateState" event. This event then is fed into
    // the StateBank to update it's view of the BVS state
    // Queries current block height 
    res, err := n.chainIO.QueryNodeStatus(ctx)
    if err != nil {
        panic(err)
    }
    latestBlock := res.SyncInfo.LatestBlockHeight
    // Create an indexer and listens for the "wasm-UpdateState" event
    idx := n.stateBank.Indexer(n.chainIO.GetClientCtx(), n.stateBankContract, latestBlock, []string{"wasm-UpdateState"}, 1, 10)
    processingQueue, err := idx.Run(ctx)
    if err != nil {
        panic(err)
    }
    
    // EventHandler() accepts a channel, listens for update and loops. It keeps the
    // StateBank in sync with the BVS until channel ends. Potentially EventHandler()
    // can run forever as long as the process is running
    go func() {
        n.stateBank.EventHandler(processingQueue)
    }()
    return
}

// monitorDriver monitors the driver contract for events and performs computation
func (n *Node) monitorDriver(ctx context.Context) (err error) {
    // DITTO. Query the RPC node to reterve the current block height then initializes
    // an indexer. This time for the "wasm-ExecuteBVSOffchain" event.
    // Queries current block height
    res, err := n.chainIO.QueryNodeStatus(ctx)
    if err != nil {
        panic(err)
    }
    latestBlock := res.SyncInfo.LatestBlockHeight
    fmt.Println("latestBlock: ", latestBlock)
    // Create an indexer and listens for the "wasm-ExecuteBVSOffchain" event
    evtIndexer := indexer.NewEventIndexer(
        n.chainIO.GetClientCtx(),
        n.bvsDriverContract,
        latestBlock,
        []string{"wasm-ExecuteBVSOffchain"},
        1,
        10)
    evtChain, err := evtIndexer.Run(ctx)
    if err != nil {
        panic(err)
    }
    fmt.Println("chain: ", evtChain)
    // Listen for the "wasm-ExecuteBVSOffchain" event and call calcTask() to
    // perform the computation as well as forwarding the computed result to 
    // the aggregator
    for evt := range evtChain {
        switch evt.EventType {
        case "wasm-ExecuteBVSOffchain":
            taskId := evt.AttrMap["taskId"]  // Data is stored in evt.AttrMap
            fmt.Println("taskId: ", taskId)
            if err := n.calcTask(taskId); err != nil {
                fmt.Println("ExecuteBVSOffchain error: ", err)
            }
        default:
            // Should not reach here what so ever since we handled the only event
            // type passed to the indexer above
            fmt.Println("unhandled event: ", evt.EventType)
        }
    }
    return
}

// calcTask calculates the task result and sends it to the aggregator.
func (n *Node) calcTask(taskId string) (err error) {
    stateKey := fmt.Sprintf("taskId.%s", taskId)
    // Reterve the associated value from the StateBank
    value, err := n.stateBank.GetWasmUpdateState(stateKey)
    if err != nil {
        return
    }
    // Convert both taskId and value to string
    input, err := strconv.Atoi(value)
    task, err := strconv.Atoi(taskId)
    if err != nil {
        fmt.Println("format err:", err)
        return
    }
    // Compute the square of the input.
    // NOTE: The entire SatLayer library, contracts and programs is to make
    // this computation verifiable and the off-chain process have no way to
    // fake this result.
    result := n.square(int64(input))
    
    // send the resut to the aggregator
    err = n.sendAggregator(int64(task), result)
    if err != nil {
        panic(err)
    }
    return
}

// square calculates the square of a given integer.
func (n *Node) square(input int64) int64 {
    return input * input
}

// sendAggregator sends the task result to the aggregator.
func (n *Node) sendAggregator(taskId int64, result int64) (err error) {
    // nowTs is the UNIX timestamp of the current time (without timezone offset)
    // this is effectively the time when the task is finished. 
    nowTs := time.Now().Unix()
    msgPayload := fmt.Sprintf("%s-%d-%d-%d", core.C.Chain.BvsHash, nowTs, taskId, result)
    // Log the infomation
    core.L.Info(fmt.Sprintf("msgPayload: %s\n", msgPayload))
    // Sign the message we just created
    signature, err := n.chainIO.GetSigner().Sign([]byte(msgPayload))

    // Prepare the payload to be sent to the aggregator. The aggregator will then
    // reconstruct it's expected msgPayload and verify the signature to validate
    // the message
    payload := Payload{
        TaskId:    taskId,
        Result:    result,
        Timestamp: nowTs,
        Signature: signature,
        PubKey:    n.pubKeyStr,
    }
    fmt.Printf("task result send aggregator payload: %+v\n", payload)
    if err != nil {
        return
    }
    // Serialize the data
    jsonData, err := json.Marshal(payload)
    if err != nil {
        fmt.Printf("Error marshaling JSON: %s", err)
        return
    }

    // Send the result to an aggregator. We use HTTP in the demo as it is a very popular
    // protocol and is quite fast. But using HTTP is not a hard requirment. Feel free to
    // use gRPC, WebSockets or other protocols
    resp, err := http.Post(core.C.Aggregator.Url, "application/json", bytes.NewBuffer(jsonData))
    if err != nil || resp.StatusCode != 200 {
        fmt.Printf("Error sending aggregator : %s\n", err)
        return
    }

    // Now we are done sending the result back to the aggregator. The off-chain process
    // will continue on to listen to and process new tasks
    return
}

Last updated