Développement d'un mirrormaker Kafka en Java

L’objectif est de consommer un topic Kafka, d’analyser les messages en format JSON et de publier dans un second broker Kafka le hash de la clé host.

{
   "host":"x.x.x.x",
   "message":"<189>date=2019-11-05 time=18:29:09 devname='FW-NAME' devid='FGT5HD0000000260' logid='0000000013' type='traffic' subtype='forward' level='notice' vd='root' eventtime=1572974949 srcip=10.0.0.1 srcport=55434 srcintf='vlan_5' srcintfrole='lan' dstip=8.8.4.4 dstport=53 dstintf='port2' dstintfrole='wan' sessionid=1983606453 proto=6 action='deny' policyid=0 policytype='policy' service='DNS'dstcountry='United States' srccountry='Reserved' trandisp='noop' duration=0 sentbyte=0 rcvdbyte=0 sentpkt=0 appcat='unscanned' crscore=30 craction=131072 crlevel='high'",
   "@timestamp":"2019-11-05T16:32:53.861Z",
   "@version":"1",
   "type":"global"
}
  • Installation de gradle

    $ sudo apt install gradle
    $ gradle -v
    ------------------------------------------------------------
    Gradle 4.4.1
    ------------------------------------------------------------
    
    Build time:   2012-12-21 00:00:00 UTC
    Revision:     none
    
    Groovy:       2.4.16
    Ant:          Apache Ant(TM) version 1.10.5 compiled on March 28 2019
    JVM:          11.0.4 (Ubuntu 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
    OS:           Linux 4.4.0-17763-Microsoft amd64
  • Création du squelette

    $ gradle init --type java-library
    Starting a Gradle Daemon (subsequent builds will be faster)
    
    BUILD SUCCESSFUL in 10s
    2 actionable tasks: 2 executed
  • Arborescence

    $ tree
    .
    ├── build.gradle
    ├── gradle
    │   └── wrapper
    │       ├── gradle-wrapper.jar
    │       └── gradle-wrapper.properties
    ├── gradlew
    ├── gradlew.bat
    ├── settings.gradle
    └── src
    ├── main
    │   └── java
    │       └── Library.java
    └── test
        └── java
            └── LibraryTest.java
    
    7 directories, 8 files
  • build.gradle

    apply plugin: 'java'
    apply plugin: 'application'
    
    sourceCompatibility= '1.8'
    
    mainClassName= 'bridge.ProcessingApp'
    
    repositories {
    mavenCentral()
    }
    
    version = '0.1.0'
    jar.baseName = 'project1'
    
    dependencies {
    compile 'org.apache.kafka:kafka-clients:0.11.0.0'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.6.3'
    compile group: 'com.google.guava', name: 'guava', version: '23.0'
    }
    
    jar {
    manifest {
        attributes 'Main-Class' : mainClassName
    }
    from {
        configurations.compile.collect{
            it.isDirectory() ? it : zipTree(it)
        }
    }
    {
        exclude "META-INF/*.SF"
        exclude "META-INF/*.DSA"
        exclude "META-INF/*.RSA"
    }
    }
  • Interface du consommateur

    package bridge;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    
    public interface Consumer {
    public static Properties createConfig(String servers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers",servers);
        props.put("group.id",groupId);
        props.put("enable.auto.commit",true);
        props.put("auto.commit.interval.ms","1000");
        props.put("session.timeout.ms","30000");
        props.put("key.deserializer"," org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"," org.apache.kafka.common.serialization.StringDeserializer");
        return props;               
    }
    
    public ConsumerRecords<String,String> consume();
    }
  • Interface du producteur

    package bridge;
    import java.util.Properties;
    
    public interface Producer {
    public static Properties createConfig(String servers) {
        Properties props = new Properties();
        props.put("bootstrap.servers",servers);
        props.put("acks","all");
        props.put("retries",0);
        props.put("batch.size","1000");
        props.put("linger.ms","1");
        props.put("key.serializer"," org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"," org.apache.kafka.common.serialization.StringSerializer");
        return props;
                   
    }
    
    public void produce(String message);
    }
  • Reader (lecture du topic Kafka source) implémente consumer

    package bridge;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class Reader implements Consumer {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    
    public Reader(String servers,String groupId,String topic){
        this.consumer = new KafkaConsumer<String,String>(Consumer.createConfig(servers,groupId));
        this.topic=topic;
    }
    
    @Override
    public ConsumerRecords<String,String> consume() {
        this.consumer.subscribe(java.util.Arrays.asList(this.topic));
        ConsumerRecords<String,String> records= consumer.poll(100);
        return records;
    }
    }
  • Filter (interprétation du message et publication éventuelle sur le broker de destination)

    package bridge;
    import java.io.IOException;
    import java.util.Properties;
    import org.apache.kafka.clients.producer.*;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.common.base.Charsets;
    import com.google.common.hash.HashCode;
    import com.google.common.hash.Hasher;
    import com.google.common.hash.Hashing;
    
    
    public class Filter implements Producer {
    
    private final KafkaProducer <String,String> producer;
    protected static final ObjectMapper MAPPER = new ObjectMapper();
    private final String topic;
    
    public Filter(String servers,String topic){
        this.producer = new KafkaProducer<String,String>(Producer.createConfig(servers));
        this.topic=topic;
    }
    
    private void parseMessage(String rawRessage){
            
        try {
            JsonNode root = MAPPER.readTree(rawRessage);
            JsonNode host = root.get("host");            
            String sha256hex = Hashing.sha256().hashString(host.toString(),Charsets.UTF_8).toString();
            System.out.println("host : "+sha256hex);
            ProducerRecord<String,String> pr=new ProducerRecord<String,String>(topic,sha256hex);
            producer.send(pr);            
        }
        catch (IOException e){
            System.out.println("Exception parsing message "+rawRessage);
        }
        catch (Exception e){
            System.out.println("Exception parsing message "+rawRessage);
        }
    }
    
    @Override
    public void produce(String message){
        parseMessage(message);
    }
    }
  • Classe principale

    package bridge;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class ProcessingApp {
    	public static void main(String[] args) {
    		String sourceServers=args[0];
    		String groupId=args[1];
    		String sourceTopic=args[2];
    		String targetServers=args[3];
    		String targetTopic=args[4];		
    		Reader reader=new Reader(sourceServers,groupId,sourceTopic);		
    		Filter filter = new Filter(targetServers,targetTopic);
    	
    		while (true){
    			ConsumerRecords<String,String> consumerRecords=reader.consume();
    			for (ConsumerRecord<String,String> record : consumerRecords){
    				filter.produce(record.value());
    			}
    		}
    	}
    }
  • Compilation et création du jar

    gradle jar
  • Lancement

    java -jar ./build/libs/project1-0.1.0.jar 192.168.142.224:9092 test syslog do

Related