java操作kafka
今天使用了java客户端生产和消费消息。简单例子:
生产者:
package com.hnshop.goods.demo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducer {
private final org.apache.kafka.clients.producer.KafkaProducer<String,String> producer;
public static final String TOPIC = "test";
private KafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
producer = new org.apache.kafka.clients.producer.KafkaProducer<String,String>(props);
}
void produce(boolean sync) {
int messageno = 1;
final int count = 100;
while (messageno < count){
String key = "send" + messageno;
String data = "mesage from producer " + messageno;
System.out.println(data);
if(sync){
try{
producer.send(new ProducerRecord<String,String>(TOPIC,data)).get();
}catch (Exception e){
e.printStackTrace();
}
}else {
producer.send(new ProducerRecord<String,String>(TOPIC,data));
}
producer.flush();
messageno ++;
}
}
public static void main(String []args){
boolean sync = true;
new KafkaProducer().produce(false);
}
}
消费者:
package com.hnshop.goods.demo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumer {
private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class.getName());
private final org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer;
private final static String TOPIC = "test";
private final static String GROUP_ID = "fortest";
private KafkaConsumer(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String,String>(props);
}
private void consumer() {
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100);
for (ConsumerRecord<String,String> record:records){
System.out.printf("offset=%d,value=%s",record.offset(),record.value());
System.out.println(" ");
}
System.out.println("长度:" + records.count());
break;
}
}
public static void main(String []args){
new KafkaConsumer().consumer();
}
}
这里一定要指定一个groupid,如果不指定的话,就会抛出异常的。
此外,客户端需要直接联系kafka,不能连接zookeeper。
还有,如果是异步方式,消费者非常有可能重复消息。比如我消费者重起了,那么还会消费之前的消费。对于重复就应该在取出数据进行判断了。
我们也完全可以采取同步的方式。
..................................................................分割线..............................................................................................................................................
上面说的是直接使用kafka-client操作kafka.下面介绍spring-kafka的用法。更加灵活方便。
1、引入依赖包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置生产者和消费者
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String,String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String,Object> producerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return props;
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate() {
return new KafkaTemplate<String,String>(producerFactory());
}
}
注解很重要
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private final static String GROUP_ID = "springkafka";
@Bean
public ConsumerFactory<String,String> consumerFactory() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.231.39.48:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
3、实现生产者
package com.hnshop.goods.service.impl;
import com.hnshop.goods.service.KafkaProduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProduceServiceImpl implements KafkaProduceService {
@Autowired
private KafkaTemplate template;
@Override
public void sendMsg(String topic, String msg) {
System.out.println("开始发送消息。topic:" + topic + "消息:" + msg);
try {
template.send(topic,msg);
}catch (Exception e){
System.out.println("发送消息出错。topic:" + topic + "消息:" + msg);
System.out.println(e.getMessage());
}
}
}
4、实现消费者
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
private Logger logger = LoggerFactory.getLogger(KafkaConsumerServiceImpl.class.getName());
@KafkaListener(topics = "test",groupId = "fortest")
public void listen(String msg) {
logger.info(msg);
}
}
@KafkaListener注解后,当spirngboot 应用服务起来后,会自动运行消费者,一直监听目标kafka.
--------EOF---------
微信分享/微信扫码阅读
微信分享/微信扫码阅读