summaryrefslogtreecommitdiff
path: root/bpf
diff options
context:
space:
mode:
authorsinanmohd <sinan@sinanmohd.com>2024-07-05 17:00:54 +0530
committersinanmohd <sinan@sinanmohd.com>2024-07-05 18:02:53 +0530
commitbd99f6b4d67c386daf604c94999e5de72584f883 (patch)
tree11d8434fc5691c163c939c4cf3eacff4cf9bdec4 /bpf
parente2f67996a608346ea3f3525ef2febf6ca5d2b78c (diff)
bpf/usage: log to database
Diffstat (limited to 'bpf')
-rw-r--r--bpf/main.go85
1 files changed, 66 insertions, 19 deletions
diff --git a/bpf/main.go b/bpf/main.go
index 47fdc41..1f5610d 100644
--- a/bpf/main.go
+++ b/bpf/main.go
@@ -1,6 +1,7 @@
package bpf
import (
+ "context"
"errors"
"fmt"
"log"
@@ -9,17 +10,19 @@ import (
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
+ "github.com/jackc/pgx/v5/pgtype"
+ "sinanmohd.com/redq/db"
)
type UsageStat struct {
- lastSeen time.Time
+ lastSeen time.Time
lastDbPush time.Time
- ingress uint64
- egress uint64
+ ingress uint64
+ egress uint64
}
type UsageMap map[uint32]UsageStat
-func Run(iface *net.Interface) {
+func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
usageMap := make(UsageMap)
objs := bpfObjects{}
@@ -57,11 +60,55 @@ func Run(iface *net.Interface) {
case <-bpfTicker.C:
usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap)
case <-dbTicker.C:
- continue;
+ usageMap.dbPush(queries, ctxDb)
+ continue
}
}
}
+func (usageStat UsageStat) expired(timeStart *time.Time) bool {
+ timeDiff := timeStart.Sub(usageStat.lastSeen)
+ if timeDiff > time.Minute {
+ return true
+ }
+
+ timeDiff = timeStart.Sub(usageStat.lastDbPush)
+ if timeDiff > time.Hour {
+ return true
+ }
+
+ return false
+}
+
+func (usageMap UsageMap) dbPush(queries *db.Queries, ctxDb context.Context) {
+ timeStart := time.Now()
+
+ for key, value := range usageMap {
+ if !value.expired(&timeStart) {
+ continue
+ }
+
+ err := queries.EnterUsage(ctxDb, db.EnterUsageParams{
+ Hardwareaddr: int32(key),
+ Starttime: pgtype.Timestamp{
+ Time: value.lastDbPush,
+ Valid: true,
+ },
+ Stoptime: pgtype.Timestamp{
+ Time: value.lastSeen,
+ Valid: true,
+ },
+ Egress: int32(value.egress),
+ Ingress: int32(value.ingress),
+ })
+ if err != nil {
+ log.Println(err)
+ }
+
+ delete(usageMap, key)
+ }
+}
+
func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) {
timeStart := time.Now()
batchKeys := make([]uint32, 4096)
@@ -77,19 +124,19 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) {
if ok {
usage.ingress += batchValues[i]
usage.lastSeen = timeStart
- usageMap[key] = usage;
+ usageMap[key] = usage
} else {
- usageMap[key] = UsageStat {
- ingress: batchValues[i],
+ usageMap[key] = UsageStat{
+ ingress: batchValues[i],
lastDbPush: timeStart,
- lastSeen: timeStart,
+ lastSeen: timeStart,
}
}
}
- if (errors.Is(err, ebpf.ErrKeyNotExist)) {
- break;
- } else if err != nil{
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ break
+ } else if err != nil {
fmt.Println(err)
break
}
@@ -104,19 +151,19 @@ func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) {
if ok {
usage.egress += batchValues[i]
usage.lastSeen = timeStart
- usageMap[key] = usage;
+ usageMap[key] = usage
} else {
- usageMap[key] = UsageStat {
- egress: batchValues[i],
+ usageMap[key] = UsageStat{
+ egress: batchValues[i],
lastDbPush: timeStart,
- lastSeen: timeStart,
+ lastSeen: timeStart,
}
}
}
- if (errors.Is(err, ebpf.ErrKeyNotExist)) {
- break;
- } else if err != nil{
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ break
+ } else if err != nil {
fmt.Println(err)
break
}