Spring Integration Inbound-Channel-Adapter для чтения больших файлов построчно
В настоящее время я использую Spring Integration 4.1.0 с Spring 4.1.2. У меня есть требование, чтобы иметь возможность читать файл построчно и использовать каждую строку, прочитанную как сообщение. В принципе, я хочу разрешить "воспроизведение" для одного из наших источников сообщений, но сообщения сохраняются не в отдельных файлах, а в одном файле. У меня нет требований к транзакциям для этого варианта использования. Мои требования аналогичны этой публикации, за исключением файла, находящегося на том же сервере, на котором работает JVM: весной интеграции - прочитать удаленный файл построчно
На мой взгляд, у меня есть следующие варианты:
1. Используйте int-file:inbound-channel-adapter
для чтения файла, а затем "разбейте" этот файл так, чтобы 1 сообщение Теперь стало несколькими сообщениями.
Пример файла конфигурации:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
Проблема заключается в том, что файл очень большой, и при использовании вышеупомянутого метода весь файл сначала считывается в память, а затем разбивается, и JVM исчерпывает пространство кучи. На самом деле необходимые шаги: прочитайте строку и преобразовать строку в сообщение, отправить сообщение, удалить сообщение из памяти, повторить.
-
Используйте
int-file:tail-inbound-channel-adapter
сend="false"
(что в основном указывает на чтение с начала файла). Запуск и остановка этого адаптера по мере необходимости для каждого файла (изменение имени файла перед каждым запуском). Пример файла конфигурации:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
-
Вызовите Spring Integration в Spring Batch и используйте
ItemReader
для обработки файла. Конечно, позволяет более мелкозернистый контроль над всем процессом, но справедливый объем работы по настройке, что с репозиторием заданий и так далее (и я не забочусь об истории заданий, поэтому я бы либо сказал заданию не регистрировать статус и/или использовать in-memoryMapJobRepository
).
4. Создайте свой собственный FileLineByLineInboundChannelAdapter
путем расширения MessageProducerSupport
.
Большая часть кода может быть заимствована из ApacheCommonsFileTailingMessageProducer
(Также см. http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). ниже приведен пример, но требуется некоторая работа, чтобы поместить чтение в его собственный Thread
так что я чту команду stop()
, пока читаю строку за строкой.
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
Мне не кажется, что мой прецедент находится вне сферы типичных вещей, которые люди могли бы сделать, поэтому я удивлен, что не могу найти решение для него из коробки. Я искал довольно много, однако, и посмотрел на многие примеры и, к сожалению, до сих пор не нашел что-то, что соответствует моим потребностям.
Я предполагаю, что, возможно, я упустил что-то очевидное, что уже предлагает фреймворк (хотя, возможно, это попадает в размытую линию между Spring Integraton и Spring Batch). Может ли кто-нибудь дать мне знать, если я полностью не согласен со своими идеями или если есть простое решение, которое я пропустил, или предложить альтернативные предложения?
2 ответа:
Весеннее Интегрирование 4.X есть хорошо новая функция, с помощью итератора сообщения:
Ссылка На Весеннее Интегрирование
Начиная с версии 4.1, AbstractMessageSplitter поддерживает тип итератора для разбиения значения.
Это позволяет отправлять итератор в виде сообщений, не считывая весь файл в память.
Вот простой пример весеннего контекста, разбивающего CSV-файлы на одно сообщение. строка:
<int-file:inbound-channel-adapter directory="${inputFileDirectory:/tmp}" channel="inputFiles"/> <int:channel id="inputFiles"> <int:dispatcher task-executor="executor"/> </int:channel> <int:splitter input-channel="inputFiles" output-channel="output"> <bean class="FileSplitter" p:commentPrefix="${commentPrefix:#}" /> </int:splitter> <task:executor id="executor" pool-size="${poolSize:8}" queue-capacity="${aueueCapacity:0}" rejection-policy="CALLER_RUNS" /> <int:channel id="output"/>
И это реализация splitter :
Обратите внимание, что объект итератора возвращается из метода обработчика splitMessage ().import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.splitter.AbstractMessageSplitter; import org.springframework.integration.transformer.MessageTransformationException; import org.springframework.messaging.Message; import org.springframework.util.Assert; public class FileSplitter extends AbstractMessageSplitter { private static final Logger log = LoggerFactory.getLogger(FileSplitter.class); private String commentPrefix = "#"; public Object splitMessage(Message<?> message) { if(log.isDebugEnabled()) { log.debug(message.toString()); } try { Object payload = message.getPayload(); Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); return new BufferedReaderFileIterator((File) payload); } catch (IOException e) { String msg = "Unable to transform file: " + e.getMessage(); log.error(msg); throw new MessageTransformationException(msg, e); } } public void setCommentPrefix(String commentPrefix) { this.commentPrefix = commentPrefix; } public class BufferedReaderFileIterator implements Iterator<String> { private File file; private BufferedReader bufferedReader; private String line; public BufferedReaderFileIterator(File file) throws IOException { this.file = file; this.bufferedReader = new BufferedReader(new FileReader(file)); readNextLine(); } @Override public boolean hasNext() { return line != null; } @Override public String next() { try { String res = this.line; readNextLine(); return res; } catch (IOException e) { log.error("Error reading file", e); throw new RuntimeException(e); } } void readNextLine() throws IOException { do { line = bufferedReader.readLine(); } while(line != null && line.trim().startsWith(commentPrefix)); if(log.isTraceEnabled()) { log.trace("Read next line: {}", line); } if(line == null) { close(); } } void close() throws IOException { bufferedReader.close(); file.delete(); } @Override public void remove() { throw new UnsupportedOperationException(); } } }
У меня также есть это , я также копирую файлы в другую папку и читаю данные из файла также
FileCopyApplicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder /> <file:inbound-channel-adapter id="filesIn" directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles" auto-startup="true"> <int:poller id="poller" fixed-delay="500" /> </file:inbound-channel-adapter> <int:service-activator input-channel="filesIn" output-channel="filesOut" ref="handler" /> <file:outbound-channel-adapter id="filesOut" directory="E:/usmandata/logs/output/" /> <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" /> <bean id="onlyPropertyFiles" class="org.springframework.integration.file.config.FileListFilterFactoryBean" p:filenamePattern="*.log" /> </beans>
Файлообменник.java
package com.javarticles.spring.integration.file; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class FileHandler { public File handleFile(File input) throws IOException { // System.out.println("Copying file: " + input.getAbsolutePath()); RandomAccessFile file = new RandomAccessFile(input,"r"); FileChannel channel = file.getChannel(); //System.out.println("File size is: " + channel.size()); ByteBuffer buffer = ByteBuffer.allocate((int) channel.size()); channel.read(buffer); buffer.flip();//Restore buffer to position 0 to read it System.out.println("Reading content and printing ... "); for (int i = 0; i < channel.size(); i++) { System.out.print((char) buffer.get()); } channel.close(); file.close(); return input; } }
SpringIntegrationFileCopyExample.java
package com.javarticles.spring.integration.file; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringIntegrationFileCopyExample { public static void main(String[] args) throws InterruptedException, IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "fileCopyApplicationContext.xml"); } }