반응형

 

 

현대의 시스템 아키텍쳐에서 시스템 간 데이터를 주고받을 때, 처리를 즉시 완료하지 않아도 되는 작업이라면 비동기 메시징 방식이 효과적으로 사용된다.
특히 마이크로소프트 아키텍쳐를 위시한 분산 환경에서는 구성 요소 간 결합도를 낮추는 디커플링을 위해 메시지 큐의 활용도가 증가하고 있다.

이번 포스팅에서는 AWS에서 제공하는 메시지 서비스 중 하나인 SQS를 활용하여 서비스 간의 통신을 Spring Application에서 구현하는 과정을 다루어 보려고 한다.

 

 

1. AWS 인프라 설정
- 1-1. SQS 생성
- 1-2. SQS 설정
- 1-3. 엑세스 정책 설정
- 1-4. AWS CLI 설치 및 프로파일 설정
2. Producer 애플리케이션 작성
- 2-1. 애플리케이션 의존성 및 환경 설정
- 2-2. SQS 관련 클래스 작성
- 2-3. API 엔드포인트 작성 및 테스트
3. Consumer 애플리케이션 작성
- 3-1. 애플리케이션 의존성 및 환경 설정
- 3-2. SQS 관련 설정 클래스 작성
- 3-3. 결과 확인
4. 큐 비교 : Standard vs FIFO
- 4-1. 메시지 포멧 비교
- 4-2. 순서 보장 비교

 

 

 


시나리오

 

 

사용할 시나리오로서 주문 처리 시스템과 배송 시스템 간의 통신을 구현할 것이다.

일반적으로 주문 시스템은 사용자의 요청에 빠르게 응답해야 하고, 내부적으로 복잡한 상태 변경(결제 완료, 재고 차감 등)을 수행할 것이다.반면 배송 시스템은 주문 데이터가 생성된 이후, 비교적 느슨한 시간 제약 하에 배송 준비 및 전산 등록 처리를 진행할 것이다.

따라서 메시지 큐를 활용하기 좋은 케이스 중 하나라고 생각한다.

  1. 요청이 발생하면, Producer(주문 시스템)은 필요한 내부 처리(주문 처리)를 수행한 후 메시지를 SQS에 전송한다.
  2. SQS는 메시지를 안전하게 저장하고, Consumer가 이를 가져갈 때까지 보존한다.
  3. Consumer(배송 시스템)은 주문 시스템에서 전송된 메시지를 Polling 방식으로 수신하여 배송 시스템의 전산 처리를 수행한다.

 

 

 

 


1. AWS 인프라 설정

 

애플리케이션 작성에 앞서서 필요한 AWS 인프라들을 구성하는 것부터 시작하자.

 

 

 


1-1. SQS 생성

 

Standard SQS 생성

 

FIFO SQS 생성

 

 

먼저 사용할 SQS 큐를 만들어 주는 작업을 해보자. 이번 포스팅에서는 SQS의 Standard Queue와 FIFO Queue를 둘 다 적용해보면서 차이점을 비교해 볼 예정이기 때문에 두 개의 큐를 생성해주었다.

 

 

 


1-2. SQS 설정

 

설정에서는 SQS에서 중요한 개념들을 설정할 수 있다.

  • 표시 제한 시간 (Visibility Timeout) : 중복 처리를 방지하기 위한 메커니즘. 메시지를 처리하기 충분한 시간, 오류 및 에러 상황에 대응할 수 있는 시간 설정이 필요하다. 
  • 메시지 수신 대기 시간 (Receive Message Wait Time) :  Short Polling 및 Long Polling 방식을 설정할 수 있다. (시간이 길 수록 Long Polling, 최대 20초)

 

 


 

FIFO Queue의 경우에는 추가적인 설정이 가능하다. 

  • 콘텐츠 기반 중복 제거 (Content-based deduplication) : FIFO에서는 완벽한 중복 처리 방지를 위해서 Deduplication ID를 사용하였다.
    • 해당 설정을 활성화한다면 Deduplication ID의 용도로 메시지 바디의 SHA-256 해시를 사용하여 중복을 감지한다.
    • 해당 설정을 비활성화한다면 직접 Deduplication ID를 Sender 측에서 설정해주어 전송해야 한다. 
    • 해당 포스팅에서는 비활성화하고 Application 측에서 Deduplication ID를 설정해주는 방식을 사용하였다.

 

 

<함께 보기> SQS의 개념과 구성 요소

https://sjh9708.tistory.com/270

 

[AWS] 메시지 서비스의 이해 : SQS & SNS & Kinesis

현대의 애플리케이션 아키텍처에서는 서비스 간 결합도를 낮추고 유연한 처리를 가능하게 하기 위해 메시지 큐(Message Queue)를 활용하는 경우가 많다. 특히 마이크로서비스 환경에서는 각 서비스

sjh9708.tistory.com

 

 

 


1-3. 엑세스 정책 설정

 

IAM 사용자 생성
엑세스 정책 설정

다음으로 SQS에 접근할 수 있는 엑세스에 대한 규칙을 정의하였다. 필자는 IAM 사용자 프로파일을 사용하여 Spring Application에서 SQS에 접근하도록 설정할 것이다.

따라서 IAM 사용자를 생성하고, SQS의 엑세스 정책에서 해당 IAM 사용자에 대한 접근을 허용해 주었다.

 

 


1-4. AWS CLI 설치 및 프로파일 설정

 

로컬 개발 환경에서 AWS 인프라에 Profile 기반으로 접근하기 위해서 AWS CLI를 설치해주었다. MAC 기준으로 homebrew를 통해서 AWS CLI를 개발 컴퓨터에 설치해주었다.

brew install awscli

 

 

CLI에서 엑세스 키를 이용하여 방금 생성한 IAM 사용자에 대한 Profile을 생성해주자.

aws configure --profile YOUR_PROFILE_NAME

 

 

 

 


2. Producer 애플리케이션 작성

 

이제 주문 시스템에 해당하는 Producer측의 Spring Application을 작성해 볼 예정이다. 간단하게 아래의 내용을 수행하는 Application을 만들 것이다.

  1. 주문 완료 처리를 하는 API 엔드포인트 생성
  2. API 호출 시 SQS Client를 통해서 SQS에 Message를 전달
  3. Spring Application은 3.4.5를 사용

 


2-1. 애플리케이션 의존성 및 환경 설정

 

▶ build.gradle

implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'
  • AWS SQS 연동을 위해 spring-cloud-aws-starter-sqs 의존성을 추가하여주었다.
  • 해당 패키지는 Spring Boot에서의 빠른 AWS 인프라 연동에 특화되어있다. (software.amazon.awssdk 종류는 Java 전체와 호환된다.)

 

▶ application.yml

cloud:
  aws:
    region: ap-northeast-2
    profile: default
    sqs:
      queue-type: standard #fifo
      queue-name: my-sqs-queue #my-sqs.fifo
  • 사용할 AWS 리전, 프로파일, 큐 타입 및 큐 이름을 지정하였다.
  • 큐 이름(queue-name)은 방금 만든 my-sqs-queue(Standard Queue) 또는 my-sqs.fifo(FIFO Queue)를 지정하여, 두 종류의 큐를 모두 테스트해 볼 예정이다.
  • 큐 종류에 따라서 Send 시 보내야 할 메시지 포멧이 달라지기 때문에 프로퍼티로 queue-type을 지정하여 이를 기준으로 메시지 포멧을 작성하도록 설정하였다.

 

 


2-2. SQS 관련 클래스 작성

 

@Configuration
public class SQSConfig {

    @Value("${cloud.aws.region}")
    private String region;

    @Value("${cloud.aws.profile}")
    private String profile;

    // SQS Client
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .region(Region.of(region))
                .credentialsProvider(ProfileCredentialsProvider.create(profile)) // CLI 프로파일 사용
                .build();
    }

    // SQS 템플릿
    @Bean
    public SqsTemplate sqsTemplate() {
        return SqsTemplate.newTemplate(sqsAsyncClient());
    }

}
  • Spring 애플리케이션에서 AWS SQS와 통신하기 위한 클라이언트를 구성하였다.
  • 환경 변수에 설정하였던 AWS Region과 AWS Profile 정보를 주입받아 사용해주었다.
  • SqsAsyncClient는 AWS SDK v2의 비동기 클라이언트이며, 지정한 프로파일을 통해 인증하여 클라이언트 Bean을 생성해주었다.
  • SqsTemplate은 메시지 송수신을 간편하게 도와주는 템플릿이다. 마찬가지로 Bean으로 등록해주었다.

 

@Service
@RequiredArgsConstructor
public class SQSSendService {
    private final SqsTemplate template;

    @Value("${cloud.aws.sqs.queue-name}")
    private String queueName;

    @Value("${cloud.aws.sqs.queue-type}")
    private String queueType;

    public SendResult<String> sendMessage(String groupId, String name, String item) {
        try {
            // 고유값 주문번호 생성
            String orderId = UUID.randomUUID().toString();

            // Body를 JSON 형태로 변환
            ObjectMapper objectMapper = new ObjectMapper();
            Map<String, String> messageMap = new HashMap<>();
            messageMap.put("orderId", orderId);
            messageMap.put("name", name);
            messageMap.put("item", item);
            String message = objectMapper.writeValueAsString(messageMap);

            System.out.println(
                    String.format("Send : %s 주문 처리가 완료되었습니다. 배송 시스템으로 정보를 전달합니다: (주문상품 : %s & 주문자 : %s)",
                            orderId, name, item));

            if(queueType.equals("fifo")){ 
            	//FIFO인 경우 메시지에 groupID와 DeduplicationID 지정
                return template.send(to -> to
                        .queue(queueName)
                        .messageGroupId(groupId)
                        .messageDeduplicationId(orderId)
                        .payload(message));
            }
            else{ 
                return template.send(to -> to
                        .queue(queueName)
                        .payload(message));
            }
        } catch (JsonProcessingException e) {
            throw new RuntimeException("JSON 직렬화 실패", e);
        }

    }
}
  • SQS로 메시지를 전송하는 서비스 로직을 담당하는 클래스를 작성해주었다.
  • 입력된 주문자 이름과 상품을 JSON 형태로 변환한 뒤, Message Body로 SQS에 전송한다.
  • FIFO 큐일 경우 메시지 Group ID와 Deduplicated ID를 함께 지정해 순서와 중복 처리를 보장할 수 있도록 설정한다. 특히 FIFO Queue를 만들 때 콘텐츠 기반 중복 제거를 Disable시켰다면 반드시 Deduplicated ID를 함께 메시지에 포함시켜야 한다.

 

 


2-3. API 엔드포인트 작성 및 테스트

 

@RestController
@RequiredArgsConstructor
@RequestMapping("/sqs")
public class SQSSendController {

    private final SQSSendService sqsSendService;

    @PostMapping("/send")
    public ResponseEntity<SendResult> sendOrder(@RequestParam String name,
                                            @RequestParam String item) {
        SendResult messageBody = sqsSendService.sendMessage("order", name, item);

        return ResponseEntity.ok(messageBody);
    }
}

 

  • POST 요청을 통해 주문 요청을 하는 API Endpoint를 작성해주었다.

 

 


Postman을 통해 주문 요청

 

AWS Console에서 확인할 수 있는 SQS 내부의 메시지

  • API 요청을 날려보면 메시지 처리 성공 결과를 확인할 수 있다.
  • AWS Console에서 SQS에 저장된 메시지를 확인할 수 있다.

 

 

 

 


3. Consumer 애플리케이션 작성

 

이제 배송 시스템에 해당하는 Consumer측 Application을 작성해 볼 예정이다. 

  1. SQS Listener를 생성하여 주기적으로 폴링하여 Message를 소비
  2. 메시지를 소비할 때 주문 시스템에 전산 처리

 

 


3-1. 애플리케이션 의존성 및 환경 설정

 

▶ build.gradle

implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'

▶ application.yml

cloud:
  aws:
    region: ap-northeast-2
    profile: default
    sqs:
      queue-name: my-sqs-queue #my-sqs.fifo
  • Producer와 동일한 사용 목적으로 Consumer의 의존성과 환경 변수를 설정해주었다.

 

 


3-2. SQS 설정 클래스 작성

 

@Configuration
public class SQSConfig {

    @Value("${cloud.aws.region}")
    private String region;

    @Value("${cloud.aws.profile}")
    private String profile;

    // SQS Client
    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
                .region(Region.of(region))
                .credentialsProvider(ProfileCredentialsProvider.create(profile)) // CLI 프로파일 사용
                .build();
    }

    // Listener Factory
    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory
                .builder()
                .sqsAsyncClient(sqsAsyncClient())
                .build();

    }

}
  • Spring 애플리케이션에서 AWS SQS로부터 메시지를 수신하기 위한 리스너 환경을 구성하였다.
  • 환경 변수에 설정한 AWS Region과 AWS Profile 정보를 주입받아 비동기 클라이언트(SqsAsyncClient)를 생성해주었다.
  • SqsMessageListenerContainerFactory는 이 클라이언트를 기반으로 @SqsListener가 사용할 수 있는 리스너 컨테이너를 생성한다.
  • 해당 설정을 통해 애플리케이션은 SQS로부터 수신되는 메시지를 자동으로 Polling하고 처리할 수 있도록 준비되었다.

 

 


@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage {
    private String orderId;
    private String item;
    private String name;
}
@Service
@RequiredArgsConstructor
public class SQSListenerService {

    @SqsListener("${cloud.aws.sqs.queue-name}")
    public void listenOrderQueue(@Headers Map<String, Object> headers, String message) {

        ObjectMapper mapper = new ObjectMapper();
        try {
            OrderMessage order = mapper.readValue(message, OrderMessage.class);
            System.out.println(
                    String.format("Receive : %s 주문 정보가 도착했습니다. 배송현황 데이터베이스에 저장합니다. : (주문상품 : %s & 주문자 : %s)",
                            order.getOrderId(), order.getItem(), order.getName()));

            // SqsListener는 Exception이 나지 않으면 자동으로 삭제 처리를 한다!
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

 

  • SQS로부터 수신한 메시지를 처리하는 리스너 클래스를 작성해주었다.
  • 수신한 메시지를 JSON 형태에서 OrderMessage 객체로 변환하여 메시지의 필요한 정보를 추출한다.
  • 정상적으로 메서드가 종료되면 해당 메시지는 자동으로 SQS 큐에서 삭제되어 재처리되지 않는다.
  • 반대로 예외가 발생할 경우에는 삭제되지 않고 Visibility Timeout 이후 재시도된다. 혹은 DLQ로 이동할 수 있도록 추가적으로 설정할 수 있다.

 

 


3-3. 결과 확인

 

메시지를 Listener가 Consume하는 모습을 확인할 수 있다.

 

 

 

 

 


4. 큐 비교 : Standard vs FIFO

 

앞의 과정에서 SQS를 활용하여 비동기 통신을 하는 Spring Application 2개를 만들어 보았다. 그렇다면 SQS Standard일 경우와 FIFO일 경우에 어떤 차이점들이 존재했는지 확인해보자.

 

 


4-1. 메시지 포멧 비교

 

  • Payload : Message의 본문 내용이 포함된다.
  • Message Group ID : FIFO의 경우 메시지를 그룹 단위로 순서를 보장하여 처리하기 위한 "Sqs_Msa_MessageGroupId"가 포함된다.
  • Sequence Number : FIFO 큐의 "additionalInformation.sequenceNumber"는 메시지의 순서를 식별하는 고유 번호로, SQS 내부에서 순차 처리를 위한 기준이 된다. 
  • Message Deduplication ID : FIFO 큐는 메시지 중복을 방지하기 위해 "Sqs_Msa_MessageDeduplicationId" 필드를 포함하며, 콘텐츠 기반 중복 제거가 비활성화된 경우 반드시 명시해야 한다.

 

 

 


4-2. 순서 보장

 

두 사진은 각각 Standard Queue와 FIFO Queue를 사용했을 때의 메시지 송수신 로그를 보여준다.

Standard Queue에서의 Producer와 Consumer

Standard Queue의 경우 파란색으로 표시된 일부 메시지를 보면 알 수 있듯 Send 순서와 관계없이 Receive 순서가 무작위로 섞여 있는 것을 볼 수 있다.
이는 Standard 큐가 높은 처리량을 제공하는 대신 메시지 순서를 보장하지 않음을 보여준다.

 

FIFO Queue에서의 Producer와 Consumer

반면 FIFO의 경우 Send된 메시지와 Receive된 메시지의 순서가 정확히 일치하며, FIFO 큐의 순서 보장을 확인할 수 있다.

따라서 메시지 순서가 중요한 시스템에서는 FIFO 큐를, 처리량이 중요한 경우에는 Standard 큐를 선택하는 것이 적절하다.

 

 

 

 

 


지금까지 AWS SQS를 활용해 Spring 애플리케이션 간 비동기 메시지 통신을 구현하고, Standard Queue와 FIFO Queue의 차이를 직접 실습을 통해 비교해보았다.
메시지 순서 보장 여부, 중복 처리 방식, 메시지 포맷의 차이 등을 살펴보며 두 큐가 어떤 상황에 적합한지 확인할 수 있었다. SQS를 활용한 마이크로서비스 간 연계나 이벤트 기반 처리 구조를 설계할 때 도움이 되었으면 좋겠다.

 

 

 

 

 

 


References

 

 

https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/example_code/ec2#code-examples

 

aws-doc-sdk-examples/javav2/example_code/ec2 at main · awsdocs/aws-doc-sdk-examples

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below....

github.com

 

https://devbksheen.tistory.com/entry/Spring-boot-3x-Amazon-SQS-%EC%A0%81%EC%9A%A9%EA%B8%B0

 

Spring boot 3.x + Amazon SQS 적용기

개요 업무상 Amazon SQS를 적용할 일이 있어 예전에 정리한 SQS 설정 방법대로 설정했지만 Listener이 작동하지 않는 이슈가 있었습니다. 서칭해본 결과 Spring Cloud AWS 2.x 버전은 Spring Boot 3.x에 완전히

devbksheen.tistory.com

 

 

반응형

BELATED ARTICLES

more