i have problems polling messages kafka in consumer group. consumer object assigns given partition with
self.ps = topicpartition(topic, partition ) and after consumer assigns partition:
self.consumer.assign([self.ps]) after able count messages inside partition with
self.consumer.seek_to_beginning(self.ps) pos = self.consumer.position(self.ps) and self.consumer.seek_to_end(self.ps) .....
in tpoic on 30000 messages. problem 1 message.
consumer configuration with: max_poll_records= 200 auto_offset_reset earliest
and here function trying messages:
def poll_messages(self): data = [] messages = self.consumer.poll(timeout_ms=6000) partition, msgs in six.iteritems(messages): msg in msgs: data.append(msg) return data even if go first available offset before start polling messages 1 message.
self.consumer.seek(self.ps, self.get_first_offset()) i hope can explain me doing wrong. in advance.
best wishes jörn
Comments
Post a Comment