Giriş
ReactiveRedisTemplate.opsForStream() ile gerçekleşiyor
Pending mesajları almak için şöyle yaparız
PendingMessages pendingMessages = redisTemplate
  .opsForStream()
  .pending("purchase-events", "purchase-events", Range.unbounded(), 100L);Örnek
Elimizde şöyle bir kod olsun
@Configurationpublic class RedisStreamConfiguration {@Value("${redis.node.ip}")private String ip;alo@Value("${redis.node.port}")private int pageManagerValues;alo@Value("${redis.node.password}")private String password;private RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();redisClusterConfiguration.addClusterNode(new RedisNode(ip, port));redisClusterConfiguration.setPassword(password);return redisClusterConfiguration;}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Config'in devamı şöyle olsun
@Configurationpublic class RedisStreamConfiguration {@Beanpublic ReactiveRedisTemplate<String, String> reactiveRedisTemplate(@Qualifier("reactiveRedisConnectionFactory")ReactiveRedisConnectionFactory factory) {StringRedisSerializer keySerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer<String> valueSerializer = new Jackson2JsonRedisSerializer<>(String.class);RedisSerializationContext.RedisSerializationContextBuilder<String, String> builder =RedisSerializationContext.newSerializationContext(keySerializer);RedisSerializationContext<String, String> context = builder .value(valueSerializer).build();return new ReactiveRedisTemplate<>(factory, context);}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Publisher şöyle
@RequiredArgsConstructor
@Service
public class BasketStreamService {
  @Value("${stream.key}")
  private String streamKey;
  private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
  private Gson gson;
  @PostConstruct
  public void init(){
    gson = new Gson();
  }
  public void publishEvent(Basket basket){
    ObjectRecord<String, String> objectRecord = StreamRecords.newRecord()
      .ofObject(gson.toJson(basket))
      .withStreamKey(streamKey);
    this.reactiveRedisTemplate
      .opsForStream()
      .add(objectRecord)
      .block(); // or subscribe
    }
}Consumer şöyle
@Component
@RequiredArgsConstructor
public class RedisConsumeService {
  @Value("${stream.key}")
  private String streamKey;
  private final StreamListener<String, ObjectRecord<String, String>> streamListener;
  @Bean
  public Subscription subscription(RedisConnectionFactory redisConnectionFactory) 
  throws UnknownHostException {
  
    StreamMessageListenerContainer
      .StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
      options = 
        StreamMessageListenerContainer
        .StreamMessageListenerContainerOptions
        .builder()
        .pollTimeout(Duration.ofSeconds(1))
        .targetType(String.class)
        .build();
    StreamMessageListenerContainer<String, ObjectRecord<String, String>>  
    listenerContainer = 
      StreamMessageListenerContainer
        .create(redisConnectionFactory, options);
    Subscription subscription = listenerContainer.receive(
    Consumer
      .from(streamKey, InetAddress.getLocalHost()
      .getHostName()),
    StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
    streamListener);
    listenerContainer.start();
    return subscription;
  }
}Listener şöyle
@Service
@RequiredArgsConstructor
@Slf4j
public class BasketProductConsumer implements 
  StreamListener<String, ObjectRecord<String, String>> {
  private final RedisTemplate<String, String> shRedisTemplate;
  private final SuggestionService suggestionService;
  private Gson gson;
  @PostConstruct
  public void init(){
    gson = new Gson();
  }
  @SneakyThrows
  @Override
  public void onMessage(ObjectRecord<String, String> record) {
       
    Basket basket = gson.fromJson(record.getValue(), Basket.class);
    List<Product> suggested = suggestionService.getSuggestion(basket);
    basket.setSuggestionProduct(suggested);
    shRedisTemplate.opsForHash()
      .put(basket.getSessionId(), "basket", gson.toJson(basket));
    Long ack = shRedisTemplate.opsForStream()
      .acknowledge("purchase-events", record);
  }
} 
Hiç yorum yok:
Yorum Gönder