Como obter o mais recente deslocamento para uma partição para um tópico kafka?

Eu estou usando o consumidor de alto nível do Python para o Kafka e quero saber as últimas compensações para cada partição de um tópico. No entanto, não consigo fazê-lo funcionar.

from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() 

Mas a saída que recebo é

 For partition 0 highwater is None For partition 1 highwater is None For partition 2 highwater is None For partition 3 highwater is None For partition 4 highwater is None For partition 5 highwater is None .... For partition 96 highwater is None For partition 97 highwater is None For partition 98 highwater is None For partition 99 highwater is None Subscription = None con.seek_to_beginning() = None con.seek_to_end() = None 

Eu tenho uma abordagem alternativa usando assign mas o resultado é o mesmo

 con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() print "con.seek_to_end() = %s"%con.seek_to_end() 

Parece que de alguma documentação posso obter esse comportamento se uma fetch não tiver sido emitida. Mas não consigo encontrar uma maneira de forçar isso. O que estou fazendo de errado?

Ou existe uma maneira diferente / mais simples de obter as últimas compensações para um tópico?

Finalmente, depois de passar um dia neste e em vários começos falsos, consegui encontrar uma solução e fazê-la funcionar. Postando-a para que outros possam se referir a ela.

 from kafka import SimpleClient from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.common import OffsetRequestPayload client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) for r in offsets_responses: print "partition = %s, offset = %s"%(r.partition, r.offsets[0]) 

Se você deseja usar os shell scripts do Kafka presentes no kafka / bin, então você pode obter os menores e mais recentes offsets usando kafka-run-class.sh.

Para obter o mais recente comando de deslocamento será semelhante a este

 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname 

Para obter o menor comando de deslocamento será semelhante a este

 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname 

Você pode encontrar mais informações sobre Get Offsets Shell no seguinte link

Espero que isto ajude!

 from kafka import KafkaConsumer, TopicPartition TOPIC = 'MYTOPIC' GROUP = 'MYGROUP' BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092'] consumer = KafkaConsumer( bootstrap_servers=BOOTSTRAP_SERVERS, group_id=GROUP, enable_auto_commit=False ) for p in consumer.partitions_for_topic(TOPIC): tp = TopicPartition(TOPIC, p) consumer.assign([tp]) committed = consumer.committed(tp) consumer.seek_to_end(tp) last_offset = consumer.position(tp) print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed))) consumer.close(autocommit=False) 

Com o kafka-python>=1.3.4 você pode usar:

kafka.KafkaConsumer.end_offsets (partições)

Obtenha o último deslocamento para as partições dadas. O último deslocamento de uma partição é o deslocamento da próxima mensagem, ou seja, o deslocamento da última mensagem disponível + 1.

 from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.end_offsets(ps) 

Outra maneira de conseguir isso é pesquisando o consumidor para obter o último deslocamento consumido e, em seguida, usando o método seek_to_end para obter a partição offset mais recente disponível.

 from kafka import KafkaConsumer consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) consumer.poll() consumer.seek_to_end() 

Este método é particularmente útil quando se usa grupos de consumidores.

FONTES:

  1. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
  2. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end
    Intereting Posts