我第一次使用“https://github.com/seminmentio/kafka-go” rel=“nofollow noreferrer”>>Kafka-go ,我在当地安装了清洁机器上的所有设备,并使用前两个代码样本:生成信息并发送以下信息。请注意,我刚刚复制了上述链接的代码。
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
ProduceMessages()
ConsumeMessages()
}
func ProduceMessages() {
// to produce messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
func ConsumeMessages() {
// to consume messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
然而,每当试图结束批量会议时,我都会得到以下结果(在等待几秒钟之后):
one!
two!
three!
目前为止还不错 但之后总是有以下错误
2022/12/10 10:35:50 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request
exit status 1
为什么它未能关闭这批货物(为什么超时)?
PS> PS> 请注意, 我无法添加 kafka-go
, 与已经存在的 confluent-kafka-go
标签不同 。