21 Kasım 2021 Pazar

SpringKafka Consumer KafkaListenerEndpointRegistry Sınıfı - Dinamik Olarak Listener Takılabilir

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
Açıklaması şöyle
... we cannot delete the consumers we have registered, because it is not possible to do this using the KafkaListenerEndpointRegistry class. In order to make this possible, we need to create our own registry class. 
getListenerContainer
MessageListenerContainer nesnesi döner

getListenerContainerIds metodu
Örnek
Şöyle yaparız. Burada MessageListenerContainer elde edildikten sonra start(), pause(), resume(), getAssignedPartitions(), getListenerId(), getGroupId() bir sürü metodu kullanılabilir.
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
  return kafkaListenerEndpointRegistry.getListenerContainerIds()
    .stream()
    .map(this::createKafkaConsumerResponse)
    .collect(Collectors.toList());
}

private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
  MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry .getListenerContainer(consumerId);
  ...
}
registerListenerContainer metodu
Açıklaması şöyle
When we create a kafka consumer using the @KafkaListener annotation, it will be read by the spring when the application is run. KafkaListenerAnnotationBeanPostProcessor is class that responsible to read those annotation. Furthermore, the kafka consumers that we have created will be registered in the KafkaListenerEndpointRegistry class by KafkaListenerEndpointRegistrar class. When the application is running, KafkaListenerEndpointRegistry will start the consumers that have been registered based on the autoStartup property.
Açıklaması şöyle
The method accepts 3 parameters, that is KafkaListenerEndpoint, KafkaListenerContainerFactory, and Boolean startImmediately. 
KafkaListenerEndpoint yazısına bakabilirsiniz

Örnek
Şöyle yaparız
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;

MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = ...;
boolean startImmediately = ...;

kafkaListenerEndpointRegistry.registerListenerContainer(kafkaListenerEndpoint,
  kafkaListenerContainerFactory, 
  startImmediately);
Örnek
Şöyle yaparız
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;

public void createAndRegisterListener(String topic) {
  KafkaListenerEndpoint listener = ...;
  kafkaListenerEndpointRegistry.registerListenerContainer(listener, 
    kafkaListenerContainerFactory, true);
}

Hiç yorum yok:

Yorum Gönder