Şu satırı dahil ederiz
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
Açıklaması şöyle
... we cannot delete the consumers we have registered, because it is not possible to do this using the KafkaListenerEndpointRegistry class. In order to make this possible, we need to create our own registry class.
getListenerContainer
MessageListenerContainer nesnesi döner
Örnek
Şöyle yaparız. Burada MessageListenerContainer elde edildikten sonra start(), pause(), resume(), getAssignedPartitions(), getListenerId(), getGroupId() bir sürü metodu kullanılabilir.@Autowiredprivate KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;@GetMappingpublic List<KafkaConsumerResponse> getConsumerIds() {return kafkaListenerEndpointRegistry.getListenerContainerIds().stream().map(this::createKafkaConsumerResponse).collect(Collectors.toList());}private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry .getListenerContainer(consumerId);...}
registerListenerContainer metodu
Açıklaması şöyle
When we create a kafka consumer using the @KafkaListener annotation, it will be read by the spring when the application is run. KafkaListenerAnnotationBeanPostProcessor is class that responsible to read those annotation. Furthermore, the kafka consumers that we have created will be registered in the KafkaListenerEndpointRegistry class by KafkaListenerEndpointRegistrar class. When the application is running, KafkaListenerEndpointRegistry will start the consumers that have been registered based on the autoStartup property.
Açıklaması şöyle
The method accepts 3 parameters, that is KafkaListenerEndpoint, KafkaListenerContainerFactory, and Boolean startImmediately.
Örnek
Şöyle yaparız
@Autowiredprivate KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;@Autowiredprivate KafkaListenerContainerFactory kafkaListenerContainerFactory;MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = ...;boolean startImmediately = ...;kafkaListenerEndpointRegistry.registerListenerContainer(kafkaListenerEndpoint,kafkaListenerContainerFactory,startImmediately);
Örnek
Şöyle yaparız
@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired private KafkaListenerContainerFactory kafkaListenerContainerFactory; public void createAndRegisterListener(String topic) { KafkaListenerEndpoint listener = ...; kafkaListenerEndpointRegistry.registerListenerContainer(listener, kafkaListenerContainerFactory, true); }
Hiç yorum yok:
Yorum Gönder