Annotation of the Reward Uploader
type Uploader struct {
bvsContract string
delegation api.Delegation
chainIO io.ChainIO
rewardsCoordinator api.RewardsCoordinator
}
func NewUploader() *Uploader {
// Initializes the loggr and ChainIO. I'm not explaining this part the 5th time.
// Please refer to other partos of the demo program regarding how this section
// works.
elkLogger := logger.NewELKLogger("bvs_demo")
reg := prometheus.NewRegistry()
metricsIndicators := transactionprocess.NewPromIndicators(reg, "bvs_demo")
chainIO, err := io.NewChainIO(core.C.Chain.Id, core.C.Chain.Rpc, core.C.Owner.KeyDir, core.C.Owner.Bech32Prefix, elkLogger, metricsIndicators, types.TxManagerParams{
MaxRetries: 3,
RetryInterval: 1 * time.Second,
ConfirmationTimeout: 60 * time.Second,
GasPriceAdjustmentRate: "1.1",
})
if err != nil {
panic(err)
}
client, err := chainIO.SetupKeyring(core.C.Owner.KeyName, core.C.Owner.KeyringBackend)
if err != nil {
panic(err)
}
txResp, err := api.NewBVSDirectoryImpl(client, core.C.Chain.BvsDirectory).GetBVSInfo(core.C.Chain.BvsHash)
if err != nil {
panic(err)
}
delegation := api.NewDelegationImpl(client, core.C.Chain.DelegationManager)
rewardsCoordinator := api.NewRewardsCoordinator(client)
rewardsCoordinator.BindClient(core.C.Chain.RewardCoordinator)
return &Uploader{
chainIO: client,
delegation: delegation,
bvsContract: txResp.BVSContract,
rewardsCoordinator: rewardsCoordinator,
}
}
// The Reward Uploader listens to the TaskResponded event then calculates how much each
// operator and staker earns.
func (u *Uploader) Run() {
// Create an indexer to listen on new events. This part is quite standard thus
// detailed descriptions would be skipped
ctx := context.Background()
blockNum := u.getBlock(ctx)
fmt.Println("latestBlock: ", blockNum)
evtIndexer := indexer.NewEventIndexer(
u.chainIO.GetClientCtx(),
u.bvsContract,
blockNum,
[]string{"wasm-TaskResponded"},
1,
5)
evtChain, err := evtIndexer.Run(ctx)
if err != nil {
panic(err)
}
fmt.Println("chain: ", evtChain)
for evt := range evtChain {
switch evt.EventType {
case "wasm-TaskResponded":
blockHeight := evt.BlockHeight
txnHash := evt.TxHash
taskId := evt.AttrMap["taskId"]
taskResult := evt.AttrMap["result"]
taskOperators := evt.AttrMap["operators"]
fmt.Printf("[TaskResponded] blockHeight: %d, txnHash: %s, taskId: %s, taskResult: %s, taskOperators: %s\n", blockHeight, txnHash, taskId, taskResult, taskOperators)
// Calculate the reward
u.calcReward(ctx, blockHeight, taskId, taskOperators)
default:
fmt.Printf("Unknown event type. evt: %+v\n", evt)
}
}
}
// Queries an RPC node about the latest block height
func (u *Uploader) getBlock(ctx context.Context) int64 {
res, err := u.chainIO.QueryNodeStatus(ctx)
if err != nil {
panic(err)
}
latestBlock := res.SyncInfo.LatestBlockHeight
return latestBlock
}
// This is where is real "magic" happens. calcReward() calculates rewards for each
// operator and staker.
func (u *Uploader) calcReward(ctx context.Context, blockHeight int64, taskId string, operators string) {
// Set on redis to make sure we don't process the same event twice
if core.S.RedisConn.SIsMember(ctx, core.PkSaveTask, taskId).Val() {
fmt.Println("task already processed: ", taskId)
return
}
core.S.RedisConn.SAdd(ctx, core.PkSaveTask, taskId)
// Grab the number of operators, then calculate the portion of rewards each operator
// should get.
operatorList := strings.Split(operators, "&")
operatorCnt := len(operatorList)
operatorAmount := core.C.Reward.Amount / float64(operatorCnt)
fmt.Println("operatorAmount: ", operatorAmount)
submissionMap := make(map[string]*Submission)
totalEarners := make([]Earner, 0)
// core.C.Reward.OperatorRatio is the amount of reward for operators
// (therefor 1 - core.C.Reward.OperatorRatio is the amount for stakers)
sAmount := operatorAmount * core.C.Reward.OperatorRatio / 100
oAmount := operatorAmount - sAmount
fmt.Println("sAmount: ", sAmount)
fmt.Println("oAmount: ", oAmount)
// Loop through the operators and calculate how much each staker earns
// propotional to their stake
for _, operator := range operatorList {
// Query the stakers staked to the operators
txnRsp, err := u.delegation.GetOperatorStakers(operator)
if err != nil {
fmt.Println("get operator stakers err: ", err)
}
fmt.Println("GetOperatorStakers txnRsp: ", txnRsp)
totalStakerAmount := 0.0
earners := make([]Earner, 0)
// Sum up the amount each staker staked. For each staker:
for _, staker := range txnRsp.StakersAndShares {
stakerAmount := 0.0
earnerTokens := make([]*TokenAmount, 0)
// Loop through all it's staked strategy, collect the staked amount
// and tokens in stake
for _, strategy := range staker.SharesPerStrategy {
amount, err := strconv.ParseUint(strategy[1], 10, 0)
if err != nil {
fmt.Println("parse float err: ", err)
continue
}
strategyAmount := float64(amount)
stakerAmount += strategyAmount
strategyToken, err := u.rpcUnderlyingToken(strategy[0])
if err != nil {
fmt.Println("get strategy token err: ", err)
continue
}
earnerTokens = append(earnerTokens, &TokenAmount{
Strategy: strategy[0],
Token: strategyToken,
RewardAmount: "",
StakeAmount: strategyAmount,
})
}
// save the staker into a earners list
earners = append(earners, Earner{
Earner: staker.Staker,
TotalStakeAmount: stakerAmount,
Tokens: earnerTokens,
})
totalStakerAmount += stakerAmount
}
fmt.Println("totalStakerAmount: ", totalStakerAmount)
// Given we have collected who and how much each staker has. Now calculate
// the amount to be rewarded to each of them.
for _, s := range earners {
if totalStakerAmount == 0.0 || s.TotalStakeAmount == 0.0 {
continue
}
stakerReward := sAmount * (s.TotalStakeAmount / totalStakerAmount)
for _, t := range s.Tokens {
rewardAmount := stakerReward * t.StakeAmount / s.TotalStakeAmount
if rewardAmount == 0.0 {
continue
}
fmt.Println("rewardAmount: ", rewardAmount)
t.RewardAmount = strconv.FormatFloat(math.Floor(rewardAmount), 'f', -1, 64)
if a, ok := submissionMap[t.Strategy]; !ok {
submissionMap[t.Strategy] = &Submission{
Strategy: t.Strategy,
Token: t.Token,
Amount: rewardAmount,
}
} else {
a.Amount += rewardAmount
}
}
}
operatorStrategyToken, err := u.rpcUnderlyingToken(core.C.Reward.OperatorStrategy)
if err != nil {
fmt.Println("get strategy token err: ", err)
continue
}
// Also add the reward for the operators
if a, ok := submissionMap[core.C.Reward.OperatorStrategy]; !ok {
submissionMap[core.C.Reward.OperatorStrategy] = &Submission{
Strategy: operatorStrategyToken,
Token: operatorStrategyToken,
Amount: oAmount,
}
} else {
a.Amount += oAmount
}
// append the operators to the list of rewards
operatorRewardAmount := strconv.FormatFloat(math.Floor(oAmount), 'f', -1, 64)
earners = append(earners, Earner{
Earner: operator,
TotalStakeAmount: oAmount,
Tokens: []*TokenAmount{
{
Strategy: core.C.Reward.OperatorStrategy,
Token: operatorStrategyToken,
RewardAmount: operatorRewardAmount,
StakeAmount: oAmount,
},
},
})
fmt.Printf("earners: %+v\n", earners)
totalEarners = append(totalEarners, earners...)
}
// now we convert the calculated earn for each recepient into reward submissions
// and put them into an array
submissions := make([]Submission, 0)
for _, submission := range submissionMap {
submissions = append(submissions, *submission)
fmt.Printf("strategy: %s, token: %s, amount: %f\n", submission.Strategy, submission.Token, submission.Amount)
}
fmt.Printf("earners: %+v\n", totalEarners)
// Submit the reward submissions.
if err := u.rpcSubmission(submissions); err != nil {
fmt.Println("rpc submission err: ", err)
return
}
// Append the new submission result into the reward merkle tree. This structure makes
// colleting rewards easy for receipents
rootHash, err := u.merkleTree(totalEarners)
if err != nil {
fmt.Println("merkle tree err: ", err)
return
}
fmt.Printf("root Hash: %s\n", rootHash)
// Submit the merkel tree root to Rewards Corrdinator
if err := u.rpcSubmitHashRoot(rootHash); err != nil {
fmt.Println("rpc root hash err: ", err)
return
}
}
// Given a list of rewards. merkleTree() makes a merkle tree and return the
// starting node hash
func (u *Uploader) merkleTree(earners []Earner) (string, error) {
// calc earner token merkle tree
earnerNodes := make([]*MerkleNode, 0)
for _, earner := range earners {
tokenHash := u.calcTokenLeafs(earner.Tokens)
earnerHash, err := u.rpcEarnerLeafHash(earner.Earner, tokenHash)
if err != nil {
fmt.Println("calc earner hash err: ", err)
return "", err
}
earnerNodes = append(earnerNodes, &MerkleNode{Hash: earnerHash})
}
root := u.calcMerkleTree(earnerNodes)
return root.Hash, nil
}
// Computes the hash of ceah node and turn it into a tree
func (u *Uploader) calcTokenLeafs(tokens []*TokenAmount) string {
tokenNodes := make([]*MerkleNode, 0)
for _, token := range tokens {
hash, err := u.rpcTokenHash(token)
if err != nil {
fmt.Println("calc token hash err: ", err)
continue
}
tokenNodes = append(tokenNodes, &MerkleNode{Hash: hash})
}
fmt.Println("tokenHashs: ", tokenNodes)
root := u.calcMerkleTree(tokenNodes)
return root.Hash
}
func (u *Uploader) calcMerkleTree(nodes []*MerkleNode) *MerkleNode {
// calc merkle tree into a balanced binary tree
for len(nodes) > 1 {
if len(nodes)%2 != 0 {
nodes = append(nodes, nodes[len(nodes)-1])
}
var newLevel []*MerkleNode
for i := 0; i < len(nodes); i += 2 {
var left, right *MerkleNode
left = nodes[i]
right = nodes[i+1]
leaves := []string{left.Hash, right.Hash}
rootHash, err := u.rpcMerkleizeLeaves(leaves)
if err != nil {
fmt.Println("merkleizeLeaves err: ", err)
continue
}
newNode := &MerkleNode{
Left: left,
Right: right,
Hash: rootHash,
}
newLevel = append(newLevel, newNode)
}
nodes = newLevel
}
return nodes[0]
}
Last updated