Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,57 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;

/** Transaction manager which uses spring-tx and Hibernate. */
@Slf4j
@Service
public class SpringTransactionManager implements ThreadLocalContextTransactionManager {

private final SpringTransaction transactionInstance = new SpringTransaction();
private final PlatformTransactionManager platformTransactionManager;
private final DataSource dataSource;

@Autowired
protected SpringTransactionManager(DataSource dataSource) {
public SpringTransactionManager(
PlatformTransactionManager platformTransactionManager, DataSource dataSource) {
this.platformTransactionManager = platformTransactionManager;
this.dataSource = dataSource;
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void inTransaction(Runnable runnable) {
uncheck(() -> inTransactionReturnsThrows(ThrowingTransactionalSupplier.fromRunnable(runnable)));
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void inTransaction(TransactionalWork work) {
uncheck(() -> inTransactionReturnsThrows(ThrowingTransactionalSupplier.fromWork(work)));
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public <T> T inTransactionReturns(TransactionalSupplier<T> supplier) {
return uncheckedly(
() -> inTransactionReturnsThrows(ThrowingTransactionalSupplier.fromSupplier(supplier)));
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public <E extends Exception> void inTransactionThrows(ThrowingTransactionalWork<E> work)
throws E {
inTransactionReturnsThrows(ThrowingTransactionalSupplier.fromWork(work));
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public <T, E extends Exception> T inTransactionReturnsThrows(
ThrowingTransactionalSupplier<T, E> work) throws E {
return work.doWork(transactionInstance);
TransactionTemplate transactionTemplate = new TransactionTemplate(platformTransactionManager);
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionTemplate.execute(
status -> uncheckedly(() -> work.doWork(transactionInstance)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.gruelbox.transactionoutbox.spring.example.multipledatasources;

import static org.springframework.http.HttpStatus.NOT_FOUND;

import com.gruelbox.transactionoutbox.TransactionOutbox;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.Computer;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.ComputerExternalQueueService;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.ComputerRepository;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.employee.Employee;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.employee.EmployeeExternalQueueService;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.employee.EmployeeRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

@SuppressWarnings("unused")
@RestController
public class EventuallyConsistentController {

@Autowired
private ComputerRepository computerRepository;

@Autowired
private TransactionOutbox computerTransactionOutbox;

@Autowired
private EmployeeRepository employeeRepository;

@Autowired
private TransactionOutbox employeeTransactionOutbox;

@SuppressWarnings("SameReturnValue")
@PostMapping("/computer")
@Transactional(transactionManager = "computerTransactionManager")
public void createComputer(
@RequestBody Computer computer,
@RequestParam(name = "ordered", required = false) Boolean ordered) {

computerRepository.save(computer);
if (ordered != null && ordered) {
computerTransactionOutbox
.with()
.ordered("justonetopic")
.schedule(ComputerExternalQueueService.class)
.sendComputerCreatedEvent(computer);
} else {
computerTransactionOutbox.schedule(ComputerExternalQueueService.class).sendComputerCreatedEvent(computer);
}
}

@GetMapping("/computer/{id}")
public Computer getComputer(@PathVariable long id) {
return computerRepository
.findById(id)
.orElseThrow(() -> new ResponseStatusException(NOT_FOUND));
}

@SuppressWarnings("SameReturnValue")
@PostMapping("/employee")
@Transactional(transactionManager = "employeeTransactionManager")
public void createEmployee(
@RequestBody Employee employee,
@RequestParam(name = "ordered", required = false) Boolean ordered) {

employeeRepository.save(employee);
if (ordered != null && ordered) {
employeeTransactionOutbox
.with()
.ordered("justonetopic")
.schedule(EmployeeExternalQueueService.class)
.sendEmployeeCreatedEvent(employee);
} else {
employeeTransactionOutbox.schedule(EmployeeExternalQueueService.class).sendEmployeeCreatedEvent(employee);
}
}

@GetMapping("/employee/{id}")
public Employee getEmployee(@PathVariable long id) {
return employeeRepository
.findById(id)
.orElseThrow(() -> new ResponseStatusException(NOT_FOUND));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.gruelbox.transactionoutbox.spring.example.multipledatasources;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.Computer;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.Computer.Type;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.computer.ComputerExternalQueueService;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.employee.Employee;
import com.gruelbox.transactionoutbox.spring.example.multipledatasources.employee.EmployeeExternalQueueService;
import java.net.URL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.jdbc.core.JdbcTemplate;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class EventuallyConsistentControllerTest {

@SuppressWarnings("unused")
@LocalServerPort
private int port;

private URL base;

@SuppressWarnings("unused")
@Autowired
private TestRestTemplate template;

@Autowired
private JdbcTemplate employeeJdbcTemplate;

@Autowired
private JdbcTemplate computerJdbcTemplate;

@Autowired
private EmployeeExternalQueueService employeeExternalQueueService;
@Autowired
private ComputerExternalQueueService computerExternalQueueService;

@BeforeEach
void setUp() throws Exception {
this.base = new URL("http://localhost:" + port + "/");
employeeExternalQueueService.clear();
computerExternalQueueService.clear();
}

@Test
void testCheckNormalEmployees() throws InterruptedException {
var joe = new Employee(1L, "Joe", "Strummer");
var dave = new Employee(2L, "Dave", "Grohl");
var neil = new Employee(3L, "Neil", "Diamond");
var tupac = new Employee(4L, "Tupac", "Shakur");
var jeff = new Employee(5L, "Jeff", "Mills");

var url = base.toString() + "/employee";
assertTrue(template.postForEntity(url, joe, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, dave, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, neil, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, tupac, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, jeff, Void.class).getStatusCode().is2xxSuccessful());

employeeJdbcTemplate.execute(
"UPDATE txno_outbox SET invocation='non-deserializable invocation' WHERE invocation LIKE '%"
+ neil.getLastName()
+ "%'");

await()
.atMost(10, SECONDS)
.pollDelay(1, SECONDS)
.untilAsserted(
() ->
assertThat(employeeExternalQueueService.getSent())
.containsExactlyInAnyOrder(joe, dave, tupac, jeff));
}

@Test
void testCheckNormalComputers() throws InterruptedException {
var computerPc1 = new Computer(1L, "pc-001", Type.DESKTOP);
var computerPc2 = new Computer(2L, "pc-002", Type.LAPTOP);
var computerPc3 = new Computer(3L, "pc-003", Type.LAPTOP);
var computerWebserver1 = new Computer(4L, "webserver-001", Type.SERVER);
var computerWebserver2 = new Computer(5L, "webserver-002", Type.SERVER);

var computerUrl = base.toString() + "/computer";
assertTrue(template.postForEntity(computerUrl, computerPc1, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(computerUrl, computerPc2, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(computerUrl, computerPc3, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(computerUrl, computerWebserver1, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(computerUrl, computerWebserver2, Void.class).getStatusCode().is2xxSuccessful());

computerJdbcTemplate.execute(
"UPDATE txno_outbox SET invocation='non-deserializable invocation' WHERE invocation LIKE '%"
+ computerPc3.getName()
+ "%'");

await()
.atMost(10, SECONDS)
.pollDelay(1, SECONDS)
.untilAsserted(
() ->
assertThat(computerExternalQueueService.getSent())
.containsExactlyInAnyOrder(computerPc1, computerPc2, computerWebserver1, computerWebserver2));
}

@Test
void testCheckOrderedEmployees() {

var joe = new Employee(1L, "Joe", "Strummer");
var dave = new Employee(2L, "Dave", "Grohl");
var neil = new Employee(3L, "Neil", "Diamond");
var tupac = new Employee(4L, "Tupac", "Shakur");
var jeff = new Employee(5L, "Jeff", "Mills");

var url = base.toString() + "/employee?ordered=true";
assertTrue(template.postForEntity(url, joe, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, dave, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, neil, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, tupac, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, jeff, Void.class).getStatusCode().is2xxSuccessful());

employeeJdbcTemplate.execute(
"UPDATE txno_outbox SET invocation='non-deserializable invocation' WHERE invocation LIKE '%"
+ neil.getLastName()
+ "%'");

await()
.atMost(10, SECONDS)
.pollDelay(1, SECONDS)
.untilAsserted(() -> assertThat(employeeExternalQueueService.getSent()).containsExactly(joe, dave));
}

@Test
void testCheckOrderedComputers() {

var computerPc1 = new Computer(1L, "pc-001", Type.DESKTOP);
var computerPc2 = new Computer(2L, "pc-002", Type.LAPTOP);
var computerPc3 = new Computer(3L, "pc-003", Type.LAPTOP);
var computerWebserver1 = new Computer(4L, "webserver-001", Type.SERVER);
var computerWebserver2 = new Computer(5L, "webserver-002", Type.SERVER);

var url = base.toString() + "/computer?ordered=true";
assertTrue(template.postForEntity(url, computerPc1, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, computerPc2, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, computerPc3, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, computerWebserver1, Void.class).getStatusCode().is2xxSuccessful());
assertTrue(template.postForEntity(url, computerWebserver2, Void.class).getStatusCode().is2xxSuccessful());

employeeJdbcTemplate.execute(
"UPDATE txno_outbox SET invocation='non-deserializable invocation' WHERE invocation LIKE '%"
+ computerPc3.getName()
+ "%'");

await()
.atMost(10, SECONDS)
.pollDelay(1, SECONDS)
.untilAsserted(() -> assertThat(computerExternalQueueService.getSent()).containsExactly(computerPc1, computerPc2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.gruelbox.transactionoutbox.spring.example.multipledatasources;

import com.gruelbox.transactionoutbox.spring.SpringInstantiator;
import com.gruelbox.transactionoutbox.spring.SpringTransactionManager;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@Import({SpringInstantiator.class})
class ExternalsConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.gruelbox.transactionoutbox.spring.example.multipledatasources;

import com.gruelbox.transactionoutbox.TransactionOutbox;
import java.util.Collection;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
* Simple implementation of a background processor for {@link TransactionOutbox}. You don't need to use this if you need different semantics, but this is a good start for most purposes.
*/
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
class TransactionOutboxBackgroundProcessor {

private final Collection<TransactionOutbox> outboxes;

@Scheduled(fixedRateString = "${outbox.repeatEvery}")
void poll() {
outboxes.forEach(outbox -> {
try {
do {
log.info("Flushing");
} while (outbox.flush());
} catch (Exception e) {
log.error("Error flushing transaction outbox. Pausing", e);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.gruelbox.transactionoutbox.spring.example.multipledatasources;

import java.time.Duration;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties("outbox")
@Data
class TransactionOutboxProperties {
private Duration repeatEvery;
private boolean useJackson;
private Duration attemptFrequency;
private int blockAfterAttempts;
}
Loading
Loading