210 lines
4.2 KiB
Go
210 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"github.com/cloudflare/goflow/v3/utils"
|
|
"github.com/cloudflare/goflow/v3/pb"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/oschwald/geoip2-golang"
|
|
_ "github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/google/uuid"
|
|
|
|
"net"
|
|
"net/http"
|
|
"database/sql"
|
|
"time"
|
|
"flag"
|
|
)
|
|
|
|
var (
|
|
Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port")
|
|
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port")
|
|
Addr = flag.String("addr", "", "NetFlow/IPFIX listening address")
|
|
Workers = 1
|
|
MetricsPath = "/metrics"
|
|
MetricsAddr = flag.String("metrics.addr", ":3000", "Metrics address")
|
|
ConnStr = flag.String("db.addr", "clickhouse://localhost:9000/netflow", "Database address")
|
|
GeoIpAsnDB = flag.String("geoip.asn", "geoip/GeoLite2-ASN.mmdb", "GeoIP ASN database path")
|
|
GeoIpCityDB = flag.String("geoip.city", "geoip/GeoLite2-City.mmdb", "GeoIP city database path")
|
|
|
|
flowProcessedSuccess = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "netflow_flow_processed_success",
|
|
})
|
|
|
|
flowProcessedError = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "netflow_flow_proccessed_error",
|
|
})
|
|
)
|
|
|
|
type Transport struct {
|
|
db *sql.DB
|
|
dbASN *geoip2.Reader
|
|
dbCity *geoip2.Reader
|
|
}
|
|
|
|
func (t *Transport) Publish(msgs []*flowprotob.FlowMessage) {
|
|
log.Infof("Received %d messages", len(msgs))
|
|
totInserted := 0
|
|
scope, err := t.db.Begin()
|
|
if err != nil {
|
|
log.Errorf("Error starting transaction: %v", err)
|
|
return
|
|
}
|
|
|
|
stmt, err := scope.Prepare(`
|
|
INSERT INTO flows (
|
|
id,
|
|
src_addr,
|
|
dst_addr,
|
|
src_port,
|
|
dst_port,
|
|
protocol,
|
|
bytes,
|
|
packets,
|
|
|
|
src_asn,
|
|
src_as_name,
|
|
src_lat,
|
|
src_lng,
|
|
src_contry,
|
|
dst_asn,
|
|
dst_as_name,
|
|
dst_lat,
|
|
dst_long,
|
|
dst_contry,
|
|
|
|
flow_time
|
|
)
|
|
`)
|
|
|
|
if err != nil {
|
|
log.Errorf("Error preparing query: %v", err)
|
|
return
|
|
}
|
|
|
|
for _, msg := range(msgs) {
|
|
srcAddr := net.IP(msg.SrcAddr)
|
|
dstAddr := net.IP(msg.DstAddr)
|
|
|
|
srcInfo, _ := t.getIpInfo(srcAddr)
|
|
dstInfo, _ := t.getIpInfo(dstAddr)
|
|
|
|
_, err = stmt.Exec(
|
|
uuid.New(),
|
|
srcAddr.String(),
|
|
dstAddr.String(),
|
|
uint16(msg.SrcPort),
|
|
uint16(msg.DstPort),
|
|
uint16(msg.Proto),
|
|
uint64(msg.Bytes),
|
|
uint32(msg.Packets),
|
|
|
|
uint32(srcInfo.ASN),
|
|
srcInfo.AS,
|
|
srcInfo.Lat,
|
|
srcInfo.Lng,
|
|
srcInfo.CountryCode,
|
|
|
|
uint32(dstInfo.ASN),
|
|
dstInfo.AS,
|
|
dstInfo.Lat,
|
|
dstInfo.Lng,
|
|
dstInfo.CountryCode,
|
|
|
|
time.Unix(int64(msg.TimeReceived), 0),
|
|
)
|
|
|
|
if err != nil {
|
|
log.Errorf("Error inserting flow in database: %v", err)
|
|
flowProcessedError.Inc()
|
|
} else {
|
|
flowProcessedSuccess.Inc()
|
|
totInserted += 1
|
|
}
|
|
}
|
|
|
|
if err := scope.Commit(); err != nil {
|
|
log.Errorf("Error commiting transaction: %v", err)
|
|
return
|
|
}
|
|
|
|
log.Infof("Inserted %d messages", len(msgs))
|
|
}
|
|
|
|
type IpInfo struct {
|
|
CountryCode string
|
|
ASN uint
|
|
AS string
|
|
Lat float64
|
|
Lng float64
|
|
}
|
|
|
|
func (t *Transport) getIpInfo(ip net.IP) (*IpInfo, error) {
|
|
res := IpInfo {}
|
|
city, err := t.dbCity.City(ip)
|
|
|
|
if err != nil {
|
|
log.Infof("Could not found city for %s: %v", ip, err)
|
|
} else {
|
|
res.CountryCode = city.Country.IsoCode
|
|
res.Lat = city.Location.Latitude
|
|
res.Lng = city.Location.Longitude
|
|
}
|
|
|
|
as, err := t.dbASN.ASN(ip)
|
|
|
|
if err != nil {
|
|
log.Infof("Could not found as for %s: %v", ip, err)
|
|
} else {
|
|
res.ASN = as.AutonomousSystemNumber
|
|
res.AS = as.AutonomousSystemOrganization
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
func main () {
|
|
flag.Parse()
|
|
db, err := sql.Open("clickhouse", *ConnStr)
|
|
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
db.Ping()
|
|
|
|
dbASN, err := geoip2.Open(*GeoIpAsnDB)
|
|
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
dbCity, err := geoip2.Open(*GeoIpCityDB)
|
|
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
defer dbCity.Close()
|
|
defer dbASN.Close()
|
|
|
|
transport := Transport{
|
|
db: db,
|
|
dbASN: dbASN,
|
|
dbCity: dbCity,
|
|
}
|
|
|
|
state := &utils.StateNetFlow{
|
|
Transport: &transport,
|
|
Logger: log.StandardLogger(),
|
|
}
|
|
|
|
http.Handle(MetricsPath, promhttp.Handler())
|
|
go http.ListenAndServe(*MetricsAddr, nil)
|
|
|
|
log.Infof("Starting listener on %s:%d", *Addr, *Port)
|
|
|
|
state.FlowRoutine(Workers, *Addr, *Port, *Reuse)
|
|
}
|