Что такое правильный способ управления транзакциями в Услуги RxJava?
Я недавно начал экспериментировать с RxJava и наткнулся на презентацию инженера Netflix, который предложил перенести наши бизнес-API на наблюдаемые API, например:
public interface VideoService {
Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic);
Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId);
Observable<VideoRating> getVideoRating(Integer videoId);
}
Однако я не нашел ни одного места, которое объясняло бы, как транзакционность должна управляться в этих сервисах. Сначала я просто аннотировал свою реализацию сервиса с помощью @Transactional
@Service
@Transactional
public class VideoServiceImpl implements VideoService{
@Autowired
private VideoBasicInfoRepository basicInfoRepo;
@Autowired
private VideoRatingRepository ratingRepo;
public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){
return Observable.create( s -> {
s.onNext(basicInfoRepo.save(videBasic));
});
}
Мы хотели бы, чтобы выполнение всего кода внутри Object.create
лямбды (s -> { // This code }
) происходило в торговая операция. однако , на самом деле происходит следующее:
- вызов
createVideoBasicInfo()
выполняется транзакционным способом, возвращая холодное наблюдаемое. -
save()
выполняется как атомарная транзакция.
Очевидно, что это имеет смысл, так как прокси Spring применяется к методам serviceImpl. Я думал о том, как сделать то, что я действительно ожидаю, например, начать программную транзакцию:
return Observable.create( s -> {
VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
return basicInfo;
});
s.onNext(savedBasic);
});
Является ли это рекомендуемым способом управления транзакциями, когда работа с реактивными API?
2 ответа:
Подписи метода Spring Data JpaRepository уже помечены @Transactional, поэтому, если вы используете только один, вам не нужно делать ничего особенного:
public interface PersonRepository extends JpaRepository<Person, Integer> { }
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonRepositoryTest { private PersonRepository personRepository; @Autowired public void setPersonRepository(PersonRepository PersonRepository) { this.personRepository = PersonRepository; } @Test public void testReactiveSavePerson() { Person person = new Person("Jane", "Doe"); assertNull(person.getId()); //null before save //save person Observable.create(s -> { s.onNext(personRepository.save(person)); }).subscribe(); //fetch from DB Person fetchedPerson = personRepository.findOne(person.getId()); //should not be null assertNotNull(fetchedPerson); //should equal assertEquals(person.getId(), fetchedPerson.getId()); assertEquals(person.getFirstName(), fetchedPerson.getFirstName()); } }
Если вам нужно объединить несколько репозиториев в одну транзакцию, вы можете использовать что-то вроде следующего класса:
@Component() public class ObservableTxFactory { public final <T> Observable<T> create(Observable.OnSubscribe<T> f) { return new ObservableTx<>(this, f); } @Transactional public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) { onSubscribe.call(subscriber); } private static class ObservableTx<T> extends Observable<T> { public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) { super(new OnSubscribeDecorator<>(observableTxFactory, f)); } } private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { private final ObservableTxFactory observableTxFactory; private final Observable.OnSubscribe<T> onSubscribe; OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) { this.onSubscribe = s; this.observableTxFactory = observableTxFactory; } @Override public void call(Subscriber<? super T> subscriber) { observableTxFactory.call(onSubscribe, subscriber); } } }
Фабричный боб также должен быть определен:
@Bean ObservableTxFactory observableTxFactory() { return new ObservableTxFactory(); }
Обслуживание:
@Service public class PersonService { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; public Observable<Person> createPerson(String firstName, String lastName) { return observableTxFactory.create(s -> { Person p = new Person(firstName, lastName); s.onNext(personRepository.save(p)); }); } }
Тест:
@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = {RepositoryConfiguration.class}) public class PersonServiceTest { @Autowired PersonRepository personRepository; @Autowired ObservableTxFactory observableTxFactory; @Test public void testPersonService() { final PersonService service = new PersonService(); service.personRepository = personRepository; service.observableTxFactory = observableTxFactory; final Observable<Person> personObservable = service.createPerson("John", "Doe"); personObservable.subscribe(); //fetch from DB final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false) .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe")) .findFirst() .get(); //should not be null assertNotNull(fetchedPerson); } }
Я хотел бы вернуться к замечательному ответу Джона Скаттергуда. Я обычно использую
Observable.fromCallable()
, поэтому я искал способ сделать это вместо реализацииObservable.OnSubscribe
, поэтому я адаптировал его технику, чтобы вы могли использовать ее, передавая вCallable
Класс Завода:
@Component public class ObservableTxFactory { public final <T> Observable.OnSubscribe<T> createFromCallable(Callable<? extends T> resultFactory) { return new OnSubscribeDecorator<>(this, resultFactory); } @SuppressWarnings("unchecked") @Transactional public <T> void call(Callable<? extends T> resultFactory, Subscriber subscriber) { final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<>(subscriber); subscriber.setProducer(singleDelayedProducer); try { singleDelayedProducer.setValue(resultFactory.call()); } catch (Throwable t) { Exceptions.throwOrReport(t, subscriber); } } private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> { private final ObservableTxFactory observableTxFactory; private final Callable<? extends T> resultFactory; OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, Callable<? extends T> resultFactory) { this.resultFactory = resultFactory; this.observableTxFactory = observableTxFactory; } @Override public void call(Subscriber<? super T> subscriber) { observableTxFactory.call(resultFactory, subscriber); } } }
Исходный Код:
Observable.fromCallable(() -> fooRepository.findOne(fooID));
Новый Код:
Observable.create(observableTxFactory.createFromCallable(() -> fooRepository.findOne(fooID)));
Убедитесь, что метод, который вы добавляете
@Transactional
, являетсяpublic
, иначе Spring AOP не сможет рекомендовать его