启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave
这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.0.0.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>
@Configuration@EnableKafkapublicclassKafkaConfig {/* --------------producer configuration-----------------**/@Beanpublic Map<String, Object>producerConfigs() { Map<String, Object> props =new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); props.put(ProducerConfig.RETRIES_CONFIG,0); props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); props.put(ProducerConfig.LINGER_MS_CONFIG,1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props; }@Beanpublic ProducerFactory<String, String>producerFactory() {returnnew DefaultKafkaProducerFactory<>(producerConfigs()); }/* --------------consumer configuration-----------------**/@Beanpublic Map<String, Object>consumerConfigs() { Map<String, Object> props =new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG,"0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props; }@Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());return factory; }@Beanpublic ConsumerFactory<String, String>consumerFactory() {returnnew DefaultKafkaConsumerFactory<>(consumerConfigs()); }@Bean//消息监听器public MyListenermyListener() {returnnew MyListener(); }/* --------------kafka template configuration-----------------**/@Beanpublic KafkaTemplate<String,String>kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate =new KafkaTemplate<>(producerFactory());return kafkaTemplate; } }
自动创建的topic分区数是1,复制因子是0
@Configuration@EnableKafkapublicclassTopicConfig {@Beanpublic KafkaAdminkafkaAdmin() { Map<String, Object> configs =new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,localhost:9093");returnnew KafkaAdmin(configs); }@Beanpublic NewTopicfoo() { /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数//当broker个数为1个时会创建topic失败,//提示:replication factor: 2 larger than available brokers: 1//只有在集群中才能使用kafka的备份功能returnnew NewTopic("foo",10, (short)2); }@Beanpublic NewTopicbar() {returnnew NewTopic("bar",10, (short)2); }@Beanpublic NewTopictopic1(){returnnew NewTopic("topic1",10, (short)2); }@Beanpublic NewTopictopic2(){returnnew NewTopic("topic2",10, (short)2); } }
topicPartitions和topics、topicPattern不能同时使用
publicclass MyListener { @KafkaListener(id ="myContainer1",//id是消费者监听容器 topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,//topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5 { @TopicPartition(topic ="topic1", partitions = {"0","3" }), @TopicPartition(topic ="topic2", partitions ="0", partitionOffsets = @PartitionOffset(partition ="1", initialOffset ="4")) })publicvoidlisten(ConsumerRecord<?, ?> record) { System.out.println("topic" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } @KafkaListener(id ="myContainer2",topics = {"foo","bar"})publicvoidlisten2(ConsumerRecord<?, ?> record){ System.out.println("topic:" + record.topic()); System.out.println("key:" + record.key()); System.out.println("value:"+record.value()); } }
@RestControllerpublicclassKafkaController {privatefinalstatic Logger logger = LoggerFactory.getLogger(KafkaController.class);@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@RequestMapping(value ="/{topic}/send",method = RequestMethod.GET)publicvoidsendMeessageTotopic1(@PathVariable String topic,@RequestParam(value ="partition",defaultValue ="0")int partition) { logger.info("start send message to {}",topic); kafkaTemplate.send(topic,partition,"你","好"); } }
消息监听器只监听订阅的topic的特定分区的消息
源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2