拉取kafka消息

广告位

package kafka; import org.apache.kafka.clients.consumer…

 package kafka;  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;  import java.util.Collections; import java.util.Properties;  public class MyConsumer {         private static KafkaConsumer consumer;      static {         Properties properties = new Properties();         properties.put("bootstrap.servers", "127.0.0.1:9092");         properties.put("group.id", "CountryCounter");         properties.put("key.deserializer",                 "org.apache.kafka.common.serialization.StringDeserializer");         properties.put("value.deserializer",                 "org.apache.kafka.common.serialization.StringDeserializer");         consumer = new KafkaConsumer(properties);     }      private static void consumerTest(){         //订阅topic         consumer.subscribe(Collections.singleton("kafka-study"));          try {             while (true){                 //在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回                 ConsumerRecords records =  consumer.poll(100);                  for (ConsumerRecord record : records) {                      System.out.println(("topic:"+record.topic() +" " + "offset:"+ record.offset() +" "+"key:"+ record.key()                              +" "+"value:"+ record.value()));               }             }         }catch (Exception e){             System.out.println("error:"+e);         }finally {             consumer.close();         }      }      public static void main(String[] args) {         consumerTest();     }   } 

?

陈晨数据工程师

关于作者: 陈晨数据工程师

为您推荐

广告位

发表评论