From bd99f6b4d67c386daf604c94999e5de72584f883 Mon Sep 17 00:00:00 2001 From: sinanmohd Date: Fri, 5 Jul 2024 17:00:54 +0530 Subject: bpf/usage: log to database --- bpf/main.go | 85 +++++++++++++++++++++++++++++++++++++++++++------------- cmd/redq/main.go | 23 +++++++++++---- go.mod | 1 - go.sum | 2 -- 4 files changed, 83 insertions(+), 28 deletions(-) diff --git a/bpf/main.go b/bpf/main.go index 47fdc41..1f5610d 100644 --- a/bpf/main.go +++ b/bpf/main.go @@ -1,6 +1,7 @@ package bpf import ( + "context" "errors" "fmt" "log" @@ -9,17 +10,19 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" + "github.com/jackc/pgx/v5/pgtype" + "sinanmohd.com/redq/db" ) type UsageStat struct { - lastSeen time.Time + lastSeen time.Time lastDbPush time.Time - ingress uint64 - egress uint64 + ingress uint64 + egress uint64 } type UsageMap map[uint32]UsageStat -func Run(iface *net.Interface) { +func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { usageMap := make(UsageMap) objs := bpfObjects{} @@ -57,11 +60,55 @@ func Run(iface *net.Interface) { case <-bpfTicker.C: usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap) case <-dbTicker.C: - continue; + usageMap.dbPush(queries, ctxDb) + continue } } } +func (usageStat UsageStat) expired(timeStart *time.Time) bool { + timeDiff := timeStart.Sub(usageStat.lastSeen) + if timeDiff > time.Minute { + return true + } + + timeDiff = timeStart.Sub(usageStat.lastDbPush) + if timeDiff > time.Hour { + return true + } + + return false +} + +func (usageMap UsageMap) dbPush(queries *db.Queries, ctxDb context.Context) { + timeStart := time.Now() + + for key, value := range usageMap { + if !value.expired(&timeStart) { + continue + } + + err := queries.EnterUsage(ctxDb, db.EnterUsageParams{ + Hardwareaddr: int32(key), + Starttime: pgtype.Timestamp{ + Time: value.lastDbPush, + Valid: true, + }, + Stoptime: pgtype.Timestamp{ + Time: value.lastSeen, + Valid: true, + }, + Egress: int32(value.egress), + Ingress: int32(value.ingress), + }) + if err != nil { + log.Println(err) + } + + delete(usageMap, key) + } +} + func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) { timeStart := time.Now() batchKeys := make([]uint32, 4096) @@ -77,19 +124,19 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) { if ok { usage.ingress += batchValues[i] usage.lastSeen = timeStart - usageMap[key] = usage; + usageMap[key] = usage } else { - usageMap[key] = UsageStat { - ingress: batchValues[i], + usageMap[key] = UsageStat{ + ingress: batchValues[i], lastDbPush: timeStart, - lastSeen: timeStart, + lastSeen: timeStart, } } } - if (errors.Is(err, ebpf.ErrKeyNotExist)) { - break; - } else if err != nil{ + if errors.Is(err, ebpf.ErrKeyNotExist) { + break + } else if err != nil { fmt.Println(err) break } @@ -104,19 +151,19 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) { if ok { usage.egress += batchValues[i] usage.lastSeen = timeStart - usageMap[key] = usage; + usageMap[key] = usage } else { - usageMap[key] = UsageStat { - egress: batchValues[i], + usageMap[key] = UsageStat{ + egress: batchValues[i], lastDbPush: timeStart, - lastSeen: timeStart, + lastSeen: timeStart, } } } - if (errors.Is(err, ebpf.ErrKeyNotExist)) { - break; - } else if err != nil{ + if errors.Is(err, ebpf.ErrKeyNotExist) { + break + } else if err != nil { fmt.Println(err) break } diff --git a/cmd/redq/main.go b/cmd/redq/main.go index d8b9b58..4231b52 100644 --- a/cmd/redq/main.go +++ b/cmd/redq/main.go @@ -1,17 +1,28 @@ package main import ( + "context" "log" "net" - redqbpf "sinanmohd.com/redq/bpf" + "github.com/jackc/pgx/v5" + "sinanmohd.com/redq/bpf" + "sinanmohd.com/redq/db" ) func main() { - iface, err := net.InterfaceByName("wlan0") - if err != nil { - log.Fatalf("lookup network: %s", err) - } + iface, err := net.InterfaceByName("wlan0") + if err != nil { + log.Fatalf("lookup network: %s", err) + } - redqbpf.Run(iface) + ctx := context.Background() + conn, err := pgx.Connect(ctx, "user=redq_ebpf dbname=redq_ebpf") + if err != nil { + log.Fatalf("connecting database: %s", err) + } + defer conn.Close(ctx) + queries := db.New(conn) + + bpf.Run(iface, queries, ctx) } diff --git a/go.mod b/go.mod index 84638f1..96b5be8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.22.0 require ( github.com/cilium/ebpf v0.15.0 github.com/jackc/pgx/v5 v5.6.0 - golang.org/x/net v0.10.0 ) require ( diff --git a/go.sum b/go.sum index 5d343e8..fae3e91 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI= golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -- cgit v1.2.3