博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Kafka 编程实战
阅读量:6350 次
发布时间:2019-06-22

本文共 5530 字,大约阅读时间需要 18 分钟。

Apache Kafka 编程实战您可能感性的文章:

....

本章通过实际例子,讲解了如何使用java进行kafka开发。

添加依赖:

org.apache.kafka
kafka-clients
2.0.0
复制代码

下面是创建主题的代码:

public class TopicProcessor {private static final String ZK_CONNECT="localhost:2181";private static final int SESSION_TIME_OUT=30000;private static final int CONNECT_OUT=30000;public static void createTopic(String topicName,int partitionNumber,int replicaNumber,Properties properties){ZkUtils zkUtils = null;try{zkUtils=ZkUtils.apply(ZK_CONNECT,SESSION_TIME_OUT,CONNECT_OUT, JaasUtils.isZkSecurityEnabled());if(!AdminUtils.topicExists(zkUtils,topicName)){AdminUtils.createTopic(zkUtils,topicName,partitionNumber,replicaNumber,properties,AdminUtils.createTopic$default$6());}}catch (Exception e){e.printStackTrace();}finally {zkUtils.close();}}public static void main(String[] args){createTopic("javatopic",1,1,new Properties());}}复制代码

首先定义了zookeeper相关连接信息。然后在createTopic中,先初始化ZkUtils,和zookeeper交互依赖于它。然后通过AdminUtils先判断是否存在你要创建的主题,如果不存在,则通过createTopic方法进行创建。传入参数包括主题名称,分区数量,副本数量等。

生产者生产消息

生产者生产消息代码如下:

public class MessageProducer {private static final String TOPIC="education-info";private static final String BROKER_LIST="localhost:9092";private static KafkaProducer
producer = null;static{Properties configs = initConfig();producer = new KafkaProducer
(configs);}private static Properties initConfig(){Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ProducerConfig.ACKS_CONFIG,"all");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());return properties;}public static void main(String[] args){try{String message = "hello world";ProducerRecord
record = new ProducerRecord
(TOPIC,message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(null==exception){System.out.println("perfect!");}if(null!=metadata){System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());}}}).get();}catch (Exception e){e.printStackTrace();}finally {producer.close();}}}复制代码

1、首先初始化KafkaProducer对象。

producer = new KafkaProducer
(configs);复制代码

2、创建要发送的消息对象。

ProducerRecord
record = new ProducerRecord
(TOPIC,message);复制代码

3、通过producer的send方法,发送消息

4、发送消息时,可以通过回调函数,取得消息发送的结果。异常发生时,对异常进行处理。

初始化producer时候,需要注意下面属性设置:

properties.put(ProducerConfig.ACKS_CONFIG,"all");复制代码

这里有三种值可供选择:

  • 0,不等服务器响应,直接返回发送成功。速度最快,但是丢了消息是无法知道的
  • 1,leader副本收到消息后返回成功
  • all,所有参与的副本都复制完成后返回成功。这样最安全,但是延迟最高。

消费者消费消息

我们直接看代码

public class MessageConsumer {private static final String TOPIC="education-info";private static final String BROKER_LIST="localhost:9092";private static KafkaConsumer
kafkaConsumer = null;static {Properties properties = initConfig();kafkaConsumer = new KafkaConsumer
(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));}private static Properties initConfig(){Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());return properties;}public static void main(String[] args){try{while(true){ConsumerRecords
records = kafkaConsumer.poll(100);for(ConsumerRecord record:records){try{System.out.println(record.value());}catch(Exception e){e.printStackTrace();}}}}catch(Exception e){e.printStackTrace();}finally {kafkaConsumer.close();}}}复制代码

代码逻辑如下:

1、初始化消费者KafkaConsumer,并订阅主题。

kafkaConsumer = new KafkaConsumer
(properties);kafkaConsumer.subscribe(Arrays.asList(TOPIC));复制代码

2、循环拉取消息

ConsumerRecords
records = kafkaConsumer.poll(100);复制代码

poll方法传入的参数100,是等待broker返回数据的时间,如果超过100ms没有响应,则不再等待。

3、拉取回消息后,循环处理。

for(ConsumerRecord record:records){try{System.out.println(record.value());}catch(Exception e){e.printStackTrace();}}复制代码

消费相关代码比较简单,不过这个版本没有处理偏移量提交。学习过第四章-协调器相关的同学应该还记得偏移量提交的问题。我曾说过最佳实践是同步和异步提交相结合,同时在特定的时间点,比如再均衡前进行手动提交。

加入偏移量提交,需要做如下修改:

1、enable.auto.commit设置为false

2、消费代码如下:

public static void main(String[] args){try{while(true){ConsumerRecords
records =kafkaConsumer.poll(100);for(ConsumerRecord record:records){try{System.out.println(record.value());}catch(Exception e){e.printStackTrace();}}kafkaConsumer.commitAsync();}}catch(Exception e){e.printStackTrace();}finally {try{kafkaConsumer.commitSync();}finally {kafkaConsumer.close();}}}复制代码

3、订阅消息时,实现再均衡的回调方法,在此方法中手动提交偏移量

kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection
partitions) {//再均衡之前和消费者停止读取消息之后调用kafkaConsumer.commitSync(currentOffsets);}});复制代码

通过以上三步,我们把自动提交偏移量改为了手动提交。正常消费时,异步提交kafkaConsumer.commitAsync()。即使偶尔失败,也会被后续成功的提交覆盖掉。而在发生异常的时候,手动提交 kafkaConsumer.commitSync()。此外在步骤3中,我们通过实现再均衡时的回调方法,手动同步提交偏移量,确保了再均衡前偏移量提交成功。

以上面的最佳实践提交偏移量,既能保证消费时较高的效率,又能够尽量避免重复消费。不过由于重复消费无法100%避免,消费逻辑需要自己处理重复消费的判断。

更多你可能感兴趣的文章:

Alibaba Blink新特性

你真的不关注一下嘛~

转载地址:http://mgtla.baihongyu.com/

你可能感兴趣的文章
你还在看《深入理解Java虚拟机》的运行时数据模型吗?
查看>>
RIS,创建 React 应用的新选择
查看>>
线性结构上的动态规划--算法竞赛入门经典笔记
查看>>
面试官:你使用webpack时手写过loader,分离过模块吗?
查看>>
Ubuntu 16.04系统下 对OpenJDK编译好的Hotspot 进行调试
查看>>
00-利用思维导图梳理JavaSE基础知识-持续更新中!
查看>>
java中三种注释及其实际应用的意义
查看>>
Emacs 24.2 for Mac OS X 最新版的 MAC Emacs 安装包
查看>>
【三石jQuery视频教程】01.图片循环展示
查看>>
ngrok
查看>>
ThinkPHP 模板变量输出
查看>>
android系统信息(内存、cpu、sd卡、电量、版本)获取
查看>>
HTML5、WebKit与移动应用开发
查看>>
面google的试题,对google面试题的衍生推导
查看>>
Eclipse Debug Android Native Application
查看>>
java动态代理
查看>>
node.js原型继承
查看>>
揭露让Linux与Windows隔阂消失的奥秘(1)
查看>>
我的友情链接
查看>>
Mysql备份和恢复策略
查看>>