Annotation of the Reward Uploader

type Uploader struct {
    bvsContract        string
    delegation         api.Delegation
    chainIO            io.ChainIO
    rewardsCoordinator api.RewardsCoordinator
}

func NewUploader() *Uploader {
    // Initializes the loggr and ChainIO. I'm not explaining this part the 5th time.
    // Please refer to other partos of the demo program regarding how this section
    // works.
    elkLogger := logger.NewELKLogger("bvs_demo")
    reg := prometheus.NewRegistry()
    metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
    chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, core.C.Owner.KeyDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
        MaxRetries:             3,
        RetryInterval:          1 * time.Second,
        ConfirmationTimeout:    60 * time.Second,
        GasPriceAdjustmentRate: "1.1",
    })
    if err != nil {
        panic(err)
    }
    client, err := chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
    if err != nil {
        panic(err)
    }
    txResp, err := api.NewBVSDirectoryImpl(client, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
    if err != nil {
        panic(err)
    }
    delegation := api.NewDelegationImpl(client, core.C.Chain.DelegationManager)
    rewardsCoordinator := api.NewRewardsCoordinator(client)
    rewardsCoordinator.BindClient(core.C.Chain.RewardCoordinator)
    return &Uploader{
        chainIO:            client,
        delegation:         delegation,
        bvsContract:        txResp.BVSContract,
        rewardsCoordinator: rewardsCoordinator,
    }
}

// The Reward Uploader listens to the TaskResponded event then calculates how much each
// operator and staker earns.
func (u *Uploader) Run() {
    // Create an indexer to listen on new events. This part is quite standard thus
    // detailed descriptions would be skipped
    ctx := context.Background()
    blockNum := u.getBlock(ctx)
    fmt.Println("latestBlock: ", blockNum)
    evtIndexer := indexer.NewEventIndexer(
        u.chainIO.GetClientCtx(),
        u.bvsContract,
        blockNum,
        []string{"wasm-TaskResponded"},
        1,
        5)
    evtChain, err := evtIndexer.Run(ctx)
    if err != nil {
        panic(err)
    }
    fmt.Println("chain: ", evtChain)
    for evt := range evtChain {
        switch evt.EventType {
        case "wasm-TaskResponded":
            blockHeight := evt.BlockHeight
            txnHash := evt.TxHash
            taskId := evt.AttrMap["taskId"]
            taskResult := evt.AttrMap["result"]
            taskOperators := evt.AttrMap["operators"]
            fmt.Printf("[TaskResponded] blockHeight: %d, txnHash: %s, taskId: %s, taskResult: %s, taskOperators: %s\n", blockHeight, txnHash, taskId, taskResult, taskOperators)
            // Calculate the reward
            u.calcReward(ctx, blockHeight, taskId, taskOperators)
        default:
            fmt.Printf("Unknown event type. evt: %+v\n", evt)
        }
    }
}

// Queries an RPC node about the latest block height
func (u *Uploader) getBlock(ctx context.Context) int64 {
    res, err := u.chainIO.QueryNodeStatus(ctx)
    if err != nil {
        panic(err)
    }
    latestBlock := res.SyncInfo.LatestBlockHeight
    return latestBlock
}

// This is where is real "magic" happens. calcReward() calculates rewards for each
// operator and staker.
func (u *Uploader) calcReward(ctx context.Context, blockHeight int64, taskId string, operators string) {
    // Set on redis to make sure we don't process the same event twice
    if core.S.RedisConn.SIsMember(ctx, core.PkSaveTask, taskId).Val() {
        fmt.Println("task already processed: ", taskId)
        return
    }
    core.S.RedisConn.SAdd(ctx, core.PkSaveTask, taskId)
    
    // Grab the number of operators, then calculate the portion of rewards each operator
    // should get.
    operatorList := strings.Split(operators, "&")
    operatorCnt := len(operatorList)
    operatorAmount := core.C.Reward.Amount / float64(operatorCnt)
    fmt.Println("operatorAmount: ", operatorAmount)

    submissionMap := make(map[string]*Submission)
    totalEarners := make([]Earner, 0)
    // core.C.Reward.OperatorRatio is the amount of reward for operators
    // (therefor 1 - core.C.Reward.OperatorRatio is the amount for stakers)
    sAmount := operatorAmount * core.C.Reward.OperatorRatio / 100
    oAmount := operatorAmount - sAmount
    fmt.Println("sAmount: ", sAmount)
    fmt.Println("oAmount: ", oAmount)
    // Loop through the operators and calculate how much each staker earns
    // propotional to their stake
    for _, operator := range operatorList {
        // Query the stakers staked to the operators
        txnRsp, err := u.delegation.GetOperatorStakers(operator)
        if err != nil {
            fmt.Println("get operator stakers err: ", err)
        }
        fmt.Println("GetOperatorStakers txnRsp: ", txnRsp)
        totalStakerAmount := 0.0
        earners := make([]Earner, 0)
        // Sum up the amount each staker staked. For each staker:
        for _, staker := range txnRsp.StakersAndShares {
            stakerAmount := 0.0
            earnerTokens := make([]*TokenAmount, 0)
            // Loop through all it's staked strategy, collect the staked amount
            // and tokens in stake
            for _, strategy := range staker.SharesPerStrategy {
                amount, err := strconv.ParseUint(strategy[1], 10, 0)
                if err != nil {
                    fmt.Println("parse float err: ", err)
                    continue
                }
                strategyAmount := float64(amount)
                stakerAmount += strategyAmount
                strategyToken, err := u.rpcUnderlyingToken(strategy[0])
                if err != nil {
                    fmt.Println("get strategy token err: ", err)
                    continue
                }
                earnerTokens = append(earnerTokens, &TokenAmount{
                    Strategy:     strategy[0],
                    Token:        strategyToken,
                    RewardAmount: "",
                    StakeAmount:  strategyAmount,
                })
            }
            // save the staker into a earners list
            earners = append(earners, Earner{
                Earner:           staker.Staker,
                TotalStakeAmount: stakerAmount,
                Tokens:           earnerTokens,
            })
            totalStakerAmount += stakerAmount
        }

        fmt.Println("totalStakerAmount: ", totalStakerAmount)
        // Given we have collected who and how much each staker has. Now calculate
        // the amount to be rewarded to each of them.
        for _, s := range earners {
            if totalStakerAmount == 0.0 || s.TotalStakeAmount == 0.0 {
                continue
            }
            stakerReward := sAmount * (s.TotalStakeAmount / totalStakerAmount)
            for _, t := range s.Tokens {
                rewardAmount := stakerReward * t.StakeAmount / s.TotalStakeAmount
                if rewardAmount == 0.0 {
                    continue
                }
                fmt.Println("rewardAmount: ", rewardAmount)
                t.RewardAmount = strconv.FormatFloat(math.Floor(rewardAmount), 'f', -1, 64)
                if a, ok := submissionMap[t.Strategy]; !ok {
                    submissionMap[t.Strategy] = &Submission{
                        Strategy: t.Strategy,
                        Token:    t.Token,
                        Amount:   rewardAmount,
                    }
                } else {
                    a.Amount += rewardAmount
                }
            }
        }
        operatorStrategyToken, err := u.rpcUnderlyingToken(core.C.Reward.OperatorStrategy)
        if err != nil {
            fmt.Println("get strategy token err: ", err)
            continue
        }
        // Also add the reward for the operators
        if a, ok := submissionMap[core.C.Reward.OperatorStrategy]; !ok {
            submissionMap[core.C.Reward.OperatorStrategy] = &Submission{
                Strategy: operatorStrategyToken,
                Token:    operatorStrategyToken,
                Amount:   oAmount,
            }
        } else {
            a.Amount += oAmount
        }
        // append the operators to the list of rewards
        operatorRewardAmount := strconv.FormatFloat(math.Floor(oAmount), 'f', -1, 64)
        earners = append(earners, Earner{
            Earner:           operator,
            TotalStakeAmount: oAmount,
            Tokens: []*TokenAmount{
                {
                    Strategy:     core.C.Reward.OperatorStrategy,
                    Token:        operatorStrategyToken,
                    RewardAmount: operatorRewardAmount,
                    StakeAmount:  oAmount,
                },
            },
        })
        fmt.Printf("earners: %+v\n", earners)
        totalEarners = append(totalEarners, earners...)
    }

    // now we convert the calculated earn for each recepient into reward submissions
    // and put them into an array
    submissions := make([]Submission, 0)
    for _, submission := range submissionMap {
        submissions = append(submissions, *submission)
        fmt.Printf("strategy: %s, token: %s, amount: %f\n", submission.Strategy, submission.Token, submission.Amount)
    }
    fmt.Printf("earners: %+v\n", totalEarners)

    // Submit the reward submissions.
    if err := u.rpcSubmission(submissions); err != nil {
        fmt.Println("rpc submission err: ", err)
        return
    }
    // Append the new submission result into the reward merkle tree. This structure makes
    // colleting rewards easy for receipents
    rootHash, err := u.merkleTree(totalEarners)
    if err != nil {
        fmt.Println("merkle tree err: ", err)
        return
    }
    fmt.Printf("root Hash: %s\n", rootHash)
    // Submit the merkel tree root to Rewards Corrdinator
    if err := u.rpcSubmitHashRoot(rootHash); err != nil {
        fmt.Println("rpc root hash err: ", err)
        return
    }
}

// Given a list of rewards. merkleTree() makes a merkle tree and return the
// starting node hash
func (u *Uploader) merkleTree(earners []Earner) (string, error) {
    // calc earner token merkle tree
    earnerNodes := make([]*MerkleNode, 0)
    for _, earner := range earners {
        tokenHash := u.calcTokenLeafs(earner.Tokens)
        earnerHash, err := u.rpcEarnerLeafHash(earner.Earner, tokenHash)
        if err != nil {
            fmt.Println("calc earner hash err: ", err)
            return "", err
        }
        earnerNodes = append(earnerNodes, &MerkleNode{Hash: earnerHash})
    }

    root := u.calcMerkleTree(earnerNodes)
    return root.Hash, nil
}

// Computes the hash of ceah node and turn it into a tree
func (u *Uploader) calcTokenLeafs(tokens []*TokenAmount) string {
    tokenNodes := make([]*MerkleNode, 0)
    for _, token := range tokens {
        hash, err := u.rpcTokenHash(token)
        if err != nil {
            fmt.Println("calc token hash err: ", err)
            continue
        }
        tokenNodes = append(tokenNodes, &MerkleNode{Hash: hash})
    }
    fmt.Println("tokenHashs: ", tokenNodes)
    root := u.calcMerkleTree(tokenNodes)
    return root.Hash
}

func (u *Uploader) calcMerkleTree(nodes []*MerkleNode) *MerkleNode {
    // calc merkle tree into a balanced binary tree
    for len(nodes) > 1 {
        if len(nodes)%2 != 0 {
            nodes = append(nodes, nodes[len(nodes)-1])
        }
        var newLevel []*MerkleNode
        for i := 0; i < len(nodes); i += 2 {
            var left, right *MerkleNode
            left = nodes[i]
            right = nodes[i+1]
            leaves := []string{left.Hash, right.Hash}
            rootHash, err := u.rpcMerkleizeLeaves(leaves)
            if err != nil {
                fmt.Println("merkleizeLeaves err: ", err)
                continue
            }
            newNode := &MerkleNode{
                Left:  left,
                Right: right,
                Hash:  rootHash,
            }
            newLevel = append(newLevel, newNode)
        }
        nodes = newLevel
    }
    return nodes[0]
}

Last updated