同福

Java操作Kafka消息系统的方法【20210422】

介绍

介绍

上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用Java去操作Kafka消息系统的方法。

Java操作Kafka可以使用apache官方提供的库,既然是官方的库,自然兼容性是没有问题的了。

安装

依赖

在pom.xml里面增加依赖项

<!-- kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

使用

消费者

概念

低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。

代码

低级消费者示例代码,由于福哥的测试环境是SpringBoot,所以没有长时间运行。

String out = "";

String topic;
Properties properties = new Properties();
KafkaConsumer<String, String> kafkaConsumer;

topic = "test";
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.168:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lowLevel");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));

while(true){
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(120);
    if(consumerRecords.count() > 0){
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords){
            out = consumerRecord.value();
        }
        break;
    }
}

return out;

效果

生产一个消息

acks = 1
batch.size = 16384
bootstrap.servers = [192.168.2.168:9092]
buffer.memory = 33554432
client.id = 
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

消费一个消息

auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [192.168.2.168:9092]
check.crcs = true
client.id = 
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = lowLevel
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

home/topic/2021/0422/23/4e1e4088b5040bf70438515096a901b5.jpg

生产者

代码

生产者示例代码。

String out = "";

String topic, msg;
Properties properties = new Properties();
KafkaProducer<String, String> kafkaProducer;

topic = "test";
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.168:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducer = new KafkaProducer<String, String>(properties);
msg = "福哥说:现在是" + TFDatetime.toString(TFDatetime.date(), "HH:mm:ss");
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg);
kafkaProducer.send(producerRecord);
kafkaProducer.close();

return out;

总结

今天福哥带着童鞋们学习了使用Java语言连接操作Kafka消息系统的方法,本节课的内容比较少,福哥后面会逐步完善关于Java操作Kafka的各种技巧的教程给大家。

好了,下一课福哥会给大家讲解如何使用Java语言来操作Kafka消息系统,敬请期待~~