netflow2clickhouse/main.go

211 lines
4.2 KiB
Go

package main
import (
"github.com/cloudflare/goflow/v3/utils"
"github.com/cloudflare/goflow/v3/pb"
log "github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/oschwald/geoip2-golang"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/google/uuid"
"net"
"net/http"
"database/sql"
"time"
"flag"
)
var (
Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port")
Addr = flag.String("addr", "", "NetFlow/IPFIX listening address")
Workers = 1
MetricsPath = "/metrics"
MetricsAddr = flag.String("metrics.addr", ":3000", "Metrics address")
ConnStr = flag.String("db.addr", "clickhouse://localhost:9000/netflow", "Database address")
GeoIpAsnDB = flag.String("geoip.asn", "geoip/GeoLite2-ASN.mmdb", "GeoIP ASN database path")
GeoIpCityDB = flag.String("geoip.city", "geoip/GeoLite2-City.mmdb", "GeoIP city database path")
flowProcessedSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "netflow_flow_processed_success",
})
flowProcessedError = promauto.NewCounter(prometheus.CounterOpts{
Name: "netflow_flow_proccessed_error",
})
)
type Transport struct {
db *sql.DB
dbASN *geoip2.Reader
dbCity *geoip2.Reader
}
func (t *Transport) Publish(msgs []*flowprotob.FlowMessage) {
log.Infof("Received %d messages", len(msgs))
totInserted := 0
scope, err := t.db.Begin()
if err != nil {
log.Errorf("Error starting transaction: %v", err)
return
}
stmt, err := scope.Prepare(`
INSERT INTO flows (
id,
src_addr,
dst_addr,
src_port,
dst_port,
protocol,
bytes,
packets,
src_asn,
src_as_name,
src_lat,
src_lng,
src_contry,
dst_asn,
dst_as_name,
dst_lat,
dst_long,
dst_contry,
flow_time
)
`)
if err != nil {
log.Errorf("Error preparing query: %v", err)
return
}
for _, msg := range(msgs) {
srcAddr := net.IP(msg.SrcAddr)
dstAddr := net.IP(msg.DstAddr)
srcInfo, _ := t.getIpInfo(srcAddr)
dstInfo, _ := t.getIpInfo(dstAddr)
_, err = stmt.Exec(
uuid.New(),
srcAddr.String(),
dstAddr.String(),
uint16(msg.SrcPort),
uint16(msg.DstPort),
uint16(msg.Proto),
uint64(msg.Bytes),
uint32(msg.Packets),
uint32(srcInfo.ASN),
srcInfo.AS,
srcInfo.Lat,
srcInfo.Lng,
srcInfo.CountryCode,
uint32(dstInfo.ASN),
dstInfo.AS,
dstInfo.Lat,
dstInfo.Lng,
dstInfo.CountryCode,
time.Unix(int64(msg.TimeReceived), 0),
)
if err != nil {
log.Errorf("Error inserting flow in database: %v", err)
flowProcessedError.Inc()
} else {
flowProcessedSuccess.Inc()
totInserted += 1
}
}
if err := scope.Commit(); err != nil {
log.Errorf("Error commiting transaction: %v", err)
return
}
log.Infof("Inserted %d messages", len(msgs))
}
type IpInfo struct {
CountryCode string
ASN uint
AS string
Lat float64
Lng float64
}
func (t *Transport) getIpInfo(ip net.IP) (*IpInfo, error) {
res := IpInfo {}
city, err := t.dbCity.City(ip)
if err != nil {
log.Infof("Could not found city for %s: %v", ip, err)
} else {
res.CountryCode = city.Country.IsoCode
res.Lat = city.Location.Latitude
res.Lng = city.Location.Longitude
}
as, err := t.dbASN.ASN(ip)
if err != nil {
log.Infof("Could not found as for %s: %v", ip, err)
} else {
res.ASN = as.AutonomousSystemNumber
res.AS = as.AutonomousSystemOrganization
}
return &res, nil
}
func main () {
flag.Parse()
db, err := sql.Open("clickhouse", *ConnStr)
if err != nil {
log.Fatal(err)
}
db.Ping()
dbASN, err := geoip2.Open(*GeoIpAsnDB)
if err != nil {
log.Fatal(err)
}
dbCity, err := geoip2.Open(*GeoIpCityDB)
if err != nil {
log.Fatal(err)
}
defer dbCity.Close()
defer dbASN.Close()
transport := Transport{
db: db,
dbASN: dbASN,
dbCity: dbCity,
}
state := &utils.StateNetFlow{
Transport: &transport,
Logger: log.StandardLogger(),
}
http.Handle(MetricsPath, promhttp.Handler())
go http.ListenAndServe(*MetricsAddr, nil)
log.Infof("Starting listener on %s:%d", *Addr, *Port)
state.FlowRoutine(Workers, *Addr, *Port, *Reuse)
}