Problema de ID do grupo de consumidores kapka em Python

ATÉ ONDE SEI,

O conceito de partições e grupos (consumidores) em kafka foi introduzido para implementar o paralelismo. Eu estou trabalhando com o kafka através do python. Eu tenho um determinado tópico, que tem (digamos) 2 partições. Isso significa que, se eu iniciar um grupo de consumidores com dois consumidores, eles serão mapeados (inscritos) em diferentes partições.

Mas, usando a biblioteca kafka em python, me deparei com um problema estranho. Eu iniciei 2 consumidores com essencialmente os mesmos IDs de grupo e iniciei os segmentos para eles consumirem mensagens.

Mas toda mensagem no kafka-stream está sendo consumida por ambos !! Isso parece ridículo para mim e até mesmo conceitualmente incorreto. Existe alguma maneira que eu possa mapear os consumidores para determinadas partições (distintas) manualmente (se elas não forem mapeadas para diferentes partições automaticamente)?

Aqui está o código:

 from kafka import KafkaConsumer import thread def con1(consumer): for msg in consumer: print msg consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092']) consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092']) thread.start_new_thread(con1, (consumer1,)) thread.start_new_thread(con1, (consumer2,)) 

Aqui está a saída para algumas mensagens que eu produzi usando o kafka-console-producer:

 ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki') ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki') ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg') ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg') ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas') ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas') 

enquanto esperado era um de cada. BTW, este tópico k-test tem 2 partições.

Eu acho que você está trabalhando com o Kafka 0.8 ou versão menor, que não suporta esse recurso com base nos documentos :

… Alguns resources só serão ativados em novos corretores, no entanto; por exemplo, grupos de consumidores totalmente coordenados – isto é, atribuição dinâmica de partições a múltiplos consumidores no mesmo grupo – requer o uso de 0.9+ corretores kafka …

 from kafka import KafkaConsumer from kafka import TopicPartition TOPIC = "k-test" PARTITION_0 = 0 PARTITION_1 = 1 consumer_0 = KafkaConsumer( TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092'] ) consumer_1 = KafkaConsumer( TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092'] ) topic_partition_0 = TopicPartition(TOPIC, PARTITION_0) topic_partition_1 = TopicPartition(TOPIC, PARTITION_1) # format: topic, partition consumer_0.assign([topic_partition_0]) consumer_1.assign([topic_partition_1]) 

assign () pode funcionar para você, mas uma vez que você o usa, o kafka não irá balancear os consumidores automaticamente quando houver consumidores parando de trabalhar.

Tente executar a ferramenta de linha de comando bin / kafka-consumer-groups.sh para verificar se o cliente Python Kafka que você está usando suporta o gerenciamento apropriado do grupo de consumidores. Se ambos os consumidores estiverem realmente no mesmo grupo, eles deverão receber mensagens de partições mutuamente exclusivas.

    Intereting Posts