diff --git a/cmd/cnetflow/cnetflow.go b/cmd/cnetflow/cnetflow.go index 1eae6de..16d25e7 100644 --- a/cmd/cnetflow/cnetflow.go +++ b/cmd/cnetflow/cnetflow.go @@ -27,6 +27,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + EnableNats = flag.Bool("nats", false, "Enable Nats (must disable Kafka)") FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -83,6 +84,13 @@ func main() { } kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState + } else if *EnableNats { + natsState, err := transport.StartNatsTransportFromArgs(log.StandardLogger()) + if err != nil { + log.Fatal(err) + } + natsState.FixedLengthProto = *FixedLength + s.Transport = natsState } log.WithFields(log.Fields{ "Type": "NetFlow"}). diff --git a/cmd/cnflegacy/cnflegacy.go b/cmd/cnflegacy/cnflegacy.go index 30c8078..77fb59e 100644 --- a/cmd/cnflegacy/cnflegacy.go +++ b/cmd/cnflegacy/cnflegacy.go @@ -27,6 +27,8 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + EnableNats = flag.Bool("nats", false, "Enable Nats (must disable Kafka)") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -81,6 +83,14 @@ func main() { } kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState + } else if *EnableNats { + natsState, err := transport.StartNatsTransportFromArgs(log.StandardLogger()) + if err != nil { + log.Fatal(err) + } + natsState.FixedLengthProto = *FixedLength + s.Transport = natsState + } log.WithFields(log.Fields{ "Type": "NetFlowLegacy"}). diff --git a/cmd/csflow/csflow.go b/cmd/csflow/csflow.go index c326fe0..b0d5536 100644 --- a/cmd/csflow/csflow.go +++ b/cmd/csflow/csflow.go @@ -27,6 +27,8 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + EnableNats = flag.Bool("nats", false, "Enable Nats (must disable Kafka)") + FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -81,6 +83,14 @@ func main() { } kafkaState.FixedLengthProto = *FixedLength s.Transport = kafkaState + } else if *EnableNats { + natsState, err := transport.StartNatsTransportFromArgs(log.StandardLogger()) + if err != nil { + log.Fatal(err) + } + natsState.FixedLengthProto = *FixedLength + s.Transport = natsState + } log.WithFields(log.Fields{ "Type": "sFlow"}). diff --git a/cmd/goflow/goflow.go b/cmd/goflow/goflow.go index 4221d1f..0134052 100644 --- a/cmd/goflow/goflow.go +++ b/cmd/goflow/goflow.go @@ -39,6 +39,7 @@ var ( LogFmt = flag.String("logfmt", "normal", "Log formatter") EnableKafka = flag.Bool("kafka", true, "Enable Kafka") + EnableNats = flag.Bool("nats", false, "Enable Nats (must disable Kafka)") FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf") MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address") MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path") @@ -107,6 +108,16 @@ func main() { sSFlow.Transport = kafkaState sNFL.Transport = kafkaState sNF.Transport = kafkaState + } else if *EnableNats { + natsState, err := transport.StartNatsTransportFromArgs(log.StandardLogger()) + if err != nil { + log.Fatal(err) + } + natsState.FixedLengthProto = *FixedLength + sSFlow.Transport = natsState + sNFL.Transport = natsState + sNF.Transport = natsState + } wg := &sync.WaitGroup{} diff --git a/go.mod b/go.mod index cae068e..0e26337 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,13 @@ go 1.12 require ( github.com/Shopify/sarama v1.22.0 - github.com/golang/protobuf v1.3.1 + github.com/golang/protobuf v1.5.0 github.com/libp2p/go-reuseport v0.0.1 + github.com/nats-io/nats-server/v2 v2.9.1 // indirect + github.com/nats-io/nats.go v1.17.0 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.1 - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.7.1 + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/go.sum b/go.sum index c9f5615..2a8443e 100644 --- a/go.sum +++ b/go.sum @@ -16,16 +16,47 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.10 h1:Ai8UzuomSCDw90e1qNMtb15msBXsNpH6gzkkENQNcJo= +github.com/klauspost/compress v1.15.10/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.1 h1:JaP6NpCVmSu0AXgbnOkGtJovOxuf8mjNjlX3H+tSpyI= +github.com/nats-io/nats-server/v2 v2.9.1/go.mod h1:T5AEyzrnDGaseK/Y0G6e2IA5tLrHyjLOeGUALq+A8XE= +github.com/nats-io/nats.go v1.16.1-0.20220906180156-a1017eec10b0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.17.0 h1:1jp5BThsdGlN91hW0k3YEfJbfACjiOYtUiLXG0RL4IE= +github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -33,6 +64,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= @@ -46,22 +78,58 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220919173607-35f4265a4bc0/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e h1:nFYrTHrdrAOpShe27kaFHjsqYSEQ0KWqdWLu3xuZJts= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc= +golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45 h1:yuLAip3bfURHClMG9VBdzPrQvCWjWiWUTBGV+/fCbUs= +golang.org/x/time v0.0.0-20220920022843-2ce7c2934d45/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/transport/kafka.go b/transport/kafka.go index ec3dae4..84c431a 100644 --- a/transport/kafka.go +++ b/transport/kafka.go @@ -51,7 +51,7 @@ func ParseKafkaVersion(versionString string) (sarama.KafkaVersion, error) { return sarama.ParseKafkaVersion(versionString) } -func RegisterFlags() { +func registerKafkaFlags() { KafkaTLS = flag.Bool("kafka.tls", false, "Use TLS to connect to Kafka") KafkaSASL = flag.Bool("kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)") KafkaTopic = flag.String("kafka.topic", "flow-messages", "Kafka topic to produce to") diff --git a/transport/nats.go b/transport/nats.go new file mode 100644 index 0000000..1c6e514 --- /dev/null +++ b/transport/nats.go @@ -0,0 +1,95 @@ +package transport + +import ( + "flag" + + flowmessage "github.com/cloudflare/goflow/v3/pb" + "github.com/cloudflare/goflow/v3/utils" + "github.com/golang/protobuf/proto" + "github.com/nats-io/nats.go" + "time" +) + +var ( + natsSubject string + natsURL string + natsCAPath []string + natsUser string + natsPass string + natsClientCertFile string + natsClientKeyFile string + natsDialTimeout time.Duration + natsPingInterval time.Duration + natsMaxReconnect int + natsReconnectWait time.Duration +) + +func registerNatsFlags() { + flag.StringVar(&natsSubject, "nats.subject", "flow-messages", "Nats subject to publish on") + flag.StringVar(&natsURL, "nats.url", nats.DefaultURL, "Nats URL to connect") + flag.Var((*stringSliceFlag)(&natsCAPath), "nats.root-ca-path", "Root ca paths. can be specified multiple times for multiple CA paths or separated by comma") + flag.StringVar(&natsClientCertFile, "nats.client-cert", "", "Path to the nats client certificate if client auth is to be used.") + flag.StringVar(&natsClientKeyFile, "nats.client-key", "", "Path to the nats client private key if client auth is to be used.") + flag.StringVar(&natsUser, "nats.user", "", "Nats username") + flag.StringVar(&natsPass, "nats.password", "", "Nats password") + flag.DurationVar(&natsDialTimeout, "nats.dialtimeout", nats.GetDefaultOptions().Timeout, "Nats dial timeout for connecting to server(s).") + flag.DurationVar(&natsPingInterval, "nats.ping_interval", nats.GetDefaultOptions().PingInterval, "Nats ping interval") + flag.IntVar(&natsMaxReconnect, "nats.max_reconnect", nats.GetDefaultOptions().MaxReconnect, "Nats max reconnects before giving up") + flag.DurationVar(&natsReconnectWait, "nats.reconnect_wait", nats.GetDefaultOptions().ReconnectWait, "Nats reconnect wait between attempts") +} + +type natsState struct { + FixedLengthProto bool + conn *nats.Conn + subject string + l utils.Logger +} + +func (n natsState) publishMessage(flowMessage *flowmessage.FlowMessage) error { + var b []byte + if !n.FixedLengthProto { + b, _ = proto.Marshal(flowMessage) + } else { + buf := proto.NewBuffer([]byte{}) + buf.EncodeMessage(flowMessage) + b = buf.Bytes() + } + return n.conn.Publish(n.subject, b) +} + +func (n natsState) Publish(messages []*flowmessage.FlowMessage) { + for _, m := range messages { + err := n.publishMessage(m) + if err != nil { + n.l.Errorf("error on publish: %s", err) + } + } +} + +func StartNatsTransportFromArgs(logger utils.Logger) (*natsState, error) { + var options = []nats.Option{ + nats.Timeout(natsDialTimeout), + nats.PingInterval(natsPingInterval), + nats.MaxReconnects(natsMaxReconnect), + nats.ReconnectWait(natsReconnectWait), + } + if len(natsCAPath) > 0 { + options = append(options, nats.RootCAs(natsCAPath...)) + } + if len(natsClientCertFile) > 0 && len(natsClientKeyFile) > 0 { + options = append(options, nats.ClientCert(natsClientCertFile, natsClientKeyFile)) + } + if len(natsUser) > 0 || len(natsPass) > 0 { + options = append(options, nats.UserInfo(natsUser, natsPass)) + } + + conn, err := nats.Connect(natsURL, options...) + if err != nil { + return nil, err + } + return &natsState{ + conn: conn, + subject: natsSubject, + l: logger, + }, nil +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 0000000..f3f7c01 --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,34 @@ +package transport + +import ( + "strings" +) + +type registerFlagsFunc func() + +var registerFlagsFuncs = []registerFlagsFunc{ + registerKafkaFlags, + registerNatsFlags, +} + +func RegisterFlags() { + for _, f := range registerFlagsFuncs { + f() + } +} + +type stringSliceFlag []string + +// String - implements flag.Value +func (ssf stringSliceFlag) String() string { + return strings.Join(ssf, ",") +} + +// Set - implements flag.Value. If the flag is passed multiple times on the command line, +// each value appends to flags. It also looks for comma separated values, and interprets +// those as individual items as well. +func (ssf *stringSliceFlag) Set(val string) error { + vals := strings.Split(val, ",") + *ssf = append(*ssf, vals...) + return nil +}