15 Ocak 2023 Pazar

SpringKafka Consumer ConcurrentKafkaListenerContainerFactory.setRecordFilterStrategy metodu - Consumer Mesajları Filtreler

Giriş
Şu satırı dahil ederiz
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
Filtreyi geçemeyen mesajları listener göremez.

Örnek
Şöyle yaparız
@Bean(name = "farLocationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> factory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer) { var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>(); configurer.configure(factory, consumerFactory()); factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() { @Override public boolean filter(ConsumerRecord<Object, Object> consumerRecord) { try { CarLocation location = objectMapper.readValue(consumerRecord.value().toString(), CarLocation.class); return location.getDistance() <= 100; } catch (JsonProcessingException e) { return false; } } }); return factory; }

Hiç yorum yok:

Yorum Gönder