首先引入依赖
org.apache.kafka kafka-clients 0.10.2.1 org.apache.kafka kafka_2.11 0.10.2.1
然后代码
package com.scc.flume.source.kafkasource;import java.util.Map;import java.util.Map.Entry;import java.util.Properties;import org.apache.kafka.common.security.JaasUtils;import kafka.admin.AdminUtils;import kafka.utils.ZkUtils;import scala.collection.JavaConverters;public class KafkaSource { public static void main(String[] args) { ZkUtils zkUtils = ZkUtils.apply("172.16.40.4:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); Maptopics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils)) .asJava(); for (Entry entry : topics.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); System.out.println(key + ":" + value); } zkUtils.close(); }}
或者直接使用kafka的api
Map> topics = consumer.listTopics(); if (null != topics) { for (String topic : topics.keySet()) { LOGGER.info("get a topic ={}", topic); ret.add(topic); } }