summaryrefslogtreecommitdiff
path: root/bpf/usage
diff options
context:
space:
mode:
Diffstat (limited to 'bpf/usage')
-rw-r--r--bpf/usage/bpf.c102
-rw-r--r--bpf/usage/bpf_bpfel.go125
-rw-r--r--bpf/usage/bpf_bpfel.obin0 -> 6584 bytes
-rw-r--r--bpf/usage/gen.go3
-rw-r--r--bpf/usage/main.go240
5 files changed, 470 insertions, 0 deletions
diff --git a/bpf/usage/bpf.c b/bpf/usage/bpf.c
new file mode 100644
index 0000000..8b2343a
--- /dev/null
+++ b/bpf/usage/bpf.c
@@ -0,0 +1,102 @@
+#include <linux/bpf.h>
+#include <linux/if_ether.h>
+#include <linux/ip.h>
+
+#include <bpf/bpf_endian.h>
+#include <bpf/bpf_helpers.h>
+
+#define MAX_MAP_ENTRIES 4096
+
+char __license[] SEC("license") = "GPL";
+
+struct {
+ __uint(type, BPF_MAP_TYPE_LRU_HASH);
+ __uint(max_entries, MAX_MAP_ENTRIES);
+ __type(key, __u64); // source mac address
+ __type(value, __u64); // no of bytes
+} ingress_ip4_usage_map SEC(".maps");
+
+struct {
+ __uint(type, BPF_MAP_TYPE_LRU_HASH);
+ __uint(max_entries, MAX_MAP_ENTRIES);
+ __type(key, __u64); // destination mac address
+ __type(value, __u64); // no of bytes
+} egress_ip4_usage_map SEC(".maps");
+
+typedef enum {
+ UPDATE_USAGE_INGRESS,
+ UPDATE_USAGE_EGRESS,
+} update_usage_t;
+
+static __always_inline __u64 nchar6_to_u64(unsigned char bytes[6])
+{
+ union {
+ char bytes[6];
+ __u64 i;
+ } ret;
+
+ ret.i = 0;
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+ ret.bytes[0] = bytes[5];
+ ret.bytes[1] = bytes[4];
+ ret.bytes[2] = bytes[3];
+ ret.bytes[3] = bytes[2];
+ ret.bytes[4] = bytes[1];
+ ret.bytes[5] = bytes[0];
+#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
+ ret.bytes[0] = bytes[0];
+ ret.bytes[1] = bytes[1];
+ ret.bytes[2] = bytes[2];
+ ret.bytes[3] = bytes[3];
+ ret.bytes[4] = bytes[4];
+ ret.bytes[5] = bytes[5];
+#endif
+
+ return ret.i;
+}
+
+static __always_inline int update_usage(void *map, struct __sk_buff *skb,
+ update_usage_t traffic)
+{
+ __u64 mac, len, *usage;
+
+ void *data_end = (void *)(long)skb->data_end;
+ struct ethhdr *eth = (void *)(long)skb->data;
+
+ if ((void *) (eth + 1) > data_end)
+ return TCX_PASS;
+
+ if (skb->protocol != bpf_htons(ETH_P_IP) &&
+ skb->protocol != bpf_htons(ETH_P_IPV6)) {
+ return TCX_PASS;
+ }
+
+ len = skb->len - sizeof(struct ethhdr);
+ if (traffic == UPDATE_USAGE_INGRESS) {
+ mac = nchar6_to_u64(eth->h_source);
+ } else {
+ mac = nchar6_to_u64(eth->h_dest);
+ }
+
+ usage = bpf_map_lookup_elem(map, &mac);
+ if (!usage) {
+ /* no entry in the map for this IP address yet. */
+ bpf_map_update_elem(map, &mac, &len, BPF_ANY);
+ } else {
+ __sync_fetch_and_add(usage, len);
+ }
+
+ return TCX_PASS;
+}
+
+SEC("tc")
+int ingress_func(struct __sk_buff *skb)
+{
+ return update_usage(&ingress_ip4_usage_map, skb, UPDATE_USAGE_INGRESS);
+}
+
+SEC("tc")
+int egress__func(struct __sk_buff *skb)
+{
+ return update_usage(&egress_ip4_usage_map, skb, UPDATE_USAGE_EGRESS);
+}
diff --git a/bpf/usage/bpf_bpfel.go b/bpf/usage/bpf_bpfel.go
new file mode 100644
index 0000000..754ac37
--- /dev/null
+++ b/bpf/usage/bpf_bpfel.go
@@ -0,0 +1,125 @@
+// Code generated by bpf2go; DO NOT EDIT.
+//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64
+
+package usage
+
+import (
+ "bytes"
+ _ "embed"
+ "fmt"
+ "io"
+
+ "github.com/cilium/ebpf"
+)
+
+// loadBpf returns the embedded CollectionSpec for bpf.
+func loadBpf() (*ebpf.CollectionSpec, error) {
+ reader := bytes.NewReader(_BpfBytes)
+ spec, err := ebpf.LoadCollectionSpecFromReader(reader)
+ if err != nil {
+ return nil, fmt.Errorf("can't load bpf: %w", err)
+ }
+
+ return spec, err
+}
+
+// loadBpfObjects loads bpf and converts it into a struct.
+//
+// The following types are suitable as obj argument:
+//
+// *bpfObjects
+// *bpfPrograms
+// *bpfMaps
+//
+// See ebpf.CollectionSpec.LoadAndAssign documentation for details.
+func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error {
+ spec, err := loadBpf()
+ if err != nil {
+ return err
+ }
+
+ return spec.LoadAndAssign(obj, opts)
+}
+
+// bpfSpecs contains maps and programs before they are loaded into the kernel.
+//
+// It can be passed ebpf.CollectionSpec.Assign.
+type bpfSpecs struct {
+ bpfProgramSpecs
+ bpfMapSpecs
+}
+
+// bpfSpecs contains programs before they are loaded into the kernel.
+//
+// It can be passed ebpf.CollectionSpec.Assign.
+type bpfProgramSpecs struct {
+ EgressFunc *ebpf.ProgramSpec `ebpf:"egress__func"`
+ IngressFunc *ebpf.ProgramSpec `ebpf:"ingress_func"`
+}
+
+// bpfMapSpecs contains maps before they are loaded into the kernel.
+//
+// It can be passed ebpf.CollectionSpec.Assign.
+type bpfMapSpecs struct {
+ EgressIp4UsageMap *ebpf.MapSpec `ebpf:"egress_ip4_usage_map"`
+ IngressIp4UsageMap *ebpf.MapSpec `ebpf:"ingress_ip4_usage_map"`
+}
+
+// bpfObjects contains all objects after they have been loaded into the kernel.
+//
+// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
+type bpfObjects struct {
+ bpfPrograms
+ bpfMaps
+}
+
+func (o *bpfObjects) Close() error {
+ return _BpfClose(
+ &o.bpfPrograms,
+ &o.bpfMaps,
+ )
+}
+
+// bpfMaps contains all maps after they have been loaded into the kernel.
+//
+// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
+type bpfMaps struct {
+ EgressIp4UsageMap *ebpf.Map `ebpf:"egress_ip4_usage_map"`
+ IngressIp4UsageMap *ebpf.Map `ebpf:"ingress_ip4_usage_map"`
+}
+
+func (m *bpfMaps) Close() error {
+ return _BpfClose(
+ m.EgressIp4UsageMap,
+ m.IngressIp4UsageMap,
+ )
+}
+
+// bpfPrograms contains all programs after they have been loaded into the kernel.
+//
+// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign.
+type bpfPrograms struct {
+ EgressFunc *ebpf.Program `ebpf:"egress__func"`
+ IngressFunc *ebpf.Program `ebpf:"ingress_func"`
+}
+
+func (p *bpfPrograms) Close() error {
+ return _BpfClose(
+ p.EgressFunc,
+ p.IngressFunc,
+ )
+}
+
+func _BpfClose(closers ...io.Closer) error {
+ for _, closer := range closers {
+ if err := closer.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Do not access this directly.
+//
+//go:embed bpf_bpfel.o
+var _BpfBytes []byte
diff --git a/bpf/usage/bpf_bpfel.o b/bpf/usage/bpf_bpfel.o
new file mode 100644
index 0000000..3526a68
--- /dev/null
+++ b/bpf/usage/bpf_bpfel.o
Binary files differ
diff --git a/bpf/usage/gen.go b/bpf/usage/gen.go
new file mode 100644
index 0000000..60e4dd6
--- /dev/null
+++ b/bpf/usage/gen.go
@@ -0,0 +1,3 @@
+package usage
+
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel bpf bpf.c
diff --git a/bpf/usage/main.go b/bpf/usage/main.go
new file mode 100644
index 0000000..526e092
--- /dev/null
+++ b/bpf/usage/main.go
@@ -0,0 +1,240 @@
+package usage
+
+import (
+ "context"
+ "errors"
+ "log"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/cilium/ebpf"
+ "github.com/cilium/ebpf/link"
+ "github.com/jackc/pgx/v5/pgtype"
+ "sinanmohd.com/redq/db"
+)
+
+type UsageStat struct {
+ lastSeen time.Time
+ lastDbPush time.Time
+ BandwidthIngress uint64
+ BandwidthEgress uint64
+ Ingress uint64
+ Egress uint64
+}
+
+type usageMap map[uint64]UsageStat
+type Usage struct {
+ Data usageMap
+ Mutex sync.RWMutex
+ objs bpfObjects
+ egressLink, ingressLink link.Link
+}
+
+func Close(u *Usage, queries *db.Queries, ctxDb context.Context) {
+ err := u.UpdateDb(queries, ctxDb, false)
+ if err != nil {
+ log.Printf("updating Database: %s", err)
+ }
+
+ u.objs.Close()
+ u.ingressLink.Close()
+ u.egressLink.Close()
+}
+
+func New(iface *net.Interface) (*Usage, error) {
+ var err error
+ var u Usage
+
+ if err := loadBpfObjects(&u.objs, nil); err != nil {
+ log.Printf("loading objects: %s", err)
+ return nil, err
+ }
+ defer func() {
+ if err != nil {
+ u.objs.Close()
+ }
+ }()
+
+ u.ingressLink, err = link.AttachTCX(link.TCXOptions{
+ Interface: iface.Index,
+ Program: u.objs.IngressFunc,
+ Attach: ebpf.AttachTCXIngress,
+ })
+ if err != nil {
+ log.Printf("could not attach TCx program: %s", err)
+ return nil, err
+ }
+ defer func() {
+ if err != nil {
+ u.ingressLink.Close()
+ }
+ }()
+
+ u.egressLink, err = link.AttachTCX(link.TCXOptions{
+ Interface: iface.Index,
+ Program: u.objs.EgressFunc,
+ Attach: ebpf.AttachTCXEgress,
+ })
+ if err != nil {
+ log.Printf("could not attach TCx program: %s", err)
+ return nil, err
+ }
+
+ u.Data = make(usageMap)
+ return &u, nil
+}
+
+func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
+ bpfTicker := time.NewTicker(time.Second)
+ defer bpfTicker.Stop()
+ dbTicker := time.NewTicker(time.Minute)
+ defer dbTicker.Stop()
+
+ for {
+ select {
+ case <-bpfTicker.C:
+ err := u.update(u.objs.IngressIp4UsageMap, u.objs.EgressIp4UsageMap)
+ if err != nil {
+ log.Printf("updating usageMap: %s", err)
+ }
+ case <-dbTicker.C:
+ err := u.UpdateDb(queries, ctxDb, true)
+ if err != nil {
+ log.Printf("updating Database: %s", err)
+ }
+ }
+ }
+}
+
+func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error {
+ timeStart := time.Now()
+
+ u.Mutex.Lock()
+ for key, value := range u.Data {
+ if ifExpired && !value.expired(&timeStart) {
+ continue
+ }
+
+ err := queries.EnterUsage(ctxDb, db.EnterUsageParams{
+ Hardwareaddr: int64(key),
+ Starttime: pgtype.Timestamp{
+ Time: value.lastDbPush,
+ Valid: true,
+ },
+ Stoptime: pgtype.Timestamp{
+ Time: value.lastSeen,
+ Valid: true,
+ },
+ Egress: int64(value.Egress),
+ Ingress: int64(value.Ingress),
+ })
+ if err != nil {
+ return err
+ }
+
+ delete(u.Data, key)
+ }
+ u.Mutex.Unlock()
+
+ return nil
+}
+
+func (us *UsageStat) expired(timeStart *time.Time) bool {
+ timeDiff := timeStart.Sub(us.lastSeen)
+ if timeDiff > time.Minute {
+ return true
+ }
+
+ timeDiff = timeStart.Sub(us.lastDbPush)
+ if timeDiff > time.Hour {
+ return true
+ }
+
+ return false
+}
+
+func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error {
+ timeStart := time.Now()
+ batchKeys := make([]uint64, 4096)
+ batchValues := make([]uint64, 4096)
+ var key uint64
+
+ u.Mutex.Lock()
+ for key, value := range u.Data {
+ value.BandwidthIngress = 0
+ value.BandwidthEgress = 0
+ u.Data[key] = value
+ }
+ u.Mutex.Unlock()
+
+ cursor := ebpf.MapBatchCursor{}
+ for {
+ _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
+ u.Mutex.Lock()
+ for i := range batchKeys {
+ if batchValues[i] == 0 {
+ continue
+ }
+
+ key = batchKeys[i]
+ usage, ok := u.Data[key]
+ if ok {
+ usage.BandwidthIngress = batchValues[i]
+ usage.Ingress += batchValues[i]
+ usage.lastSeen = timeStart
+ u.Data[key] = usage
+ } else {
+ u.Data[key] = UsageStat{
+ BandwidthIngress: batchValues[i],
+ Ingress: batchValues[i],
+ lastDbPush: timeStart,
+ lastSeen: timeStart,
+ }
+ }
+ }
+ u.Mutex.Unlock()
+
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ break
+ } else if err != nil {
+ return err
+ }
+ }
+
+ cursor = ebpf.MapBatchCursor{}
+ for {
+ _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
+ u.Mutex.Lock()
+ for i := range batchKeys {
+ if batchValues[i] == 0 {
+ continue
+ }
+
+ key = batchKeys[i]
+ usage, ok := u.Data[key]
+ if ok {
+ usage.BandwidthEgress = batchValues[i]
+ usage.Egress += batchValues[i]
+ usage.lastSeen = timeStart
+ u.Data[key] = usage
+ } else {
+ u.Data[key] = UsageStat{
+ BandwidthEgress: batchValues[i],
+ Egress: batchValues[i],
+ lastDbPush: timeStart,
+ lastSeen: timeStart,
+ }
+ }
+ }
+ u.Mutex.Unlock()
+
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ break
+ } else if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}