본문 바로가기
Spring/Spring-detail

동시성 문제 해결 - Synchronized, Pessimistic Lock, Optimistic Lock, Redis

by YoonJong 2024. 4. 8.
728x90

https://www.inflearn.com/course/%EB%8F%99%EC%8B%9C%EC%84%B1%EC%9D%B4%EC%8A%88-%EC%9E%AC%EA%B3%A0%EC%8B%9C%EC%8A%A4%ED%85%9C/dashboard

 

재고시스템으로 알아보는 동시성이슈 해결방법 | 최상용 - 인프런

최상용 | 동시성 이슈란 무엇인지 알아보고 처리하는 방법들을 학습합니다., 동시성 이슈 처리도 자신있게! 간단한 재고 시스템으로 차근차근 배워보세요. 백엔드 개발자라면 꼭 알아야 할 동

www.inflearn.com

 

위 강의를 학습하며 작성.

 


  1. 작업환경 세팅
brew install docker
brew link docker
docker version

docker pull mysql
docker run -d -p 3307:3306 -e MYSQL_ROOT_PASSWORD=1234 --name mysql mysql
docker ps

docker exec -it mysql bash
mysql -u root -p
create database stock_example;
use stock_example

spring:
  jpa:
    hibernate:
      ddl-auto: create
    show-sql: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3307/stock_example
    username: root
    password: 1234

logging:
  level:
    org:
      hibernate:
        SQL: DEBUG
        type:
          descriptor:
            sql:
              BasicBinder: TRACE

동시에 100개의 요청이 들어오는 테스트 코드 작성

→ Executors : 비동기를 사용할 수 있다 ( 자바 API )

→ CountDownLatch : 요청이 모두 끝날때 까지 기다린다.

@Test
void 동시에_100개_요청() throws InterruptedException {
    int threadCount = 100;

    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executorService.submit(() -> {
            try {
                stockService.decrease(1L, 1L);
            } finally {
                latch.countDown();
            }
        });
    }
    latch.await();

    Stock stock = stockRepository.findById(1L).orElseThrow();
    assertThat(stock.getQuantity()).isEqualTo(0L);
}

레이스 컨디션 : 두 개 이상의 스레드가 공유 데이터에 접근할 수 있고, 동시에 변경하려고 할 때

  1. Synchronized → 해결 불가 ( 프로세스 안에서만 보장된다 , 여러 스레드의 접근은 보장 하지 못한다 )
@Transactional
public synchronized void decrease(Long id, Long quantity) {

    Stock stock = stockRepository.findById(id).orElseThrow();
    stock.decrease(quantity);

    stockRepository.saveAndFlush(stock);
}

// 트랜잭션의 내부 동작
@RequiredArgsConstructor
public class TransactionStockService {

  private StockService stockService;

  public void decrease(Long id, Long quantity) {
      startTransaction();

      stockService.decrease(id, quantity);
			// 해당 부분에서 문제 발생 -> 다른 스레드가 decrease 호출
			// decrease() 메서드가 끝나면 다른 스레드가 커밋전에 접근
      endTransaction();
  }

  private void startTransaction() {
      System.out.println("Transaction Start");
  }

  private void endTransaction() {
      System.out.println("Commit");
  }

}
  1. Database 락 사용하기
    • Pessimistic Lock (비관적 락)
      • 동시성 문제가 발생할 것이라고 예상하고 락을 걸어버리는 것
        • 충돌이 자주 발생하는 환경에서 롤백의 횟수를 줄일 수 있다.
        • 데이터 무결성을 보장하는 수준이 높다
        • 서로 자원이 필요한 경우, 락이 걸려있어서 데드락이 걸릴 수 있다.
        • 동시성이 떨어져 성능에 손해를 볼 수 있다.
          • PESSIMISTIC_READ : 다른 트랜잭션에서 읽기는 가능하지만 쓰기는 불가
          • PESSIMISTIC_WRITE : 다른 트랜잭션에서 읽기, 쓰기 모두 불가
    // Repository
    public interface StockRepository extends JpaRepository<Stock, Long> {
    
        @Lock(LockModeType.PESSIMISTIC_WRITE) // 비관적 락
        @Query("select s from Stock s where s.id = :id")
        Stock findByIdWithPessimisticLock(Long id);
    }
    
    // Service
    @Service
    @RequiredArgsConstructor
    public class PessimisticLockStockService {
    
        private final StockRepository stockRepository;
    
        // 재고 감소
        @Transactional
        public void decrease(Long id, Long quantity) {
    
            Stock stock = stockRepository.findByIdWithPessimisticLock(id);
            stock.decrease(quantity);
    
            stockRepository.saveAndFlush(stock);
        }
    }
    
    // 테스트 코드
    @Test
    void 동시에_100개_요청_비관적락() throws InterruptedException {
        int threadCount = 100;
    
        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);
    
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    pessimisticLockStockService.decrease(1L, 1L);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
    
        Stock stock = stockRepository.findById(1L).orElseThrow();
        assertThat(stock.getQuantity()).isEqualTo(0L);
    }
    
    // 콘솔
    Hibernate: select s1_0.id,s1_0.product_id,s1_0.quantity from stock s1_0 where s1_0.id=? for update
    
    • Optimistic Lock (낙관적 락)
      • 자원에 락을 걸지 않고, 동시성에 문제가 생기면 그때 가서 처리
        • 트랜잭션의 충돌이 발생하지 않을 것으로 기대
        • Version 의 상태를 보고 충돌을 확인, 충돌이 확인되면 롤백을 진행
        • 성능상은 비관적 락보다 효율적이다.
        • 실패했을 때 로직을 직접 구현해야 한다. → 충돌이 빈번하게 일어나지 않을 것이라고 예상할 때 효율적

 

@Entity
@NoArgsConstructor
@Getter
public class Stock {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long productId;
    private Long quantity;

    @Version
    private Long version; // 추가 
}


public interface StockRepository extends JpaRepository<Stock, Long> {

    @Lock(LockModeType.OPTIMISTIC) // 낙관적 락
    @Query("select s from Stock s where s.id = :id")
    Stock findByIdWithOptimisticLock(Long id);
}


@Service
@RequiredArgsConstructor
public class OptimisticLockStockService {

    private final StockRepository stockRepository;

    // 재고 감소
    @Transactional
    public void decrease(Long id, Long quantity) {

        Stock stock = stockRepository.findByIdWithOptimisticLock(id);
        stock.decrease(quantity);
    }
}


// 낙관적 락이 실패할 경우
@Component
@RequiredArgsConstructor
public class OptimisticLockFacade {

    private final OptimisticLockStockService optimisticLockStockService;

    //@Transactional // 무한루프 -> 트랜잭션이 끝나지 않았는데, version 을 계속해서 체크.
    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (true) {
            try {
                optimisticLockStockService.decrease(id, quantity);
                break;
            } catch (Exception e) {
                Thread.sleep(50); // 50ms 후 재시도
            }
        }
    }
}


// 테스트 코드
@Test
void 동시에_100개_요청_낙관적락() throws InterruptedException {
    int threadCount = 100;

    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executorService.submit(() -> {
            try {
                try {
                    optimisticLockFacade.decrease(1L, 1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } finally {
                latch.countDown();
            }
        });
    }
    latch.await();

    Stock stock = stockRepository.findById(1L).orElseThrow();
    assertThat(stock.getQuantity()).isEqualTo(0L);
}

// 콘솔 : Version 을 계속해서 체크
update stock set product_id=?,quantity=?,version=? where id=? and version=?
select s1_0.id,s1_0.product_id,s1_0.quantity,s1_0.version from stock s1_0 where s1_0.id=?

 

  • Named Lock ( 분산 락 )
    • 락 이름을 사용하여 동시 접근을 제어
    • 동일한 락 이름을 가진 스레드만 락을 취득
    • 락을 취득한 스레드는 다른 스레드가 락을 취득할 때까지 데이터를 수정 가능
    • 직접 해제하거나 선점시간이 끝나야 한다.

 

  • @Transactional(propagation = Propagation.REQUIRES_NEW) 을 하는 이유
    • Synchronized 와 동일한 문제가 발생한다.
    • 데이터베이스에 커밋되기 전에 락이 풀리는 현상이 발생
    • 별도의 트랜잭션으로 분리를 해서 데이터베이스에 정상적으로 커밋이 된 후 락을 해제해야 한다.
  • 하나의 트랜잭션에 있을 경우
1. 트랜잭션 1 시작
2. 락 획득
3. stockService.decrease(id, quantity) 메서드 호출
4. 재고 -1 
5. 다른 스레드 접근
6. 정합성 안맞음!
7. 트랜잭션 1 종료 ( 커밋 )
  • @Transactional(propagation = Propagation.REQUIRES_NEW) 옵션
1. 트랜잭션 1 시작
2. 락 획득
3. stockService.decrease(id, quantity) 메서드 호출 
4. 트랜잭션 2 시작
5. 재고 -1 
6. 트랜잭션 2 종료 ( 커밋 )
7. 다른 스레드 접근
8. 트랜잭션 1 종료 ( 커밋 )
public interface LockRepository extends JpaRepository<Stock, Long> {

    @Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
    void getLock(String key);

    @Query(value = "select release_lock(:key)", nativeQuery = true)
    void releaseLock(String key);

}

@Component
@RequiredArgsConstructor
public class NamedLockStockFacade {

    private final LockRepository lockRepository;

    private final StockService stockService;

    @Transactional
    public void decrease(Long id, Long quantity) {
        try {
            lockRepository.getLock(id.toString());
            stockService.decrease(id, quantity);
        } finally {
            lockRepository.releaseLock(id.toString());
        }
    }
}

@Service
@RequiredArgsConstructor
public class StockService {

    private final StockRepository stockRepository;

    // 재고 감소
    @Transactional(propagation = Propagation.REQUIRES_NEW) // (NamedLock)부모의 트랜잭션과 별도로 생성되어야 한다.
    public void decrease(Long id, Long quantity) {

        Stock stock = stockRepository.findById(id).orElseThrow();
        stock.decrease(quantity);

    }

}

@Test
void 동시에_100개_요청_낙관적락() throws InterruptedException {
    int threadCount = 100;

    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executorService.submit(() -> {
            try {
                namedLockStockFacade.decrease(1L, 1L);
            } finally {
                latch.countDown();
            }
        });
    }
    latch.await();

    Stock stock = stockRepository.findById(1L).orElseThrow();
    assertThat(stock.getQuantity()).isEqualTo(0L);
}
  1. Redis 를 이용하여 해결하기
    • 재시도가 필요하지 않은 lock 은 lettuce 활용
      • 구현이 간단하다.
      • Spring data redis 를 이용하면 lettuce가 기본이기 때문에 별도의 라이브러리를 사용하지 않아도 된다.
      • spin lock 방식이기 때문에 동시에 많은 스레드가 lock 획득 상태라면 redis에 부하가 갈 수 있다.
    • 재시도가 필요한 경우에는 redisson 활용
      • 락 획득 재시도를 기본으로 제공한다.
      • pub-sub 방식으로 구현되어 있기 때문에 lettuce 와 비교 했을 때 redis 에 부하가 덜 간다.
      • 별도의 라이브러리를 사용해야 한다.
      • lock 을 라이브러리 차원에서 제공해주기 때문에 사용법을 학습해야 한다.
//redis 설정 
docker pull resdis
docker run --name myredis -d -p 6379:6379 redis

docker ps // redis CONTAINER_ID 확인 ex) c51136edec1a
exec -it c51136edec1a redis-cli // redis 접속

// 접속 후
setnx 1 lock // 성공 (Integer)1
setnx 1 lock // 실패 (Integer)0 -> 이미 존재하기 때문에
del 1 // 1번 키 삭제
  1. Lettuce : setnx 명령어를 활용하여 분산락 구현 , Spin Lock 방식
    • 락 획득 실패시 redis 로 계속해서 요청을 보낸다.

 

// 의존성추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'



@Component
@RequiredArgsConstructor
public class RedisLockRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Boolean lock(Long key) {
        return redisTemplate
                .opsForValue()
                .setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
    }

    public Boolean unLock(Long key) {
        return redisTemplate
                .delete(generateKey(key));
    }

    private String generateKey(Long key) {
        return key.toString();
    }

}


@Component
@RequiredArgsConstructor
public class LettuceLockStockFacade {

    private final RedisLockRepository redisLockRepository;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while(!redisLockRepository.lock(id)) {
            Thread.sleep(100); // 레디스 부하 줄임
        }

        try {
            stockService.decrease(id, quantity);
        } finally {
            redisLockRepository.unLock(id);
        }
    }

}



@SpringBootTest
class LettuceLockStockFacadeTest {

    @Autowired
    private LettuceLockStockFacade lettuceLockStockFacade;

    @Autowired
    private StockRepository stockRepository;

    @BeforeEach
    public void before() {
        stockRepository.saveAndFlush(new Stock(1L, 100L));
    }

    @AfterEach
    public void after() {
        stockRepository.deleteAll();
    }

    @Test
    void 동시에_100개_요청_Lettuce() throws InterruptedException {
        int threadCount = 100;

        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    try {
                        lettuceLockStockFacade.decrease(1L, 1L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        Stock stock = stockRepository.findById(1L).orElseThrow();
        assertThat(stock.getQuantity()).isEqualTo(0L);
    }
}

 

b. Redisson : pub - sub 기반으로 Lock 구현 제공

  • 락 획득 실패 시 대기하고 있다가 알림이 오면 그때 1번만 락 획득을 시도한다.

// pub / sub 테스트
// 터미널 A,B(2개) open 및 redis 접속

// A
subscribe ch1 // ch1 구독(접속)

// B
publish ch1 hello // ch1에 hello 메시지 보내기

 

// 의존성 추가
implementation group: 'org.redisson', name: 'redisson-spring-boot-starter', version: '3.23.5'


@Component
@RequiredArgsConstructor
public class RedissonLockStockFacade {

    private final RedissonClient redissonClient;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) {
        RLock lock = redissonClient.getLock(id.toString());

        try {
            boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);

            if (!available) {
                System.out.println("Lock 획득 실패");
                return;
            }

            stockService.decrease(id, quantity);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

}


@Test
    void 동시에_100개_요청_Redisson() throws InterruptedException {
        int threadCount = 100;

        ExecutorService executorService = Executors.newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    redissonLockStockFacade.decrease(1L, 1L);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        Stock stock = stockRepository.findById(1L).orElseThrow();
        assertThat(stock.getQuantity()).isEqualTo(0L);
    }
728x90

댓글