отметьте чтение данных как" обработка " флагом столбца таблицы, а затем восстановите в конце
Ниже приведена соответствующая часть кода для считывателя, процессора, писателя и шага для пакетного задания, которое я создаю.
У меня есть требование обновить столбец флага в таблице, из которой данные считываются ( исходная таблица), чтобы отметить, что эти данные обрабатываются этим заданием, чтобы другие приложения не забирали эти данные. Затем, как только обработка прочитанных записей завершена, мне нужно восстановить этот столбец в исходное значение, чтобы другие приложения тоже могли работать с этими записями.
Наверное, слушатель-это подход, который нужно принять ( ItemReadListener ? ). Reader listener кажется подходящим только для первого шага (т. е. для обновления столбца флага), но не для восстановления в конце блока. Задача, по-видимому, состоит в том, чтобы сделать данные для чтения доступными в конце процессора.
Может ли кто-нибудь предложить возможные подходы?
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {
return stepBuilderFactory.get("step1")
.<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(simpleAsyntaskExecutor)
.throttleLimit(Constants.THROTTLE_LIMIT)
.build();
}
@Bean
public ItemReader<RemittanceVO> reader() {
JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
reader.setDataSource(dataSource);
reader.setRowMapper(new RemittanceRowMapper());
reader.setQueryProvider(queryProvider);
reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
return reader;
}
@Bean
public ItemProcessor<RemittanceVO, RemittanceClaimVO> processor() {
return new MatchClaimProcessor();
}
@Bean
public ItemWriter<RemittanceClaimVO> writer(DataSource dataSource) {
return new MatchedClaimWriter();
}
Я начал с весенней партии несколько дней назад, поэтому не знаком со всеми представленными моделями и шаблонами.
1 ответ:
Во-первых, небольшая подсказка об использовании asyncTaskExecutor: вы должны синхронизировать считыватель, иначе вы столкнетесь с проблемами параллелизма. Для этого можно использовать SynchronizedItemStreamReader:
@Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer, ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) { return stepBuilderFactory.get("step1") .<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE) .reader(syncReader) .processor(processor) .writer(writer) .taskExecutor(simpleAsyntaskExecutor) .throttleLimit(Constants.THROTTLE_LIMIT) .build(); } @Bean public ItemReader<RemittanceVO> syncReader() { SynchronizedItemStreamReader<RemittanceVO> syncReader = new SynchronizedItemStreamReader<>(); syncReader.setDelegate(reader()); return syncReader; } @Bean public ItemReader<RemittanceVO> reader() { JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>(); reader.setDataSource(dataSource); reader.setRowMapper(new RemittanceRowMapper()); reader.setQueryProvider(queryProvider); reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE); return reader; }
Во-вторых, возможный подход к вашему реальному вопросу:
Я бы использовал простой тасклет, чтобы" пометить " записи, которые вы хотите обработать. Вы можете сделать это с помощью одной простой инструкции обновления, так как вы знаете свои критерии выбора. Таким образом, вам нужно только один звонок и поэтому только одна сделка.
После этого я бы реализовал обычный шаг с reader, processor и writer. Читатель должен читать только отмеченные записи, что делает ваше предложение select также очень простым.
Чтобы восстановить флаг, вы можете сделать это в третьем шаге, который реализован как tasklet и использует соответствующий оператор UPDATE (как и первый шаг). Чтобы гарантировать, что флаг будет восстановлен в случае исключения, просто настройте свой jobflow соответствующим образом, так что что Шаг 3 выполняется, даже если Шаг 2 терпит неудачу (- >смотрите мой ответ на этот вопрос Spring Batch Java Config: пропустить шаг при исключении и перейти к следующим шагам )
Конечно, вы также можете восстановить флаг при написании чанка, если используете compositeItemWriter. Однако вам нужна стратегия, как восстановить флаг в случае исключения на Шаге 2.
ИМО, использование слушателя не является хорошей идеей, так как обработка транзакций происходит по-другому.