Транзакционный доступ к почтовому ящику POP3 с интеграцией Spring


Мы хотим использовать Spring Integration 2.1.1 для транзакционного считывания почты с почтового ящика POP3. Транзакционность заключается в том, что мы можем прочитать сообщение электронной почты, сохранить его в Oracle и удалить из почтового ящика. В случае отката транзакции мы хотим, чтобы письмо оставалось в почтовом ящике (т. е. не удалялось).

Наша проблема заключается в том, что, несмотря на то, что в транзакции (подробнее об этом через секунду) почтовый ящик открыт, почта извлекается, помечается для удаления и соединение закрыто независимо от остальной части транзакции. Это означает, что слишком поздно не удалять почту, если что-то не сработает позже (поскольку соединение закрыто и почтовый ящик перешел в состояние обновления, которое навсегда удаляет электронную почту).

Мы пытались сделать это, создав транзакционный опрос inbound-channel-adaptor:

<!-- This inbound channel adaptor interfaces to the sendmail POP3 queue -->
<int-mail:inbound-channel-adapter id="pop3PollingChannelAdaptor" 
                                  store-uri="pop3://myuser:myuser@100.100.100.100/inbox" 
                                  channel="receiveInboundEmailChannel" 
                                  should-delete-messages="true"              
                                  auto-startup="true" 
                                  java-mail-properties="javaMailProperties">
    <!-- Will poll every 20 seconds -->            
    <int:poller fixed-rate="10000" 
                max-messages-per-poll="1">
        <int:transactional transaction-manager="txManager"
                           isolation="DEFAULT"
                           propagation="REQUIRED"
                           read-only="false"
                           timeout="100000" />
    </int:poller>
</int-mail:inbound-channel-adapter>

Который помещает сообщения на входящий канал электронной почты:

Который в свою очередь прослушивается адаптером исходящего канала (конечная точка):

<int:outbound-channel-adapter channel="receiveInboundEmailChannel" 
                              ref="inboundEmailMessageEndpoint" 
                              method="processMessage" />

Реализация конечной точки в настоящее время сохраняет вещи действительно простыми. Мы не приближаемся к базе данных в данный момент, вместо этого мы просто создаем исключение, если хотим вызвать откат.

@MessageEndpoint
public class InboundEmailMessageEndpoint {
    @Transactional(propagation = Propagation.MANDATORY)
    public void processMessage(Message<MimeMessage> message) {

        MimeMessage mailMessage =  message.getPayload();

        // We throw exceptions here to cause a rollback if we need to during investigation...
    }
}

Просматривая все журналы, мы можем увидеть нашу проблему в контексте (обратите внимание, как POP3 "sayonara" до начала остальной нашей работы в конечной точке и, самое главное, до фиксации / отката окружающей транзакции):

15:32:25.256 INFO  [main][org.springframework.transaction.jta.JtaTransactionManager]     Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@3228a1
15:32:25.256 INFO  [main][org.springframework.transaction.jta.JtaTransactionManager] Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@10980e7
15:32:25.422 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.422 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.423 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT,timeout_100000
15:32:25.423 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.423 WARN  [task-scheduler-1][com.atomikos.icatch.imp.TransactionServiceImp] Attempt to create a transaction with a timeout that exceeds com.atomikos.icatch.max_timeout - truncating to: 300000
15:32:25.423 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: ACTIVE
15:32:25.425 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] TaskManager: initializing...
15:32:25.426 INFO  [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] THREADS: using JDK thread pooling...
15:32:25.428 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] THREADS: using executor class com.atomikos.icatch.imp.thread.Java15ExecutorFactory$Executor
15:32:25.428 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.Java15ExecutorFactory] (1.5) executing task: com.atomikos.timing.PooledAlarmTimer@18c4a7f
15:32:25.429 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.ThreadFactory] ThreadFactory: creating new thread: Atomikos:0
15:32:25.429 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.TransactionServiceImp] Creating composite transaction: 10.9.21.7.tm0000100022
15:32:25.433 INFO  [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] createCompositeTransaction ( 100000000 ): created new ROOT transaction with id 10.9.21.7.tm0000100022
DEBUG: JavaMail version 1.4.4
DEBUG: successfully loaded resource: /META-INF/javamail.default.providers
DEBUG: Tables of loaded providers
DEBUG: Providers Listed By Class Name: {com.sun.mail.smtp.SMTPSSLTransport=javax.mail.Provider[TRANSPORT,smtps,com.sun.mail.smtp.SMTPSSLTransport,Sun Microsystems, Inc], com.sun.mail.smtp.SMTPTransport=javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Sun Microsystems, Inc], com.sun.mail.imap.IMAPSSLStore=javax.mail.Provider[STORE,imaps,com.sun.mail.imap.IMAPSSLStore,Sun Microsystems, Inc], com.sun.mail.pop3.POP3SSLStore=javax.mail.Provider[STORE,pop3s,com.sun.mail.pop3.POP3SSLStore,Sun Microsystems, Inc], com.sun.mail.imap.IMAPStore=javax.mail.Provider[STORE,imap,com.sun.mail.imap.IMAPStore,Sun Microsystems, Inc], com.sun.mail.pop3.POP3Store=javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc]}
DEBUG: Providers Listed By Protocol: {imaps=javax.mail.Provider[STORE,imaps,com.sun.mail.imap.IMAPSSLStore,Sun Microsystems, Inc], imap=javax.mail.Provider[STORE,imap,com.sun.mail.imap.IMAPStore,Sun Microsystems, Inc], smtps=javax.mail.Provider[TRANSPORT,smtps,com.sun.mail.smtp.SMTPSSLTransport,Sun Microsystems, Inc], pop3=javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc], pop3s=javax.mail.Provider[STORE,pop3s,com.sun.mail.pop3.POP3SSLStore,Sun Microsystems, Inc], smtp=javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Sun Microsystems, Inc]}
DEBUG: successfully loaded resource: /META-INF/javamail.default.address.map
DEBUG: getProvider() returning javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc]
DEBUG POP3: mail.pop3.rsetbeforequit: false
DEBUG POP3: mail.pop3.disabletop: false
DEBUG POP3: mail.pop3.forgettopheaders: false
DEBUG POP3: mail.pop3.cachewriteto: false
DEBUG POP3: mail.pop3.filecache.enable: false
DEBUG POP3: mail.pop3.keepmessagecontent: false
DEBUG POP3: mail.pop3.starttls.enable: false
DEBUG POP3: mail.pop3.starttls.required: false
15:32:25.446 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] connecting to store [pop3://devmail9:*****@100.100.100.100/inbox]
DEBUG POP3: mail.pop3.apop.enable: false
DEBUG POP3: mail.pop3.disablecapa: false
DEBUG POP3: connecting to host "100.100.100.100", port 110, isSSL false
S: +OK POP3 xxxxx.xxxx.xxxx.xxx.xxx.uk v4.39 server ready
C: CAPA
S: -ERR Unknown command in AUTHORIZATION state
C: USER myuser
S: +OK User name accepted, password please
C: PASS myuser
S: +OK Mailbox open, 3 messages
15:32:25.564 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] opening folder [pop3://myuser:*****@100.100.100.100/inbox]
C: STAT
S: +OK 3 5956
15:32:25.569 INFO  [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] attempting to receive mail from folder [inbox]
C: NOOP
S: +OK No-op to you too!
15:32:25.591 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] found 1 new messages
C: TOP 1 0
S: +OK Top of message follows
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by     xxxxx.xxxx.xxxx.xxx.xxx.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for C:    LIST 1

....

S: +OK 1 1996
15:32:25.604 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] Recieved 1 messages
15:32:26.097 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] USER flags are not supported by this mail server. Flagging message with system flag
DEBUG POP3: streaming msg 1
C: RETR 1
S: +OK 1996 octets
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by GLA610.crown.copfs.gsi.gov.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for <myuser@xxxxx.xxxx.xxxx.xxx.xxx.uk>; Thu, 7 Jun 2012 15:30:53 +0100

....

DEBUG POP3: streaming msg 1
C: RETR 1
S: +OK 1996 octets
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by xxxxx.xxxx.xxxx.xxx.xxx.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for <myuser@xxxxx.xxxx.xxxx.xxx.xxx.uk>; Thu, 7 Jun 2012 15:30:53 +0100

....

.
C: NOOP
S: +OK No-op to you too!
C: DELE 1
S: +OK Message deleted
C: QUIT
S: +OK Sayonara
15:32:26.672 DEBUG [task-scheduler-1][org.springframework.integration.mail.MailReceivingMessageSource] received mail message [javax.mail.internet.MimeMessage@ef4504]
15:32:26.696 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.697 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.697 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Participating in existing transaction
15:32:26.716 INFO  [task-scheduler-1][xx.xxx.xxx.xxx.channel.email.InboundEmailMessageEndpoint] In InboundEmailMessageEndpoint.  processing MailMessage (Subject): 7777777777777777777
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Initiating transaction commit
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction()  returning instance with id 10.9.21.7.tm0000100022
15:32:26.736 INFO  [task-scheduler-1][com.atomikos.icatch.imp.CompositeTransactionImp] commit() done (by application) of transaction 10.9.21.7.tm0000100022
15:32:26.736 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: COMMITTING
15:32:26.740 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: TERMINATED
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : stopping timer...
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : disposing statehandler TERMINATED...
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : disposed.
2 2

2 ответа:

Почтовые серверы не поддерживают транзакции. Вам нужно будет реализовать свой собственный транзакционный ресурс, чтобы обернуть почтовый сервер-журнал транзакций / намерений, восстановление после сбоя и т. д.

POP3 является транзакционным, но не в смысле транзакций базы данных и очереди сообщений. Следовательно, он не может должным образом участвовать в сделке, как мы требуем.

Тем не менее, мы зарегистрировали запрос функции с помощью Spring integration folks для добавления псевдотранзакционной поддержки по линиям шаблона "Best Efforts 1PC" с POP3 "commit" в качестве последнего шага. На данный момент это выглядит так, как если бы оно вошло в выпуск 2.2.0.