Donate. I desperately need donations to survive due to my health

Get paid by answering surveys Click here

Click here to donate

Remote/Work from Home jobs

Unable to publish message to multiple topics using confluent kafka

I am using confluent-kafka to publish a message to multiple kafka topics. When i publish to a single topic, everything works fine, however when I add a second send call(either to the same topic again or a new topic) I get a message in logs saying the topic has 0 partitions - Invalid topic. I've checked the broker and the default partition count is set to 1(tried raising it same thing).

If I produce messages from a console-producer, I have no issues. This leads me to believe the problem is something related to producer configuration.

Here is the producer I'm using:

    conf = {
        'bootstrap.servers': broker_url,
        'api.version.request': 'true',
        'log.connection.close': 'false',
        'log_level': 2,
        'debug': 'all'
    }
    producer = Producer(**conf)

And I am just making calls to producer.send()

Here is a snippet of log messages from the producer with debug set to all:

%7|1543519538.466|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: topic1 %7|1543519538.466|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW topic1[-1] 0x2d536a0 (at rd_kafka_topic_new0:325) %7|1543519538.768|NOINFO|rdkafka#producer-1| [thrd:main]: Topic topic1 partition count is zero: should refresh metadata %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Request metadata for 1 topic(s): refresh unavailable topics %7|1543519538.768|SEND|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Sent MetadataRequest (v2, 51 bytes @ 0, CorrId 3) %7|1543519538.768|RECV|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Received MetadataResponse (v2, 153 bytes, CorrId 3, rtt 0.31ms) %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: ===== Received metadata (for 1 requested topics): refresh unavailable topics ===== %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: ClusterId: rp0_ZkpuTASF7mlaSHnDwQ, ControllerId: 1001 %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: 1 brokers, 1 topics %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Broker #0/1: broker:6667 NodeId 1001 %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Topic #0/1: topic1 with 1 partitions %7|1543519538.768|STATE|rdkafka#producer-1| [thrd:main]: Topic topic1 changed state unknown -> exists %7|1543519538.768|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic topic1 partition count changed from 0 to 1 %7|1543519538.768|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW topic1 [0] 0x7f691c002910 (at rd_kafka_topic_partition_cnt_update:550) %7|1543519538.768|METADATA|rdkafka#producer-1| [thrd:main]: Topic topic1 partition 0 Leader 1001 %7|1543519538.768|BRKDELGT|rdkafka#producer-1| [thrd:main]: topic1 [0]: delegate to broker broker:6667/1001 (rktp 0x7f691c002910, term 0, ref 2, remove 0) %7|1543519538.768|BRKDELGT|rdkafka#producer-1| [thrd:main]: topic1 [0]: broker broker:6667/1001 is now leader for partition with 0 messages (0 bytes) queued %7|1543519538.769|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic topic1 [0] 0x7f691c002910 from (none) to broker:6667/1001 (sending PARTITION_JOIN to broker:6667/1001) %7|1543519538.769|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 1 unassigned messages in topic topic1 to 1 partitions %7|1543519538.769|TOPBRK|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Topic broker [0]: joining broker (rktp 0x7f691c002910) %7|1543519538.769|FETCHADD|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Added topic1 [0] to active list (1 entries, opv 0) %7|1543519538.769|BROADCAST|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: Broadcasting state change %7|1543519538.769|UAS|rdkafka#producer-1| [thrd:main]: 1/1 messages were partitioned in topic topic1 %7|1543519538.769|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: 1/1 requested topic(s) seen in metadata %7|1543519538.769|TOPPAR|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: topic1 [0] 1 message(s) in xmit queue (1 added from partition queue) %7|1543519538.769|PRODUCE|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: topic1 [0]: Produce MessageSet with 1 message(s) (885 bytes, ApiVersion 3, MsgVersion 2) %7|1543519538.769|SEND|rdkafka#producer-1| [thrd:boker:6667/bootstrap]: broker:6667/1001: Sent ProduceRequest (v3, 956 bytes @ 0, CorrId 4) %7|1543519538.769|RECV|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Received ProduceResponse (v3, 60 bytes, CorrId 4, rtt 0.36ms) %7|1543519538.769|MSGSET|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: topic1 [0]: MessageSet with 1 message(s) delivered %7|1543519538.770|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: topic2 %7|1543519538.770|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW topic2 [-1] 0x31969e0 (at rd_kafka_topic_new0:325) %7|1543519539.768|NOINFO|rdkafka#producer-1| [thrd:main]: Topic topic2 partition count is zero: should refresh metadata %7|1543519539.768|METADATA|rdkafka#producer-1| [thrd:main]: Requesting metadata for 1/1 topics: refresh unavailable topics %7|1543519539.768|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Request metadata for 1 topic(s): refresh unavailable topics %7|1543519539.768|SEND|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Sent MetadataRequest (v2, 50 bytes @ 0, CorrId 5) %7|1543519539.771|RECV|rdkafka#producer-1| [thrd:broker:6667/bootstrap]: broker:6667/1001: Received MetadataResponse (v2, 126 bytes, CorrId 5, rtt 2.58ms) %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: ===== Received metadata (for 1 requested topics): refresh unavailable topics ===== %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: ClusterId: rp0_ZkpuTASF7mlaSHnDwQ, ControllerId: 1001 %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: 1 brokers, 1 topics %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Broker #0/1: broker:6667 NodeId 1001 %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: Topic #0/1: topic2 with 0 partitions: Broker: Invalid topic %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: Error in metadata reply for topic topic2 (PartCnt 0): Broker: Invalid topic %7|1543519539.771|STATE|rdkafka#producer-1| [thrd:main]: Topic topic2 changed state unknown -> notexists %7|1543519539.771|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 1 unassigned messages in topic topic2 to 0 partitions %7|1543519539.771|UAS|rdkafka#producer-1| [thrd:main]: 0/1 messages were partitioned in topic topic2 %7|1543519539.771|UAS|rdkafka#producer-1| [thrd:main]: 1/1 messages failed partitioning in topic topic2 %7|1543519539.771|METADATA|rdkafka#producer-1| [thrd:main]: broker:6667/1001: 1/1 requested topic(s) seen in metadata

Any suggestions would be greatly appreciated.

Comments