Spring Integration Inbound-Channel-Adapter для чтения больших файлов построчно


В настоящее время я использую Spring Integration 4.1.0 с Spring 4.1.2. У меня есть требование, чтобы иметь возможность читать файл построчно и использовать каждую строку, прочитанную как сообщение. В принципе, я хочу разрешить "воспроизведение" для одного из наших источников сообщений, но сообщения сохраняются не в отдельных файлах, а в одном файле. У меня нет требований к транзакциям для этого варианта использования. Мои требования аналогичны этой публикации, за исключением файла, находящегося на том же сервере, на котором работает JVM: весной интеграции - прочитать удаленный файл построчно

На мой взгляд, у меня есть следующие варианты:

1. Используйте int-file:inbound-channel-adapter для чтения файла, а затем "разбейте" этот файл так, чтобы 1 сообщение Теперь стало несколькими сообщениями. Пример файла конфигурации:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>
Проблема заключается в том, что файл очень большой, и при использовании вышеупомянутого метода весь файл сначала считывается в память, а затем разбивается, и JVM исчерпывает пространство кучи. На самом деле необходимые шаги: прочитайте строку и преобразовать строку в сообщение, отправить сообщение, удалить сообщение из памяти, повторить.
  1. Используйте int-file:tail-inbound-channel-adapter с end="false" (что в основном указывает на чтение с начала файла). Запуск и остановка этого адаптера по мере необходимости для каждого файла (изменение имени файла перед каждым запуском). Пример файла конфигурации:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. Вызовите Spring Integration в Spring Batch и используйте ItemReader для обработки файла. Конечно, позволяет более мелкозернистый контроль над всем процессом, но справедливый объем работы по настройке, что с репозиторием заданий и так далее (и я не забочусь об истории заданий, поэтому я бы либо сказал заданию не регистрировать статус и/или использовать in-memory MapJobRepository).

4. Создайте свой собственный FileLineByLineInboundChannelAdapter путем расширения MessageProducerSupport. Большая часть кода может быть заимствована из ApacheCommonsFileTailingMessageProducer (Также см. http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). ниже приведен пример, но требуется некоторая работа, чтобы поместить чтение в его собственный Thread так что я чту команду stop(), пока читаю строку за строкой.

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }
Мне не кажется, что мой прецедент находится вне сферы типичных вещей, которые люди могли бы сделать, поэтому я удивлен, что не могу найти решение для него из коробки. Я искал довольно много, однако, и посмотрел на многие примеры и, к сожалению, до сих пор не нашел что-то, что соответствует моим потребностям.

Я предполагаю, что, возможно, я упустил что-то очевидное, что уже предлагает фреймворк (хотя, возможно, это попадает в размытую линию между Spring Integraton и Spring Batch). Может ли кто-нибудь дать мне знать, если я полностью не согласен со своими идеями или если есть простое решение, которое я пропустил, или предложить альтернативные предложения?

2 3

2 ответа:

Весеннее Интегрирование 4.X есть хорошо новая функция, с помощью итератора сообщения:

Ссылка На Весеннее Интегрирование

Начиная с версии 4.1, AbstractMessageSplitter поддерживает тип итератора для разбиения значения.

Это позволяет отправлять итератор в виде сообщений, не считывая весь файл в память.

Вот простой пример весеннего контекста, разбивающего CSV-файлы на одно сообщение. строка:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

И это реализация splitter :

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}
Обратите внимание, что объект итератора возвращается из метода обработчика splitMessage ().

У меня также есть это , я также копирую файлы в другую папку и читаю данные из файла также

FileCopyApplicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder />

    <file:inbound-channel-adapter id="filesIn"
        directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
        auto-startup="true">
        <int:poller id="poller" fixed-delay="500" />
    </file:inbound-channel-adapter>



    <int:service-activator input-channel="filesIn"
        output-channel="filesOut" ref="handler" />

    <file:outbound-channel-adapter id="filesOut"
        directory="E:/usmandata/logs/output/" />




    <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
    <bean id="onlyPropertyFiles"
        class="org.springframework.integration.file.config.FileListFilterFactoryBean"
        p:filenamePattern="*.log" />
</beans>

Файлообменник.java

package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileHandler {
    public File handleFile(File input) throws IOException {
       // System.out.println("Copying file: " + input.getAbsolutePath());


        RandomAccessFile file = new RandomAccessFile(input,"r");

        FileChannel channel = file.getChannel();

        //System.out.println("File size is: " + channel.size());

        ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());

        channel.read(buffer);

        buffer.flip();//Restore buffer to position 0 to read it

        System.out.println("Reading content and printing ... ");

        for (int i = 0; i < channel.size(); i++) {
            System.out.print((char) buffer.get());
        }

        channel.close();
        file.close();
        return input;
    }
}

SpringIntegrationFileCopyExample.java

package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringIntegrationFileCopyExample {

    public static void main(String[] args) throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "fileCopyApplicationContext.xml");

    }

}