i have problems polling messages kafka in consumer group. consumer object assigns given partition with
self.ps = topicpartition(topic, partition )
and after consumer assigns partition:
self.consumer.assign([self.ps])
after able count messages inside partition with
self.consumer.seek_to_beginning(self.ps) pos = self.consumer.position(self.ps)
and self.consumer.seek_to_end(self.ps)
.....
in tpoic on 30000 messages. problem 1 message.
consumer configuration with: max_poll_records= 200
auto_offset_reset
earliest
and here function trying messages:
def poll_messages(self): data = [] messages = self.consumer.poll(timeout_ms=6000) partition, msgs in six.iteritems(messages): msg in msgs: data.append(msg) return data
even if go first available offset before start polling messages 1 message.
self.consumer.seek(self.ps, self.get_first_offset())
i hope can explain me doing wrong. in advance.
best wishes jörn
Comments
Post a Comment