Kafka Consumer poll messages with python -


i have problems polling messages kafka in consumer group. consumer object assigns given partition with

self.ps = topicpartition(topic, partition ) 

and after consumer assigns partition:

self.consumer.assign([self.ps]) 

after able count messages inside partition with

self.consumer.seek_to_beginning(self.ps) pos = self.consumer.position(self.ps) 

and self.consumer.seek_to_end(self.ps) .....

in tpoic on 30000 messages. problem 1 message.

consumer configuration with: max_poll_records= 200 auto_offset_reset earliest

and here function trying messages:

 def poll_messages(self):       data = []      messages = self.consumer.poll(timeout_ms=6000)       partition, msgs in six.iteritems(messages):          msg in msgs:              data.append(msg)      return data 

even if go first available offset before start polling messages 1 message.

self.consumer.seek(self.ps, self.get_first_offset()) 

i hope can explain me doing wrong. in advance.

best wishes jörn


Comments