diff options
-rw-r--r-- | cmd/redq/main.go | 8 | ||||
-rw-r--r-- | usage/bpf.c (renamed from bpf/bpf_usage.c) | 0 | ||||
-rw-r--r-- | usage/bpf_bpfel.go (renamed from bpf/bpf_bpfel.go) | 2 | ||||
-rw-r--r-- | usage/bpf_bpfel.o (renamed from bpf/bpf_bpfel.o) | bin | 6592 -> 6584 bytes | |||
-rw-r--r-- | usage/gen.go (renamed from bpf/gen.go) | 4 | ||||
-rw-r--r-- | usage/main.go (renamed from bpf/main.go) | 74 |
6 files changed, 51 insertions, 37 deletions
diff --git a/cmd/redq/main.go b/cmd/redq/main.go index 4231b52..8807105 100644 --- a/cmd/redq/main.go +++ b/cmd/redq/main.go @@ -6,11 +6,15 @@ import ( "net" "github.com/jackc/pgx/v5" - "sinanmohd.com/redq/bpf" + "sinanmohd.com/redq/usage" "sinanmohd.com/redq/db" ) func main() { + u := &usage.Usage { + Data : make(usage.UsageMap), + } + iface, err := net.InterfaceByName("wlan0") if err != nil { log.Fatalf("lookup network: %s", err) @@ -24,5 +28,5 @@ func main() { defer conn.Close(ctx) queries := db.New(conn) - bpf.Run(iface, queries, ctx) + u.Run(iface, queries, ctx) } diff --git a/bpf/bpf_usage.c b/usage/bpf.c index 8b2343a..8b2343a 100644 --- a/bpf/bpf_usage.c +++ b/usage/bpf.c diff --git a/bpf/bpf_bpfel.go b/usage/bpf_bpfel.go index f4ffb76..754ac37 100644 --- a/bpf/bpf_bpfel.go +++ b/usage/bpf_bpfel.go @@ -1,7 +1,7 @@ // Code generated by bpf2go; DO NOT EDIT. //go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64 -package bpf +package usage import ( "bytes" diff --git a/bpf/bpf_bpfel.o b/usage/bpf_bpfel.o Binary files differindex d9a44b4..3526a68 100644 --- a/bpf/bpf_bpfel.o +++ b/usage/bpf_bpfel.o diff --git a/bpf/gen.go b/usage/gen.go index ff585db..60e4dd6 100644 --- a/bpf/gen.go +++ b/usage/gen.go @@ -1,3 +1,3 @@ -package bpf +package usage -//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf_usage.c +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf.c diff --git a/bpf/main.go b/usage/main.go index d956a9b..8015aa0 100644 --- a/bpf/main.go +++ b/usage/main.go @@ -1,4 +1,4 @@ -package bpf +package usage import ( "context" @@ -7,6 +7,7 @@ import ( "net" "os" "os/signal" + "sync" "syscall" "time" @@ -19,16 +20,19 @@ import ( type UsageStat struct { lastSeen time.Time lastDbPush time.Time - bandwidthIngress uint64 - bandwidthEgress uint64 - ingress uint64 - egress uint64 + BandwidthIngress uint64 + BandwidthEgress uint64 + Ingress uint64 + Egress uint64 } -type UsageMap map[uint64]UsageStat -func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { - usageMap := make(UsageMap) +type UsageMap map[uint64]UsageStat +type Usage struct { + Data UsageMap + Mutex sync.Mutex +} +func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { objs := bpfObjects{} if err := loadBpfObjects(&objs, nil); err != nil { log.Fatalf("loading objects: %s", err) @@ -65,18 +69,18 @@ func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { for { select { case <-bpfTicker.C: - err := usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap) + err := u.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap) if err != nil { log.Printf("updating usageMap: %s", err) } case <-sigs: - err := usageMap.updateDb(queries, ctxDb, false) + err := u.updateDb(queries, ctxDb, false) if err != nil { log.Printf("updating Database: %s", err) } os.Exit(0) case <-dbTicker.C: - err := usageMap.updateDb(queries, ctxDb, true) + err := u.updateDb(queries, ctxDb, true) if err != nil { log.Printf("updating Database: %s", err) } @@ -84,7 +88,7 @@ func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { } } -func (usageStat UsageStat) expired(timeStart *time.Time) bool { +func (usageStat *UsageStat) expired(timeStart *time.Time) bool { timeDiff := timeStart.Sub(usageStat.lastSeen) if timeDiff > time.Minute { return true @@ -98,10 +102,11 @@ func (usageStat UsageStat) expired(timeStart *time.Time) bool { return false } -func (usageMap UsageMap) updateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error { +func (u *Usage) updateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error { timeStart := time.Now() - for key, value := range usageMap { + u.Mutex.Lock() + for key, value := range u.Data { if ifExpired && !value.expired(&timeStart) { continue } @@ -116,20 +121,21 @@ func (usageMap UsageMap) updateDb(queries *db.Queries, ctxDb context.Context, if Time: value.lastSeen, Valid: true, }, - Egress: int64(value.egress), - Ingress: int64(value.ingress), + Egress: int64(value.Egress), + Ingress: int64(value.Ingress), }) if err != nil { return err } - delete(usageMap, key) + delete(u.Data, key) } + u.Mutex.Unlock() return nil } -func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error { +func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { timeStart := time.Now() batchKeys := make([]uint64, 4096) batchValues := make([]uint64, 4096) @@ -145,20 +151,22 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error { } key = batchKeys[i] - usage, ok := usageMap[key] + u.Mutex.Lock() + usage, ok := u.Data[key] if ok { - usage.bandwidthIngress = batchValues[i] - usage.ingress - usage.ingress += batchValues[i] + usage.BandwidthIngress = batchValues[i] - usage.Ingress + usage.Ingress += batchValues[i] usage.lastSeen = timeStart - usageMap[key] = usage + u.Data[key] = usage } else { - usageMap[key] = UsageStat{ - bandwidthIngress: batchValues[i], - ingress: batchValues[i], + u.Data[key] = UsageStat{ + BandwidthIngress: batchValues[i], + Ingress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } + u.Mutex.Unlock() } if errors.Is(err, ebpf.ErrKeyNotExist) { @@ -178,20 +186,22 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error { } key = batchKeys[i] - usage, ok := usageMap[key] + u.Mutex.Lock() + usage, ok := u.Data[key] if ok { - usage.bandwidthEgress = batchValues[i] - usage.egress - usage.egress += batchValues[i] + usage.BandwidthEgress = batchValues[i] - usage.Egress + usage.Egress += batchValues[i] usage.lastSeen = timeStart - usageMap[key] = usage + u.Data[key] = usage } else { - usageMap[key] = UsageStat{ - bandwidthEgress: batchValues[i], - egress: batchValues[i], + u.Data[key] = UsageStat{ + BandwidthEgress: batchValues[i], + Egress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } + u.Mutex.Unlock() } if errors.Is(err, ebpf.ErrKeyNotExist) { |