网站首页 > java教程 正文
一、环境准备
1.1Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
二、Kafka 生产者实现
2.1 基础配置
通过 Properties 对象设置 Kafka 服务器地址和序列化方式:
java
复制
public class KafkaProducerDemo {
private static final String TOPIC = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC,
"key-" + i,
"Hello Kafka " + i
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("消息发送成功: 分区=%d, 偏移量=%d%n", metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
}
}
关键配置说明 :
● bootstrap.servers:Kafka 集群地址
● key.serializer/value.serializer:消息键值序列化器
2.2 高级特性
2.2.1 消息分区控制
通过 partitioner.class 自定义分区策略:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
2.2.2 批量发送与性能优化
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小(字节)
props.put(ProducerConfig.LINGER_MS_CONFIG, 50); // 等待时间(毫秒)
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确认机制
2.2.3 事务支持
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
三、Kafka 消费者实现
3.1 基础消费
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
public class KafkaConsumerDemo {
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早消息开始消费
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息: 分区=%d, 偏移量=%d, key=%s, value=%s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
}
3.2 高级消费模式
3.2.1 手动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
consumer.commitSync(); // 同步提交
}
3.2.2 多线程消费
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
// 轮询消费
}
}
});
}
四、高级功能
4.1 安全连接(SASL/SSL)
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"admin\" " +
"password=\"secret\";");
4.2 自定义 Topic 创建
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
public class TopicCreator {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
try (AdminClient admin = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic("new-topic", 3, (short) 2);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
}
}
五、最佳实践
1. 幂等生产者
2. 启用 enable.idempotence=true 避免重复消息。 监控与指标
3. 通过 JMX 监控 producer-metrics 和 consumer-metrics。 资源管理
4. 使用连接池管理 KafkaProducer/KafkaConsumer 实例。 异常处理
实现重试机制和死信队列(DLQ)处理失败消息。
六、Spring Boot 集成
6.1 添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.0</version>
</dependency>
6.2 配置类
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
6.3 消息监听
@KafkaListener(topics = "test-topic", groupId = "spring-group")
public void listen(String message) {
System.out.println("Spring Kafka 收到消息: " + message);
}
猜你喜欢
- 2025-07-03 Java双非本科,非科班,自学1年时间终于斩获offer
- 2025-07-03 MySQL 的异步复制(mysql支持异步吗)
- 2025-07-03 深入浅出JVM(一)之Hotspot虚拟机中的对象
- 2025-07-03 高可用MySQL集群实战教程,详解主从复制搭建步骤
- 2025-07-03 一文带你了解MySQL主从复制(Master-Slave)
- 2025-07-03 Java 中的 AI:使用 Spring Boot 和 LangChain 构建 ChatGPT 克隆
- 2025-07-03 Java中List赋值给另一个List的6种方法详解
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)