인프라/Kafaka

[실습] Kafka CDC 트랜잭션을 이용한 Pub/sub 예제

물에서육지로 2024. 12. 12. 14:32

트랜잭션을 활용하여 CDC 를 처리하려고 합니다.

트랜잭션을 잘 사용하면 데이터 처리에 용이하지만 트랜잭션의 묶여있는 순서에 따라 결과가 달라질 수 있으니

잘 보고 사용해야합니다.


1. @RestController 생성

    MyController에서 앞으로 url 호출 예정 (post: /message/f 호출)

2. Service 설정

3. ServiceImpl 설정

  • MyModel 객체를 받아 이를 JPA가 처리할 수 있는 MyEntity 객체로 변환합니다.
  • 변환된 MyEntity 객체를 JPA 저장소에 저장합니다.
  • 저장된 결과를 다시 MyModel로 변환한 후 반환합니다. (kafka로 보낼 것)

 

단, 위와 같이 throw 를 제일 아래에 작성해 주었을 경우는 결과가 달라진다.

트랜잭션으로 묶은 이유는 에러 발생 시 db의 커밋이 이뤄지지 않고 롤백이 발생하게 되는데,

카프카의 메세지는 롤백이 안된다.

저렇게 throw위치를 주면 db 저장은 롤백이 되어 데이터가 저장되지 않는데, 메세지는 날아가게 된다.

만약 저장 시 메세지를 받는 구조에서 이렇게 개발되어 있으면 문제가 야기 될 수 있다.

그래서 트랜잭션으로 cdc를 구현한 것이 완벽한 원자성을 보존할 수 있는 방법은 아니다.

 

4. Repository 설정

 

5. Producer 설정

 

6.Consumer

중첩 클래스는 하나의 클래스 안에 정의된 또 다른 클래스를 의미합니다. Java에서 중첩 클래스는 특정 클래스와 긴밀하게 관련된 클래스를 캡슐화하여 코드 구조를 더 깔끔하고 논리적으로 만들기 위해 사용됩니다.

위 코드에서 Payload는 MyCdcMessage 내부에 정의된 중첩 클래스입니다. 이는 Payload가 MyCdcMessage와 밀접하게 관련되어 있음을 나타냅니다.

 

 

7. 결과