summaryrefslogblamecommitdiff
path: root/bpf/main.go
blob: 4f1360e61de53b165af36a37e1885bf2db184d67 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12


           
                 







                                     

                                        


                       
                            
                            

                         


                                  
                                                                            


































                                                                   




                                                                                               
                                  



                                                                        



                 













                                                               
                                                                                     




















                                                                     
                                  



                                     

                  

 
                                                                            













                                                                                            
                                                     
                                

                                                                   
                                                              
                                                              



                                 


                                                        
                                    











                                                                                           
                                                     
                                

                                                                   
                                                              
                                                              



                                 


                                                        
                                  

                 

                  
 
package bpf

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net"
	"time"

	"github.com/cilium/ebpf"
	"github.com/cilium/ebpf/link"
	"github.com/jackc/pgx/v5/pgtype"
	"sinanmohd.com/redq/db"
)

type UsageStat struct {
	lastSeen   time.Time
	lastDbPush time.Time
	ingress    uint64
	egress     uint64
}
type UsageMap map[uint32]UsageStat

func Run(iface *net.Interface, queries *db.Queries, ctxDb context.Context) {
	usageMap := make(UsageMap)

	objs := bpfObjects{}
	if err := loadBpfObjects(&objs, nil); err != nil {
		log.Fatalf("loading objects: %s", err)
	}
	defer objs.Close()

	ingressLink, err := link.AttachTCX(link.TCXOptions{
		Interface: iface.Index,
		Program:   objs.IngressFunc,
		Attach:    ebpf.AttachTCXIngress,
	})
	if err != nil {
		log.Fatalf("could not attach TCx program: %s", err)
	}
	defer ingressLink.Close()

	egressLink, err := link.AttachTCX(link.TCXOptions{
		Interface: iface.Index,
		Program:   objs.EgressFunc,
		Attach:    ebpf.AttachTCXEgress,
	})
	if err != nil {
		log.Fatalf("could not attach TCx program: %s", err)
	}
	defer egressLink.Close()

	bpfTicker := time.NewTicker(1 * time.Second)
	defer bpfTicker.Stop()
	dbTicker := time.NewTicker(60 * time.Second)
	defer dbTicker.Stop()
	for {
		select {
		case <-bpfTicker.C:
			err := usageMap.update(objs.IngressIp4UsageMap, objs.EgressIp4UsageMap)
			if err != nil {
				log.Printf("updating usageMap: %s", err)
			}

		case <-dbTicker.C:
			err := usageMap.updateDb(queries, ctxDb)
			if err != nil {
				log.Printf("updating Database: %s", err)
			}
		}
	}
}

func (usageStat UsageStat) expired(timeStart *time.Time) bool {
	timeDiff := timeStart.Sub(usageStat.lastSeen)
	if timeDiff > time.Minute {
		return true
	}

	timeDiff = timeStart.Sub(usageStat.lastDbPush)
	if timeDiff > time.Hour {
		return true
	}

	return false
}

func (usageMap UsageMap) updateDb(queries *db.Queries, ctxDb context.Context) error {
	timeStart := time.Now()

	for key, value := range usageMap {
		if !value.expired(&timeStart) {
			continue
		}

		err := queries.EnterUsage(ctxDb, db.EnterUsageParams{
			Hardwareaddr: int32(key),
			Starttime: pgtype.Timestamp{
				Time:  value.lastDbPush,
				Valid: true,
			},
			Stoptime: pgtype.Timestamp{
				Time:  value.lastSeen,
				Valid: true,
			},
			Egress:  int32(value.egress),
			Ingress: int32(value.ingress),
		})
		if err != nil {
			return err
		}

		delete(usageMap, key)
	}

	return nil
}

func (usageMap UsageMap) update(ingress *ebpf.Map, egress *ebpf.Map) error {
	timeStart := time.Now()
	batchKeys := make([]uint32, 4096)
	batchValues := make([]uint64, 4096)
	var key uint32

	cursor := ebpf.MapBatchCursor{}
	for {
		_, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
		for i := range batchKeys {
			key = batchKeys[i]
			usage, ok := usageMap[key]
			if ok {
				usage.ingress += batchValues[i]
				usage.lastSeen = timeStart
				usageMap[key] = usage
			} else {
				usageMap[key] = UsageStat{
					ingress:    batchValues[i],
					lastDbPush: timeStart,
					lastSeen:   timeStart,
				}
			}
		}

		if errors.Is(err, ebpf.ErrKeyNotExist) {
			break
		} else if err != nil {
			return  err;
		}
	}

	cursor = ebpf.MapBatchCursor{}
	for {
		_, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil)
		for i := range batchKeys {
			key = batchKeys[i]
			usage, ok := usageMap[key]
			if ok {
				usage.egress += batchValues[i]
				usage.lastSeen = timeStart
				usageMap[key] = usage
			} else {
				usageMap[key] = UsageStat{
					egress:     batchValues[i],
					lastDbPush: timeStart,
					lastSeen:   timeStart,
				}
			}
		}

		if errors.Is(err, ebpf.ErrKeyNotExist) {
			break
		} else if err != nil {
			return err
		}
	}

	return nil
}