From 3da7837e2e51763b00b2f7af80973b2d5d61933d Mon Sep 17 00:00:00 2001 From: sinanmohd Date: Sun, 7 Jul 2024 18:21:59 +0530 Subject: usage -> bpf/usage --- api/bandwidth.go | 2 +- api/main.go | 2 +- api/usage.go | 2 +- bpf/usage/bpf.c | 102 +++++++++++++++++++++ bpf/usage/bpf_bpfel.go | 125 ++++++++++++++++++++++++++ bpf/usage/bpf_bpfel.o | Bin 0 -> 6584 bytes bpf/usage/gen.go | 3 + bpf/usage/main.go | 240 +++++++++++++++++++++++++++++++++++++++++++++++++ cmd/main.go | 2 +- usage/bpf.c | 102 --------------------- usage/bpf_bpfel.go | 125 -------------------------- usage/bpf_bpfel.o | Bin 6584 -> 0 bytes usage/gen.go | 3 - usage/main.go | 240 ------------------------------------------------- 14 files changed, 474 insertions(+), 474 deletions(-) create mode 100644 bpf/usage/bpf.c create mode 100644 bpf/usage/bpf_bpfel.go create mode 100644 bpf/usage/bpf_bpfel.o create mode 100644 bpf/usage/gen.go create mode 100644 bpf/usage/main.go delete mode 100644 usage/bpf.c delete mode 100644 usage/bpf_bpfel.go delete mode 100644 usage/bpf_bpfel.o delete mode 100644 usage/gen.go delete mode 100644 usage/main.go diff --git a/api/bandwidth.go b/api/bandwidth.go index 51b1760..d42c846 100644 --- a/api/bandwidth.go +++ b/api/bandwidth.go @@ -8,7 +8,7 @@ import ( "github.com/cilium/cilium/pkg/mac" "github.com/dustin/go-humanize" - "sinanmohd.com/redq/usage" + "sinanmohd.com/redq/bpf/usage" ) type BandwidthStat struct { diff --git a/api/main.go b/api/main.go index eb8715f..ea2d7f9 100644 --- a/api/main.go +++ b/api/main.go @@ -8,7 +8,7 @@ import ( "sinanmohd.com/redq/db" "sinanmohd.com/redq/dns" - "sinanmohd.com/redq/usage" + "sinanmohd.com/redq/bpf/usage" ) const ( diff --git a/api/usage.go b/api/usage.go index 5849ba7..378c617 100644 --- a/api/usage.go +++ b/api/usage.go @@ -8,7 +8,7 @@ import ( "github.com/dustin/go-humanize" "sinanmohd.com/redq/db" - "sinanmohd.com/redq/usage" + "sinanmohd.com/redq/bpf/usage" ) type UsageStat struct { diff --git a/bpf/usage/bpf.c b/bpf/usage/bpf.c new file mode 100644 index 0000000..8b2343a --- /dev/null +++ b/bpf/usage/bpf.c @@ -0,0 +1,102 @@ +#include +#include +#include + +#include +#include + +#define MAX_MAP_ENTRIES 4096 + +char __license[] SEC("license") = "GPL"; + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __type(key, __u64); // source mac address + __type(value, __u64); // no of bytes +} ingress_ip4_usage_map SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(max_entries, MAX_MAP_ENTRIES); + __type(key, __u64); // destination mac address + __type(value, __u64); // no of bytes +} egress_ip4_usage_map SEC(".maps"); + +typedef enum { + UPDATE_USAGE_INGRESS, + UPDATE_USAGE_EGRESS, +} update_usage_t; + +static __always_inline __u64 nchar6_to_u64(unsigned char bytes[6]) +{ + union { + char bytes[6]; + __u64 i; + } ret; + + ret.i = 0; +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + ret.bytes[0] = bytes[5]; + ret.bytes[1] = bytes[4]; + ret.bytes[2] = bytes[3]; + ret.bytes[3] = bytes[2]; + ret.bytes[4] = bytes[1]; + ret.bytes[5] = bytes[0]; +#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ + ret.bytes[0] = bytes[0]; + ret.bytes[1] = bytes[1]; + ret.bytes[2] = bytes[2]; + ret.bytes[3] = bytes[3]; + ret.bytes[4] = bytes[4]; + ret.bytes[5] = bytes[5]; +#endif + + return ret.i; +} + +static __always_inline int update_usage(void *map, struct __sk_buff *skb, + update_usage_t traffic) +{ + __u64 mac, len, *usage; + + void *data_end = (void *)(long)skb->data_end; + struct ethhdr *eth = (void *)(long)skb->data; + + if ((void *) (eth + 1) > data_end) + return TCX_PASS; + + if (skb->protocol != bpf_htons(ETH_P_IP) && + skb->protocol != bpf_htons(ETH_P_IPV6)) { + return TCX_PASS; + } + + len = skb->len - sizeof(struct ethhdr); + if (traffic == UPDATE_USAGE_INGRESS) { + mac = nchar6_to_u64(eth->h_source); + } else { + mac = nchar6_to_u64(eth->h_dest); + } + + usage = bpf_map_lookup_elem(map, &mac); + if (!usage) { + /* no entry in the map for this IP address yet. */ + bpf_map_update_elem(map, &mac, &len, BPF_ANY); + } else { + __sync_fetch_and_add(usage, len); + } + + return TCX_PASS; +} + +SEC("tc") +int ingress_func(struct __sk_buff *skb) +{ + return update_usage(&ingress_ip4_usage_map, skb, UPDATE_USAGE_INGRESS); +} + +SEC("tc") +int egress__func(struct __sk_buff *skb) +{ + return update_usage(&egress_ip4_usage_map, skb, UPDATE_USAGE_EGRESS); +} diff --git a/bpf/usage/bpf_bpfel.go b/bpf/usage/bpf_bpfel.go new file mode 100644 index 0000000..754ac37 --- /dev/null +++ b/bpf/usage/bpf_bpfel.go @@ -0,0 +1,125 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64 + +package usage + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + EgressFunc *ebpf.ProgramSpec `ebpf:"egress__func"` + IngressFunc *ebpf.ProgramSpec `ebpf:"ingress_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + EgressIp4UsageMap *ebpf.MapSpec `ebpf:"egress_ip4_usage_map"` + IngressIp4UsageMap *ebpf.MapSpec `ebpf:"ingress_ip4_usage_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + EgressIp4UsageMap *ebpf.Map `ebpf:"egress_ip4_usage_map"` + IngressIp4UsageMap *ebpf.Map `ebpf:"ingress_ip4_usage_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.EgressIp4UsageMap, + m.IngressIp4UsageMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + EgressFunc *ebpf.Program `ebpf:"egress__func"` + IngressFunc *ebpf.Program `ebpf:"ingress_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.EgressFunc, + p.IngressFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfel.o +var _BpfBytes []byte diff --git a/bpf/usage/bpf_bpfel.o b/bpf/usage/bpf_bpfel.o new file mode 100644 index 0000000..3526a68 Binary files /dev/null and b/bpf/usage/bpf_bpfel.o differ diff --git a/bpf/usage/gen.go b/bpf/usage/gen.go new file mode 100644 index 0000000..60e4dd6 --- /dev/null +++ b/bpf/usage/gen.go @@ -0,0 +1,3 @@ +package usage + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf.c diff --git a/bpf/usage/main.go b/bpf/usage/main.go new file mode 100644 index 0000000..526e092 --- /dev/null +++ b/bpf/usage/main.go @@ -0,0 +1,240 @@ +package usage + +import ( + "context" + "errors" + "log" + "net" + "sync" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/jackc/pgx/v5/pgtype" + "sinanmohd.com/redq/db" +) + +type UsageStat struct { + lastSeen time.Time + lastDbPush time.Time + BandwidthIngress uint64 + BandwidthEgress uint64 + Ingress uint64 + Egress uint64 +} + +type usageMap map[uint64]UsageStat +type Usage struct { + Data usageMap + Mutex sync.RWMutex + objs bpfObjects + egressLink, ingressLink link.Link +} + +func Close(u *Usage, queries *db.Queries, ctxDb context.Context) { + err := u.UpdateDb(queries, ctxDb, false) + if err != nil { + log.Printf("updating Database: %s", err) + } + + u.objs.Close() + u.ingressLink.Close() + u.egressLink.Close() +} + +func New(iface *net.Interface) (*Usage, error) { + var err error + var u Usage + + if err := loadBpfObjects(&u.objs, nil); err != nil { + log.Printf("loading objects: %s", err) + return nil, err + } + defer func() { + if err != nil { + u.objs.Close() + } + }() + + u.ingressLink, err = link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: u.objs.IngressFunc, + Attach: ebpf.AttachTCXIngress, + }) + if err != nil { + log.Printf("could not attach TCx program: %s", err) + return nil, err + } + defer func() { + if err != nil { + u.ingressLink.Close() + } + }() + + u.egressLink, err = link.AttachTCX(link.TCXOptions{ + Interface: iface.Index, + Program: u.objs.EgressFunc, + Attach: ebpf.AttachTCXEgress, + }) + if err != nil { + log.Printf("could not attach TCx program: %s", err) + return nil, err + } + + u.Data = make(usageMap) + return &u, nil +} + +func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { + bpfTicker := time.NewTicker(time.Second) + defer bpfTicker.Stop() + dbTicker := time.NewTicker(time.Minute) + defer dbTicker.Stop() + + for { + select { + case <-bpfTicker.C: + err := u.update(u.objs.IngressIp4UsageMap, u.objs.EgressIp4UsageMap) + if err != nil { + log.Printf("updating usageMap: %s", err) + } + case <-dbTicker.C: + err := u.UpdateDb(queries, ctxDb, true) + if err != nil { + log.Printf("updating Database: %s", err) + } + } + } +} + +func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error { + timeStart := time.Now() + + u.Mutex.Lock() + for key, value := range u.Data { + if ifExpired && !value.expired(&timeStart) { + continue + } + + err := queries.EnterUsage(ctxDb, db.EnterUsageParams{ + Hardwareaddr: int64(key), + Starttime: pgtype.Timestamp{ + Time: value.lastDbPush, + Valid: true, + }, + Stoptime: pgtype.Timestamp{ + Time: value.lastSeen, + Valid: true, + }, + Egress: int64(value.Egress), + Ingress: int64(value.Ingress), + }) + if err != nil { + return err + } + + delete(u.Data, key) + } + u.Mutex.Unlock() + + return nil +} + +func (us *UsageStat) expired(timeStart *time.Time) bool { + timeDiff := timeStart.Sub(us.lastSeen) + if timeDiff > time.Minute { + return true + } + + timeDiff = timeStart.Sub(us.lastDbPush) + if timeDiff > time.Hour { + return true + } + + return false +} + +func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { + timeStart := time.Now() + batchKeys := make([]uint64, 4096) + batchValues := make([]uint64, 4096) + var key uint64 + + u.Mutex.Lock() + for key, value := range u.Data { + value.BandwidthIngress = 0 + value.BandwidthEgress = 0 + u.Data[key] = value + } + u.Mutex.Unlock() + + cursor := ebpf.MapBatchCursor{} + for { + _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() + for i := range batchKeys { + if batchValues[i] == 0 { + continue + } + + key = batchKeys[i] + usage, ok := u.Data[key] + if ok { + usage.BandwidthIngress = batchValues[i] + usage.Ingress += batchValues[i] + usage.lastSeen = timeStart + u.Data[key] = usage + } else { + u.Data[key] = UsageStat{ + BandwidthIngress: batchValues[i], + Ingress: batchValues[i], + lastDbPush: timeStart, + lastSeen: timeStart, + } + } + } + u.Mutex.Unlock() + + if errors.Is(err, ebpf.ErrKeyNotExist) { + break + } else if err != nil { + return err + } + } + + cursor = ebpf.MapBatchCursor{} + for { + _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() + for i := range batchKeys { + if batchValues[i] == 0 { + continue + } + + key = batchKeys[i] + usage, ok := u.Data[key] + if ok { + usage.BandwidthEgress = batchValues[i] + usage.Egress += batchValues[i] + usage.lastSeen = timeStart + u.Data[key] = usage + } else { + u.Data[key] = UsageStat{ + BandwidthEgress: batchValues[i], + Egress: batchValues[i], + lastDbPush: timeStart, + lastSeen: timeStart, + } + } + } + u.Mutex.Unlock() + + if errors.Is(err, ebpf.ErrKeyNotExist) { + break + } else if err != nil { + return err + } + } + + return nil +} diff --git a/cmd/main.go b/cmd/main.go index db9cc13..5c58d44 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,7 +12,7 @@ import ( "sinanmohd.com/redq/api" "sinanmohd.com/redq/db" "sinanmohd.com/redq/dns" - "sinanmohd.com/redq/usage" + "sinanmohd.com/redq/bpf/usage" ) func main() { diff --git a/usage/bpf.c b/usage/bpf.c deleted file mode 100644 index 8b2343a..0000000 --- a/usage/bpf.c +++ /dev/null @@ -1,102 +0,0 @@ -#include -#include -#include - -#include -#include - -#define MAX_MAP_ENTRIES 4096 - -char __license[] SEC("license") = "GPL"; - -struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __uint(max_entries, MAX_MAP_ENTRIES); - __type(key, __u64); // source mac address - __type(value, __u64); // no of bytes -} ingress_ip4_usage_map SEC(".maps"); - -struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __uint(max_entries, MAX_MAP_ENTRIES); - __type(key, __u64); // destination mac address - __type(value, __u64); // no of bytes -} egress_ip4_usage_map SEC(".maps"); - -typedef enum { - UPDATE_USAGE_INGRESS, - UPDATE_USAGE_EGRESS, -} update_usage_t; - -static __always_inline __u64 nchar6_to_u64(unsigned char bytes[6]) -{ - union { - char bytes[6]; - __u64 i; - } ret; - - ret.i = 0; -#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ - ret.bytes[0] = bytes[5]; - ret.bytes[1] = bytes[4]; - ret.bytes[2] = bytes[3]; - ret.bytes[3] = bytes[2]; - ret.bytes[4] = bytes[1]; - ret.bytes[5] = bytes[0]; -#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ - ret.bytes[0] = bytes[0]; - ret.bytes[1] = bytes[1]; - ret.bytes[2] = bytes[2]; - ret.bytes[3] = bytes[3]; - ret.bytes[4] = bytes[4]; - ret.bytes[5] = bytes[5]; -#endif - - return ret.i; -} - -static __always_inline int update_usage(void *map, struct __sk_buff *skb, - update_usage_t traffic) -{ - __u64 mac, len, *usage; - - void *data_end = (void *)(long)skb->data_end; - struct ethhdr *eth = (void *)(long)skb->data; - - if ((void *) (eth + 1) > data_end) - return TCX_PASS; - - if (skb->protocol != bpf_htons(ETH_P_IP) && - skb->protocol != bpf_htons(ETH_P_IPV6)) { - return TCX_PASS; - } - - len = skb->len - sizeof(struct ethhdr); - if (traffic == UPDATE_USAGE_INGRESS) { - mac = nchar6_to_u64(eth->h_source); - } else { - mac = nchar6_to_u64(eth->h_dest); - } - - usage = bpf_map_lookup_elem(map, &mac); - if (!usage) { - /* no entry in the map for this IP address yet. */ - bpf_map_update_elem(map, &mac, &len, BPF_ANY); - } else { - __sync_fetch_and_add(usage, len); - } - - return TCX_PASS; -} - -SEC("tc") -int ingress_func(struct __sk_buff *skb) -{ - return update_usage(&ingress_ip4_usage_map, skb, UPDATE_USAGE_INGRESS); -} - -SEC("tc") -int egress__func(struct __sk_buff *skb) -{ - return update_usage(&egress_ip4_usage_map, skb, UPDATE_USAGE_EGRESS); -} diff --git a/usage/bpf_bpfel.go b/usage/bpf_bpfel.go deleted file mode 100644 index 754ac37..0000000 --- a/usage/bpf_bpfel.go +++ /dev/null @@ -1,125 +0,0 @@ -// Code generated by bpf2go; DO NOT EDIT. -//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64 - -package usage - -import ( - "bytes" - _ "embed" - "fmt" - "io" - - "github.com/cilium/ebpf" -) - -// loadBpf returns the embedded CollectionSpec for bpf. -func loadBpf() (*ebpf.CollectionSpec, error) { - reader := bytes.NewReader(_BpfBytes) - spec, err := ebpf.LoadCollectionSpecFromReader(reader) - if err != nil { - return nil, fmt.Errorf("can't load bpf: %w", err) - } - - return spec, err -} - -// loadBpfObjects loads bpf and converts it into a struct. -// -// The following types are suitable as obj argument: -// -// *bpfObjects -// *bpfPrograms -// *bpfMaps -// -// See ebpf.CollectionSpec.LoadAndAssign documentation for details. -func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { - spec, err := loadBpf() - if err != nil { - return err - } - - return spec.LoadAndAssign(obj, opts) -} - -// bpfSpecs contains maps and programs before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bpfSpecs struct { - bpfProgramSpecs - bpfMapSpecs -} - -// bpfSpecs contains programs before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bpfProgramSpecs struct { - EgressFunc *ebpf.ProgramSpec `ebpf:"egress__func"` - IngressFunc *ebpf.ProgramSpec `ebpf:"ingress_func"` -} - -// bpfMapSpecs contains maps before they are loaded into the kernel. -// -// It can be passed ebpf.CollectionSpec.Assign. -type bpfMapSpecs struct { - EgressIp4UsageMap *ebpf.MapSpec `ebpf:"egress_ip4_usage_map"` - IngressIp4UsageMap *ebpf.MapSpec `ebpf:"ingress_ip4_usage_map"` -} - -// bpfObjects contains all objects after they have been loaded into the kernel. -// -// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. -type bpfObjects struct { - bpfPrograms - bpfMaps -} - -func (o *bpfObjects) Close() error { - return _BpfClose( - &o.bpfPrograms, - &o.bpfMaps, - ) -} - -// bpfMaps contains all maps after they have been loaded into the kernel. -// -// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. -type bpfMaps struct { - EgressIp4UsageMap *ebpf.Map `ebpf:"egress_ip4_usage_map"` - IngressIp4UsageMap *ebpf.Map `ebpf:"ingress_ip4_usage_map"` -} - -func (m *bpfMaps) Close() error { - return _BpfClose( - m.EgressIp4UsageMap, - m.IngressIp4UsageMap, - ) -} - -// bpfPrograms contains all programs after they have been loaded into the kernel. -// -// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. -type bpfPrograms struct { - EgressFunc *ebpf.Program `ebpf:"egress__func"` - IngressFunc *ebpf.Program `ebpf:"ingress_func"` -} - -func (p *bpfPrograms) Close() error { - return _BpfClose( - p.EgressFunc, - p.IngressFunc, - ) -} - -func _BpfClose(closers ...io.Closer) error { - for _, closer := range closers { - if err := closer.Close(); err != nil { - return err - } - } - return nil -} - -// Do not access this directly. -// -//go:embed bpf_bpfel.o -var _BpfBytes []byte diff --git a/usage/bpf_bpfel.o b/usage/bpf_bpfel.o deleted file mode 100644 index 3526a68..0000000 Binary files a/usage/bpf_bpfel.o and /dev/null differ diff --git a/usage/gen.go b/usage/gen.go deleted file mode 100644 index 60e4dd6..0000000 --- a/usage/gen.go +++ /dev/null @@ -1,3 +0,0 @@ -package usage - -//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf.c diff --git a/usage/main.go b/usage/main.go deleted file mode 100644 index 526e092..0000000 --- a/usage/main.go +++ /dev/null @@ -1,240 +0,0 @@ -package usage - -import ( - "context" - "errors" - "log" - "net" - "sync" - "time" - - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - "github.com/jackc/pgx/v5/pgtype" - "sinanmohd.com/redq/db" -) - -type UsageStat struct { - lastSeen time.Time - lastDbPush time.Time - BandwidthIngress uint64 - BandwidthEgress uint64 - Ingress uint64 - Egress uint64 -} - -type usageMap map[uint64]UsageStat -type Usage struct { - Data usageMap - Mutex sync.RWMutex - objs bpfObjects - egressLink, ingressLink link.Link -} - -func Close(u *Usage, queries *db.Queries, ctxDb context.Context) { - err := u.UpdateDb(queries, ctxDb, false) - if err != nil { - log.Printf("updating Database: %s", err) - } - - u.objs.Close() - u.ingressLink.Close() - u.egressLink.Close() -} - -func New(iface *net.Interface) (*Usage, error) { - var err error - var u Usage - - if err := loadBpfObjects(&u.objs, nil); err != nil { - log.Printf("loading objects: %s", err) - return nil, err - } - defer func() { - if err != nil { - u.objs.Close() - } - }() - - u.ingressLink, err = link.AttachTCX(link.TCXOptions{ - Interface: iface.Index, - Program: u.objs.IngressFunc, - Attach: ebpf.AttachTCXIngress, - }) - if err != nil { - log.Printf("could not attach TCx program: %s", err) - return nil, err - } - defer func() { - if err != nil { - u.ingressLink.Close() - } - }() - - u.egressLink, err = link.AttachTCX(link.TCXOptions{ - Interface: iface.Index, - Program: u.objs.EgressFunc, - Attach: ebpf.AttachTCXEgress, - }) - if err != nil { - log.Printf("could not attach TCx program: %s", err) - return nil, err - } - - u.Data = make(usageMap) - return &u, nil -} - -func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { - bpfTicker := time.NewTicker(time.Second) - defer bpfTicker.Stop() - dbTicker := time.NewTicker(time.Minute) - defer dbTicker.Stop() - - for { - select { - case <-bpfTicker.C: - err := u.update(u.objs.IngressIp4UsageMap, u.objs.EgressIp4UsageMap) - if err != nil { - log.Printf("updating usageMap: %s", err) - } - case <-dbTicker.C: - err := u.UpdateDb(queries, ctxDb, true) - if err != nil { - log.Printf("updating Database: %s", err) - } - } - } -} - -func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error { - timeStart := time.Now() - - u.Mutex.Lock() - for key, value := range u.Data { - if ifExpired && !value.expired(&timeStart) { - continue - } - - err := queries.EnterUsage(ctxDb, db.EnterUsageParams{ - Hardwareaddr: int64(key), - Starttime: pgtype.Timestamp{ - Time: value.lastDbPush, - Valid: true, - }, - Stoptime: pgtype.Timestamp{ - Time: value.lastSeen, - Valid: true, - }, - Egress: int64(value.Egress), - Ingress: int64(value.Ingress), - }) - if err != nil { - return err - } - - delete(u.Data, key) - } - u.Mutex.Unlock() - - return nil -} - -func (us *UsageStat) expired(timeStart *time.Time) bool { - timeDiff := timeStart.Sub(us.lastSeen) - if timeDiff > time.Minute { - return true - } - - timeDiff = timeStart.Sub(us.lastDbPush) - if timeDiff > time.Hour { - return true - } - - return false -} - -func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { - timeStart := time.Now() - batchKeys := make([]uint64, 4096) - batchValues := make([]uint64, 4096) - var key uint64 - - u.Mutex.Lock() - for key, value := range u.Data { - value.BandwidthIngress = 0 - value.BandwidthEgress = 0 - u.Data[key] = value - } - u.Mutex.Unlock() - - cursor := ebpf.MapBatchCursor{} - for { - _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) - u.Mutex.Lock() - for i := range batchKeys { - if batchValues[i] == 0 { - continue - } - - key = batchKeys[i] - usage, ok := u.Data[key] - if ok { - usage.BandwidthIngress = batchValues[i] - usage.Ingress += batchValues[i] - usage.lastSeen = timeStart - u.Data[key] = usage - } else { - u.Data[key] = UsageStat{ - BandwidthIngress: batchValues[i], - Ingress: batchValues[i], - lastDbPush: timeStart, - lastSeen: timeStart, - } - } - } - u.Mutex.Unlock() - - if errors.Is(err, ebpf.ErrKeyNotExist) { - break - } else if err != nil { - return err - } - } - - cursor = ebpf.MapBatchCursor{} - for { - _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) - u.Mutex.Lock() - for i := range batchKeys { - if batchValues[i] == 0 { - continue - } - - key = batchKeys[i] - usage, ok := u.Data[key] - if ok { - usage.BandwidthEgress = batchValues[i] - usage.Egress += batchValues[i] - usage.lastSeen = timeStart - u.Data[key] = usage - } else { - u.Data[key] = UsageStat{ - BandwidthEgress: batchValues[i], - Egress: batchValues[i], - lastDbPush: timeStart, - lastSeen: timeStart, - } - } - } - u.Mutex.Unlock() - - if errors.Is(err, ebpf.ErrKeyNotExist) { - break - } else if err != nil { - return err - } - } - - return nil -} -- cgit v1.2.3