유실된 SNS메시지를 스케쥴러와 스프링 배치로 처리하는 방법을 설명한다.
우리는 MSA프로젝트에서 Transactional Outbox pattern을 사용해서 스프링 이벤트를 발행해서 SNS메시지 발행 + 발행여부를 본인 DB에 저장하는 것을 이벤트 리스너를 통해 동시에 진행하도록 설계를 완료했다. 이제 우리는 스프링 이벤트가 발행되어 SNS메시지 발행내역을 DB에 저장하는 것은 Transactional로 묶어두었으니 메시지 유실이 발생해도 발행여부는 false로 남으니 이것들만 찾아서 메시지 재발행을 하면 된다.
우리 팀은 이 유실된 메시지를 스케쥴러를 통해 5분마다 스프링 배치로 event_record_table에서 발행여부가 false인 유실된 메시지를 가지고 와서 SNS메시지를 재발행시키기로 결정했고 지금 그 내용을 공유한다.
설계한 MSA프로젝트는 아래의 글을 읽어보면 된다.
SpringBoot3에서 배치를 사용하고자 하는 분은 배치 테이블 자동 생성에 약간 오류가 있으니 아래의 포스트를 통해 수동으로 테이블을 생성하는 방법을 확인하고 넘어가도록 하자
1. 배치코드 작성하기
1-1. 배치 Config 클래스 선언하기
- 스프링 5부터는 DefaultBatchConfiguration을 확장하는 방식을 사용한다. 이는 @EnableBatchProcessing의 기능을 포함하며, 기본 배치 인프라 구성을 제공한다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
}
1-2. chunkSize, pageSize, 상수 선언하기
- 앞으로 사용할 값들을 클래스 상단에 선언했다.
1-2-1. chunkSize
- chunkSize는 배치 작업에서 한 번에 처리할 데이터의 양을 결정하는 중요한 값이다. 여기서는 이 값을 application.yml 파일에서 설정할 수 있게 해서 유연성을 높였다. @Value 어노테이션을 사용해서 application.yml에 설정된 값을 가져오고, 만약 값이 설정되지 않았다면 기본값으로 100을 사용하도록 했다. 이렇게 함으로써 코드를 수정하지 않고도 설정 파일을 통해 값을 변경할 수 있어서 편리하다.
1-2-2. pageSize - JPA의 paging
- pageSize는 페이징 처리를 할 때 페이지당 몇 개의 항목을 보여줄지 결정하는 값이다. 이것도 마찬가지로 @Value 어노테이션을 사용해 application.yml에서 값을 가져오고, 설정되지 않았다면 기본값으로 10을 사용한다. 이 값을 통해 데이터를 페이지별로 나눠서 처리할 수 있어, 특히 대용량 데이터 처리에 유용하게 사용된다.
// 배치 작업에서 사용할 chunkSize 값 설정. 기본값은 100
@Value("${chunkSize:100}")
private int chunkSize;
// 페이징을 위한 사이즈 설정. 기본값은 10
@Value("${pageSize:10}")
private int pageSize;
지금부터는 우리팀이 작성한 Batch코드에 대해서 설명하도록 하겠다. 먼저 Job과 Step을 선언했다.
2. Job, Step 선언하기
2-1. 스프링 배치의 구성
- Spring Batch에서 Job과 Step은 배치 처리의 핵심 요소다. Job은 하나의 배치 작업 단위를 의미하고, Step은 Job 안에서 실제 처리를 담당하는 각 단계를 말한다.
2-2. Job 선언
- Job을 선언할 때는 @Bean 어노테이션을 사용해서 스프링 컨테이너에 등록해줘야 한다. 이렇게 하면 스프링이 어플리케이션 실행 시에 자동으로 Job을 인식하고 관리할 수 있게 된다.
- JobBuilder를 사용해 Job을 구성하는데, 여기서 "job"이라는 이름을 가진 Job을 생성하고, JobRepository를 사용해서 Job의 실행 상태를 관리한다. start 메서드에는 Job이 실행될 때 처음으로 실행할 Step을 지정해줘야 한다. 여기선 내가 선언해 준 Step을 Bean으로 주입받아서 시작점으로 설정해 줬다.
2-3. Step 선언
- Step은 Job 내에서 실행되는 개별 처리 단계를 의미하며, 실제 데이터 처리 작업이 이루어지는 곳이다. 이를 위해 @Bean 어노테이션을 사용하여 Step을 선언하고, @Transactional 어노테이션을 추가해서 데이터베이스 작업이 안정적으로 처리되도록 관리한다.
- Step을 정의할 때는 StepBuilder 클래스를 활용한다. 여기서 "step"이라는 이름의 Step을 생성하고, chunk 메서드를 통해 한 번에 처리할 데이터의 양을 지정한다. 이런 설정은 특히 대량의 데이터를 효율적으로 처리해야 할 때 중요하다.
- Step에서 사용되는 reader, processor, writer는 각각 데이터를 읽고, 처리하고, 쓰는 역할을 하는 구성 요소들이다. 이들을 조합해서 Step의 로직을 완성시킨다.
위의 Step코드를 보면 나는 메서드 상단에 @Transactional을 적어줬다. 각 단계가 아닌 Step에 @Transactional을 적어준 이유는 배치의 트랜잭션 동작과 관련이 있다.
2-3. Step에 적용된 @Transactional의 동작
@Transactional 어노테이션은 Step에 적용되어 reader, processor, writer의 데이터 처리 과정이 하나의 트랜잭션 내에서 실행되도록 한다. 이는 다음과 같은 중요한 목적을 가지고 있다.
- 데이터 일관성 보장
- 데이터베이스와 상호작용하는 모든 작업이 성공적으로 완료되거나, 실패할 경우 전체 작업을 취소(롤백)하여 데이터의 일관성을 유지한다.
- 데이터베이스와 상호작용하는 모든 작업이 성공적으로 완료되거나, 실패할 경우 전체 작업을 취소(롤백)하여 데이터의 일관성을 유지한다.
- 어플리케이션 안정성 강화
- 예기치 않은 에러 상황에서도 데이터베이스의 상태가 불완전한 상태로 남지 않게 하여, 애플리케이션의 안정성을 강화한다.
- 예기치 않은 에러 상황에서도 데이터베이스의 상태가 불완전한 상태로 남지 않게 하여, 애플리케이션의 안정성을 강화한다.
- 데이터 처리의 무결성 유지
- 특히 대량의 데이터를 다룰 때, 중간에 발생할 수 있는 예외 상황에서도 전체 배치 처리의 무결성을 유지한다.
- 특히 대량의 데이터를 다룰 때, 중간에 발생할 수 있는 예외 상황에서도 전체 배치 처리의 무결성을 유지한다.
지금 내 상황에서는 Reader 단계에서 데이터를 읽어올 때 데이터베이스에 접근하기 때문에 Reader에만 @Transactional을 적어줄 수도 있었다. 그렇지만 비즈니스 로직을 동작시키는 Processor단계와 최종적으로 데이터를 쓰는 Writer 단계에서 혹여 발생할 수도 있는 오류에 대비하여 배치 작업의 전체 단계를 트랜잭션으로 묶는 것이 좋다고 판단했고 가장 큰 단위인 Step에 @Transactional을 선언해 주었다.
@Bean
public Job job(final JobRepository jobRepository, final Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
/**
* Step을 정의. 데이터 처리의 실질적인 단계를 구성함.
* - reader: JpaPagingItemReader를 사용해 publishedAt이 false인 Member 데이터를 읽음
* - writer: 읽어온 데이터를 SnsService를 통해 처리함
*/
@Bean
@Transactional
public Step step(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<MemberEventRecord, MemberEventRecord>chunk(this.chunkSize, transactionManager)
.reader(this.jpaPagingItemReader(entityManagerFactory))
.processor(this.itemProcessor())
.writer(this.itemWriter())
.build();
}
이제 Step의 내부에서 단계별로 동작하는 Reader, Processor, Writer 코드를 알아보자
3. Reader, Processor, Writer 코드 작성
3-1. 먼저 Spring Batch Step의 각 실행 단계에 대해 간단히 알아보자
- Reader (읽기)
- 이 단계에서는 주로 데이터베이스, 파일 시스템, 외부 시스템 등에서 데이터를 읽어온다. Reader는 데이터를 가져오는 역할에 집중하며, 데이터 처리 로직은 포함하지 않는다.
- 이 단계에서는 주로 데이터베이스, 파일 시스템, 외부 시스템 등에서 데이터를 읽어온다. Reader는 데이터를 가져오는 역할에 집중하며, 데이터 처리 로직은 포함하지 않는다.
- Processor (처리)
- Processor 단계에서는 Reader를 통해 읽어온 데이터에 대해 비즈니스 로직을 적용한다. 이 단계는 데이터 변환, 필터링, 또는 복잡한 계산과 같은 작업을 수행할 수 있다. 일반적으로, Processor는 데이터를 처리만 하고, 데이터베이스에 대한 쓰기 작업은 수행하지 않는다. 하지만, 특정 요구 사항에 따라 Processor에서 간단한 데이터 조회나 상태 체크를 수행할 수도 있다.
- Processor 단계에서는 Reader를 통해 읽어온 데이터에 대해 비즈니스 로직을 적용한다. 이 단계는 데이터 변환, 필터링, 또는 복잡한 계산과 같은 작업을 수행할 수 있다. 일반적으로, Processor는 데이터를 처리만 하고, 데이터베이스에 대한 쓰기 작업은 수행하지 않는다. 하지만, 특정 요구 사항에 따라 Processor에서 간단한 데이터 조회나 상태 체크를 수행할 수도 있다.
- Writer (쓰기)
- Writer 단계는 처리된 데이터를 최종적으로 저장한다. 이는 데이터베이스에 데이터를 쓰거나, 파일 시스템에 파
일을 생성하는 등의 작업을 포함할 수 있다. Writer는 Processor를 통해 처리된 데이터에 대한 쓰기 작업에 집중한다.
- Writer 단계는 처리된 데이터를 최종적으로 저장한다. 이는 데이터베이스에 데이터를 쓰거나, 파일 시스템에 파
Step의 단계별 동작을 간단히 알아봤으니 Reader 코드에 대한 설명을 진행하겠다.
3-2. Reader 설명
- JpaPagingItemReader는 Spring Batch에서 제공하는 컴포넌트로, 데이터베이스에서 데이터를 효율적으로 읽기 위해 페이징 방식을 사용한다. 이 컴포넌트의 주요 목적은 대량의 데이터를 메모리에 한꺼번에 로드하지 않고, 페이지 단위로 나누어서 처리하는 것이다. 이는 메모리 사용량을 최적화하고, 대용량 데이터 처리 시 성능 문제를 방지하는 데 유용하다.
- 여기서 구현한 jpaPagingItemReader 메서드는 JPQL(Java Persistence Query Language)을 사용하여 MemberEventRecord 엔티티 중에서 아직 발행되지 않은 (published = false) 레코드들 중, 각 member.id와 snsTopic 조합별로 가장 최신의 레코드를 선택하고, 이를 id 순서대로 정렬하여 반환한다.
조합별로 가장 최신의 레코드를 선택한다는 게 무슨 의미일까?
이 경우에 "조합별로"라는 표현은 MemberEventRecord 엔티티의 member.id와 snsTopic 필드 값을 기준으로 데이터를 그룹화하는 것을 의미한다. 즉, 각각의 member.id와 snsTopic의 고유한 조합을 찾아 그 조합에 해당하는 레코드들 중에서 특정한 조건(여기서는 가장 큰 id 값을 가진 레코드)을 만족하는 데이터를 선택하는 것이다.
1. 예를 들어, 만약 다음과 같은 데이터가 있다고 가정해 보자
id | member.id | snsTopic | published |
1 | 101 | TopicA | false |
2 | 101 | TopicA | false |
3 | 102 | TopicB | false |
4 | 102 | TopicA | false |
5 | 101 | TopicB | false |
2. 위에서 작성한 "GROUP BY innerMer.member.id, innerMer.snsTopic" 는 다음과 같은 고유한 조합들을 만들어낼 것이다.
조합 1: member.id = 101 및 snsTopic = TopicA
조합 2: member.id = 102 및 snsTopic = TopicB
조합 3: member.id = 102 및 snsTopic = TopicA
조합 4: member.id = 101 및 snsTopic = TopicB
3. 그리고 이 조합들 중에서 각 조합별로 id 값이 가장 큰 레코드를 선택하게 된다.
- 즉, 조합 1에서는 id가 2인 레코드, 조합 2에서는 id가 3인 레코드, 조합 3에서는 id가 4인 레코드, 그리고 조합 4에서는 id가 5인 레코드를 선택하게 되는 것이다. 이런 식으로 각 조합별로 가장 최신(가장 높은 id 값을 가진) 레코드를 선택하는 것이다.
이렇게 나는 memberId와 snsTopic 조합이 중복되는 데이터 중에서 가장 최신의 데이터만을 가져와서 다시 SNS 메시지를 발행하도록 했다. 즉, 같은 member.id를 가진 레코드들 중에서 가장 최근의 정보만을 사용하여 SNS 메시지를 재발행하는 것이다.
이제 Reader를 통해 가져온 데이터를 Processor에서 활용하여 SNS에 메시지를 재발행하자
3-3. Processor 설명
- ItemProcessor는 배치 처리 과정에서 읽어온 데이터에 대한 비즈니스 로직을 적용하는 중요한 단계다. 이 과정에서 데이터를 변환하거나, 필요한 계산을 수행하거나, 조건에 따라 필터링을 할 수 있다.
- 내가 작성한 itemProcessor 코드는 스위치문 조건을 통과하면 snsService의 publishNicknameToTopic 메서드를 호출하여 SNS에 유실되었던 메시지를 재발행하는 비즈니스 로직을 실행한다. (switch문을 사용하는 이유는 서버에서 발행할 SNS메시지는 여러 개가 존재할 예정이라 각각 구분해서 SNS 메시지를 재발행하기 위해서다.)
Processor에서 비즈니스 로직을 수행해서 SNS이벤트를 재발행했다면 최종적으로 데이터를 저장하는 Writer 코드를 동작시키자
3-4. Writer 설명
- Spring Batch에서 ItemWriter는 처리된 데이터를 최종적으로 저장하거나 쓰는 역할을 한다. 이 단계는 전체 배치 처리 과정 중에서 데이터를 최종적으로 데이터베이스에 쓰거나, 필요한 경우에는 업데이트하는 작업을 수행한다.
- Spring Batch에서 청크 기반 처리를 사용하는 경우, 트랜잭션은 각 청크 단위로 관리된다. 즉, 한 청크의 모든 작업(읽기, 처리, 쓰기)이 완료된 후에 해당 청크에 대한 트랜잭션이 커밋되는 것이다.
- 예시를 들자면 청크의 크기가 10이고 총데이터가 100개라면, 전체 배치 처리는 다음과 같이 진행된다.
1. 각 청크(10개의 데이터)에 대해 Reader, Processor, 그리고 Writer가 순차적으로 실행된다.
2. 한 청크의 모든 데이터에 대한 읽기, 처리, 쓰기가 완료되면, 해당 청크에 대한 트랜잭션이 커밋된다.
3. 이 과정은 데이터가 모두 처리될 때까지 반복된다. 총 100개의 데이터가 있으므로, 청크 크기가 10이기 때문에 총 10개의 청크가 처리된다.
결과적으로, 각 청크가 독립적인 트랜잭션 단위로 취급되어 총 10번의 트랜잭션이 발생하고, 각 트랜잭션은 해당 청크의 모든 작업이 완료된 후에 커밋된다. 이 방식은 각 청크별로 데이터 처리의 일관성과 무결성을 보장하는 데 도움을 준다.
이제 내가 작성한 Writer를 알아보자
여기서 ItemWriter의 구현인 JpaItemWriter<MemberEventRecord>는 실제로 데이터베이스에 대한 작업을 하지 않는다. Read단계를 제외하고는 데이터베이스에 대한 처리가 없어서 커밋할 게 없다. 이 Writer 코드는 단지 ItemProcessor에서 수행된 비즈니스 로직 (여기서는 SNS 메시지 발행) 이후의 "쓰기" 단계를 대표하도록 작성해서 배치의 전체적인 구조만 잡아줬을 뿐이다.
결론적으로, 나한테 ItemWriter는 필요 없지만 Spring Batch의 처리 흐름을 완성하기 위해 작성했다.
작성한 Step의 3단계 코드
/**
* JpaPagingItemReader를 설정. publishedAt이 false인 Member 데이터를 조회하는 쿼리를 실행함.
*/
@Bean
public JpaPagingItemReader<MemberEventRecord> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
log.info("jpaPagingItemReader 동작!");
JpaPagingItemReader<MemberEventRecord> reader = new JpaPagingItemReader<>();
String jpql = """
SELECT mer
FROM MemberEventRecord mer
WHERE mer.published = false
AND mer.id IN (
SELECT MAX(innerMer.id)
FROM MemberEventRecord innerMer
WHERE innerMer.published = false
GROUP BY innerMer.member.id, innerMer.snsTopic
)
ORDER BY mer.id ASC
""";
reader.setQueryString(jpql);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(pageSize); // 페이지 크기 설정
return reader;
}
@Bean
public ItemProcessor<MemberEventRecord, MemberEventRecord> itemProcessor() {
return item -> {
log.info("processItem 동작중");
snsService.publishNicknameToTopic(item.getAttribute(), item.getTraceId());
return item;
};
}
/**
* ItemWriter를 설정. 업데이트된 엔터티를 처리. 여기서 작업이 마무리되면 트랜잭션이 종료되고 커밋된다.
*/
@Bean
public ItemWriter<MemberEventRecord> itemWriter() {
log.info("itemWriter 동작!");
JpaItemWriter<MemberEventRecord> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
지금까지 SNS발행도중 http 네트워크의 문제로 유실된 메시지를 배치를 통해 재발행하는 과정을 알아봤다. 근데 여기서 SNS와 배치 트랜잭션의 관계성에 대해 궁금증이 생길 수도 있다. 이에 설명을 추가한다.
4. SNS 메시지 재발행과 트랜잭션의 관리의 독립성
4-1. SNS 메시지 재발행과 트랜잭션의 관리의 독립성
- AWS SNS를 사용한 메시지 발행은 데이터베이스 트랜잭션과 독립적이다. 즉, SNS 메시지를 보내는 작업은 스프링의 @Transactional 어노테이션으로 관리되거나 롤백되지 않는다. 트랜잭션은 데이터베이스와 관련된 작업에만 적용되는 것이다.
4-2. 데이터베이스 트랜잭션과의 차이점
- Spring Batch에서는 @Transactional 어노테이션이 데이터베이스 작업에 사용된다. 만약 데이터베이스 작업에 문제가 생기면, 그 트랜잭션은 롤백돼서 원래 상태로 돌아간다. 하지만, SNS 같은 외부 시스템 호출은 이런 트랜잭션 관리에 포함되지 않는다. 예를 들어, SNS 메시지는 성공적으로 보내졌는데 데이터베이스 작업에서 문제가 생겼다면, 메시지는 그대로 남아 있고 데이터베이스 작업만 롤백되는 상황이 발생한다.
4-3. 실패 처리와 예외 대응
- SNS 메시지 발행 실패 같은 외부 시스템과의 상호작용은 별도로 관리해야 한다. 예외 상황에서 어떻게 대응할지 사전에 계획하는 것이 중요하고, 실패 처리 로직도 별도로 구현해야 한다. 나의 경우처럼 데이터베이스 쓰기가 없다면, 읽기에서 쿼리나 데이터의 문제로 읽어오는 도중에 오류가 발생하는 상황을 제외하고는 실패 처리를 크게 고려할 필요는 없을것이다.
나는 배치코드를 작성하고 나서 스프링 스케쥴러를 통해 5분마다 배치코드가 동작하도록 설정했다.
5. 스케쥴러 적용하기
5-1. 스케쥴러 설정 클래스 생성
- 스케쥴러를 설정하려면 SchedulerConfig 클래스를 직접 선언해야 한다. 이 클래스는 @EnableScheduling 어노테이션을 사용해서 스케줄링 기능을 활성화한다.
5-2. 스케쥴러의 역할
- @Scheduled 어노테이션으로 정해진 크론 표현식에 따라 Job을 실행시킨다. 예를 들어, 내가 작성한 "0 */5 * * * *" 설정은 매 5분마다 Job을 실행시키고, "*/10 * * * * *" 설정은 10초마다 실행시킨다.
5-3. JobLauncher란
- 스케쥴러를 설정하는 과정에서 핵심적인 역할을 하는 것이 바로 JobLauncher다. JobLauncher는 스프링 배치에서 잡을 실행하는 주요 인터페이스로, 잡과 잡 파라미터를 받아 실행을 시작하고, JobExecution 객체를 반환한다.
5-4. 스케쥴러의 흐름 설명
- 스케쥴러가 5분마다 runBatchJob 메서드를 호출한다.
- JobParametersBuilder를 사용해 각 실행을 구별하는 JobParameters를 생성한다.
- jobLauncher.run 메서드로 잡을 실행 시켜서 유실된 SNS메시지를 재발행한다.
이렇게 배치와 스케쥴러를 함께 적용시켜서 5분마다 유실된 SNS 메시지를 재발행하도록 했다.
작성한 Batch 전체 코드 (SpringBoot3)
import com.fasterxml.jackson.databind.ObjectMapper;
import com.recipia.member.aws.SnsService;
import com.recipia.member.domain.event.MemberEventRecord;
import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
/**
* DefaultBatchConfiguration을 확장하는 방식은 스프링 배치 5에서 새로 도입된 접근 방식이다.
* 이는 @EnableBatchProcessing의 기능을 포함하며, 기본 배치 인프라 구성을 제공한다.
*/
@Slf4j
@RequiredArgsConstructor
@Configuration
public class BatchConfig extends DefaultBatchConfiguration {
private final SnsService snsService;
private final EntityManagerFactory entityManagerFactory;
private final ObjectMapper objectMapper;
// 배치 작업에서 사용할 chunkSize 값 설정. 기본값은 100
@Value("${chunkSize:100}")
private int chunkSize;
// 페이징을 위한 사이즈 설정. 기본값은 10
@Value("${pageSize:10}")
private int pageSize;
/**
* Job을 정의. 하나 이상의 Step을 포함할 수 있으며, 여기서는 step을 시작점으로 설정함.
*/
@Bean
public Job job(final JobRepository jobRepository, final Step step) {
return new JobBuilder("job", jobRepository)
.start(step)
.build();
}
/**
* Step을 정의. 데이터 처리의 실질적인 단계를 구성함.
* - reader: JpaPagingItemReader를 사용해 publishedAt이 false인 Member 데이터를 읽음
* - writer: 읽어온 데이터를 SnsService를 통해 처리함
*/
@Bean
@Transactional
public Step step(final JobRepository jobRepository,
final PlatformTransactionManager transactionManager) {
return new StepBuilder("step", jobRepository)
.<MemberEventRecord, MemberEventRecord>chunk(this.chunkSize, transactionManager)
.reader(this.jpaPagingItemReader(entityManagerFactory))
.processor(this.itemProcessor())
.writer(this.itemWriter())
.build();
}
/**
* JpaPagingItemReader를 설정. publishedAt이 false인 Member 데이터를 조회하는 쿼리를 실행함.
*/
@Bean
public JpaPagingItemReader<MemberEventRecord> jpaPagingItemReader(EntityManagerFactory entityManagerFactory) {
log.info("jpaPagingItemReader 동작!");
JpaPagingItemReader<MemberEventRecord> reader = new JpaPagingItemReader<>();
String jpql = """
SELECT mer
FROM MemberEventRecord mer
WHERE mer.published = false
AND mer.id IN (
SELECT MAX(innerMer.id)
FROM MemberEventRecord innerMer
WHERE innerMer.published = false
GROUP BY innerMer.member.id, innerMer.snsTopic
)
ORDER BY mer.id ASC
""";
reader.setQueryString(jpql);
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(pageSize); // 페이지 크기 설정
return reader;
}
@Bean
public ItemProcessor<MemberEventRecord, MemberEventRecord> itemProcessor() {
return item -> {
log.info("processItem 동작중");
switch (item.getSnsTopic()) {
case "NicknameChange" -> snsService.publishNicknameToTopic(item.getAttribute(), item.getTraceId());
}
return item;
};
}
/**
* ItemWriter를 설정. 업데이트된 엔터티를 처리. 여기서 작업이 마무리되면 트랜잭션이 종료되고 커밋된다.
*/
@Bean
public ItemWriter<MemberEventRecord> itemWriter() {
log.info("itemWriter 동작!");
JpaItemWriter<MemberEventRecord> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
}
MSA의 분산추적 로깅 Zipkin 적용방법이 궁금하다면?
이 포스트는 Team chillwave에서 사이드 프로젝트 중 적용했던 부분을 다시 공부하며 기록한 것입니다.
시간이 괜찮다면 팀원 '평양냉면 7'님의 블로그도 한번 봐주세요 :)
'MSA' 카테고리의 다른 글
[Spring] 헥사고날 아키텍처 (2) | 2024.08.10 |
---|---|
MSA 서버 간 통신: SNS의 MessageAttributes로 완벽한 Zeropayload 전략 구현하기 (1) | 2023.12.01 |
Spring MSA 프로젝트에서 단일 책임 원칙을 지키기 위한 리팩토링 (3) | 2023.11.24 |
Spring MSA: Sampling으로 원하는 http요청만 Zipkin으로 추적하기 (0) | 2023.11.23 |
Zipkin 로그 최적화: AWS ALB 헬스 체크 설정과 로그 추적 간소화 (2) | 2023.11.22 |