I m using python 3.9.16 and kafka-python version 2.0.2. I m running on my Macbook Pro IOS 11.6.5.
I m still getting my feet wet with Kafka so it s entirely possible I m doing things the wrong way.
What I m trying to do is test seeking to offsets with my consumer in case something doesn t get processed and I have to go back and re-read a message.
Anyway, I keep running into this error message. I m not even sure why it s happening because sometimes I can process the offset and it works fine, other times, it gives me this message:
ValueError: Error encountered when attempting to convert value: b to struct format: <built-in method unpack of _struct.Struct object at 0x10bb669f0> , hit error: unpack requires a buffer of 4 bytes
When it s working, I can see this in pdb, which kinda proves that the values are present in the topic for me to consume:
(Pdb)
> /Users/username/kafka/tkCons.py(41)<module>()
-> print ("{}, {}".format(blah.offset, blah.value))
(Pdb)
10, b {"number": 10}
> /Users/username/kafka/tkCons.py(40)<module>()
-> for blah in consumer:
(Pdb)
I wish I could narrow down what I m doing during testing but I can t pin down what lines of code I added/commented out helps make it work or makes it give me the above error. Since I m not 100% sure what s happening under the hood, is me seeking around somehow affecting something in zookeeper? What do I need to do to make whatever under the hood stuff happy? Here s my code in case it matters.
from kafka import KafkaConsumer, TopicPartition
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(# my-topic333 , my-topic222 , my-topic ,
group_id= my-group ,
bootstrap_servers=[ localhost:9092 ])
myTP = TopicPartition( my-topic333 , 0)
import pdb
pdb.set_trace()
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
print ("before this is my position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 50)
#consumer.seek_to_beginning()
print ("after seeking this is my position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))