Skip to content

Conversation

mashhurs
Copy link
Contributor

@mashhurs mashhurs commented Oct 9, 2025

Description

Replaces #40235

It accepts source and destination {IP, port} pair and optionally protocol (eg. TCP) and seed (eg. 1,2,etc...) parameters.
At the result it computes the has values and generates network flow (smaller to greater) ID a.k.a community ID.

Link to tracking issue

Testing

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Generates Community ID for network flow data",
    "processors": [
      {
        "community_id": {}
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "source": {
          "ip": "123.124.125.126",
          "port": 12345
        },
        "destination": {
          "ip": "55.56.57.58",
          "port": 80
        },
        "network": {
          "transport": "TCP"
        }
      }
    }
  ]
}
image
  • unit tests
cd /path-to/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs && go test -v -run TestCommunityID
  • E2E test
Local run
  • config
extensions:
  zpages:
    endpoint: 127.0.0.1:55679

receivers:
  otlp:
    protocols:
      http:
        endpoint: 127.0.0.1:8080

processors:
  batch:
  transform:
    metric_statements:
      - context: resource
        statements:
          - set(attributes["community_id"], CommunityID(attributes["source.ip"], attributes["source.port"], attributes["destination.ip"], attributes["destination.port"], "TCP", 2))

exporters:
  debug:
    verbosity: detailed
  elasticsearch:
    cloudid: "cloud-id"
    user: "elastic"
    password: "pwd"

service:
  telemetry:
    logs:
      level: debug
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch, transform]
      exporters: [elasticsearch, debug]

  extensions: [zpages]
  • Payload
curl --location '127.0.0.1:8080/v1/metrics' \
--header 'Content-Type: application/json' \
--data '{
  "resourceMetrics": [
    {
      "resource": {
        "attributes": [
          {
            "key": "service.name",
            "value": {
              "stringValue": "my.service"
            }
          },
          {
            "key": "timestamp",
            "value": {
              "stringValue": "2018-12-01T16:17:18Z"
            }
          },
          {
            "key": "source.ip",
            "value": { "stringValue": "123.124.125.126" }
          },
          {
            "key": "source.port",
            "value": { "intValue": "12345" }
          },
          {
            "key": "destination.ip",
            "value": { "stringValue": "55.56.57.58" }
          },
          {
            "key": "destination.port",
            "value": { "intValue": "80" }
          }
        ]
      },
      "scopeMetrics": [
        {
          "scope": {
            "name": "my.library",
            "version": "1.0.0",
            "attributes": [
              {
                "key": "my.scope.attribute",
                "value": {
                  "stringValue": "some scope attribute"
                }
              }
            ]
          },
          "metrics": [
            {
              "name": "my.counter",
              "unit": "1",
              "description": "I am a Counter",
              "sum": {
                "aggregationTemporality": 1,
                "isMonotonic": true,
                "dataPoints": [
                  {
                    "asDouble": 5,
                    "startTimeUnixNano": "1544712660300000000",
                    "timeUnixNano": "1544712660300000000",
                    "attributes": [
                      {
                        "key": "my.counter.attr",
                        "value": {
                          "stringValue": "some value"
                        }
                      }
                    ]
                  },
                  {
                    "asDouble": 2,
                    "startTimeUnixNano": "1544712660300000000",
                    "timeUnixNano": "1544712660300000000",
                    "attributes": [
                      {
                        "key": "another.counter.attr",
                        "value": {
                          "stringValue": "another value"
                        }
                      }
                    ]
                  }
                ]
              }
            },
            {
              "name": "my.gauge",
              "unit": "1",
              "description": "I am a Gauge",
              "gauge": {
                "dataPoints": [
                  {
                    "asDouble": 10,
                    "timeUnixNano": "1544712660300000000",
                    "attributes": [
                      {
                        "key": "my.gauge.attr",
                        "value": {
                          "stringValue": "some value"
                        }
                      }
                    ]
                  }
                ]
              }
            },
            {
              "name": "my.histogram",
              "unit": "1",
              "description": "I am a Histogram",
              "histogram": {
                "aggregationTemporality": 1,
                "dataPoints": [
                  {
                    "startTimeUnixNano": "1544712660300000000",
                    "timeUnixNano": "1544712660300000000",
                    "count": "2",
                    "sum": 2,
                    "bucketCounts": [
                      "1",
                      "1"
                    ],
                    "explicitBounds": [
                      1
                    ],
                    "min": 0,
                    "max": 2,
                    "attributes": [
                      {
                        "key": "my.histogram.attr",
                        "value": {
                          "stringValue": "some value"
                        }
                      }
                    ]
                  }
                ]
              }
            }
          ]
        }
      ]
    }
  ]
}'
  • Logs
2025-10-09T11:37:49.524-0700    debug   [email protected]/parser.go:37      TransformContext after statement execution      {"resource": {"service.instance.id": "5224a627-dcc7-4f0a-afa0-efad6130016a", "service.name": "otelcontribcol", "service.version": "0.137.0-dev"}, "otelcol.component.id": "transform", "otelcol.component.kind": "processor", "otelcol.pipeline.id": "metrics", "otelcol.signal": "metrics", "statement": "set(resource.attributes[\"community_id\"], CommunityID(resource.attributes[\"source.ip\"], resource.attributes[\"source.port\"], resource.attributes[\"destination.ip\"], resource.attributes[\"destination.port\"], \"TCP\", 2))", "condition matched": true, "TransformContext": {"resource": {"attributes": {"service.name": "my.service", "timestamp": "2018-12-01T16:17:18Z", "source.ip": "123.124.125.126", "source.port": 12345, "destination.ip": "55.56.57.58", "destination.port": 80, "community_id": "1:7oTw+VtCU5XAuMK7fTNdGllNeUs="}, "dropped_attribute_count": 0}, "cache": {}}}
2025-10-09T11:37:49.524-0700    info    Metrics {"resource": {"service.instance.id": "5224a627-dcc7-4f0a-afa0-efad6130016a", "service.name": "otelcontribcol", "service.version": "0.137.0-dev"}, "otelcol.component.id": "debug", "otelcol.component.kind": "exporter", "otelcol.signal": "metrics", "resource metrics": 1, "metrics": 3, "data points": 4}
2025-10-09T11:37:49.524-0700    info    ResourceMetrics #0
Resource SchemaURL: 
Resource attributes:
     -> service.name: Str(my.service)
     -> timestamp: Str(2018-12-01T16:17:18Z)
     -> source.ip: Str(123.124.125.126)
     -> source.port: Int(12345)
     -> destination.ip: Str(55.56.57.58)
     -> destination.port: Int(80)
     -> community_id: Str(1:7oTw+VtCU5XAuMK7fTNdGllNeUs=)

Documentation

  • changelog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[pkg/ottl] Support community ID network flow
2 participants