page contents

java 编程使用kafka教程

本文讲述了关于Java编程使用kafka教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-03-KkAPcj3i6424dfd81c065.png本文讲述了关于Java编程使用kafka教程!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

maven依赖:

org.apache.kafka

kafka_2.11

0.9.0.0

kafka producer连接,注意import:

import java.util.Properties;

import org.apache.kafka.clients.producer.ProducerRecord;

import config.Configloader;

import config.MsgCreater;

import config.Timer;

import threads.WriteKFKThreads;

public class TestKFKWriteQPS {

private static int count_callback=0;

private static long timeStart=0;

private static int count;

private static int threads;

private static int length;

private static String topic;

public static void main(String[] args) throws Exception {

Properties kp=Configloader.getProperties("../kafka.properties");

Properties mp=Configloader.getProperties("../main.properties");

count=Integer.parseInt(mp.getProperty("w_count"));

threads=Integer.parseInt(mp.getProperty("w_threads"));

count_callback = threads;

length=Integer.parseInt(mp.getProperty("msg_len"));

topic=mp.getProperty("topic");

ProducerRecord data = new ProducerRecord(topic, MsgCreater.getMsg(length));

timeStart=System.currentTimeMillis();

for(int i=0;i

new Thread(new WriteKFKThreads(count, kp, data)).start();

}

}

public static void threadCallback(){

count_callback--;

if(count_callback==0){

long used =System.currentTimeMillis()-timeStart;

System.out.println("SET --END-- "+count*threads+" time : "+ ((double) used/1000));

System.out.println("QPS: "+count*threads/((double) used/1000));

System.out.println(Timer.getAllDelayDistribution());

}

}

}

kafka set 线程 请无视Timer:

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import config.Timer;

import runner.TestKFKWriteQPS;

public class WriteKFKThreads implements Runnable{

int count=0;

Producer producer;

ProducerRecord data;

public WriteKFKThreads(int count , Properties config ,ProducerRecord data) {

this.count=count;

this.producer =new KafkaProducer(config);

this.data = data;

}

@Override

public void run() {

Timer t=new Timer();

for(int i=0;i

long start = System.currentTimeMillis();

try {

producer.send(data).get();

long take =System.currentTimeMillis() - start;

t.inTime(take);

} catch (Exception e) {

e.printStackTrace();

}

}

Timer.threadAdder(t);

TestKFKWriteQPS.threadCallback();

producer.close();

}

}

kafka consumer连接  :

import java.util.Arrays;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import config.Configloader;

public class TestKFKReadQPS {

private static long timeStart = 0;

private static int count;

private static int threads;

private static String topic;

private static int total;

public static void main(String[] args) throws Exception {

Properties kp = Configloader.getProperties("../kafka.properties");

Properties mp = Configloader.getProperties("../main.properties");

count = Integer.parseInt(mp.getProperty("r_count"));

threads = Integer.parseInt(mp.getProperty("r_threads"));

topic = mp.getProperty("topic");

total = count * threads;

//partition=Integer.parseInt(mp.getProperty("partition"));

//offset = Long.parseLong(mp.getProperty("offset"));

KafkaConsumer consumer = new KafkaConsumer(kp);

//consumer.seek(new TopicPartition(topic, partition), offset);

boolean goon = true;

consumer.subscribe(Arrays.asList(topic));

int times=1000;

while (goon) {

ConsumerRecords records = consumer.poll(100);

if(records.count() == 0){

times--;

if(times <=0){

goon=false;

System.out.println("data loose , now get : "+ (count * threads-total));

break;

}

}

for (ConsumerRecord record : records) {

// System.out.printf("partition = %d, offset = %d, key = %s,value = %s \n", record.partition(), record.offset(),

//record.key(), record.value());

setStart();

total--;

if(total <=0){

goon = false;

break;

}

}

}

consumer.close();

long used =System.currentTimeMillis()-timeStart;

System.out.println("GET --END-- "+count*threads+" time : "+ ((double) used/1000));

System.out.println("QPS: "+count*threads/((double) used/1000));

}

private static void setStart(){

if(timeStart == 0){

timeStart = System.currentTimeMillis();

System.out.println("Read start!");

}

}

}

配置文件 kafka.properties:

group.id=test

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=org.apache.kafka.common.serialization.StringSerializer

zookeeper.connect=127.0.0.1:2181

bootstrap.servers=127.0.0.1:9092

metadata.broker.list=127.0.0.1:9092

serializer.class=kafka.serializer.StringEncoder

enable.auto.commit=true

auto.commit.interval.ms=1000

session.timeout.ms=30000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

main.properties参数很简单,各位自己改改就行。

注:kafka部署时候server是按照linux机器hostname做白名单的(貌似mac下是IP?),也可以设置广播hostname

server.properties中添加:advertised.host.name = XX

XX最好配置为IP,否则就要在本地host指定 IP =XX

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-03-30 09:03
  • 阅读 ( 435 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
小柒
小柒

1658 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章