page contents

Golang使用Kafka教程: Golang实现Kafka消息发送、接收

一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。 kafka中涉及的名词: 消息记录(re...

attachments-2021-05-KK5BSHjE60af17b3b5e79.png

一:核心概念

kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。

kafka中涉及的名词:

  1. 消息记录(record): 由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中, 记录在生产者中称为生产者记录(ProducerRecord), 在消费者中称为消费者记录(ConsumerRecord),Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了,在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
  2. 生产者(producer): 生产者用于发布(send)消息
  3. 消费者(consumer): 消费者用于订阅(subscribe)消息
  4. 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组, 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
  5. 主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
  6. 分区(partition): 消息的一种物理分组, 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的id,称之为偏移量(offset),在每个分区中偏移量都是唯一的。每个分区对应一个逻辑log,有多个segment组成。
  7. 偏移量(offset): 分区中的每个消息都一个一个唯一id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动控制一条消息是否已经被成功消费)
  8. 代理(broker): 一台kafka服务器称之为一个broker
  9. 副本(replica):副本只是一个分区(partition)的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  10. 领导者(leader):Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader, producer 和 consumer 只跟 leader 交互
  11. 追随者(follower):跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。replica 中的一个角色,从 leader 中复制数据。
  12. zookeeper:Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。ZooKeeper用于管理和协调Kafka代理

kafka功能

  1. 发布订阅:生产者(producer)生产消息(数据流), 将消息发送到到kafka指定的主题队列(topic)中,也可以发送到topic中的指定分区(partition)中,消费者(consumer)从kafka的指定队列中获取消息,然后来处理消息。
  2. 流处理(Stream Process): 将输入topic转换数据流到输出topic
  3. 连接器(Connector) : 将数据从应用程序(源系统)中导入到kafka,或者从kafka导出数据到应用程序(宿主系统sink system), 例如:将文件中的数据导入到kafka,从kafka中将数据导出到文件中

kafka中的消息模型

  1. 队列:同名的消费者组员瓜分消息
  2. 发布订阅:广播消息给多个消费者组(不同名)

生产者(producer)将消息记录(record)发送到kafka中的主题中(topic), 一个主题可以有多个分区(partition), 消息最终存储在分区中,消费者(consumer)最终从主题的分区中获取消息。 

attachments-2021-05-WVuy5IuV60af16a893225.png

attachments-2021-05-PLD8T62T60af16afdc993.pngattachments-2021-05-0BUQnVXH60af16b7d1b76.png


二:安装与启动

一: Mac版安装

brew install kafka

安装kafka是需要依赖于zookeeper的,所以安装kafka的时候也会包含zookeeper 

kafka的安装目录:/usr/local/Cellar/kafka 

kafka的配置文件目录:/usr/local/etc/kafka 

kafka服务的配置文件:/usr/local/etc/kafka/server.properties 

zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties

# server.properties中的重要配置
 
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/var/lib/kafka-logs
# zookeeper.properties
 
dataDir=/usr/local/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0 

二: 启动zookeeper

# 新起一个终端启动zookeeper
cd /usr/local/Cellar/kafka/1.0.0
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

三: 启动kafka

# 新起一个终端启动zookeeper,注意启动kafka之前先启动zookeeper
cd /usr/local/Cellar/kafka/1.0.0
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

四:创建topic

# 新起一个终端来创建主题
cd /usr/local/Cellar/kafka/1.0.0
 
## 创建一个名为“test”的主题,该主题有1个分区
./bin/kafka-topics --create 
    --zookeeper localhost:2181 
    --partitions 1 
    --topic test

五:查看topic

// 创建成功可以通过 list 列举所有的主题
./bin/kafka-topics --list --zookeeper localhost:2181
 
// 查看某个主题的信息
./bin/kafka-topics --describe --zookeeper localhost:2181 --topic <name>

六:发送消息

# 新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
  > ./bin/kafka-console-producer --broker-list localhost:9092 --topic test 
  This is a message
  This is another message

七:消费消息(接收消息)

# 新起一个终端作为消费者,接收消息
cd /usr/local/Cellar/kafka/1.0.0
> ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

八:在生产者发送消息 

在步骤六中新起的终端属于一条消息(任意字符),输入完回车就算一条消息,可以看到在步骤7中的消费者端就会显示刚才输入的消息


三:Go实现消息接收,发送

1). 准备

1.安装依赖库sarama

   go get github.com/Shopify/sarama

   该库要求kafka版本在0.8及以上,支持kafka定义的high-level API和low-level API,但不支持常用的consumer自动rebalance和offset追踪,所以一 般得结合cluster版本使用。

2.sarama-cluster依赖库

   go get github.com/bsm/sarama-cluster

   需要kafka 0.9及以上版本

3.代码示例来自官网(本地已测试),可到官网查看更多信息。

2). 生产者

1. 同步消息模式

import (
    "github.com/Shopify/sarama"
    "time"
    "log"
    "fmt"
    "os"
    "os/signal"
    "sync"
)
 
var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"}
 
func main()  {
    syncProducer(Address)
    //asyncProducer1(Address)
}
 
//同步消息模式
func syncProducer(address []string)  {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    p, err := sarama.NewSyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    defer p.Close()
    topic := "test"
    srcValue := "sync: this is a message. index=%d"
    for i:=0; i<10; i++ {
        value := fmt.Sprintf(srcValue, i)
        msg := &sarama.ProducerMessage{
            Topic:topic,
            Value:sarama.ByteEncoder(value),
        }
        part, offset, err := p.SendMessage(msg)
        if err != nil {
            log.Printf("send message(%s) err=%s \n", value, err)
        }else {
            fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset)
        }
        time.Sleep(2*time.Second)
    }
}

2.异步消息

func SaramaProducer()  {
 
    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1
 
    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()
 
    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for{
            select {
            case  <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)
 
    var value string
    for i:=0;;i++ {
        time.Sleep(500*time.Millisecond)
        time11:=time.Now()
        value = "this is a message 0606 "+time11.Format("15:04:05")
 
        // 发送的消息,主题。 
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "0606_test",
        }
 
        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)
 
        //使用通道发送
        producer.Input() <- msg
    }
}


3)消费者

    集群模实现

func main()  {
    topic := []string{"test"}
    var wg = &sync.WaitGroup{}
    wg.Add(2)
    //广播式消费:消费者1
    go clusterConsumer(wg, Address, topic, "group-1")
    //广播式消费:消费者2
    go clusterConsumer(wg, Address, topic, "group-2")
 
    wg.Wait()
}
 
// 支持brokers cluster的消费者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string)  {
    defer wg.Done()
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.Initial = sarama.OffsetNewest
 
    // init consumer
    consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
    if err != nil {
        log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
        return
    }
    defer consumer.Close()
 
    // trap SIGINT to trigger a shutdown
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
 
    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("%s:Error: %s\n", groupId, err.Error())
        }
    }()
 
    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
        }
    }()
 
    // consume messages, watch signals
    var successes int
    Loop:
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "")  // mark message as processed
                successes++
            }
        case <-signals:
            break Loop
        }
    }
    fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

如果你想用Python开辟副业赚钱,但不熟悉爬虫与反爬虫技术,没有接单途径,也缺乏兼职经验
关注下方微信公众号:Python编程学习圈,获取价值999元全套Python入门到进阶的学习资料以及教程,还有Python技术交流群一起交流学习哦。

attachments-2022-06-CHkyoXy862ac194d98b44.jpeg

  • 发表于 2021-05-27 11:53
  • 阅读 ( 1119 )
  • 分类:Golang

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1324 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章