kafka consumer api - KafkaStream not receiving any messages from Topic -


i'm playing around kafkastreams , kafkaconnect trying consume messages topic. have "standard" batch consumer set topic , works charm. first send couple of records kafka , consume them afterwards. want same using kakfa streams, don't single message topic. here's consumer code i'm using.

final int number_of_partitions = 4; final properties consumerconfig = new properties(); consumerconfig.setproperty("zookeeper.connect", rule.getconfiguration().kafka.getzookeeperurl()); consumerconfig.setproperty("backoff.increment.ms", "100"); consumerconfig.setproperty("group.id", "java-consumer-example"); consumerconfig.setproperty("consumer.timeout.ms", "1000000"); consumerconfig.setproperty("client.id", "someclient"); consumerconfig.setproperty("auto.offset.reset", "smallest"); consumerconfig.setproperty("enable.auto.commit", "false"); consumerconfig.setproperty("bootstrap.servers", rule.getconfiguration().kafka.gethosts());  final consumerconnector connector = consumer.createjavaconsumerconnector(new consumerconfig(consumerconfig)); final topicfilter sourcetopicfilter = new whitelist(rule.getconfiguration().kafka.gettopic());  final verifiableproperties decoderprops = new verifiableproperties(); decoderprops.props().setproperty("schema.registry.url", rule.getconfiguration().kafka.getregistry()); decoderprops.props().setproperty("max.schemas.per.subject", "1"); final list<kafkastream<string, object>> streams = connector     .createmessagestreamsbyfilter(sourcetopicfilter, number_of_partitions, new stringdecoder(decoderprops), new kafkaavrodecoder(decoderprops));  final executorservice executorservice = executors.newfixedthreadpool(number_of_partitions); (final kafkastream stream : streams) {     executorservice.submit(() -> {         try {             final consumeriterator = stream.iterator();             while (it.hasnext()) {                 final messageandmetadata messageandmetadata = it.next();                 final string key = (string) messageandmetadata.key();                 system.out.println("key" + key);             }         } catch (final exception ex) {             logger.error("error", ex);         }     }); } 

my problem is, code keeps on waiting in it.hasnext() condition until timeout reached. i'm missing detail here, can't figure out, why don't out of topic. part of test, have producer sends number of records topic right before consumer starts, can't offset problem. ideas highly welcome.

i found solution. error outside code posted. timeout provided shutdown of executorservice short killed without providing enough time consumer job.


Comments

Popular posts from this blog

account - Script error login visual studio DefaultLogin_PCore.js -

xcode - CocoaPod Storyboard error: -