Giriş
ReactiveRedisTemplate.opsForStream() ile gerçekleşiyor
Pending mesajları almak için şöyle yaparız
PendingMessages pendingMessages = redisTemplate
.opsForStream()
.pending("purchase-events", "purchase-events", Range.unbounded(), 100L);Örnek
Elimizde şöyle bir kod olsun
@Configurationpublic class RedisStreamConfiguration {@Value("${redis.node.ip}")private String ip;alo@Value("${redis.node.port}")private int pageManagerValues;alo@Value("${redis.node.password}")private String password;private RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();redisClusterConfiguration.addClusterNode(new RedisNode(ip, port));redisClusterConfiguration.setPassword(password);return redisClusterConfiguration;}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Config'in devamı şöyle olsun
@Configurationpublic class RedisStreamConfiguration {@Beanpublic ReactiveRedisTemplate<String, String> reactiveRedisTemplate(@Qualifier("reactiveRedisConnectionFactory")ReactiveRedisConnectionFactory factory) {StringRedisSerializer keySerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer<String> valueSerializer = new Jackson2JsonRedisSerializer<>(String.class);RedisSerializationContext.RedisSerializationContextBuilder<String, String> builder =RedisSerializationContext.newSerializationContext(keySerializer);RedisSerializationContext<String, String> context = builder .value(valueSerializer).build();return new ReactiveRedisTemplate<>(factory, context);}@Beanpublic ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {return new LettuceConnectionFactory(redisClusterConfiguration());}}
Publisher şöyle
@RequiredArgsConstructor
@Service
public class BasketStreamService {
@Value("${stream.key}")
private String streamKey;
private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private Gson gson;
@PostConstruct
public void init(){
gson = new Gson();
}
public void publishEvent(Basket basket){
ObjectRecord<String, String> objectRecord = StreamRecords.newRecord()
.ofObject(gson.toJson(basket))
.withStreamKey(streamKey);
this.reactiveRedisTemplate
.opsForStream()
.add(objectRecord)
.block(); // or subscribe
}
}Consumer şöyle
@Component
@RequiredArgsConstructor
public class RedisConsumeService {
@Value("${stream.key}")
private String streamKey;
private final StreamListener<String, ObjectRecord<String, String>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(String.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, String>>
listenerContainer =
StreamMessageListenerContainer
.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.receive(
Consumer
.from(streamKey, InetAddress.getLocalHost()
.getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}Listener şöyle
@Service
@RequiredArgsConstructor
@Slf4j
public class BasketProductConsumer implements
StreamListener<String, ObjectRecord<String, String>> {
private final RedisTemplate<String, String> shRedisTemplate;
private final SuggestionService suggestionService;
private Gson gson;
@PostConstruct
public void init(){
gson = new Gson();
}
@SneakyThrows
@Override
public void onMessage(ObjectRecord<String, String> record) {
Basket basket = gson.fromJson(record.getValue(), Basket.class);
List<Product> suggested = suggestionService.getSuggestion(basket);
basket.setSuggestionProduct(suggested);
shRedisTemplate.opsForHash()
.put(basket.getSessionId(), "basket", gson.toJson(basket));
Long ack = shRedisTemplate.opsForStream()
.acknowledge("purchase-events", record);
}
}
Hiç yorum yok:
Yorum Gönder