Giriş
Maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws-messaging</artifactId>
</dependency>
Gradle
implementation 'io.awspring.cloud:spring-cloud-starter-aws-messaging'
SQS Test
Örnek
version: "3"
services:
sqs:
image: roribio16/alpine-sqs
ports:
- '9334:9324'
- '9335:9325'
volumes:
- my-datavolume:/sqs-data
volumes:
my-datavolume:
Örnek
application.yml dosyasında şöyle
yaparızcloud:
aws:
region:
static: us-east-1 # Region where you create the queue
auto: false
credentials:
access-key: #access key
secret-key: #acess secret key
QueueMessagingTemplate Sınıfı
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
With spring-cloud-starter-aws-messaging we get QueueMessagingTemplate. It is an abstraction to easily publish/receive messages to queue. It also provides apis to configure various converters.
constructor - AmazonSQSAsync
Örnek
Şöyle
yaparız. Burada profile dev ise yerelde çalışan SQS'e bağlanıyor
@Configuration
public class SqsConfig {
private final int BATCH_SIZE = 10;
private final String queueEndpoint;
public SqsConfig(@Value("${queue.endpoint}") String queueEndpoint) {
this.queueEndpoint = queueEndpoint;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync(@Value("${spring.profiles.active}")
String profile) {
if(profile.equals("dev")){
return AmazonSQSAsyncClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
queueEndpoint,
Region.AP_SOUTHEAST_1.id()))
.build();
}
else {
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion(Region.AP_SOUTHEAST_1.id())
.build();
}
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}Örnek
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
@Configuration
public class SqsConfig {
@Value("${cloud.aws.region.static}")
private String region; // region name
@Value("${cloud.aws.credentials.access-key}")
private String awsAccessKey; // aws access key.
@Value("${cloud.aws.credentials.secret-key}")
private String awsSecretKey; // aws secret key
@Bean
public QueueMessagingTemplate queueMessagingTemplate() {
return new QueueMessagingTemplate(amazonSQSAsync());
}
@Bean
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion(region) //region name wich aquired by application.yml file
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials(awsAccessKey, awsSecretKey))) // access key and secret key
.build();
}
}
convertAndSend metodu
Belirtilen kuyruk ismine Java nesnesini gönderir
Örnek
Slf4j
@Service
public class QueueServiceImpl implements QueueService {
private final QueueMessagingTemplate queueMessagingTemplate;
private final String taskQueue;
public QueueServiceImpl(QueueMessagingTemplate queueMessagingTemplate,
@Value("${queue.task}") String taskQueue) {
this.queueMessagingTemplate = queueMessagingTemplate;
this.taskQueue = taskQueue;
}
@Override
public void publishTask(Task task) {
task.setId(UUID.randomUUID().toString());
log.info("Publishing task to queue {}", task);
queueMessagingTemplate.convertAndSend(taskQueue, task);
}
}Örnek
Şöyle
yaparız. @SqsListener ile sadece String mesaj okuyoruz
import com.aws.sqs.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
@RestController
@RequestMapping(value = "/sqs")
public class SqsController {
//queue name
private static final String QUEUE = "my-queue";
@Autowired
private QueueMessagingTemplate queueMessagingTemplate;
@PostMapping(value = "/send")
@ResponseStatus(code = HttpStatus.CREATED)
public void sendMessageToSqs(@RequestBody final Message message) {
queueMessagingTemplate.convertAndSend(QUEUE, message); //send to queue
}
// @SqsListener listens to the message from the queue.
@SqsListener(value = QUEUE, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void getMessageFromSqs(Message message, @Header("MessageId") String messageId) {
logger.info("Received message= {} with messageId= {}", message.toString(), messageId);
}
}
Örnek
Açıklaması şöyle
... we need to let SQS know how it could convert the receiving strings to object. So we add a bean of QueueMessageHandlerFactory.
Şöyle
yaparız. @SqsListener Task tipinden Java nesnesi okuyoruz
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory(
ObjectMapper objectMapper,
AmazonSQSAsync amazonSQSAsync) {
MappingJackson2MessageConverter messageConverter
= new MappingJackson2MessageConverter();
messageConverter.setObjectMapper(objectMapper);
messageConverter.setStrictContentTypeMatch(false);
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(amazonSQSAsync);
List<HandlerMethodArgumentResolver> resolvers = List.of(
new PayloadMethodArgumentResolver(messageConverter,null, false));
factory.setArgumentResolvers(resolvers);
return factory;
}
}
@Service
@Slf4j
public class TaskProcessorImpl implements TaskProcessor {
@Override
@SqsListener(value = "dev-task.std",deletionPolicy = SqsMessageDeletionPolicy.ALWAYS)
@MessageMapping
public void process(Task task) {
log.info("Processing task {}", task);
}
}