отметьте чтение данных как" обработка " флагом столбца таблицы, а затем восстановите в конце


Ниже приведена соответствующая часть кода для считывателя, процессора, писателя и шага для пакетного задания, которое я создаю.

У меня есть требование обновить столбец флага в таблице, из которой данные считываются ( исходная таблица), чтобы отметить, что эти данные обрабатываются этим заданием, чтобы другие приложения не забирали эти данные. Затем, как только обработка прочитанных записей завершена, мне нужно восстановить этот столбец в исходное значение, чтобы другие приложения тоже могли работать с этими записями.

Наверное, слушатель-это подход, который нужно принять ( ItemReadListener ? ). Reader listener кажется подходящим только для первого шага (т. е. для обновления столбца флага), но не для восстановления в конце блока. Задача, по-видимому, состоит в том, чтобы сделать данные для чтения доступными в конце процессора.

Может ли кто-нибудь предложить возможные подходы?

@Bean
    public Step step1(StepBuilderFactory stepBuilderFactory,
            ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
            ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {

        return stepBuilderFactory.get("step1")
                .<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .taskExecutor(simpleAsyntaskExecutor)
                .throttleLimit(Constants.THROTTLE_LIMIT)
                .build();
    }

@Bean
    public ItemReader<RemittanceVO> reader() {
        JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
        reader.setDataSource(dataSource);
        reader.setRowMapper(new RemittanceRowMapper());
        reader.setQueryProvider(queryProvider);
        reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
        return reader;
    }



@Bean
 public ItemProcessor<RemittanceVO, RemittanceClaimVO> processor() {
            return new MatchClaimProcessor();
        }

@Bean
        public ItemWriter<RemittanceClaimVO> writer(DataSource dataSource) {
            return new MatchedClaimWriter();
        }

Я начал с весенней партии несколько дней назад, поэтому не знаком со всеми представленными моделями и шаблонами.

1 2

1 ответ:

Во-первых, небольшая подсказка об использовании asyncTaskExecutor: вы должны синхронизировать считыватель, иначе вы столкнетесь с проблемами параллелизма. Для этого можно использовать SynchronizedItemStreamReader:

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
        ItemReader<RemittanceVO> reader, ItemWriter<RemittanceClaimVO> writer,
        ItemProcessor<RemittanceVO, RemittanceClaimVO> processor) {

    return stepBuilderFactory.get("step1")
            .<RemittanceVO, RemittanceClaimVO> chunk(Constants.SPRING_BATCH_CHUNK_SIZE)
            .reader(syncReader)
            .processor(processor)
            .writer(writer)
            .taskExecutor(simpleAsyntaskExecutor)
            .throttleLimit(Constants.THROTTLE_LIMIT)
            .build();
}


@Bean
public ItemReader<RemittanceVO> syncReader() {
    SynchronizedItemStreamReader<RemittanceVO> syncReader = new SynchronizedItemStreamReader<>();

    syncReader.setDelegate(reader());

    return syncReader;
}


@Bean
public ItemReader<RemittanceVO> reader() {
    JdbcPagingItemReader<RemittanceVO> reader = new JdbcPagingItemReader<RemittanceVO>();
    reader.setDataSource(dataSource);
    reader.setRowMapper(new RemittanceRowMapper());
    reader.setQueryProvider(queryProvider);
    reader.setPageSize(Constants.SPRING_BATCH_READER_PAGE_SIZE);
    return reader;
}

Во-вторых, возможный подход к вашему реальному вопросу:

Я бы использовал простой тасклет, чтобы" пометить " записи, которые вы хотите обработать. Вы можете сделать это с помощью одной простой инструкции обновления, так как вы знаете свои критерии выбора. Таким образом, вам нужно только один звонок и поэтому только одна сделка.

После этого я бы реализовал обычный шаг с reader, processor и writer. Читатель должен читать только отмеченные записи, что делает ваше предложение select также очень простым.

Чтобы восстановить флаг, вы можете сделать это в третьем шаге, который реализован как tasklet и использует соответствующий оператор UPDATE (как и первый шаг). Чтобы гарантировать, что флаг будет восстановлен в случае исключения, просто настройте свой jobflow соответствующим образом, так что что Шаг 3 выполняется, даже если Шаг 2 терпит неудачу (- >смотрите мой ответ на этот вопрос Spring Batch Java Config: пропустить шаг при исключении и перейти к следующим шагам )

Конечно, вы также можете восстановить флаг при написании чанка, если используете compositeItemWriter. Однако вам нужна стратегия, как восстановить флаг в случае исключения на Шаге 2.

ИМО, использование слушателя не является хорошей идеей, так как обработка транзакций происходит по-другому.