diff options
author | sinanmohd <sinan@sinanmohd.com> | 2024-07-04 13:13:23 +0530 |
---|---|---|
committer | sinanmohd <sinan@sinanmohd.com> | 2024-07-05 17:02:06 +0530 |
commit | e2f67996a608346ea3f3525ef2febf6ca5d2b78c (patch) | |
tree | 932f766b65ed5a6ff3d57c772292a2b75210d9aa /bpf/main.go | |
parent | e309091d17720b69c53172a41c0ea45ad7b66911 (diff) |
refactor: drop http api, move to sqlc/postgresql
Diffstat (limited to 'bpf/main.go')
-rw-r--r-- | bpf/main.go | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/bpf/main.go b/bpf/main.go new file mode 100644 index 0000000..47fdc41 --- /dev/null +++ b/bpf/main.go @@ -0,0 +1,124 @@ +package bpf + +import ( + "errors" + "fmt" + "log" + "net" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" +) + +type UsageStat struct { + lastSeen time.Time + lastDbPush time.Time + ingress uint64 + egress uint64 +} +type UsageMap map[uint32]UsageStat + +func Run(iface *net.Interface) { + usageMap := make(UsageMap) + + objs := bpfObjects{} + if err := loadBpfObjects(&objs, nil); err != nil { + log.Fatalf("loading objects: %s", err) + } + defer objs.Close() + + ingressLink, err := link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: objs.IngressFunc, + Attach: ebpf.AttachTCXIngress, + }) + if err != nil { + log.Fatalf("could not attach TCx program: %s", err) + } + defer ingressLink.Close() + + egressLink, err := link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: objs.EgressFunc, + Attach: ebpf.AttachTCXEgress, + }) + if err != nil { + log.Fatalf("could not attach TCx program: %s", err) + } + defer egressLink.Close() + + bpfTicker := time.NewTicker(1 * time.Second) + defer bpfTicker.Stop() + dbTicker := time.NewTicker(60 * time.Second) + defer dbTicker.Stop() + for { + select { + case <-bpfTicker.C: + usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap) + case <-dbTicker.C: + continue; + } + } +} + +func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) { + timeStart := time.Now() + batchKeys := make([]uint32, 4096) + batchValues := make([]uint64, 4096) + var key uint32 + + cursor := ebpf.MapBatchCursor{} + for { + _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + for i := range batchKeys { + key = batchKeys[i] + usage, ok := usageMap[key] + if ok { + usage.ingress += batchValues[i] + usage.lastSeen = timeStart + usageMap[key] = usage; + } else { + usageMap[key] = UsageStat { + ingress: batchValues[i], + lastDbPush: timeStart, + lastSeen: timeStart, + } + } + } + + if (errors.Is(err, ebpf.ErrKeyNotExist)) { + break; + } else if err != nil{ + fmt.Println(err) + break + } + } + + cursor = ebpf.MapBatchCursor{} + for { + _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + for i := range batchKeys { + key = batchKeys[i] + usage, ok := usageMap[key] + if ok { + usage.egress += batchValues[i] + usage.lastSeen = timeStart + usageMap[key] = usage; + } else { + usageMap[key] = UsageStat { + egress: batchValues[i], + lastDbPush: timeStart, + lastSeen: timeStart, + } + } + } + + if (errors.Is(err, ebpf.ErrKeyNotExist)) { + break; + } else if err != nil{ + fmt.Println(err) + break + } + } +} |