Что такое правильный способ управления транзакциями в Услуги RxJava?


Я недавно начал экспериментировать с RxJava и наткнулся на презентацию инженера Netflix, который предложил перенести наши бизнес-API на наблюдаемые API, например:

public interface VideoService {
    Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic);
    Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId);
    Observable<VideoRating> getVideoRating(Integer videoId);
}
Однако я не нашел ни одного места, которое объясняло бы, как транзакционность должна управляться в этих сервисах. Сначала я просто аннотировал свою реализацию сервиса с помощью @Transactional
@Service
@Transactional
public class VideoServiceImpl implements VideoService{

    @Autowired
    private VideoBasicInfoRepository basicInfoRepo;
    @Autowired
    private VideoRatingRepository ratingRepo;

    public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){
        return Observable.create( s -> {
            s.onNext(basicInfoRepo.save(videBasic));
        });
    }

Мы хотели бы, чтобы выполнение всего кода внутри Object.create лямбды (s -> { // This code }) происходило в торговая операция. однако , на самом деле происходит следующее:

  1. вызов createVideoBasicInfo() выполняется транзакционным способом, возвращая холодное наблюдаемое.
  2. save() выполняется как атомарная транзакция.

Очевидно, что это имеет смысл, так как прокси Spring применяется к методам serviceImpl. Я думал о том, как сделать то, что я действительно ожидаю, например, начать программную транзакцию:

return Observable.create( s -> {
    VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
        VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
        return basicInfo;
    });
    s.onNext(savedBasic);
});

Является ли это рекомендуемым способом управления транзакциями, когда работа с реактивными API?

2 8

2 ответа:

Подписи метода Spring Data JpaRepository уже помечены @Transactional, поэтому, если вы используете только один, вам не нужно делать ничего особенного:

public interface PersonRepository extends JpaRepository<Person, Integer> {
}

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonRepositoryTest {
    private PersonRepository personRepository;

    @Autowired
    public void setPersonRepository(PersonRepository PersonRepository) {
        this.personRepository = PersonRepository;
    }

    @Test
    public void testReactiveSavePerson() {
        Person person = new Person("Jane", "Doe");
        assertNull(person.getId()); //null before save

        //save person
        Observable.create(s -> {
            s.onNext(personRepository.save(person));
        }).subscribe();

        //fetch from DB
        Person fetchedPerson = personRepository.findOne(person.getId());

        //should not be null
        assertNotNull(fetchedPerson);

        //should equal
        assertEquals(person.getId(), fetchedPerson.getId());
        assertEquals(person.getFirstName(), fetchedPerson.getFirstName());
    }
}

Если вам нужно объединить несколько репозиториев в одну транзакцию, вы можете использовать что-то вроде следующего класса:

@Component()
public class ObservableTxFactory {
    public final <T> Observable<T> create(Observable.OnSubscribe<T> f) {
        return new ObservableTx<>(this, f);
    }

    @Transactional
    public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) {
        onSubscribe.call(subscriber);
    }

    private static class ObservableTx<T> extends Observable<T> {

        public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) {
            super(new OnSubscribeDecorator<>(observableTxFactory, f));
        }
    }

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {

        private final ObservableTxFactory observableTxFactory;
        private final Observable.OnSubscribe<T> onSubscribe;

        OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) {
            this.onSubscribe = s;
            this.observableTxFactory = observableTxFactory;
        }

        @Override
        public void call(Subscriber<? super T> subscriber) {
            observableTxFactory.call(onSubscribe, subscriber);
        }
    }
}

Фабричный боб также должен быть определен:

@Bean
ObservableTxFactory observableTxFactory() {
    return new ObservableTxFactory();
}

Обслуживание:

@Service
public class PersonService {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    public Observable<Person> createPerson(String firstName, String lastName) {
        return observableTxFactory.create(s -> {
            Person p = new Person(firstName, lastName);
            s.onNext(personRepository.save(p));
        });
    }
}

Тест:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonServiceTest {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    @Test
    public void testPersonService() {
        final PersonService service = new PersonService();
        service.personRepository = personRepository;
        service.observableTxFactory = observableTxFactory;

        final Observable<Person> personObservable = service.createPerson("John", "Doe");
        personObservable.subscribe();

        //fetch from DB
        final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false)
                .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe"))
                .findFirst()
                .get();

        //should not be null
        assertNotNull(fetchedPerson);
    }

}

Скриншот, показывающий прокси-сервер: Введите описание изображения здесь

Я хотел бы вернуться к замечательному ответу Джона Скаттергуда. Я обычно использую Observable.fromCallable(), поэтому я искал способ сделать это вместо реализации Observable.OnSubscribe, поэтому я адаптировал его технику, чтобы вы могли использовать ее, передавая в Callable

Класс Завода:

@Component
public class ObservableTxFactory {
    public final <T> Observable.OnSubscribe<T> createFromCallable(Callable<? extends T> resultFactory) {
        return new OnSubscribeDecorator<>(this, resultFactory);
    }

    @SuppressWarnings("unchecked")
    @Transactional
    public <T> void call(Callable<? extends T> resultFactory, Subscriber subscriber) {
        final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<>(subscriber);

        subscriber.setProducer(singleDelayedProducer);

        try {
            singleDelayedProducer.setValue(resultFactory.call());
        } catch (Throwable t) {
            Exceptions.throwOrReport(t, subscriber);
        }
    }

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {

        private final ObservableTxFactory observableTxFactory;
        private final Callable<? extends T> resultFactory;

        OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, Callable<? extends T> resultFactory) {
            this.resultFactory = resultFactory;
            this.observableTxFactory = observableTxFactory;
        }

        @Override
        public void call(Subscriber<? super T> subscriber) {
            observableTxFactory.call(resultFactory, subscriber);
        }
    }
}

Исходный Код:

Observable.fromCallable(() -> fooRepository.findOne(fooID));

Новый Код:

Observable.create(observableTxFactory.createFromCallable(() -> fooRepository.findOne(fooID)));

Убедитесь, что метод, который вы добавляете @Transactional , является public, иначе Spring AOP не сможет рекомендовать его