介绍
介绍
上一课大家跟福哥学会了在我们的TFLinux系统上面安装Kafka软件,今天福哥要带着大家学习使用Java去操作Kafka消息系统的方法。
Java操作Kafka可以使用apache官方提供的库,既然是官方的库,自然兼容性是没有问题的了。
安装
依赖
在pom.xml里面增加依赖项
<!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
使用
消费者
概念
低级消费者模式就是一个人接了一个项目,所有事情都要自己一点点做。如果当天没有做完,我会记录做到第几件了,然后第二天来了继续做下面的工作。
代码
低级消费者示例代码,由于福哥的测试环境是SpringBoot,所以没有长时间运行。
String out = ""; String topic; Properties properties = new Properties(); KafkaConsumer<String, String> kafkaConsumer; topic = "test"; properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.168:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "lowLevel"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Collections.singletonList(topic)); while(true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(120); if(consumerRecords.count() > 0){ for (ConsumerRecord<String, String> consumerRecord : consumerRecords){ out = consumerRecord.value(); } break; } } return out;
效果
生产一个消息
acks = 1 batch.size = 16384 bootstrap.servers = [192.168.2.168:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer
消费一个消息
auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [192.168.2.168:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = lowLevel heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
生产者
代码
生产者示例代码。
String out = ""; String topic, msg; Properties properties = new Properties(); KafkaProducer<String, String> kafkaProducer; topic = "test"; properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.168:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); kafkaProducer = new KafkaProducer<String, String>(properties); msg = "福哥说:现在是" + TFDatetime.toString(TFDatetime.date(), "HH:mm:ss"); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, msg); kafkaProducer.send(producerRecord); kafkaProducer.close(); return out;
总结
今天福哥带着童鞋们学习了使用Java语言连接操作Kafka消息系统的方法,本节课的内容比较少,福哥后面会逐步完善关于Java操作Kafka的各种技巧的教程给大家。
好了,下一课福哥会给大家讲解如何使用Java语言来操作Kafka消息系统,敬请期待~~