본문 바로가기
Books/도메인 주도 개발 시작하기

Chapter10. 이벤트

by YoonJong 2024. 4. 17.
728x90

 

외부 서비스가 정상이 아닐 경우, 트랜잭션 처리를 어떻게 해야할까 ?

비동기 이벤트를 사용해서 두 시스템 간의 결합도를 낮춘다. ( 강결합 → 약결합 )

 

이벤트가 발생한다 → 상태가 변경됐다 → 해당 이벤트에 반응하여 원하는 동작을 수행하는 기능 작성

  • 이벤트 클래스 : 이벤트를 표현
  • 디스패처 : 스프링이 제공하는 ApplicationEventPublisher 를 이용
  • Events: 이벤트를 발행, 이벤트 발행을 위해 ApplicationEventPublisher 를 사용
  • 이벤트 핸들러 : 생성 주체가 발생한 이벤트를 전달받아 이벤트에 담긴 데이터를 이용해서 원하는 기능을 실행
  • ex) ‘주문 취소’ 이벤트를 받는 이벤트 핸들러는 해당 주문의 주문자에게 SMS로 주문 취소 사실을 알린다.
  • 이벤트 디스패처 : 이벤트 생성 주체와 이벤트 핸들러를 연결
  • 이벤트의 구성
    1. 이벤트 종류
    2. 이벤트 발생 시간
    3. 추가 데이터 : 주문번호, 신규 배송지 정보 등 이벤트와 관련된 정보
  • 이벤트의 용도
    1. 트리거 : 도메인의 상태가 바뀔 때 다른 후처리가 필요하면 후처리를 실행
    2. 서로 다른 시스템 간의 데이터 동기화 : ex) 배송지를 변경하면 외부 배송 서비스에 바뀐 배송지 정보를 전송
    3. 서로 다른 도메인 로직이 섞이는 것을 방지할 수 있다.

// 이벤트 클래스 : 이벤트를 처리하는데 필요한 최소한의 데이터를 포함해야 한다.
public class ShippingAddressChangedEvent {
    private final Long orderId;
    private final Address previousAddress;
    private final Address newAddress;

    //...
}

 

  1. Configuration 설정
  2. Events 클래스 생성 → ApplicationEventPublisher 사용
  3. EventHandler → 이벤트를 처리할 @EventListener 사용하여 구현
//1
@Configuration
public class EventsConfiguration {
    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    public InitializingBean eventsInitializer() {
        return () -> Events.setPublisher(applicationContext);
    }
}

//2
public class Events {
    private static ApplicationEventPublisher publisher;

    static void setPublisher(ApplicationEventPublisher publisher) {
        Events.publisher = publisher;
    }

    public static void raise(Object event) {
        if (publisher != null) {
            publisher.publishEvent(event);
        }
    }
}

//3
public void initializePassword() {
    String newPassword = generateRandomPassword();
    this.password = new Password(newPassword);
    Events.raise(new PasswordChangedEvent(id.getId(), newPassword));
}

@Component
public class PasswordChangedEventHandler {
    @EventListener(PasswordChangedEvent.class)
    public void handle(PasswordChangedEvent event) {
        // 이메일 발송 코드
    }
}

public class PasswordChangedEvent {
    private String id;
    private String newPassword;

    public PasswordChangedEvent(String id, String newPassword) {
        this.id = id;
        this.newPassword = newPassword;
    }

    public String getId() {
        return id;
    }

    public String getNewPassword() {
        return newPassword;
    }
}

외부 서비스에 영향을 받는 경우는 어떻게 할까?

ex) 외부 환불 시스템의 요청이 오래걸려서 취소에서 딜레이가 될 경우

 

  1. 비동기 이벤트 처리 방법을 사용
    1. 로컬 핸들러를 비동기로 실행하기
@SpringBootApplication
@EnableAsync  // 스프링의 비동기 실행 기능 활성화
public class ShopApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopApplication.class, args);
    }

}

@Service
public class OrderCanceledEventHandler {
    private RefundService refundService;

    public OrderCanceledEventHandler(RefundService refundService) {
        this.refundService = refundService;
    }

    @Async // 추가
    public void handle(OrderCanceledEvent event) {
        refundService.refund(event.getOrderNumber());
    }
}

 

2. 메시징 시스템을 이용한 비동기 구현 : RabbitMQ, Kafka 등 ..

 

3.  이벤트 저장소를 이용한 비동기 처리

이벤트가 발생하면 핸들러는 스토리지에 이벤트를 저장한다.

포워더는 주기적으로 이벤트를 저장소에서 이벤트를 가져와 이벤트 핸들러를 실행

 

 

API 로 구현 : 클라이언트 API 를 이용해서 이벤트 처리에 실패하면 다시 실패한 이벤트부터 읽어와 재처리할 수 있다.

public class EventEntry {
    private Long id;
    private String type;
    private String contentType;
    private String payload;
    private long timestamp;

	 public EventEntry(String type, String contentType, String payload) {
        this.type = type;
        this.contentType = contentType;
        this.payload = payload;
        this.timestamp = System.currentTimeMillis();
    }

    public EventEntry(Long id, String type, String contentType, String payload,
                      long timestamp) {
        this.id = id;
        this.type = type;
        this.contentType = contentType;
        this.payload = payload;
        this.timestamp = timestamp;
    }
		//..
}

// 저장과 조회만 , 수정은 기능 제공X
public interface EventStore {
    void save(Object event);

    List<EventEntry> get(long offset, long limit);
}

// EventStore 구현
@Component
public class JdbcEventStore implements EventStore {
    private ObjectMapper objectMapper;
    private JdbcTemplate jdbcTemplate;

    public JdbcEventStore(ObjectMapper objectMapper, JdbcTemplate jdbcTemplate) {
        this.objectMapper = objectMapper;
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void save(Object event) {
        EventEntry entry = new EventEntry(event.getClass().getName(),
                "application/json", toJson(event));
        jdbcTemplate.update(
                "insert into evententry " +
                        "(type, content_type, payload, timestamp) " +
                        "values (?, ?, ?, ?)",
                ps -> {
                    ps.setString(1, entry.getType());
                    ps.setString(2, entry.getContentType());
                    ps.setString(3, entry.getPayload());
                    ps.setTimestamp(4, new Timestamp(entry.getTimestamp()));
                });
    }

    private String toJson(Object event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new PayloadConvertException(e);
        }
    }

    @Override
    public List<EventEntry> get(long offset, long limit) {
        return jdbcTemplate.query(
                "select * from evententry order by id asc limit ?, ?",
                ps -> {
                    ps.setLong(1, offset);
                    ps.setLong(2, limit);
                },
                (rs, rowNum) -> {
                    return new EventEntry(
                            rs.getLong("id"),
                            rs.getString("type"),
                            rs.getString("content_type"),
                            rs.getString("payload"),
                            rs.getTimestamp("timestamp").getTime());
                });
    }
}

// 이벤트 저장소에 보관하기 위한 Handler
@Component
public class EventStoreHandler {
    private EventStore eventStore;

    public EventStoreHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @EventListener(Event.class)
    public void handle(Event event) {
        eventStore.save(event);
    }
}

// API
@RestController
public class EventApi {
    private EventStore eventStore;

    public EventApi(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @RequestMapping(value = "/api/events", method = RequestMethod.GET)
    public List<EventEntry> list(
            @RequestParam("offset") Long offset,
            @RequestParam("limit") Long limit) {
        return eventStore.get(offset, limit);
    }
}

/**
1. 가장 마지막에 처리한 데이터의 offset 인 lastOffset 을 구한다. 처음에는 0
2. 마지막 처리한 lastOffset 을 offset 으로 사용해서 API 실행
3. API 결과로 받은 데이터를 처리
4. offset + 데이터 처리개수를 lastOffset 으로 저장한
**/
  1. 포워더 구현
    @Component
    public class EventForwarder {
        private static final int DEFAULT_LIMIT_SIZE = 100;
    
        private EventStore eventStore;
        private OffsetStore offsetStore;
        private EventSender eventSender;
        private int limitSize = DEFAULT_LIMIT_SIZE;
    
        public EventForwarder(EventStore eventStore,
                              OffsetStore offsetStore,
                              EventSender eventSender) {
            this.eventStore = eventStore;
            this.offsetStore = offsetStore;
            this.eventSender = eventSender;
        }
    
        @Scheduled(initialDelay = 1000L, fixedDelay = 1000L)
        public void getAndSend() {
            long nextOffset = getNextOffset();
            List<EventEntry> events = eventStore.get(nextOffset, limitSize);
            if (!events.isEmpty()) {
                int processedCount = sendEvent(events);
                if (processedCount > 0) {
                    saveNextOffset(nextOffset + processedCount);
                }
            }
        }
    
        private long getNextOffset() {
            return offsetStore.get();
        }
    
        private int sendEvent(List<EventEntry> events) {
            int processedCount = 0;
            try {
                for (EventEntry entry : events) {
                    eventSender.send(entry);
                    processedCount++;
                }
            } catch(Exception ex) {
                // 로깅 처리
            }
            return processedCount;
        }
    
        private void saveNextOffset(long nextOffset) {
            offsetStore.update(nextOffset);
        }
    
    }
    
  2. 일정 주기로 EventStore 에서 이벤트를 읽어와 이벤트 핸들러에 전달

트랜잭션 실패를 고려해야 한다.

@TransactionalEventListener : 스프링 트랜잭션 상태에 따라 이벤트 핸들러를 실행할 수 있게 한다.

@Service
public class OrderCanceledEventHandler {
    private RefundService refundService;

    public OrderCanceledEventHandler(RefundService refundService) {
        this.refundService = refundService;
    }

    @Async
    @TransactionalEventListener(
            classes = OrderCanceledEvent.class,
            phase = TransactionPhase.AFTER_COMMIT // 트랜잭션 커밋에 성공한 뒤에 핸들러 메서드를 실행
    )
    public void handle(OrderCanceledEvent event) {
        refundService.refund(event.getOrderNumber());
    }
}
728x90

댓글