Annotation of the off-chain compute program
NOTE FOR SELF: Need much better introduction. Trying power through the actual code annotation first RN.
The code used in this annotation is taken from commit c5764b62732516f634c08251258474cb8f4a97db
type Node struct {
bvsContract string
stateBankContract string
bvsDriverContract string
pubKeyStr string
chainIO io.ChainIO
stateBank api.StateBank
}
type Payload struct {
TaskId int64 `json:"taskID"`
Result int64 `json:"result"`
Timestamp int64 `json:"timestamp"`
Signature string `json:"signature"`
PubKey string `json:"pubKey"`
}
// The NewNode function basically is the same as NewMonitor in the previous section. But with a bit more
// initalization for keys and state
func NewNode() *Node {
elkLogger := logger.NewELKLogger("bvs_demo")
reg := prometheus.NewRegistry()
metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
// Create a new ChainIO instance
// ChainIO is our main interfaace from a off-chain program to any on-chain contract
chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, core.C.Owner.KeyDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
MaxRetries: 3,
RetryInterval: 1 * time.Second,
ConfirmationTimeout: 60 * time.Second,
GasPriceAdjustmentRate: "1.1",
})
if err != nil {
panic(err)
}
// Setup the keyring so we have access to private keys
chainIO, err = chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
if err != nil {
panic(err)
}
account, err := chainIO.GetCurrentAccount()
if err != nil {
panic(err)
}
pubKeyStr := base64.StdEncoding.EncodeToString(account.GetPubKey().Bytes())
// Reterve the information about the BVS
txResp, err := api.NewBVSDirectoryImpl(chainIO, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
if err != nil {
panic(err)
}
// Initalize the StateBank
// A StateBank is used to store BVS state on-chain. Off-chain processes are responsible
// to keep track of events from the StateBank and update it's local view accordingly
// (in order to react differently depending on the BVS state)
stateBank := api.NewStateBankImpl(chainIO)
// construce the Node struct and return it
return &Node{
bvsContract: txResp.BVSContract,
stateBankContract: txResp.StateBank,
bvsDriverContract: txResp.BVSDriver,
stateBank: stateBank,
chainIO: chainIO,
pubKeyStr: pubKeyStr,
}
}
// Run starts the node's main execution loop.
func (n *Node) Run(ctx context.Context) {
go func() {
// Synchronize StateBank. Note that even though it is a plain function call,
// internally, syncStateBank() runs forever and thus StateBack will not desync
if err := n.syncStateBank(ctx); err != nil {
panic(err)
}
}()
// Listens to events and run the computation
if err := n.monitorDriver(ctx); err != nil {
panic(err)
}
}
// Initialize and synchronize the StateBank. Even though this function returns very
// soon, it starts a goruntine that is long-lived and this StateBank will not loose
// it's synchronization
func (n *Node) syncStateBank(ctx context.Context) (err error) {
// Query the RPC node to reterve the current block height then initializes an
// Indexer to listen to the "wasm-UpdateState" event. This event then is fed into
// the StateBank to update it's view of the BVS state
// Queries current block height
res, err := n.chainIO.QueryNodeStatus(ctx)
if err != nil {
panic(err)
}
latestBlock := res.SyncInfo.LatestBlockHeight
// Create an indexer and listens for the "wasm-UpdateState" event
idx := n.stateBank.Indexer(n.chainIO.GetClientCtx(), n.stateBankContract, latestBlock, []string{"wasm-UpdateState"}, 1, 10)
processingQueue, err := idx.Run(ctx)
if err != nil {
panic(err)
}
// EventHandler() accepts a channel, listens for update and loops. It keeps the
// StateBank in sync with the BVS until channel ends. Potentially EventHandler()
// can run forever as long as the process is running
go func() {
n.stateBank.EventHandler(processingQueue)
}()
return
}
// monitorDriver monitors the driver contract for events and performs computation
func (n *Node) monitorDriver(ctx context.Context) (err error) {
// DITTO. Query the RPC node to reterve the current block height then initializes
// an indexer. This time for the "wasm-ExecuteBVSOffchain" event.
// Queries current block height
res, err := n.chainIO.QueryNodeStatus(ctx)
if err != nil {
panic(err)
}
latestBlock := res.SyncInfo.LatestBlockHeight
fmt.Println("latestBlock: ", latestBlock)
// Create an indexer and listens for the "wasm-ExecuteBVSOffchain" event
evtIndexer := indexer.NewEventIndexer(
n.chainIO.GetClientCtx(),
n.bvsDriverContract,
latestBlock,
[]string{"wasm-ExecuteBVSOffchain"},
1,
10)
evtChain, err := evtIndexer.Run(ctx)
if err != nil {
panic(err)
}
fmt.Println("chain: ", evtChain)
// Listen for the "wasm-ExecuteBVSOffchain" event and call calcTask() to
// perform the computation as well as forwarding the computed result to
// the aggregator
for evt := range evtChain {
switch evt.EventType {
case "wasm-ExecuteBVSOffchain":
taskId := evt.AttrMap["taskId"] // Data is stored in evt.AttrMap
fmt.Println("taskId: ", taskId)
if err := n.calcTask(taskId); err != nil {
fmt.Println("ExecuteBVSOffchain error: ", err)
}
default:
// Should not reach here what so ever since we handled the only event
// type passed to the indexer above
fmt.Println("unhandled event: ", evt.EventType)
}
}
return
}
// calcTask calculates the task result and sends it to the aggregator.
func (n *Node) calcTask(taskId string) (err error) {
stateKey := fmt.Sprintf("taskId.%s", taskId)
// Reterve the associated value from the StateBank
value, err := n.stateBank.GetWasmUpdateState(stateKey)
if err != nil {
return
}
// Convert both taskId and value to string
input, err := strconv.Atoi(value)
task, err := strconv.Atoi(taskId)
if err != nil {
fmt.Println("format err:", err)
return
}
// Compute the square of the input.
// NOTE: The entire SatLayer library, contracts and programs is to make
// this computation verifiable and the off-chain process have no way to
// fake this result.
result := n.square(int64(input))
// send the resut to the aggregator
err = n.sendAggregator(int64(task), result)
if err != nil {
panic(err)
}
return
}
// square calculates the square of a given integer.
func (n *Node) square(input int64) int64 {
return input * input
}
// sendAggregator sends the task result to the aggregator.
func (n *Node) sendAggregator(taskId int64, result int64) (err error) {
// nowTs is the UNIX timestamp of the current time (without timezone offset)
// this is effectively the time when the task is finished.
nowTs := time.Now().Unix()
msgPayload := fmt.Sprintf("%s-%d-%d-%d", core.C.Chain.BvsHash, nowTs, taskId, result)
// Log the infomation
core.L.Info(fmt.Sprintf("msgPayload: %s\n", msgPayload))
// Sign the message we just created
signature, err := n.chainIO.GetSigner().Sign([]byte(msgPayload))
// Prepare the payload to be sent to the aggregator. The aggregator will then
// reconstruct it's expected msgPayload and verify the signature to validate
// the message
payload := Payload{
TaskId: taskId,
Result: result,
Timestamp: nowTs,
Signature: signature,
PubKey: n.pubKeyStr,
}
fmt.Printf("task result send aggregator payload: %+v\n", payload)
if err != nil {
return
}
// Serialize the data
jsonData, err := json.Marshal(payload)
if err != nil {
fmt.Printf("Error marshaling JSON: %s", err)
return
}
// Send the result to an aggregator. We use HTTP in the demo as it is a very popular
// protocol and is quite fast. But using HTTP is not a hard requirment. Feel free to
// use gRPC, WebSockets or other protocols
resp, err := http.Post(core.C.Aggregator.Url, "application/json", bytes.NewBuffer(jsonData))
if err != nil || resp.StatusCode != 200 {
fmt.Printf("Error sending aggregator : %s\n", err)
return
}
// Now we are done sending the result back to the aggregator. The off-chain process
// will continue on to listen to and process new tasks
return
}
Last updated