Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
[network_map.eth]
check_url = "http://104.197.233.185:8000/subgraphs/name/ojo-network/unidexer"
check_url = "https://api.studio.thegraph.com/query/46403/unidexer/version/latest"
truth_url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3"
base_asset = "ETH"
deviation = 0
cron_interval = "20s"
deviation = 2
cron_interval = "20s"

[[network_map.eth.pools]]
address = "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"
token_name = "WETH"
token_address = "0xc02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"

[[network_map.eth.pools]]
address = "0xa4e0faa58465a2d369aa21b3e42d43374c6f9613"
token_name = "rETH"
token_address = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
12 changes: 9 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ type (
}

Source struct {
TruthUrl string `mapstructure:"truth_url"`
CheckUrl string `mapstructure:"check_url"`
BaseAsset string `mapstructure:"base_asset"`
TruthUrl string `mapstructure:"truth_url"`
CheckUrl string `mapstructure:"check_url"`
Pools []Pool `mapstructure:"pools"`

// should match most of the time
Deviation float64 `mapstructure:"deviation"`

CronInterval string `mapstructure:"cron_interval"`
}

Pool struct {
Address string `mapstructure:"address"`
TokenName string `mapstructure:"token_name"`
TokenAddress string `mapstructure:"token_address"`
}

AccessToken struct {
SlackToken string
SlackChannel string
Expand Down
186 changes: 144 additions & 42 deletions monitor/ethservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -16,26 +17,44 @@ import (
)

type (
BundleQuery struct {
Bundle struct {
EthPriceUSD string `graphql:"ethPriceUSD"`
ID string `graphql:"id"`
} `graphql:"bundle(id: \"1\")"`
Pool struct {
ID string `graphql:"id"`
Token0Price string `graphql:"token0Price"`
Token1Price string `graphql:"token1Price"`
Token0 struct {
ID string `graphql:"id"`
} `json:"token0"`
Token1 struct {
ID string `graphql:"id"`
} `json:"token1"`
}

Data struct {
Pools []Pool `graphql:"pools(where:{id_in: $ids})"`
}

ethChecker struct {
logger zerolog.Logger
network string
truthUrl *gql.Client
checkUrl *gql.Client
baseAsset string
deviation float64
cronDuration time.Duration

truth float64
check float64

mut sync.Mutex

//id to map detail
pools map[string]*PoolDetail

//asset name to id

assetIdMap map[string]string
}

PoolDetail struct {
config.Pool
truePrice float64
expectedPrice float64
}

EthService struct {
Expand All @@ -51,11 +70,10 @@ func StartEthServices(ctx context.Context, logger zerolog.Logger, config config.

for network, details := range config.NetworkMap {
cronDuration, _ := time.ParseDuration(details.CronInterval)
service := newEthMonitorService(ctx, eLogger, details.Deviation, network, details.TruthUrl, details.CheckUrl, details.BaseAsset, cronDuration)
service := newEthMonitorService(ctx, eLogger, details.Deviation, network, details.TruthUrl, details.CheckUrl, cronDuration, details.Pools)
es.services[network] = service

eLogger.Info().Str("network", network).
Str("base_asset", details.BaseAsset).
Str("check_url", details.CheckUrl).
Str("truth_url", details.TruthUrl).
Msg("monitoring")
Expand All @@ -64,31 +82,45 @@ func StartEthServices(ctx context.Context, logger zerolog.Logger, config config.
return &es
}

func newEthMonitorService(ctx context.Context, logger zerolog.Logger, deviation float64, network, truthUrl, checkUrl, baseAsset string, cronDuration time.Duration) *ethChecker {
func newEthMonitorService(ctx context.Context, logger zerolog.Logger, deviation float64, network, truthUrl, checkUrl string, cronDuration time.Duration, pools []config.Pool) *ethChecker {
poolIndex := make(map[string]*PoolDetail)
assetIdMap := make(map[string]string)
for _, pool := range pools {
pool.TokenAddress = strings.ToLower(pool.TokenAddress)
poolIndex[strings.ToLower(pool.Address)] = &PoolDetail{pool, 0, 0}
assetIdMap[strings.ToLower(pool.TokenName)] = pool.Address
}

service := &ethChecker{
logger: logger.With().Str("network", network).Logger(),
deviation: deviation,
network: network,
cronDuration: cronDuration,
truthUrl: gql.NewClient(truthUrl, nil),
checkUrl: gql.NewClient(checkUrl, nil),
baseAsset: baseAsset,
pools: poolIndex,
assetIdMap: assetIdMap,
}

go service.startCron(ctx)
go service.startCron(ctx, pools)

return service
}

func (es *ethChecker) startCron(ctx context.Context) {
func (es *ethChecker) startCron(ctx context.Context, pools []config.Pool) {
ids := make([]string, len(pools))
for i, pool := range pools {
ids[i] = pool.Address
}

for {
select {
case <-ctx.Done():
wg.Done()
return

default:
err := es.checkAssetPrice(ctx)
err := es.checkAssetPrice(ctx, ids)
if err != nil {
es.logger.Err(err).
Str("network", es.network).
Expand All @@ -100,31 +132,32 @@ func (es *ethChecker) startCron(ctx context.Context) {
}
}

func (es *ethChecker) checkAssetPrice(ctx context.Context) error {
func (es *ethChecker) checkAssetPrice(ctx context.Context, ids []string) error {
g, _ := errgroup.WithContext(ctx)
var (
truth float64
check float64
truth Data
check Data
)

g.Go(func() error {
price, err := GetBundle(es.truthUrl)
ids := ids
priceData, err := GetPrices(es.truthUrl, ids)
if err != nil {
return err
}

truth = price
truth = priceData

return nil
})

g.Go(func() error {
price, err := GetBundle(es.checkUrl)
priceData, err := GetPrices(es.checkUrl, ids)
if err != nil {
return err
}

check = price
check = priceData

return nil
})
Expand All @@ -134,42 +167,111 @@ func (es *ethChecker) checkAssetPrice(ctx context.Context) error {
return err
}

if truth != check {
// check deviation
if (math.Abs(truth-check)/truth)*100 > es.deviation {
slackChan <- createMismatchedDeviationAttachment(
truth,
check,
es.network,
es.baseAsset,
)
}
truthIndex := make(map[string]Pool)
for _, truth := range truth.Pools {
truthIndex[truth.ID] = truth
}
for _, check := range check.Pools {
if truth, found := truthIndex[check.ID]; !found {
// asset not found error
es.logger.Err(fmt.Errorf("%s asset not found", check.ID)).Send()
continue
} else {
var truePrice, expectedPrice float64
var tokenName = es.pools[check.ID].TokenName

switch es.pools[check.ID].TokenAddress {
// quote for a specific base
case check.Token0.ID:
truePrice, err = strconv.ParseFloat(truth.Token1Price, 64)
if err != nil {
return err
}

expectedPrice, err = strconv.ParseFloat(check.Token1Price, 64)
if err != nil {
return err
}

case check.Token1.ID:
truePrice, err = strconv.ParseFloat(truth.Token0Price, 64)
if err != nil {
return err
}

expectedPrice, err = strconv.ParseFloat(check.Token0Price, 64)
if err != nil {
return err
}
}

es.check = check
es.truth = truth
if truePrice != expectedPrice {
if (math.Abs(truePrice-expectedPrice)/truePrice)*100 > es.deviation {
slackChan <- createMismatchedDeviationAttachment(
truePrice,
expectedPrice,
es.network,
tokenName,
)
}
}

pool := es.pools[check.ID]
pool.expectedPrice = expectedPrice
pool.truePrice = truePrice
}
}

return nil
}

func GetBundle(c *gql.Client) (float64, error) {
var bundle BundleQuery
err := c.Query(context.Background(), &bundle, nil)
func GetPrices(c *gql.Client, ids []string) (Data, error) {
var bundle Data
err := c.Query(context.Background(), &bundle, map[string]interface{}{
"ids": ids,
})

if err != nil {
return 0, err
return Data{}, err
}

return strconv.ParseFloat(bundle.Bundle.EthPriceUSD, 64)
return bundle, nil
}

func (ec *EthService) getPrices(network string) (float64, float64, error) {
func (ec *EthService) getPrices(network, assetName string) (float64, float64, error) {
service, found := ec.services[network]
if !found {
return 0, 0, fmt.Errorf("network %s not found", network)
}

service.mut.Lock()
defer service.mut.Unlock()
id, found := service.assetIdMap[assetName]
if !found {
return 0, 0, fmt.Errorf("asset %s not found on network %s", assetName, network)
}

pool, found := service.pools[id]
if !found {
return 0, 0, fmt.Errorf("asset %s not synced on indexer %s", assetName, network)
}

return pool.truePrice, pool.expectedPrice, nil
}

func (ec *EthService) getAllTokenNames(network string) (names []string, ids []string, err error) {
service, found := ec.services[network]
if !found {
err = fmt.Errorf("network %s not found", network)
return
}

service.mut.Lock()
defer service.mut.Unlock()
for assetName, id := range service.assetIdMap {
names = append(names, assetName)
ids = append(ids, id)
}

return service.check, service.truth, nil
return
}
17 changes: 14 additions & 3 deletions monitor/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,27 @@ func handleSlashCommand(es *EthService, command *slack.SlashCommand) error {
return fmt.Errorf("no network")
}

network := commands[0]
switch command.Command {
case "/netstatus":
check, truth, err := es.getPrices(network)
if len(commands) < 2 {
return fmt.Errorf("no token name")
}

network, tokenName := commands[0], commands[1]
check, truth, err := es.getPrices(network, tokenName)
if err != nil {
return err
}

slackChan <- checkPriceAttachment(check, truth, network)
slackChan <- checkPriceAttachment(check, truth, tokenName, network)

case "/listassets":
names, ids, err := es.getAllTokenNames(commands[0])
if err != nil {
return err
}

slackChan <- listAttachment(commands[0], names, ids)
}
return nil
}
Loading