diff options
Diffstat (limited to 'usage')
-rw-r--r-- | usage/main.go | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/usage/main.go b/usage/main.go index 72cf4fd..526e092 100644 --- a/usage/main.go +++ b/usage/main.go @@ -14,7 +14,7 @@ import ( "sinanmohd.com/redq/db" ) -type usageStat struct { +type UsageStat struct { lastSeen time.Time lastDbPush time.Time BandwidthIngress uint64 @@ -23,10 +23,10 @@ type usageStat struct { Egress uint64 } -type usageMap map[uint64]usageStat +type usageMap map[uint64]UsageStat type Usage struct { Data usageMap - Mutex sync.Mutex + Mutex sync.RWMutex objs bpfObjects egressLink, ingressLink link.Link } @@ -140,7 +140,7 @@ func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired b return nil } -func (us *usageStat) expired(timeStart *time.Time) bool { +func (us *UsageStat) expired(timeStart *time.Time) bool { timeDiff := timeStart.Sub(us.lastSeen) if timeDiff > time.Minute { return true @@ -160,33 +160,40 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { batchValues := make([]uint64, 4096) var key uint64 + u.Mutex.Lock() + for key, value := range u.Data { + value.BandwidthIngress = 0 + value.BandwidthEgress = 0 + u.Data[key] = value + } + u.Mutex.Unlock() + cursor := ebpf.MapBatchCursor{} for { _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() for i := range batchKeys { - /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */ if batchValues[i] == 0 { continue } key = batchKeys[i] - u.Mutex.Lock() usage, ok := u.Data[key] if ok { - usage.BandwidthIngress = batchValues[i] - usage.Ingress + usage.BandwidthIngress = batchValues[i] usage.Ingress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { - u.Data[key] = usageStat{ + u.Data[key] = UsageStat{ BandwidthIngress: batchValues[i], Ingress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } - u.Mutex.Unlock() } + u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break @@ -198,30 +205,29 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { cursor = ebpf.MapBatchCursor{} for { _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() for i := range batchKeys { - /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */ if batchValues[i] == 0 { continue } key = batchKeys[i] - u.Mutex.Lock() usage, ok := u.Data[key] if ok { - usage.BandwidthEgress = batchValues[i] - usage.Egress + usage.BandwidthEgress = batchValues[i] usage.Egress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { - u.Data[key] = usageStat{ + u.Data[key] = UsageStat{ BandwidthEgress: batchValues[i], Egress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } - u.Mutex.Unlock() } + u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break |