SQSListener с ThreadpoolExecutor


В приведенном ниже примере я устанавливаю максимальный и основной размер пула равным 1. Однако никакие сообщения не обрабатываются. Когда я включаю журнал отладки, я вижу , что сообщения извлекаются из SQS, но я предполагаю, что они не обрабатываются / удаляются. Однако когда я увеличиваю размер ядра и максимальный размер пула до 2, сообщения, кажется, обрабатываются.

EDIT

Я полагаю, что Spring, возможно, выделяет поток для приемника, который считывает данные из очереди, и поэтому он не может выделить поток к слушателю, который обрабатывает сообщение. Когда я увеличил corepoolsize до 2, я увидел, что сообщения считываются из очереди. Когда я добавил другой прослушиватель (для очереди мертвых писем), я столкнулся с той же проблемой - 2 потока не были достаточными, поскольку сообщения не обрабатывались. Когда я увеличил corepoolsize до 3, он начал обрабатывать сообщения. Я предполагаю , что в этом случае 1 поток был выделен для чтения сообщений из очереди и 2 слушателя были назначены 1 поток каждый.

@Configuration
public class SqsListenerConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "aws.configuration")
    public ClientConfiguration clientConfiguration() {
        return new ClientConfiguration();
    }


    @Bean
    @Primary
    public AWSCredentialsProvider awsCredentialsProvider() {

        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("credential");
        try {
            credentialsProvider.getCredentials();
            System.out.println(credentialsProvider.getCredentials().getAWSAccessKeyId());
            System.out.println(credentialsProvider.getCredentials().getAWSSecretKey());

        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                            "Please make sure that your credentials file is at the correct " +
                            "location (~/.aws/credentials), and is in valid format.",
                    e);
        }
        return credentialsProvider;
    }


    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().
                withCredentials(awsCredentialsProvider()).
                withClientConfiguration(clientConfiguration()).
                build();
    }


    @Bean
    @ConfigurationProperties(prefix = "aws.queue")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
        simpleMessageListenerContainer.setMessageHandler(queueMessageHandler());
        simpleMessageListenerContainer.setMaxNumberOfMessages(10);
        simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
        return simpleMessageListenerContainer;
    }


    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync());
        QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
        return queueMessageHandler;
    }


    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setThreadNamePrefix("oaoQueueExecutor");
        executor.initialize();
        return executor;
    }


    @Bean
    public QueueMessagingTemplate messagingTemplate(@Autowired AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }


}

Конфигурация Слушателя

    @SqsListener(value = "${oao.sqs.url}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void onMessage(String serviceData, @Header("MessageId") String messageId, @Header("ApproximateFirstReceiveTimestamp") String approximateFirstReceiveTimestamp) {

        System.out.println(" Data = " + serviceData + " MessageId = " + messageId);

        repository.execute(serviceData);
}
1 8

1 ответ:

Установив corePoolSize и maximumPoolSize одинаковыми, вы создадите fixed-size thread pool. Очень хорошее объяснение правил задокументировано здесь

Установка maxPoolSize неявно позволяет удалять задачи. Однако емкость очереди по умолчанию равна Integer.MAX_VALUE, что на практике означает бесконечность.

Следует обратить внимание на то, что ThreadPoolTaskExecutor использует ThreadPoolExecutor внизу, который имеет несколько необычный подход к организации очередей, описанный в docs :

Если corePoolSize или больше потоки запущены, исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.

Это означает, что maxPoolSize имеет значение только тогда, когда очередь заполнена, иначе число потоков никогда не будет расти дальше corePoolSize. Например, если мы отправляем задачи , которые никогда не завершаются в пул потоков:

  • первые corePoolSize представления запустят новый поток каждый;
  • после этого все заявки отправляются в очередь;
  • Если очередь конечна и ее емкость исчерпана, каждое представление запускает новую нить, вплоть до maxPoolSize;
  • когда пул и очередь заполнены, новые заявки отклоняются.

Очередь - чтение документов

Любые BlockingQueue могут использоваться для передачи и хранения представленных заданий. Использование этой очереди взаимодействует с размером пула:

  • если запущено меньше потоков corePoolSize, то исполнитель всегда предпочитает добавлять новый поток, а не стоять в очереди.
  • Если выполняется corePoolSize или несколько потоков, исполнитель всегда предпочитает ставить запрос в очередь, а не добавлять новый поток.
  • если запрос не может быть помещен в очередь, создается новый поток, если только это превысит максимальный размер пула, и в этом случае задача будет быть отвергнутым.

Unbounded queues. Используя неограниченной очередью (например, LinkedBlockingQueue без предопределенной емкости) вызовет новые задачи, которые будут поставлены в очередь в случаях, когда все corePoolSize нити заняты. Таким образом, будет создано не более corePoolSize потоков. (И еще: поэтому значение maximumPoolSize не имеет никакого эффекта.)

  1. если число потоков меньше, чем corePoolSize, создайте новый Поток для запуска новой задачи.
  2. если число потоков равно (или больше) corePoolSize, поставьте задачу в очередь.
  3. Если очередь заполнена, а количество потоков меньше, чем maxPoolSize, Создайте новый поток для выполнения задач.
  4. если очередь заполнена, и количество потоков больше, чем или равный maxPoolSize, отклонить задачу.