summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/redq/main.go8
-rw-r--r--usage/bpf.c (renamed from bpf/bpf_usage.c)0
-rw-r--r--usage/bpf_bpfel.go (renamed from bpf/bpf_bpfel.go)2
-rw-r--r--usage/bpf_bpfel.o (renamed from bpf/bpf_bpfel.o)bin6592 -> 6584 bytes
-rw-r--r--usage/gen.go (renamed from bpf/gen.go)4
-rw-r--r--usage/main.go (renamed from bpf/main.go)74
6 files changed, 51 insertions, 37 deletions
diff --git a/cmd/redq/main.go b/cmd/redq/main.go
index 4231b52..8807105 100644
--- a/cmd/redq/main.go
+++ b/cmd/redq/main.go
@@ -6,11 +6,15 @@ import (
"net"
"github.com/jackc/pgx/v5"
- "sinanmohd.com/redq/bpf"
+ "sinanmohd.com/redq/usage"
"sinanmohd.com/redq/db"
)
func main() {
+ u := &usage.Usage {
+ Data : make(usage.UsageMap),
+ }
+
iface, err := net.InterfaceByName("wlan0")
if err != nil {
log.Fatalf("lookup network: %s", err)
@@ -24,5 +28,5 @@ func main() {
defer conn.Close(ctx)
queries := db.New(conn)
- bpf.Run(iface, queries, ctx)
+ u.Run(iface, queries, ctx)
}
diff --git a/bpf/bpf_usage.c b/usage/bpf.c
index 8b2343a..8b2343a 100644
--- a/bpf/bpf_usage.c
+++ b/usage/bpf.c
diff --git a/bpf/bpf_bpfel.go b/usage/bpf_bpfel.go
index f4ffb76..754ac37 100644
--- a/bpf/bpf_bpfel.go
+++ b/usage/bpf_bpfel.go
@@ -1,7 +1,7 @@
// Code generated by bpf2go; DO NOT EDIT.
//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64
-package bpf
+package usage
import (
"bytes"
diff --git a/bpf/bpf_bpfel.o b/usage/bpf_bpfel.o
index d9a44b4..3526a68 100644
--- a/bpf/bpf_bpfel.o
+++ b/usage/bpf_bpfel.o
Binary files differ
diff --git a/bpf/gen.go b/usage/gen.go
index ff585db..60e4dd6 100644
--- a/bpf/gen.go
+++ b/usage/gen.go
@@ -1,3 +1,3 @@
-package bpf
+package usage
-//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf_usage.c
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf.c
diff --git a/bpf/main.go b/usage/main.go
index d956a9b..8015aa0 100644
--- a/bpf/main.go
+++ b/usage/main.go
@@ -1,4 +1,4 @@
-package bpf
+package usage
import (
"context"
@@ -7,6 +7,7 @@ import (
"net"
"os"
"os/signal"
+ "sync"
"syscall"
"time"
@@ -19,16 +20,19 @@ import (
type UsageStat struct {
lastSeen time.Time
lastDbPush time.Time
- bandwidthIngress uint64
- bandwidthEgress uint64
- ingress uint64
- egress uint64
+ BandwidthIngress uint64
+ BandwidthEgress uint64
+ Ingress uint64
+ Egress uint64
}
-type UsageMap map[uint64]UsageStat
-func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
- usageMap := make(UsageMap)
+type UsageMap map[uint64]UsageStat
+type Usage struct {
+ Data UsageMap
+ Mutex sync.Mutex
+}
+func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
@@ -65,18 +69,18 @@ func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
for {
select {
case <-bpfTicker.C:
- err := usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap)
+ err := u.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap)
if err != nil {
log.Printf("updating usageMap: %s", err)
}
case <-sigs:
- err := usageMap.updateDb(queries, ctxDb, false)
+ err := u.updateDb(queries, ctxDb, false)
if err != nil {
log.Printf("updating Database: %s", err)
}
os.Exit(0)
case <-dbTicker.C:
- err := usageMap.updateDb(queries, ctxDb, true)
+ err := u.updateDb(queries, ctxDb, true)
if err != nil {
log.Printf("updating Database: %s", err)
}
@@ -84,7 +88,7 @@ func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
}
}
-func (usageStat UsageStat) expired(timeStart *time.Time) bool {
+func (usageStat *UsageStat) expired(timeStart *time.Time) bool {
timeDiff := timeStart.Sub(usageStat.lastSeen)
if timeDiff > time.Minute {
return true
@@ -98,10 +102,11 @@ func (usageStat UsageStat) expired(timeStart *time.Time) bool {
return false
}
-func (usageMap UsageMap) updateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error {
+func (u *Usage) updateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error {
timeStart := time.Now()
- for key, value := range usageMap {
+ u.Mutex.Lock()
+ for key, value := range u.Data {
if ifExpired && !value.expired(&timeStart) {
continue
}
@@ -116,20 +121,21 @@ func (usageMap UsageMap) updateDb(queries *db.Queries, ctxDb context.Context, if
Time: value.lastSeen,
Valid: true,
},
- Egress: int64(value.egress),
- Ingress: int64(value.ingress),
+ Egress: int64(value.Egress),
+ Ingress: int64(value.Ingress),
})
if err != nil {
return err
}
- delete(usageMap, key)
+ delete(u.Data, key)
}
+ u.Mutex.Unlock()
return nil
}
-func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error {
+func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error {
timeStart := time.Now()
batchKeys := make([]uint64, 4096)
batchValues := make([]uint64, 4096)
@@ -145,20 +151,22 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error {
}
key = batchKeys[i]
- usage, ok := usageMap[key]
+ u.Mutex.Lock()
+ usage, ok := u.Data[key]
if ok {
- usage.bandwidthIngress = batchValues[i] - usage.ingress
- usage.ingress += batchValues[i]
+ usage.BandwidthIngress = batchValues[i] - usage.Ingress
+ usage.Ingress += batchValues[i]
usage.lastSeen = timeStart
- usageMap[key] = usage
+ u.Data[key] = usage
} else {
- usageMap[key] = UsageStat{
- bandwidthIngress: batchValues[i],
- ingress: batchValues[i],
+ u.Data[key] = UsageStat{
+ BandwidthIngress: batchValues[i],
+ Ingress: batchValues[i],
lastDbPush: timeStart,
lastSeen: timeStart,
}
}
+ u.Mutex.Unlock()
}
if errors.Is(err, ebpf.ErrKeyNotExist) {
@@ -178,20 +186,22 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error {
}
key = batchKeys[i]
- usage, ok := usageMap[key]
+ u.Mutex.Lock()
+ usage, ok := u.Data[key]
if ok {
- usage.bandwidthEgress = batchValues[i] - usage.egress
- usage.egress += batchValues[i]
+ usage.BandwidthEgress = batchValues[i] - usage.Egress
+ usage.Egress += batchValues[i]
usage.lastSeen = timeStart
- usageMap[key] = usage
+ u.Data[key] = usage
} else {
- usageMap[key] = UsageStat{
- bandwidthEgress: batchValues[i],
- egress: batchValues[i],
+ u.Data[key] = UsageStat{
+ BandwidthEgress: batchValues[i],
+ Egress: batchValues[i],
lastDbPush: timeStart,
lastSeen: timeStart,
}
}
+ u.Mutex.Unlock()
}
if errors.Is(err, ebpf.ErrKeyNotExist) {