English 中文(简体)
关闭批次失败:[7] 请求超时:请求超过请求中用户指定的时限
原标题:failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

我第一次使用“https://github.com/seminmentio/kafka-go” rel=“nofollow noreferrer”>>Kafka-go ,我在当地安装了清洁机器上的所有设备,并使用前两个代码样本:生成信息并发送以下信息。请注意,我刚刚复制了上述链接的代码。

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {

    ProduceMessages()
    ConsumeMessages()
}

func ProduceMessages() {
    // to produce messages
    topic := "my-topic"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    _, err = conn.WriteMessages(
        kafka.Message{Value: []byte("one!")},
        kafka.Message{Value: []byte("two!")},
        kafka.Message{Value: []byte("three!")},
    )
    if err != nil {
        log.Fatal("failed to write messages:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }
}

func ConsumeMessages() {
    // to consume messages
    topic := "my-topic"
    partition := 0

    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }

    conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

    b := make([]byte, 10e3) // 10KB max per message
    for {
        n, err := batch.Read(b)
        if err != nil {
            break
        }
        fmt.Println(string(b[:n]))
    }

    if err := batch.Close(); err != nil {
        log.Fatal("failed to close batch:", err)
    }

    if err := conn.Close(); err != nil {
        log.Fatal("failed to close connection:", err)
    }
}

然而,每当试图结束批量会议时,我都会得到以下结果(在等待几秒钟之后):

one!
two!
three!

目前为止还不错 但之后总是有以下错误

2022/12/10 10:35:50 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request
exit status 1

为什么它未能关闭这批货物(为什么超时)?

PS> PS> 请注意, 我无法添加 kafka-go , 与已经存在的 confluent-kafka-go 标签不同 。

问题回答

The reason for this error is that the size of your batch is less than 10 kb. Try to reduce the value of the first argument in this line:

batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

“ 批量 : = conn. readBatch( 10e3, 1e6) ” 中的两个参数给出它想要从 kafka 服务器接收的字节的最小和最大数量 。

在此情况下,“ 1” 的长度小于 10e3, 这意味着 10* (103) 字节, 所以您应该将第一个参数缩小到一个较小的值, 如 1e1 。





相关问题
minimum work size of a goroutine [closed]

Does anyone know approximately what the minimum work size is needed in order for a goroutine to be beneficial (assuming that there are free cores for the work to be offloaded to)?

How do you get the terminal size in Go?

How do I get the terminal size in Go. In C it would look like this: struct ttysize ts; ioctl(0, TIOCGWINSZ, &ts); But how to i access TIOCGWINSZ in Go

What do you use to write Go [closed]

I know its a bit too early, but I ve been trying out Go (Google s Programming Language) and its kindof annoying to write code in gedit. So, my question: What do you use to experiment with Go?

Shared memory vs. Go channel communication

One of Go s slogans is Do not communicate by sharing memory; instead, share memory by communicating. I am wondering whether Go allows two different Go-compiled binaries running on the same machine to ...

Embedding instead of inheritance in Go

What is your opinion of this design decision? What advantages does it have and what disadvantages? Links: Embedding description

热门标签