4 Ocak 2023 Çarşamba

SpringData Redis Stream - Persistent Pub/Sub

Giriş
ReactiveRedisTemplate.opsForStream() ile gerçekleşiyor

Pending mesajları almak için şöyle yaparız
PendingMessages pendingMessages = redisTemplate
  .opsForStream()
  .pending("purchase-events", "purchase-events", Range.unbounded(), 100L);
Örnek
Elimizde şöyle bir kod olsun
@Configuration
public class RedisStreamConfiguration {
  @Value("${redis.node.ip}")
  private String ip;alo
  @Value("${redis.node.port}")
  private int pageManagerValues;alo
  @Value("${redis.node.password}")
  private String password;

  private RedisClusterConfiguration redisClusterConfiguration() {
    RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
    redisClusterConfiguration.addClusterNode(new RedisNode(ip, port));
    redisClusterConfiguration.setPassword(password);
    return redisClusterConfiguration;
  }
 
  @Bean
  public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
    return new LettuceConnectionFactory(redisClusterConfiguration());
  }
}
Config'in devamı şöyle olsun
@Configuration
public class RedisStreamConfiguration {
 
  @Bean
  public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(
    @Qualifier("reactiveRedisConnectionFactory")ReactiveRedisConnectionFactory factory) {
    
    StringRedisSerializer keySerializer = new StringRedisSerializer();
    Jackson2JsonRedisSerializer<String> valueSerializer = new Jackson2JsonRedisSerializer<>(String.class);

    RedisSerializationContext.RedisSerializationContextBuilder<String, String> builder =
      RedisSerializationContext.newSerializationContext(keySerializer);

    RedisSerializationContext<String, String> context = builder .value(valueSerializer).build();
    return new ReactiveRedisTemplate<>(factory, context);
  }
  @Bean
  public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
    return new LettuceConnectionFactory(redisClusterConfiguration());
  }
}
Publisher şöyle
@RequiredArgsConstructor
@Service
public class BasketStreamService {

  @Value("${stream.key}")
  private String streamKey;
  private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
  private Gson gson;

  @PostConstruct
  public void init(){
    gson = new Gson();
  }

  public void publishEvent(Basket basket){

    ObjectRecord<String, String> objectRecord = StreamRecords.newRecord()
      .ofObject(gson.toJson(basket))
      .withStreamKey(streamKey);
    this.reactiveRedisTemplate
      .opsForStream()
      .add(objectRecord)
      .block(); // or subscribe
    }
}
Consumer şöyle
@Component
@RequiredArgsConstructor
public class RedisConsumeService {

  @Value("${stream.key}")
  private String streamKey;

  private final StreamListener<String, ObjectRecord<String, String>> streamListener;

  @Bean
  public Subscription subscription(RedisConnectionFactory redisConnectionFactory) 
  throws UnknownHostException {
  
    StreamMessageListenerContainer
      .StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
      options = 
        StreamMessageListenerContainer
        .StreamMessageListenerContainerOptions
        .builder()
        .pollTimeout(Duration.ofSeconds(1))
        .targetType(String.class)
        .build();

    StreamMessageListenerContainer<String, ObjectRecord<String, String>>  
    listenerContainer = 
      StreamMessageListenerContainer
        .create(redisConnectionFactory, options);

    Subscription subscription = listenerContainer.receive(
    Consumer
      .from(streamKey, InetAddress.getLocalHost()
      .getHostName()),
    StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
    streamListener);

    listenerContainer.start();
    return subscription;
  }
}
Listener şöyle
@Service
@RequiredArgsConstructor
@Slf4j
public class BasketProductConsumer implements 
  StreamListener<String, ObjectRecord<String, String>> {

  private final RedisTemplate<String, String> shRedisTemplate;
  private final SuggestionService suggestionService;
  private Gson gson;
  @PostConstruct
  public void init(){
    gson = new Gson();
  }

  @SneakyThrows
  @Override
  public void onMessage(ObjectRecord<String, String> record) {
       
    Basket basket = gson.fromJson(record.getValue(), Basket.class);
    List<Product> suggested = suggestionService.getSuggestion(basket);

    basket.setSuggestionProduct(suggested);

    shRedisTemplate.opsForHash()
      .put(basket.getSessionId(), "basket", gson.toJson(basket));
    Long ack = shRedisTemplate.opsForStream()
      .acknowledge("purchase-events", record);
  }
}




Hiç yorum yok:

Yorum Gönder