page contents

Java教程——使用Java访问Kafka实例

本文讲述了Java教程——使用Java访问Kafka实例!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-08-3WLEgPF064ebf7e6f00f4.jpg本文讲述了Java教程——使用Java访问Kafka实例!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

Kafka作为消息系统的一种, 当然可以像其他消息中间件一样作为消息数据中转的平台。下面以Java语言为例,看一下如何使用Kafka来发送和接收消息。

01引入依赖

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>0.11.0.1</version>

</dependency>

添加Kafka客户端访问支持,具体版本和本地安装的Kafka版本一致即可。

02消息生产者

package org.study.mq.kafka.java;

import org.apache.kafka.clients.producer.*;

import java.util.HashMap;

import java.util.Map;

public class ProducerSample {

    public static void main(String[] args) {

        Map<String, Object> props = new HashMap<String, Object>();

        props.put("bootstrap.servers", "localhost:9092");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord(topic, "idea-key2", "java-message 1"));

        producer.send(new ProducerRecord(topic, "idea-key2", "java-message 2"));

        producer.send(new ProducerRecord(topic, "idea-key2", "java-message 3"));

        producer.close();

    }

}

示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

bootstrap.servers 表示 Kafka 集群 。如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。

key.serializer 和 value.serializer 表示消息的序列化类型 。Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是发送文本消息到服务器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。

zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。

有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition:如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。

03消息消费者

package org.study.mq.kafka.java;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;

import java.util.Map;

import java.util.Properties;

public class ConsumerSample {

    public static void main(String[] args) {

        String topic = "test-topic";

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");

        props.put("group.id", "testGroup1");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(topic));

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)

                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());

        }



    }

}

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

bootstrap. servers 和生产者一样,表示 Kafka 集群。

group.id 表示消费者的分组 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。

auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。

key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。

消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。

04启动服务器

#启动ZooKeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

#启动Kafka服务器

kafka-server-start /usr/local/etc/kafka/server.properties

05运行Consumer

先运行Consumer,这样当生产者发送消息时就能在消费者后端看到消息记录。

06运行Producer

再运行Producer,发布几条消息,在Consumer的控制台就能看到所接收收到的消息。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-08-28 09:27
  • 阅读 ( 210 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
王昭君
王昭君

209 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1316 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章