13 Ocak 2021 Çarşamba

SpringCloud AWS SQS

Giriş
Bir örnek burada

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-aws-messaging</artifactId>
</dependency>
Gradle
Şu satırı dahil ederiz
implementation 'io.awspring.cloud:spring-cloud-starter-aws-messaging'
SQS Test
Örnek
Localstack yerine şöyle yaparız
version: "3"

services:
  sqs:
    image: roribio16/alpine-sqs
    ports:
      - '9334:9324'
      - '9335:9325'
    volumes:
      - my-datavolume:/sqs-data
volumes:
  my-datavolume:

Örnek
application.yml dosyasında şöyle yaparız
cloud:
  aws:
    region:
      static: us-east-1  # Region where you create the queue
      auto: false
    credentials:
      access-key: #access key
      secret-key: #acess secret key
QueueMessagingTemplate Sınıfı
Şu satırı dahil ederiz
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
Açıklaması şöyle
With spring-cloud-starter-aws-messaging we get QueueMessagingTemplate. It is an abstraction to easily publish/receive messages to queue. It also provides apis to configure various converters.

constructor - AmazonSQSAsync 
Örnek
Şöyle yaparız. Burada profile dev ise yerelde çalışan SQS'e bağlanıyor
@Configuration
public class SqsConfig {
  private final int BATCH_SIZE = 10;
  private final String queueEndpoint;

  public SqsConfig(@Value("${queue.endpoint}") String queueEndpoint) {
    this.queueEndpoint = queueEndpoint;
  }

  @Bean
  @Primary
  public AmazonSQSAsync amazonSQSAsync(@Value("${spring.profiles.active}")
    String profile) {
    
    if(profile.equals("dev")){
      return AmazonSQSAsyncClientBuilder
        .standard()
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
          queueEndpoint,
          Region.AP_SOUTHEAST_1.id()))
        .build();
    }
    else {
      return AmazonSQSAsyncClientBuilder
        .standard()
        .withRegion(Region.AP_SOUTHEAST_1.id())
        .build();
    }
  }

  @Bean
  public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
   return new QueueMessagingTemplate(amazonSQSAsync);
  }
}
Örnek
Şöyle yaparız
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
@Configuration
public class SqsConfig {

  @Value("${cloud.aws.region.static}")
  private String region;   // region name

  @Value("${cloud.aws.credentials.access-key}")
  private String awsAccessKey;   // aws access key.

  @Value("${cloud.aws.credentials.secret-key}")
  private String awsSecretKey; // aws secret key

  @Bean
  public QueueMessagingTemplate queueMessagingTemplate() {
    return new QueueMessagingTemplate(amazonSQSAsync());
  }

  @Bean
  public AmazonSQSAsync amazonSQSAsync() {
    return AmazonSQSAsyncClientBuilder
      .standard()
      .withRegion(region) //region name wich aquired by application.yml file
      .withCredentials(new AWSStaticCredentialsProvider(
       new BasicAWSCredentials(awsAccessKey, awsSecretKey))) // access key and secret key
      .build();
  }
}
convertAndSend metodu
Belirtilen kuyruk ismine Java nesnesini gönderir

Örnek
Şöyle yaparız
Slf4j
@Service
public class QueueServiceImpl implements QueueService {
  private final QueueMessagingTemplate queueMessagingTemplate;
  private final String taskQueue;

  public QueueServiceImpl(QueueMessagingTemplate queueMessagingTemplate,
                         @Value("${queue.task}") String taskQueue) {
    this.queueMessagingTemplate = queueMessagingTemplate;
    this.taskQueue = taskQueue;
  }

  @Override
  public void publishTask(Task task) {

   task.setId(UUID.randomUUID().toString());
   log.info("Publishing task to queue {}", task);

   queueMessagingTemplate.convertAndSend(taskQueue, task);
  }
}
Örnek
Şöyle yaparız. @SqsListener ile sadece String mesaj okuyoruz
import com.aws.sqs.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;

@RestController
@RequestMapping(value = "/sqs")
public class SqsController {
  //queue name
  private static final String QUEUE = "my-queue";

  @Autowired
  private QueueMessagingTemplate queueMessagingTemplate;

  @PostMapping(value = "/send")
  @ResponseStatus(code = HttpStatus.CREATED)
  public void sendMessageToSqs(@RequestBody final Message message) {
    queueMessagingTemplate.convertAndSend(QUEUE, message); //send to queue
  }

  // @SqsListener listens to the message from the queue.
  @SqsListener(value = QUEUE, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void getMessageFromSqs(Message message, @Header("MessageId") String messageId) {
    logger.info("Received message= {} with messageId= {}", message.toString(), messageId);
  }
}
Örnek
Açıklaması şöyle
... we need to let SQS know how it could convert the receiving strings to object. So we add a bean of QueueMessageHandlerFactory.

Şöyle yaparız. @SqsListener Task tipinden Java nesnesi okuyoruz
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory(
  ObjectMapper objectMapper, 
  AmazonSQSAsync amazonSQSAsync) {
    
    MappingJackson2MessageConverter messageConverter 
      = new MappingJackson2MessageConverter();
    messageConverter.setObjectMapper(objectMapper);
    messageConverter.setStrictContentTypeMatch(false);

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(amazonSQSAsync);

    List<HandlerMethodArgumentResolver> resolvers = List.of(
      new PayloadMethodArgumentResolver(messageConverter,null, false));
   factory.setArgumentResolvers(resolvers);

   return factory;
 }
}

@Service
@Slf4j
public class TaskProcessorImpl implements TaskProcessor {
  @Override
  @SqsListener(value = "dev-task.std",deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
  @MessageMapping
  public void process(Task task) {
    log.info("Processing task {}", task);
  }
}

Hiç yorum yok:

Yorum Gönder