Spring/Spring-detail
동시성 문제 해결 - Synchronized, Pessimistic Lock, Optimistic Lock, Redis
YoonJong
2024. 4. 8. 18:09
728x90
위 강의를 학습하며 작성.
- 작업환경 세팅
brew install docker
brew link docker
docker version
docker pull mysql
docker run -d -p 3307:3306 -e MYSQL_ROOT_PASSWORD=1234 --name mysql mysql
docker ps
docker exec -it mysql bash
mysql -u root -p
create database stock_example;
use stock_example
spring:
jpa:
hibernate:
ddl-auto: create
show-sql: true
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3307/stock_example
username: root
password: 1234
logging:
level:
org:
hibernate:
SQL: DEBUG
type:
descriptor:
sql:
BasicBinder: TRACE
동시에 100개의 요청이 들어오는 테스트 코드 작성
→ Executors : 비동기를 사용할 수 있다 ( 자바 API )
→ CountDownLatch : 요청이 모두 끝날때 까지 기다린다.
@Test
void 동시에_100개_요청() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.decrease(1L, 1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
레이스 컨디션 : 두 개 이상의 스레드가 공유 데이터에 접근할 수 있고, 동시에 변경하려고 할 때
- Synchronized → 해결 불가 ( 프로세스 안에서만 보장된다 , 여러 스레드의 접근은 보장 하지 못한다 )
- 참고 : https://velog.io/@balparang/Transactional과-synchronized를-같이-사용할-때의-문제점
- 운영환경에서는 사용이 불가능. ( 단 하나의 서버에서는 가능 )
- @Transactional 을 제거하면 정상적으로 테스트 성공 → proxy 가 원인이기 때문에 제거
@Transactional
public synchronized void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
// 트랜잭션의 내부 동작
@RequiredArgsConstructor
public class TransactionStockService {
private StockService stockService;
public void decrease(Long id, Long quantity) {
startTransaction();
stockService.decrease(id, quantity);
// 해당 부분에서 문제 발생 -> 다른 스레드가 decrease 호출
// decrease() 메서드가 끝나면 다른 스레드가 커밋전에 접근
endTransaction();
}
private void startTransaction() {
System.out.println("Transaction Start");
}
private void endTransaction() {
System.out.println("Commit");
}
}
- Database 락 사용하기
- Pessimistic Lock (비관적 락)
- 동시성 문제가 발생할 것이라고 예상하고 락을 걸어버리는 것
- 충돌이 자주 발생하는 환경에서 롤백의 횟수를 줄일 수 있다.
- 데이터 무결성을 보장하는 수준이 높다
- 서로 자원이 필요한 경우, 락이 걸려있어서 데드락이 걸릴 수 있다.
- 동시성이 떨어져 성능에 손해를 볼 수 있다.
- PESSIMISTIC_READ : 다른 트랜잭션에서 읽기는 가능하지만 쓰기는 불가
- PESSIMISTIC_WRITE : 다른 트랜잭션에서 읽기, 쓰기 모두 불가
- 동시성 문제가 발생할 것이라고 예상하고 락을 걸어버리는 것
// Repository public interface StockRepository extends JpaRepository<Stock, Long> { @Lock(LockModeType.PESSIMISTIC_WRITE) // 비관적 락 @Query("select s from Stock s where s.id = :id") Stock findByIdWithPessimisticLock(Long id); } // Service @Service @RequiredArgsConstructor public class PessimisticLockStockService { private final StockRepository stockRepository; // 재고 감소 @Transactional public void decrease(Long id, Long quantity) { Stock stock = stockRepository.findByIdWithPessimisticLock(id); stock.decrease(quantity); stockRepository.saveAndFlush(stock); } } // 테스트 코드 @Test void 동시에_100개_요청_비관적락() throws InterruptedException { int threadCount = 100; ExecutorService executorService = Executors.newFixedThreadPool(32); CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { executorService.submit(() -> { try { pessimisticLockStockService.decrease(1L, 1L); } finally { latch.countDown(); } }); } latch.await(); Stock stock = stockRepository.findById(1L).orElseThrow(); assertThat(stock.getQuantity()).isEqualTo(0L); } // 콘솔 Hibernate: select s1_0.id,s1_0.product_id,s1_0.quantity from stock s1_0 where s1_0.id=? for update
- Optimistic Lock (낙관적 락)
- 자원에 락을 걸지 않고, 동시성에 문제가 생기면 그때 가서 처리
- 트랜잭션의 충돌이 발생하지 않을 것으로 기대
- Version 의 상태를 보고 충돌을 확인, 충돌이 확인되면 롤백을 진행
- 성능상은 비관적 락보다 효율적이다.
- 실패했을 때 로직을 직접 구현해야 한다. → 충돌이 빈번하게 일어나지 않을 것이라고 예상할 때 효율적
- 자원에 락을 걸지 않고, 동시성에 문제가 생기면 그때 가서 처리
- Pessimistic Lock (비관적 락)
@Entity
@NoArgsConstructor
@Getter
public class Stock {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private Long quantity;
@Version
private Long version; // 추가
}
public interface StockRepository extends JpaRepository<Stock, Long> {
@Lock(LockModeType.OPTIMISTIC) // 낙관적 락
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithOptimisticLock(Long id);
}
@Service
@RequiredArgsConstructor
public class OptimisticLockStockService {
private final StockRepository stockRepository;
// 재고 감소
@Transactional
public void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findByIdWithOptimisticLock(id);
stock.decrease(quantity);
}
}
// 낙관적 락이 실패할 경우
@Component
@RequiredArgsConstructor
public class OptimisticLockFacade {
private final OptimisticLockStockService optimisticLockStockService;
//@Transactional // 무한루프 -> 트랜잭션이 끝나지 않았는데, version 을 계속해서 체크.
public void decrease(Long id, Long quantity) throws InterruptedException {
while (true) {
try {
optimisticLockStockService.decrease(id, quantity);
break;
} catch (Exception e) {
Thread.sleep(50); // 50ms 후 재시도
}
}
}
}
// 테스트 코드
@Test
void 동시에_100개_요청_낙관적락() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
try {
optimisticLockFacade.decrease(1L, 1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
// 콘솔 : Version 을 계속해서 체크
update stock set product_id=?,quantity=?,version=? where id=? and version=?
select s1_0.id,s1_0.product_id,s1_0.quantity,s1_0.version from stock s1_0 where s1_0.id=?
- Named Lock ( 분산 락 )
- 락 이름을 사용하여 동시 접근을 제어
- 동일한 락 이름을 가진 스레드만 락을 취득
- 락을 취득한 스레드는 다른 스레드가 락을 취득할 때까지 데이터를 수정 가능
- 직접 해제하거나 선점시간이 끝나야 한다.
- @Transactional(propagation = Propagation.REQUIRES_NEW) 을 하는 이유
- Synchronized 와 동일한 문제가 발생한다.
- 데이터베이스에 커밋되기 전에 락이 풀리는 현상이 발생
- 별도의 트랜잭션으로 분리를 해서 데이터베이스에 정상적으로 커밋이 된 후 락을 해제해야 한다.
- 하나의 트랜잭션에 있을 경우
1. 트랜잭션 1 시작
2. 락 획득
3. stockService.decrease(id, quantity) 메서드 호출
4. 재고 -1
5. 다른 스레드 접근
6. 정합성 안맞음!
7. 트랜잭션 1 종료 ( 커밋 )
- @Transactional(propagation = Propagation.REQUIRES_NEW) 옵션
1. 트랜잭션 1 시작
2. 락 획득
3. stockService.decrease(id, quantity) 메서드 호출
4. 트랜잭션 2 시작
5. 재고 -1
6. 트랜잭션 2 종료 ( 커밋 )
7. 다른 스레드 접근
8. 트랜잭션 1 종료 ( 커밋 )
public interface LockRepository extends JpaRepository<Stock, Long> {
@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
void getLock(String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(String key);
}
@Component
@RequiredArgsConstructor
public class NamedLockStockFacade {
private final LockRepository lockRepository;
private final StockService stockService;
@Transactional
public void decrease(Long id, Long quantity) {
try {
lockRepository.getLock(id.toString());
stockService.decrease(id, quantity);
} finally {
lockRepository.releaseLock(id.toString());
}
}
}
@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
// 재고 감소
@Transactional(propagation = Propagation.REQUIRES_NEW) // (NamedLock)부모의 트랜잭션과 별도로 생성되어야 한다.
public void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
}
}
@Test
void 동시에_100개_요청_낙관적락() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
namedLockStockFacade.decrease(1L, 1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
- Redis 를 이용하여 해결하기
- 재시도가 필요하지 않은 lock 은 lettuce 활용
- 구현이 간단하다.
- Spring data redis 를 이용하면 lettuce가 기본이기 때문에 별도의 라이브러리를 사용하지 않아도 된다.
- spin lock 방식이기 때문에 동시에 많은 스레드가 lock 획득 상태라면 redis에 부하가 갈 수 있다.
- 재시도가 필요한 경우에는 redisson 활용
- 락 획득 재시도를 기본으로 제공한다.
- pub-sub 방식으로 구현되어 있기 때문에 lettuce 와 비교 했을 때 redis 에 부하가 덜 간다.
- 별도의 라이브러리를 사용해야 한다.
- lock 을 라이브러리 차원에서 제공해주기 때문에 사용법을 학습해야 한다.
- 재시도가 필요하지 않은 lock 은 lettuce 활용
//redis 설정
docker pull resdis
docker run --name myredis -d -p 6379:6379 redis
docker ps // redis CONTAINER_ID 확인 ex) c51136edec1a
exec -it c51136edec1a redis-cli // redis 접속
// 접속 후
setnx 1 lock // 성공 (Integer)1
setnx 1 lock // 실패 (Integer)0 -> 이미 존재하기 때문에
del 1 // 1번 키 삭제
- Lettuce : setnx 명령어를 활용하여 분산락 구현 , Spin Lock 방식
- 락 획득 실패시 redis 로 계속해서 요청을 보낸다.
// 의존성추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
@Component
@RequiredArgsConstructor
public class RedisLockRepository {
private final RedisTemplate<String, String> redisTemplate;
public Boolean lock(Long key) {
return redisTemplate
.opsForValue()
.setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
}
public Boolean unLock(Long key) {
return redisTemplate
.delete(generateKey(key));
}
private String generateKey(Long key) {
return key.toString();
}
}
@Component
@RequiredArgsConstructor
public class LettuceLockStockFacade {
private final RedisLockRepository redisLockRepository;
private final StockService stockService;
public void decrease(Long id, Long quantity) throws InterruptedException {
while(!redisLockRepository.lock(id)) {
Thread.sleep(100); // 레디스 부하 줄임
}
try {
stockService.decrease(id, quantity);
} finally {
redisLockRepository.unLock(id);
}
}
}
@SpringBootTest
class LettuceLockStockFacadeTest {
@Autowired
private LettuceLockStockFacade lettuceLockStockFacade;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
stockRepository.saveAndFlush(new Stock(1L, 100L));
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
void 동시에_100개_요청_Lettuce() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
try {
lettuceLockStockFacade.decrease(1L, 1L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
}
b. Redisson : pub - sub 기반으로 Lock 구현 제공
- 락 획득 실패 시 대기하고 있다가 알림이 오면 그때 1번만 락 획득을 시도한다.
// pub / sub 테스트
// 터미널 A,B(2개) open 및 redis 접속
// A
subscribe ch1 // ch1 구독(접속)
// B
publish ch1 hello // ch1에 hello 메시지 보내기
// 의존성 추가
implementation group: 'org.redisson', name: 'redisson-spring-boot-starter', version: '3.23.5'
@Component
@RequiredArgsConstructor
public class RedissonLockStockFacade {
private final RedissonClient redissonClient;
private final StockService stockService;
public void decrease(Long id, Long quantity) {
RLock lock = redissonClient.getLock(id.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("Lock 획득 실패");
return;
}
stockService.decrease(id, quantity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
@Test
void 동시에_100개_요청_Redisson() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
redissonLockStockFacade.decrease(1L, 1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
728x90