Kafka高级特性-主题
# 2.3 主题
# 2.3.1 管理
使用kafka-topics.sh脚本:
主题中可以使用的参数定义:
# 2.3.1.1 创建主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
# 2.3.1.2 查看主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
# 2.3.1.3 修改主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
# 2.3.1.4 删除主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
给主题添加删除的标记:
要过一段时间删除。
# 2.3.2 增加分区
通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
通过--alter修改主题的分区数,增加分区。
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2
# 2.3.3 分区副本的分配-了解
副本分配的三个目标:
- 均衡地将副本分散于各个broker上
- 对于某个broker上分配的分区,它的其他副本在其他broker上
- 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑机架信息的情况下:
第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
其余副本通过增加偏移进行分配。
分配案例:
考虑到机架信息,首先为每个机架创建一个broker列表。如:
三个机架(rack1,rack2,rack3)
,六个broker(0,1,2,3,4,5)
brokerID -> rack
- 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
rack1:0,5
rack2:3,4
rack3:1,2
这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2 即:0, 3, 1, 5, 4, 2
通过简单的轮询将分区分配给不同机架上的broker:
每个分区副本在分配的时候在上一个分区第一个副本开始分配的位置右移一位。
六个broker,六个分区,正好最后一个分区的第一个副本分配的位置是该broker列表的最后一个。
如果有更多的分区需要分配,则算法开始对follower副本进行移位分配。
这主要是为了避免每次都得到相同的分配序列。
此时,如果有一个分区等待分配(分区6),这按照如下方式分配:
6 -> 0,4,2 (而不是像分区0那样重复0,3,1)
跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第一个副本。
其余的副本,倾向于机架上没有副本的broker进行副本分配,除非每个机架有一个副本。
然后其他的副本又通过轮询的方式分配给broker。
结果是,如果副本的个数大于等于机架数,保证每个机架最少有一个副本。
否则每个机架最多保有一个副本。
如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和broker之间均匀分布。
上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中一个是leader分区。
# 2.3.4 必要参数配置
kafka-topics.sh --config xx=xx --config yy=yy
# 2.3.5 KafkaAdminClient应用
说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient
。
功能与原理介绍
Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):
创建主题:
createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
删除主题:
deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)
列出所有主题:
listTopics(final ListTopicsOptions options)
查询主题:
1. describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)
查询集群信息:
describeCluster(DescribeClusterOptions options)
查询配置信息:
describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)
修改配置信息:
alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
修改副本的日志目录:
alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
查询节点的日志目录信息:
describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)
查询副本的日志目录信息:
describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,DescribeReplicaLogDirsOptions options)
增加分区:
createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。
用到的参数:
主要操作步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:
- 自定义XXXOptions;
- 自定义XXXResult返回值;
- 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
package com.lagou.kafka.demo;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class MyAdminClient
{
private KafkaAdminClient client;
@Before
public void before()
{
Map < String, Object > conf = new HashMap < > ();
conf.put("bootstrap.servers", "node1:9092");
conf.put("client.id", "adminclient-1");
client = (KafkaAdminClient) KafkaAdminClient.create(conf);
}
@After
public void after()
{
client.close();
}
@Test
public void testListTopics1() throws ExecutionException, InterruptedException
{
ListTopicsResult listTopicsResult = client.listTopics();
// KafkaFuture<Collection<TopicListing>> listings =listTopicsResult.listings();
// Collection<TopicListing> topicListings = listings.get();
//
// topicListings.forEach(new Consumer<TopicListing>() {
// @Override
// public void accept(TopicListing topicListing) {
// boolean internal = topicListing.isInternal();
// String name = topicListing.name();
// String s = topicListing.toString();
// System.out.println(s + "\t" + name + "\t" + internal);
// }
// });
// KafkaFuture<Set<String>> names = listTopicsResult.names();
// Set<String> strings = names.get();
//
// strings.forEach(name -> {
// System.out.println(name);
// });
// KafkaFuture<Map<String, TopicListing>> mapKafkaFuture =listTopicsResult.namesToListings();
// Map<String, TopicListing> stringTopicListingMap =mapKafkaFuture.get();
//
// stringTopicListingMap.forEach((k, v) -> {
// System.out.println(k + "\t" + v);
// });
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(false);
options.timeoutMs(500);
ListTopicsResult listTopicsResult1 = client.listTopics(options);
Map < String, TopicListing > stringTopicListingMap = listTopicsResult1.namesToListings().get();
stringTopicListingMap.forEach((k, v) - >
{
System.out.println(k + "\t" + v);
});
// 关闭管理客户端
client.close();
}
@Test
public void testCreateTopic() throws ExecutionException, InterruptedException
{
Map < String, String > configs = new HashMap < > ();
configs.put("max.message.bytes", "1048576");
configs.put("segment.bytes", "1048576000");
NewTopic newTopic = new NewTopic("adm_tp_01", 2, (short) 1);
newTopic.configs(configs);
CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic));
KafkaFuture < Void > all = topics.all();
Void aVoid = all.get();
System.out.println(aVoid);
}
@Test
public void testDeleteTopic() throws ExecutionException, InterruptedException
{
DeleteTopicsOptions options = new DeleteTopicsOptions();
options.timeoutMs(500);
DeleteTopicsResult deleteResult = client.deleteTopics(Collections.singleton("adm_tp_01"), options);
deleteResult.all().get();
}
@Test
public void testAlterTopic() throws ExecutionException, InterruptedException
{
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map < String, NewPartitions > newPartitionsMap = new HashMap < > ();
newPartitionsMap.put("adm_tp_01", newPartitions);
CreatePartitionsOptions option = new CreatePartitionsOptions();
// Set to true if the request should be validated without creating new partitions.
// 如果只是验证,而不创建分区,则设置为true
// option.validateOnly(true);
CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
Void aVoid = partitionsResult.all().get();
}
@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException
{
DescribeTopicsOptions options = new DescribeTopicsOptions();
options.timeoutMs(3000);
DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("adm_tp_01"), options);
Map < String, TopicDescription > stringTopicDescriptionMap = topicsResult.all().get();
stringTopicDescriptionMap.forEach((k, v) - >
{
System.out.println(k + "\t" + v);
System.out.println("=======================================");
System.out.println(k);
boolean internal = v.isInternal();
String name = v.name();
List < TopicPartitionInfo > partitions = v.partitions();
String partitionStr = Arrays.toString(partitions.toArray());
System.out.println("内部的?" + internal);
System.out.println("topic name = " + name);
System.out.println("分区:" + partitionStr);
partitions.forEach(partition - >
{
System.out.println(partition);
});
});
}
@Test
public void testDescribeCluster() throws ExecutionException, InterruptedException
{
DescribeClusterResult describeClusterResult = client.describeCluster();
KafkaFuture < String > stringKafkaFuture = describeClusterResult.clusterId();
String s = stringKafkaFuture.get();
System.out.println("cluster name = " + s);
KafkaFuture < Node > controller = describeClusterResult.controller();
Node node = controller.get();
System.out.println("集群控制器:" + node);
Collection < Node > nodes = describeClusterResult.nodes().get();
nodes.forEach(node1 - >
{
System.out.println(node1);
});
}
@Test
public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException
{
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource));
Map < ConfigResource, Config > configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS);
configMap.forEach(new BiConsumer < ConfigResource, Config > ()
{
@Override
public void accept(ConfigResource configResource, Config config)
{
ConfigResource.Type type = configResource.type();
String name = configResource.name();
System.out.println("资源名称:" + name);
Collection < ConfigEntry > entries = config.entries();
entries.forEach(new Consumer < ConfigEntry > ()
{
@Override
public void accept(ConfigEntry configEntry)
{
boolean aDefault = configEntry.isDefault();
boolean readOnly = configEntry.isReadOnly();
boolean sensitive = configEntry.isSensitive();
String name1 = configEntry.name();
String value = configEntry.value();
System.out.println("是否默认:" + aDefault + "\t是否只读?" + readOnly + "\t是否敏感?" + sensitive + "\t" + name1 + " --> " + value);
}
});
ConfigEntry retries = config.get("retries");
if(retries != null)
{
System.out.println(retries.name() + " -->" + retries.value());
}
else
{
System.out.println("没有这个属性");
}
}
});
}
@Test
public void testAlterConfig() throws ExecutionException, InterruptedException
{
// 这里设置后,原来资源中不冲突的属性也会丢失,直接按照这里的配置设置
Map < ConfigResource, Config > configMap = new HashMap < > ();
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01");
Config config = new Config(Collections.singleton(new ConfigEntry("segment.bytes", "1048576000")));
configMap.put(resource, config);
AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap);
Void aVoid = alterConfigsResult.all().get();
}
@Test
public void testDescribeLogDirs() throws ExecutionException, InterruptedException
{
DescribeLogDirsOptions option = new DescribeLogDirsOptions();
option.timeoutMs(1000);
DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0), option);
Map < Integer, Map < String, DescribeLogDirsResponse.LogDirInfo >> integerMapMap = describeLogDirsResult.all().get();
integerMapMap.forEach(new BiConsumer < Integer, Map < String, DescribeLogDirsResponse.LogDirInfo >> ()
{
@Override
public void accept(Integer integer, Map < String, DescribeLogDirsResponse.LogDirInfo > stringLogDirInfoMap)
{
System.out.println("broker.id = " + integer);
stringLogDirInfoMap.forEach(new BiConsumer < String, DescribeLogDirsResponse.LogDirInfo > ()
{
@Override
public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo)
{
System.out.println("log.dirs:" + s);
// 查看该broker上的主题/分区/偏移量等信息
// logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
// @Override
// public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
// int partition =topicPartition.partition();
// String topic = topicPartition.topic();
// boolean isFuture = replicaInfo.isFuture;
// long offsetLag = replicaInfo.offsetLag;
// long size = replicaInfo.size;
// System.out.println("partition:" +partition + "\ttopic:" + topic
// + "\tisFuture:" + isFuture
// + "\toffsetLag:" + offsetLag
// + "\tsize:" + size);
// }
// });
}
});
}
});
}
}
# 2.3.6 偏移量管理
Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
早期由zookeeper管理消费组的偏移量。
查询方法:
通过原生 kafka 提供的工具脚本进行查询。
工具脚本的位置与名称为 bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:
这里我们先编写一个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,
由于kafka 消费者记录group的消费偏移量有两种方式 :
1)kafka 自维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将--bootstrap-server
换成 --zookeeper
即可。
1**.查看有那些group ID正在进行消费:**
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
group
注意:
- 这里面是没有指定topic,查看的是所有topic消费者的group.id的列表。
- 注意:重名的group.id只会显示一次
2.查看指定group.id的消费者消费情况
[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tp_demo_02 0 923 923 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 1 872 872 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 2 935 935 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
[root@node11 ~]#
如果消费者停止,查看偏移量信息:
将偏移量设置为最早的:
将偏移量设置为最新的:
分别将指定主题的指定分区的偏移量向前移动10个消息:
代码:
KafkaProducerSingleton.java
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerSingleton
{
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer < String, String > kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton()
{}
/**
* 静态内部类
*
* @author tanjie
*/
private static class LazyHandler
{
private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
}
/**
* 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
* @return
*/
public static final KafkaProducerSingleton getInstance()
{
return LazyHandler.instance;
}
/**
* kafka生产者进行初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry)
{
this.topic = topic;
this.retry = retry;
if(null == kafkaProducer)
{
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducer = new KafkaProducer < String, String > (props);
}
}
/**
* 通过kafkaProducer发送消息
* @param message
*/
public void sendKafkaMessage(final String message)
{
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topic, random.nextInt(3), "", message);
kafkaProducer.send(record, new Callback()
{
public void onCompletion(RecordMetadata recordMetadata, Exception exception)
{
if(null != exception)
{
LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 当kafka消息发送失败后,重试
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage)
{
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topic, random.nextInt(3), "", retryMessage);
for(int i = 1; i <= retry; i++)
{
try
{
kafkaProducer.send(record);
return;
}
catch (Exception e)
{
LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka实例销毁
*/
public void close()
{
if(null != kafkaProducer)
{
kafkaProducer.close();
}
}
public String getTopic()
{
return topic;
}
public void setTopic(String topic)
{
this.topic = topic;
}
public int getRetry()
{
return retry;
}
public void setRetry(int retry)
{
this.retry = retry;
}
}
ProducerHandler.java
package com.lagou.kafka.demo.producer;
public class ProducerHandler implements Runnable
{
private String message;
public ProducerHandler(String message)
{
this.message = message;
}
@Override
public void run()
{
KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance();
kafkaProducerSingleton.init("tp_demo_02", 3);
int i = 0;
while(true)
{
try
{
System.out.println("当前线程:" + Thread.currentThread().getName() + "\t获取的kafka实例:" + kafkaProducerSingleton);
kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
Thread.sleep(100);
}
catch (Exception e)
{}
}
}
}
MyProducer.java
package com.lagou.kafka.demo.producer;
public class MyProducer
{
public static void main(String[] args)
{
Thread thread = new Thread(new ProducerHandler("hello lagou "));
thread.start();
}
}
KafkaConsumerAuto.java
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerAuto
{
/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer < String, String > consumer;
private ExecutorService executorService;
public KafkaConsumerAuto()
{
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 关闭自动ᨀ交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms", "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer < String, String > (props);
// 订阅主题
consumer.subscribe(Collections.singleton("tp_demo_02"));
}
public void execute() throws InterruptedException
{
executorService = Executors.newFixedThreadPool(2);
while(true)
{
ConsumerRecords < String, String > records = consumer.poll(2_000);
if(null != records)
{
executorService.submit(new ConsumerThreadAuto(records, consumer));
}
Thread.sleep(1000);
}
}
public void shutdown()
{
try
{
if(consumer != null)
{
consumer.close();
}
if(executorService != null)
{
executorService.shutdown();
}
if(!executorService.awaitTermination(10, TimeUnit.SECONDS))
{
System.out.println("关闭线程池超时。。。");
}
}
catch (InterruptedException ex)
{
Thread.currentThread().interrupt();
}
}
}
ConsumerThreadAuto.java
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerThreadAuto implements Runnable
{
private ConsumerRecords < String, String > records;
private KafkaConsumer < String, String > consumer;
public ConsumerThreadAuto(ConsumerRecords < String, String > records, KafkaConsumer < String, String > consumer)
{
this.records = records;
this.consumer = consumer;
}
@Override
public void run()
{
for(ConsumerRecord < String, String > record: records)
{
System.out.println("当前线程:" + Thread.currentThread() + "\t主题:" + record.topic() + "\t偏移量:" + record.offset() + "\t分区:" + record.partition() + "\t获取的消息:" + record.value());
}
}
}
ConsumerAutoMain.java
package com.lagou.kafka.demo.consumer;
public class ConsumerAutoMain
{
public static void main(String[] args)
{
KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
try
{
kafka_consumerAuto.execute();
Thread.sleep(20000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
kafka_consumerAuto.shutdown();
}
}
}