How I interface a Kafka Broker with Splunk

Table des matières
I share my learning journey in Splunk and Kafka. My objective here is to send a “significant” amount of data to a Kafka broker let’s say 1 000 000 messages (around 100 bytes) in less than 1 minute and consume this data in Splunk.

My architecture is docker based to facilitate the tests and easily duplicate the components. The orchestration is drived by docker-compose.
The producer Kafka-Data-Gen generate random message something like :
{
"eventKey":"999983",
"uuid":"3cdca466-4201-486e-8ae7-0a5b05031c73", "message":"wshbzgtlhoxfvuyqkgqnstbix"
}
and send them to the kafka broker on the topic “syslog” in our example. The consumer Kafka connect will read the message available on the topic and push them to Splunk via the data input HTTP Event collector.
Kafka broker
To start quickly, I made a container with a embedded zookeeper since I will use only one broker (docker available in my github : https://github.com/dakiri/kafka-docker)
Build and start the container :
make build
make daemon
Splunk
Docker available on my github: soon !
Build and start the container :
make build
make daemon
Configuration of the HTTP Event Collector
The connector will send the message to Splunk through the Http Event Collector, so we need to activate this data input and create a new token :
- Settings → Data Input → HTTP Event Collector → Global settings
- Settings → Data Input → HTTP Event Collector → New Token

We have a new data input available with the token : 7180508d-6fe4–4831–9594–393f0b2954d3 please note that we have checked : “ Enable indexer acknowledgement” (this must enabled also on the definition the endpoint in the connector).
Splunk kafka connector
Docker available in my github : https://github.com/dakiri/kafka-connect-splunk
This docker is based on the source available here : https://github.com/splunk/kafka-connect-splunk I put inside the container all the stuffs needed to compile a new version. Build and start the container
make build
make daemon
Configuration of the broker
config/connect-distributed-quickstart.properties
bootstrap.servers=kafka:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Flush much faster (10s) than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=connectors/
group.id=kafka-connect-splunk-hec-sink
config.storage.topic=__kafka-connect-splunk-task-configs
config.storage.replication.factor=1
offset.storage.topic=__kafka-connect-splunk-offsets
offset.storage.replication.factor=1
offset.storage.partitions=1
status.storage.topic=__kafka-connect-splunk-statuses
status.storage.replication.factor=1
status.storage.partitions=1
Since, I work on the same machine I use an init script (start.sh) that add entry to host file.
Define the broker address (or the list of brokers), the format is broker1:9092,broker2:9092,…

Launch the container
with make daemon or make run
Configuration of the Splunk endpoint
The configuration is done by rest API.
curl 192.168.2.38:8083/connectors -X POST -H "Content-Type: application/json" -d’{
"name": "splunk-prod",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "2",
"topics": "syslog,t1",
"splunk.hec.uri":"https://192.168.2.39:8088",
"splunk.hec.token": "7180508d-6fe4–4831–9594–393f0b2954d3",
"splunk.hec.ack.enabled" : "true",
"splunk.hec.raw" : "false",
"splunk.hec.json.event.enrichment" : "org=fin,bu=daki",
"splunk.hec.track.data" : "true",
"splunk.hec.ssl.validate.certs" : "false"
}
}’
Things to know :
- name: name of connector a new consumer will be created with this name
- connector.class: name of the Java Class use to do the job, must be set to : com.splunk.kafka.connect.SplunkSinkConnector
- tasks.max: number of parallel tasks
- topics: topics list separated by coma
- splunk.hec.uri: Http Event Collector URL Splunk
- splunk.hec.token: Token of the event collector
- splunk.hec.ack.enabled: Valid settings are true or false. When set to true the Splunk Kafka Connector will poll event ACKs for POST events before check-pointing the Kafka offsets. This is used to prevent data loss, as this setting implements guaranteed delivery
- splunk.hec.raw: Set to true in order for Splunk software to ingest data using the the /raw HEC endpoint. false will use the /event endpoint
- splunk.hec.json.event.enrichment: Only applicable to /event HEC endpoint. This setting is used to enrich raw data with extra metadata fields. It contains a comma separated list of key value pairs. The configured enrichment metadata will be indexed along with raw event data by Splunk software. Note: Data enrichment for /event HEC endpoint is only available in Splunk Enterprise 6.5 and above
- splunk.hec.track.data: Valid settings are true or false. When set to true, data loss and data injection latency metadata will be indexed along with raw data
- splunk.hec.ssl.validate.certs : Valid settings are true or false. When set to true, data loss and data injection latency metadata will be indexed along with raw data
After posting the configuration we receive a confirmation.

Kafka Data Gen
In order to generate random message I use kafka date gen (the source are available here : https://github.com/dtregonning/kafka-data-gen) as a producer, I put a fork in my github with prebuild jar :
Producer
time java -jar ./build/libs/kafka-data-gen.jar -message-count 2000000 -message-size 25 -topic syslog -bootstrap.servers "192.168.2.38:9092" -acks all -kafka-retries 0 -kafka-batch-size 60000 -kafka-linger 1 -kafka-buffer-memory 33554432 -eps 0
To test we can generate some data with kafkacat
kafkacat -b 192.168.2.38 -t syslog -P
Consumer
kafkacat -b 192.168.2.38:9092 -t syslog -o end
View result in Splunk
