SQSListener с ThreadpoolExecutor
В приведенном ниже примере я устанавливаю максимальный и основной размер пула равным 1. Однако никакие сообщения не обрабатываются. Когда я включаю журнал отладки, я вижу , что сообщения извлекаются из SQS, но я предполагаю, что они не обрабатываются / удаляются. Однако когда я увеличиваю размер ядра и максимальный размер пула до 2, сообщения, кажется, обрабатываются.
EDIT
Я полагаю, что Spring, возможно, выделяет поток для приемника, который считывает данные из очереди, и поэтому он не может выделить поток к слушателю, который обрабатывает сообщение. Когда я увеличил corepoolsize до 2, я увидел, что сообщения считываются из очереди. Когда я добавил другой прослушиватель (для очереди мертвых писем), я столкнулся с той же проблемой - 2 потока не были достаточными, поскольку сообщения не обрабатывались. Когда я увеличил corepoolsize до 3, он начал обрабатывать сообщения. Я предполагаю , что в этом случае 1 поток был выделен для чтения сообщений из очереди и 2 слушателя были назначены 1 поток каждый.
@Configuration
public class SqsListenerConfiguration {
@Bean
@ConfigurationProperties(prefix = "aws.configuration")
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration();
}
@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
try {
credentialsProvider.getCredentials();
System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
return credentialsProvider;
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard().
withCredentials(awsCredentialsProvider()).
withClientConfiguration(clientConfiguration()).
build();
}
@Bean
@ConfigurationProperties(prefix = "aws.queue")
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
simpleMessageListenerContainer.setMaxNumberOfMessages(10);
simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
return simpleMessageListenerContainer;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
return queueMessageHandler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(1);
executor.setThreadNamePrefix("oaoQueueExecutor");
executor.initialize();
return executor;
}
@Bean
public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
}
Конфигурация Слушателя
@SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {
System.out.println(" Data = " + serviceData + " MessageId = " + messageId);
repository.execute(serviceData);
}
1 ответ:
Установив
corePoolSize
иmaximumPoolSize
одинаковыми, вы создадитеfixed-size thread pool
. Очень хорошее объяснение правил задокументировано здесьУстановка
maxPoolSize
неявно позволяет удалять задачи. Однако емкость очереди по умолчанию равнаInteger.MAX_VALUE
, что на практике означает бесконечность.Следует обратить внимание на то, что
ThreadPoolTaskExecutor
используетThreadPoolExecutor
внизу, который имеет несколько необычный подход к организации очередей, описанный в docs :Если
corePoolSize
или больше потоки запущены, исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.Это означает, что
maxPoolSize
имеет значение только тогда, когда очередь заполнена, иначе число потоков никогда не будет расти дальшеcorePoolSize
. Например, если мы отправляем задачи , которые никогда не завершаются в пул потоков:
- первые
corePoolSize
представления запустят новый поток каждый;- после этого все заявки отправляются в очередь;
- Если очередь конечна и ее емкость исчерпана, каждое представление запускает новую нить, вплоть до
maxPoolSize
;- когда пул и очередь заполнены, новые заявки отклоняются.
Любые
BlockingQueue
могут использоваться для передачи и хранения представленных заданий. Использование этой очереди взаимодействует с размером пула:
- если запущено меньше потоков corePoolSize, то исполнитель всегда предпочитает добавлять новый поток, а не стоять в очереди.
- Если выполняется corePoolSize или несколько потоков, исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
- если запрос не может быть помещен в очередь, создается новый поток, если только это превысит максимальный размер пула, и в этом случае задача будет быть отвергнутым.
Unbounded queues
. Используя неограниченной очередью (например,LinkedBlockingQueue
без предопределенной емкости) вызовет новые задачи, которые будут поставлены в очередь в случаях, когда все corePoolSize нити заняты. Таким образом, будет создано не болееcorePoolSize
потоков. (И еще: поэтому значениеmaximumPoolSize
не имеет никакого эффекта.)
- если число потоков меньше, чем
corePoolSize
, создайте новый Поток для запуска новой задачи.- если число потоков равно (или больше)
corePoolSize
, поставьте задачу в очередь.- Если очередь заполнена, а количество потоков меньше, чем
maxPoolSize
, Создайте новый поток для выполнения задач.- если очередь заполнена, и количество потоков больше, чем или равный
maxPoolSize
, отклонить задачу.