package usage import ( "context" "errors" "log" "net" "sync" "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/jackc/pgx/v5/pgtype" "sinanmohd.com/redq/db" ) type UsageStat struct { lastSeen time.Time lastDbPush time.Time BandwidthIngress uint64 BandwidthEgress uint64 Ingress uint64 Egress uint64 } type usageMap map[uint64]UsageStat type Usage struct { Data usageMap Mutex sync.RWMutex objs bpfObjects egressLink, ingressLink link.Link } func Close(u *Usage, queries *db.Queries, ctxDb context.Context) { err := u.UpdateDb(queries, ctxDb, false) if err != nil { log.Printf("updating Database: %s", err) } u.objs.Close() u.ingressLink.Close() u.egressLink.Close() } func New(iface *net.Interface) (*Usage, error) { var err error var u Usage if err := loadBpfObjects(&u.objs, nil); err != nil { log.Printf("loading objects: %s", err) return nil, err } defer func() { if err != nil { u.objs.Close() } }() u.ingressLink, err = link.AttachTCX(link.TCXOptions{ Interface: iface.Index, Program: u.objs.IngressFunc, Attach: ebpf.AttachTCXIngress, }) if err != nil { log.Printf("could not attach TCx program: %s", err) return nil, err } defer func() { if err != nil { u.ingressLink.Close() } }() u.egressLink, err = link.AttachTCX(link.TCXOptions{ Interface: iface.Index, Program: u.objs.EgressFunc, Attach: ebpf.AttachTCXEgress, }) if err != nil { log.Printf("could not attach TCx program: %s", err) return nil, err } u.Data = make(usageMap) return &u, nil } func (u *Usage) Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) { bpfTicker := time.NewTicker(time.Second) defer bpfTicker.Stop() dbTicker := time.NewTicker(time.Minute) defer dbTicker.Stop() for { select { case <-bpfTicker.C: err := u.update(u.objs.IngressIp4UsageMap, u.objs.EgressIp4UsageMap) if err != nil { log.Printf("updating usageMap: %s", err) } case <-dbTicker.C: err := u.UpdateDb(queries, ctxDb, true) if err != nil { log.Printf("updating Database: %s", err) } } } } func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired bool) error { timeStart := time.Now() u.Mutex.Lock() for key, value := range u.Data { if ifExpired && !value.expired(&timeStart) { continue } err := queries.EnterUsage(ctxDb, db.EnterUsageParams{ Hardwareaddr: int64(key), Starttime: pgtype.Timestamp{ Time: value.lastDbPush, Valid: true, }, Stoptime: pgtype.Timestamp{ Time: value.lastSeen, Valid: true, }, Egress: int64(value.Egress), Ingress: int64(value.Ingress), }) if err != nil { return err } delete(u.Data, key) } u.Mutex.Unlock() return nil } func (us *UsageStat) expired(timeStart *time.Time) bool { timeDiff := timeStart.Sub(us.lastSeen) if timeDiff > time.Minute { return true } timeDiff = timeStart.Sub(us.lastDbPush) if timeDiff > time.Hour { return true } return false } func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { timeStart := time.Now() batchKeys := make([]uint64, 4096) batchValues := make([]uint64, 4096) var key uint64 u.Mutex.Lock() for key, value := range u.Data { value.BandwidthIngress = 0 value.BandwidthEgress = 0 u.Data[key] = value } u.Mutex.Unlock() cursor := ebpf.MapBatchCursor{} for { _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) u.Mutex.Lock() for i := range batchKeys { if batchValues[i] == 0 { continue } key = batchKeys[i] usage, ok := u.Data[key] if ok { usage.BandwidthIngress = batchValues[i] usage.Ingress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { u.Data[key] = UsageStat{ BandwidthIngress: batchValues[i], Ingress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } } u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break } else if err != nil { return err } } cursor = ebpf.MapBatchCursor{} for { _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) u.Mutex.Lock() for i := range batchKeys { if batchValues[i] == 0 { continue } key = batchKeys[i] usage, ok := u.Data[key] if ok { usage.BandwidthEgress = batchValues[i] usage.Egress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { u.Data[key] = UsageStat{ BandwidthEgress: batchValues[i], Egress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } } u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break } else if err != nil { return err } } return nil }