summaryrefslogtreecommitdiff
path: root/usage/main.go
diff options
context:
space:
mode:
authorsinanmohd <sinan@sinanmohd.com>2024-07-07 07:37:57 +0530
committersinanmohd <sinan@sinanmohd.com>2024-07-07 07:44:48 +0530
commita1692f732078adad272256639f7a3ff14c9e643a (patch)
tree8e3507ebd905e6fec65ac63bb6260b21489c2278 /usage/main.go
parent22ae2f9b78dfdd2f59340d272ef598deb56cc245 (diff)
api/bandwidth: init
Diffstat (limited to 'usage/main.go')
-rw-r--r--usage/main.go34
1 files changed, 20 insertions, 14 deletions
diff --git a/usage/main.go b/usage/main.go
index 72cf4fd..526e092 100644
--- a/usage/main.go
+++ b/usage/main.go
@@ -14,7 +14,7 @@ import (
"sinanmohd.com/redq/db"
)
-type usageStat struct {
+type UsageStat struct {
lastSeen time.Time
lastDbPush time.Time
BandwidthIngress uint64
@@ -23,10 +23,10 @@ type usageStat struct {
Egress uint64
}
-type usageMap map[uint64]usageStat
+type usageMap map[uint64]UsageStat
type Usage struct {
Data usageMap
- Mutex sync.Mutex
+ Mutex sync.RWMutex
objs bpfObjects
egressLink, ingressLink link.Link
}
@@ -140,7 +140,7 @@ func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired b
return nil
}
-func (us *usageStat) expired(timeStart *time.Time) bool {
+func (us *UsageStat) expired(timeStart *time.Time) bool {
timeDiff := timeStart.Sub(us.lastSeen)
if timeDiff > time.Minute {
return true
@@ -160,33 +160,40 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error {
batchValues := make([]uint64, 4096)
var key uint64
+ u.Mutex.Lock()
+ for key, value := range u.Data {
+ value.BandwidthIngress = 0
+ value.BandwidthEgress = 0
+ u.Data[key] = value
+ }
+ u.Mutex.Unlock()
+
cursor := ebpf.MapBatchCursor{}
for {
_, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
+ u.Mutex.Lock()
for i := range batchKeys {
- /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */
if batchValues[i] == 0 {
continue
}
key = batchKeys[i]
- u.Mutex.Lock()
usage, ok := u.Data[key]
if ok {
- usage.BandwidthIngress = batchValues[i] - usage.Ingress
+ usage.BandwidthIngress = batchValues[i]
usage.Ingress += batchValues[i]
usage.lastSeen = timeStart
u.Data[key] = usage
} else {
- u.Data[key] = usageStat{
+ u.Data[key] = UsageStat{
BandwidthIngress: batchValues[i],
Ingress: batchValues[i],
lastDbPush: timeStart,
lastSeen: timeStart,
}
}
- u.Mutex.Unlock()
}
+ u.Mutex.Unlock()
if errors.Is(err, ebpf.ErrKeyNotExist) {
break
@@ -198,30 +205,29 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error {
cursor = ebpf.MapBatchCursor{}
for {
_, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
+ u.Mutex.Lock()
for i := range batchKeys {
- /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */
if batchValues[i] == 0 {
continue
}
key = batchKeys[i]
- u.Mutex.Lock()
usage, ok := u.Data[key]
if ok {
- usage.BandwidthEgress = batchValues[i] - usage.Egress
+ usage.BandwidthEgress = batchValues[i]
usage.Egress += batchValues[i]
usage.lastSeen = timeStart
u.Data[key] = usage
} else {
- u.Data[key] = usageStat{
+ u.Data[key] = UsageStat{
BandwidthEgress: batchValues[i],
Egress: batchValues[i],
lastDbPush: timeStart,
lastSeen: timeStart,
}
}
- u.Mutex.Unlock()
}
+ u.Mutex.Unlock()
if errors.Is(err, ebpf.ErrKeyNotExist) {
break