How I interface a Kafka Broker with Splunk

Table des matières

I share my learning journey in Splunk and Kafka. My objective here is to send a “significant” amount of data to a Kafka broker let’s say 1 000 000 messages (around 100 bytes) in less than 1 minute and consume this data in Splunk.

archi

My architecture is docker based to facilitate the tests and easily duplicate the components. The orchestration is drived by docker-compose.

The producer Kafka-Data-Gen generate random message something like :

{
  "eventKey":"999983", 
  "uuid":"3cdca466-4201-486e-8ae7-0a5b05031c73", "message":"wshbzgtlhoxfvuyqkgqnstbix"
}

and send them to the kafka broker on the topic “syslog” in our example. The consumer Kafka connect will read the message available on the topic and push them to Splunk via the data input HTTP Event collector.

Kafka broker

To start quickly, I made a container with a embedded zookeeper since I will use only one broker (docker available in my github : https://github.com/dakiri/kafka-docker)

Build and start the container :

    make build
    make daemon

Splunk

Docker available on my github: soon !

Build and start the container :

    make build
    make daemon

Configuration of the HTTP Event Collector

The connector will send the message to Splunk through the Http Event Collector, so we need to activate this data input and create a new token :

  • Settings → Data Input → HTTP Event Collector → Global settings
  • Settings → Data Input → HTTP Event Collector → New Token
splunk

We have a new data input available with the token : 7180508d-6fe4–4831–9594–393f0b2954d3 please note that we have checked : “ Enable indexer acknowledgement” (this must enabled also on the definition the endpoint in the connector).

Splunk kafka connector

Docker available in my github : https://github.com/dakiri/kafka-connect-splunk

This docker is based on the source available here : https://github.com/splunk/kafka-connect-splunk I put inside the container all the stuffs needed to compile a new version. Build and start the container

    make build
    make daemon

Configuration of the broker

config/connect-distributed-quickstart.properties

    bootstrap.servers=kafka:9092
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter.schemas.enable=false
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    # Flush much faster (10s) than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    plugin.path=connectors/
    group.id=kafka-connect-splunk-hec-sink
    config.storage.topic=__kafka-connect-splunk-task-configs
    config.storage.replication.factor=1
    offset.storage.topic=__kafka-connect-splunk-offsets
    offset.storage.replication.factor=1
    offset.storage.partitions=1
    status.storage.topic=__kafka-connect-splunk-statuses
    status.storage.replication.factor=1
    status.storage.partitions=1

Since, I work on the same machine I use an init script (start.sh) that add entry to host file.

Define the broker address (or the list of brokers), the format is broker1:9092,broker2:9092,…

kafkaconnect

Launch the container

with make daemon or make run

Configuration of the Splunk endpoint

The configuration is done by rest API.

    curl 192.168.2.38:8083/connectors -X POST -H "Content-Type: application/json" -d’{
    "name": "splunk-prod",
    "config": {
    "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
    "tasks.max": "2",
    "topics": "syslog,t1",
    "splunk.hec.uri":"https://192.168.2.39:8088",
    "splunk.hec.token": "7180508d-6fe4–4831–9594–393f0b2954d3",
    "splunk.hec.ack.enabled" : "true",
    "splunk.hec.raw" : "false",
    "splunk.hec.json.event.enrichment" : "org=fin,bu=daki",
    "splunk.hec.track.data" : "true",
    "splunk.hec.ssl.validate.certs" : "false"
    }
    }’

Things to know :

  1. name: name of connector a new consumer will be created with this name
  2. connector.class: name of the Java Class use to do the job, must be set to : com.splunk.kafka.connect.SplunkSinkConnector
  3. tasks.max: number of parallel tasks
  4. topics: topics list separated by coma
  5. splunk.hec.uri: Http Event Collector URL Splunk
  6. splunk.hec.token: Token of the event collector
  7. splunk.hec.ack.enabled: Valid settings are true or false. When set to true the Splunk Kafka Connector will poll event ACKs for POST events before check-pointing the Kafka offsets. This is used to prevent data loss, as this setting implements guaranteed delivery
  8. splunk.hec.raw: Set to true in order for Splunk software to ingest data using the the /raw HEC endpoint. false will use the /event endpoint
  9. splunk.hec.json.event.enrichment: Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a comma separated list of key value pairs. The configured enrichment metadata will be indexed along with raw event data by Splunk software. Note: Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above
  10. splunk.hec.track.data: Valid settings are true or false. When set to true, data loss and data injection latency metadata will be indexed along with raw data
  11. splunk.hec.ssl.validate.certs : Valid settings are true or false. When set to true, data loss and data injection latency metadata will be indexed along with raw data

After posting the configuration we receive a confirmation.

connect_config

Kafka Data Gen

In order to generate random message I use kafka date gen (the source are available here : https://github.com/dtregonning/kafka-data-gen) as a producer, I put a fork in my github with prebuild jar :

  • Producer

    time java -jar ./build/libs/kafka-data-gen.jar -message-count 2000000 -message-size 25 -topic syslog -bootstrap.servers "192.168.2.38:9092" -acks all -kafka-retries 0 -kafka-batch-size 60000 -kafka-linger 1 -kafka-buffer-memory 33554432 -eps 0

To test we can generate some data with kafkacat

kafkacat -b 192.168.2.38 -t syslog -P
  • Consumer

    kafkacat -b 192.168.2.38:9092 -t syslog -o end

View result in Splunk

demosplunk

Related