Giriş
ReactiveRedisTemplate.opsForStream() ile gerçekleşiyor
Pending mesajları almak için şöyle yaparız
PendingMessages pendingMessages = redisTemplate .opsForStream() .pending("purchase-events", "purchase-events", Range.unbounded(), 100L);
Örnek
Elimizde şöyle bir kod olsun
@Configurationpublic class RedisStreamConfiguration {@Value("${redis.node.ip}")private String ip;alo@Value("${redis.node.port}")private int pageManagerValues;alo@Value("${redis.node.password}")private String password;private RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();redisClusterConfiguration.addClusterNode(new RedisNode(ip, port));redisClusterConfiguration.setPassword(password);return redisClusterConfiguration;}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Config'in devamı şöyle olsun
@Configurationpublic class RedisStreamConfiguration {@Beanpublic ReactiveRedisTemplate<String, String> reactiveRedisTemplate(@Qualifier("reactiveRedisConnectionFactory")ReactiveRedisConnectionFactory factory) {StringRedisSerializer keySerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer<String> valueSerializer = new Jackson2JsonRedisSerializer<>(String.class);RedisSerializationContext.RedisSerializationContextBuilder<String, String> builder =RedisSerializationContext.newSerializationContext(keySerializer);RedisSerializationContext<String, String> context = builder .value(valueSerializer).build();return new ReactiveRedisTemplate<>(factory, context);}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Publisher şöyle
@RequiredArgsConstructor @Service public class BasketStreamService { @Value("${stream.key}") private String streamKey; private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate; private Gson gson; @PostConstruct public void init(){ gson = new Gson(); } public void publishEvent(Basket basket){ ObjectRecord<String, String> objectRecord = StreamRecords.newRecord() .ofObject(gson.toJson(basket)) .withStreamKey(streamKey); this.reactiveRedisTemplate .opsForStream() .add(objectRecord) .block(); // or subscribe } }
Consumer şöyle
@Component @RequiredArgsConstructor public class RedisConsumeService { @Value("${stream.key}") private String streamKey; private final StreamListener<String, ObjectRecord<String, String>> streamListener; @Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { StreamMessageListenerContainer .StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(String.class) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer .create(redisConnectionFactory, options); Subscription subscription = listenerContainer.receive( Consumer .from(streamKey, InetAddress.getLocalHost() .getHostName()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; } }
Listener şöyle
@Service @RequiredArgsConstructor @Slf4j public class BasketProductConsumer implements StreamListener<String, ObjectRecord<String, String>> { private final RedisTemplate<String, String> shRedisTemplate; private final SuggestionService suggestionService; private Gson gson; @PostConstruct public void init(){ gson = new Gson(); } @SneakyThrows @Override public void onMessage(ObjectRecord<String, String> record) { Basket basket = gson.fromJson(record.getValue(), Basket.class); List<Product> suggested = suggestionService.getSuggestion(basket); basket.setSuggestionProduct(suggested); shRedisTemplate.opsForHash() .put(basket.getSessionId(), "basket", gson.toJson(basket)); Long ack = shRedisTemplate.opsForStream() .acknowledge("purchase-events", record); } }
Hiç yorum yok:
Yorum Gönder