From a1692f732078adad272256639f7a3ff14c9e643a Mon Sep 17 00:00:00 2001 From: sinanmohd Date: Sun, 7 Jul 2024 07:37:57 +0530 Subject: api/bandwidth: init --- api/bandwidth.go | 41 +++++++++++++++++++++++++++++++++++++++++ api/main.go | 37 ++++++++++++++++++++++++++++++++++--- go.mod | 11 +++++++---- go.sum | 44 +++++++++++++++++++++++++++++--------------- usage/main.go | 34 ++++++++++++++++++++-------------- 5 files changed, 131 insertions(+), 36 deletions(-) create mode 100644 api/bandwidth.go diff --git a/api/bandwidth.go b/api/bandwidth.go new file mode 100644 index 0000000..5f23221 --- /dev/null +++ b/api/bandwidth.go @@ -0,0 +1,41 @@ +package api + +import ( + "encoding/json" + "fmt" + "log" + "net" + + "github.com/cilium/cilium/pkg/mac" + "github.com/dustin/go-humanize" + "sinanmohd.com/redq/usage" +) + +type BandwidthStat struct { + Ingress string `json:"ingress"` + Egress string `json:"egress"` +} + +type BandwidthResp map[string]BandwidthStat + +func handleBandwidth(conn net.Conn, u *usage.Usage) { + resp := make(BandwidthResp) + + u.Mutex.RLock() + for key, value := range u.Data { + m := mac.Uint64MAC(key) + resp[m.String()] = BandwidthStat{ + Ingress: fmt.Sprintf("%s/s", humanize.Bytes(value.BandwidthIngress)), + Egress: fmt.Sprintf("%s/s", humanize.Bytes(value.BandwidthEgress)), + } + } + u.Mutex.RUnlock() + + buf, err := json.Marshal(resp) + if err != nil { + log.Printf("marshaling json: %s", err) + return + } + + conn.Write(buf) +} diff --git a/api/main.go b/api/main.go index 69cf981..7081d3c 100644 --- a/api/main.go +++ b/api/main.go @@ -1,13 +1,23 @@ package api import ( + "encoding/json" "log" "net" "sinanmohd.com/redq/usage" ) -const sockPath = "/tmp/redq_ebpf.sock" +const ( + sockPath = "/tmp/redq_ebpf.sock" + bufSize = 4096 +) + +type ApiReq struct { + Type string `json:"type"` + Action string `json:"action"` + Arg string `json:"arg"` +} type Api struct { sock net.Listener @@ -38,10 +48,31 @@ func (a *Api) Run(u *usage.Usage) { continue } - go handleConn(conn) + go handleConn(conn, u) } } -func handleConn(conn net.Conn) { +func handleConn(conn net.Conn, u *usage.Usage) { defer conn.Close() + var req ApiReq + buf := make([]byte, bufSize) + + count, err := conn.Read(buf) + if err != nil { + log.Printf("reading to buffer: %s", err) + return + } + + err = json.Unmarshal(buf[:count], &req) + if err != nil { + log.Printf("unmarshaling json: %s", err) + return + } + + switch req.Type { + case "bandwidth": + handleBandwidth(conn, u) + default: + log.Printf("invalid request type: %s", req.Type) + } } diff --git a/go.mod b/go.mod index 96b5be8..7c2927e 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,19 @@ module sinanmohd.com/redq go 1.22.0 require ( + github.com/cilium/cilium v1.15.6 github.com/cilium/ebpf v0.15.0 + github.com/dustin/go-humanize v1.0.1 github.com/jackc/pgx/v5 v5.6.0 ) require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/stretchr/testify v1.8.4 // indirect - golang.org/x/crypto v0.19.0 // indirect - golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 // indirect - golang.org/x/sys v0.17.0 // indirect + github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 // indirect + github.com/vishvananda/netns v0.0.4 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index fae3e91..ed8fe35 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,18 @@ +github.com/cilium/checkmate v1.0.3 h1:CQC5eOmlAZeEjPrVZY3ZwEBH64lHlx9mXYdUehEwI5w= +github.com/cilium/checkmate v1.0.3/go.mod h1:KiBTasf39/F2hf2yAmHw21YFl3hcEyP4Yk6filxc12A= +github.com/cilium/cilium v1.15.6 h1:YT6UYuvdua6N1KQ6mRprymCct6Ee7uCE1hckbAR2bRM= +github.com/cilium/cilium v1.15.6/go.mod h1:UEP0tpPVhdrLC7rCHZwZ8hTpd6d01dF/1GvFPo8UhXE= github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk= github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= @@ -19,23 +25,31 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI= -golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 h1:tcHUxOT8j/R+0S+A1j8D2InqguXFNxAiij+8QFOlX7Y= +github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21/go.mod h1:whJevzBpTrid75eZy99s3DqCmy05NfibNaF2Ol5Ox5A= +github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8= +golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/usage/main.go b/usage/main.go index 72cf4fd..526e092 100644 --- a/usage/main.go +++ b/usage/main.go @@ -14,7 +14,7 @@ import ( "sinanmohd.com/redq/db" ) -type usageStat struct { +type UsageStat struct { lastSeen time.Time lastDbPush time.Time BandwidthIngress uint64 @@ -23,10 +23,10 @@ type usageStat struct { Egress uint64 } -type usageMap map[uint64]usageStat +type usageMap map[uint64]UsageStat type Usage struct { Data usageMap - Mutex sync.Mutex + Mutex sync.RWMutex objs bpfObjects egressLink, ingressLink link.Link } @@ -140,7 +140,7 @@ func (u *Usage) UpdateDb(queries *db.Queries, ctxDb context.Context, ifExpired b return nil } -func (us *usageStat) expired(timeStart *time.Time) bool { +func (us *UsageStat) expired(timeStart *time.Time) bool { timeDiff := timeStart.Sub(us.lastSeen) if timeDiff > time.Minute { return true @@ -160,33 +160,40 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { batchValues := make([]uint64, 4096) var key uint64 + u.Mutex.Lock() + for key, value := range u.Data { + value.BandwidthIngress = 0 + value.BandwidthEgress = 0 + u.Data[key] = value + } + u.Mutex.Unlock() + cursor := ebpf.MapBatchCursor{} for { _, err := ingress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() for i := range batchKeys { - /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */ if batchValues[i] == 0 { continue } key = batchKeys[i] - u.Mutex.Lock() usage, ok := u.Data[key] if ok { - usage.BandwidthIngress = batchValues[i] - usage.Ingress + usage.BandwidthIngress = batchValues[i] usage.Ingress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { - u.Data[key] = usageStat{ + u.Data[key] = UsageStat{ BandwidthIngress: batchValues[i], Ingress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } - u.Mutex.Unlock() } + u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break @@ -198,30 +205,29 @@ func (u *Usage) update(ingress *ebpf.Map, egress *ebpf.Map) error { cursor = ebpf.MapBatchCursor{} for { _, err := egress.BatchLookupAndDelete(&cursor, batchKeys, batchValues, nil) + u.Mutex.Lock() for i := range batchKeys { - /* TODO: maybe BatchLookupAndDelete is not the best idea with mostly empty map */ if batchValues[i] == 0 { continue } key = batchKeys[i] - u.Mutex.Lock() usage, ok := u.Data[key] if ok { - usage.BandwidthEgress = batchValues[i] - usage.Egress + usage.BandwidthEgress = batchValues[i] usage.Egress += batchValues[i] usage.lastSeen = timeStart u.Data[key] = usage } else { - u.Data[key] = usageStat{ + u.Data[key] = UsageStat{ BandwidthEgress: batchValues[i], Egress: batchValues[i], lastDbPush: timeStart, lastSeen: timeStart, } } - u.Mutex.Unlock() } + u.Mutex.Unlock() if errors.Is(err, ebpf.ErrKeyNotExist) { break -- cgit v1.2.3