专业的JAVA编程教程与资源

网站首页 > java教程 正文

java配置使用kafka(kafka配置jdk)

temp10 2025-07-03 19:15:44 java教程 1 ℃ 0 评论

一、环境准备

1.1Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>


二、Kafka 生产者实现

2.1 基础配置

通过 Properties 对象设置 Kafka 服务器地址和序列化方式:

java配置使用kafka(kafka配置jdk)

java

复制



public class KafkaProducerDemo {
    private static final String TOPIC = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    TOPIC, 
                    "key-" + i, 
                    "Hello Kafka " + i
                );
                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("消息发送成功: 分区=%d, 偏移量=%d%n", metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
        }
    }
}


关键配置说明

bootstrap.servers:Kafka 集群地址

key.serializer/value.serializer:消息键值序列化器


2.2 高级特性

2.2.1 消息分区控制

通过 partitioner.class 自定义分区策略:


props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());


2.2.2 批量发送与性能优化


props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小(字节)
props.put(ProducerConfig.LINGER_MS_CONFIG, 50);    // 等待时间(毫秒)
props.put(ProducerConfig.ACKS_CONFIG, "all");      // 确认机制


2.2.3 事务支持


producer.initTransactions();
producer.beginTransaction();
try {
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}



三、Kafka 消费者实现

3.1 基础消费


import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;

public class KafkaConsumerDemo {
    private static final String TOPIC = "test-topic";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早消息开始消费

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息: 分区=%d, 偏移量=%d, key=%s, value=%s%n",
                            record.partition(), record.offset(), record.key(), record.value());
                }
            }
        }
    }
}


3.2 高级消费模式

3.2.1 手动提交偏移量


props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // 处理消息
    consumer.commitSync(); // 同步提交
}


3.2.2 多线程消费


ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
    executor.submit(() -> {
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));
            while (true) {
                // 轮询消费
            }
        }
    });
}



四、高级功能

4.1 安全连接(SASL/SSL)


props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"admin\" " +
    "password=\"secret\";");


4.2 自定义 Topic 创建


import org.apache.kafka.clients.admin.*;
import java.util.Collections;

public class TopicCreator {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

        try (AdminClient admin = AdminClient.create(props)) {
            NewTopic newTopic = new NewTopic("new-topic", 3, (short) 2);
            admin.createTopics(Collections.singleton(newTopic)).all().get();
        }
    }
}



五、最佳实践

1. 幂等生产者

2. 启用 enable.idempotence=true 避免重复消息。 监控与指标

3. 通过 JMX 监控 producer-metrics 和 consumer-metrics。 资源管理

4. 使用连接池管理 KafkaProducer/KafkaConsumer 实例。 异常处理

实现重试机制和死信队列(DLQ)处理失败消息。

六、Spring Boot 集成

6.1 添加依赖


<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.0</version>
</dependency>


6.2 配置类


@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> factory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}


6.3 消息监听


@KafkaListener(topics = "test-topic", groupId = "spring-group")
public void listen(String message) {
    System.out.println("Spring Kafka 收到消息: " + message);
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表