java操作kafka

今天使用了java客户端生产和消费消息。简单例子:

生产者:

package com.hnshop.goods.demo;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducer {

    private final org.apache.kafka.clients.producer.KafkaProducer<String,String> producer;

    public static final String TOPIC = "test";

    private KafkaProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        producer = new org.apache.kafka.clients.producer.KafkaProducer<String,String>(props);
    }

    void produce(boolean sync) {
        int messageno = 1;
        final int count = 100;
        while (messageno < count){
            String key = "send" + messageno;
            String data = "mesage from producer " + messageno;
            System.out.println(data);
            if(sync){
                try{
                    producer.send(new ProducerRecord<String,String>(TOPIC,data)).get();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }else {
                producer.send(new ProducerRecord<String,String>(TOPIC,data));
            }
            producer.flush();
            messageno ++;
        }
    }

    public static void main(String []args){
        boolean sync = true;
        new KafkaProducer().produce(false);
    }
}

消费者:

package com.hnshop.goods.demo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class.getName());

    private final org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer;

    private final static String TOPIC = "test";

    private final static String GROUP_ID = "fortest";

    private KafkaConsumer(){
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String,String>(props);
    }

    private void consumer() {
        consumer.subscribe(Arrays.asList(TOPIC));
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String,String> record:records){
                System.out.printf("offset=%d,value=%s",record.offset(),record.value());
                System.out.println(" ");
            }
            System.out.println("长度:" + records.count());
            break;
        }
    }

    public static void main(String []args){
        new KafkaConsumer().consumer();
    }
}

这里一定要指定一个groupid,如果不指定的话,就会抛出异常的。

此外,客户端需要直接联系kafka,不能连接zookeeper。

还有,如果是异步方式,消费者非常有可能重复消息。比如我消费者重起了,那么还会消费之前的消费。对于重复就应该在取出数据进行判断了。

我们也完全可以采取同步的方式。

..................................................................分割线..............................................................................................................................................

上面说的是直接使用kafka-client操作kafka.下面介绍spring-kafka的用法。更加灵活方便。

1、引入依赖包

     <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、配置生产者和消费者




@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String,String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }


    @Bean
    public Map<String,Object> producerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return props;
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate() {
        return new KafkaTemplate<String,String>(producerFactory());
    }
}


注解很重要

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    private final static String GROUP_ID = "springkafka";


    @Bean
    public ConsumerFactory<String,String> consumerFactory() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }


3、实现生产者

package com.hnshop.goods.service.impl;

import com.hnshop.goods.service.KafkaProduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;


@Service
public class KafkaProduceServiceImpl implements KafkaProduceService {


    @Autowired
    private KafkaTemplate template;

    @Override
    public void sendMsg(String topic, String msg) {
        System.out.println("开始发送消息。topic:" + topic + "消息:" + msg);
        try {
            template.send(topic,msg);
        }catch (Exception e){
            System.out.println("发送消息出错。topic:" + topic + "消息:" + msg);
            System.out.println(e.getMessage());
        }
    }
}

4、实现消费者

@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class.getName());

    @KafkaListener(topics = "test",groupId = "fortest")
    public void listen(String msg) {
       logger.info(msg);
    }
}

@KafkaListener注解后,当spirngboot 应用服务起来后,会自动运行消费者,一直监听目标kafka.

--------EOF---------
本文微信分享/扫码阅读