AWS SQS와 EventBridge를 활용한 스케줄러 만들기
728x90
반응형

매일 특정 시간마다 알림톡을 발신하는 기능을 만들려고 한다. 스케줄러 역할에는 AWS의 EventBridge를 사용했고 이벤트 메세지 발신용도로 SQS를 사용했다. 개발 환경은 아래와 같다.

 

  • Kotlin 플러그인 버전 1.7.22
  • 스프링부트 3.0.5
    • id("org.springframework.boot") version 3.0.5
  • JDK 17
    • java.sourceCompatibility = JavaVersion.VERSION_17
  • MySQL, JPA, QueryDSL
  • 기타
    • id("io.spring.dependency-management") version 1.1.0

 

지금부터 소개할 내용들은 아래 자료를 참고하였다. 소개할 내용들이 이해가 가지 않는다면 아래 자료를 읽어볼 것을 권장한다.

 

 

당연히 시작은 의존성 추가부터

SQS 메세지를 수신하기 위해 아래와 같은 의존성 추가를 해둔다. 참고로 Spring cloud 버전은 2022년 이후부터 3.0으로 바뀌면서 패키지명도 변경되었다.

// build.gradle.kts
dependencyManagement {
    imports {
        mavenBom("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.0")
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:2022.0.2")
    }
}

dependencies {
    implementation("io.awspring.cloud:spring-cloud-aws-sqs")
    implementation("io.awspring.cloud:spring-cloud-aws-autoconfigure")
}

 

 

SQS 대기열 (SQS큐)

아래 이미지를 참고하여 대기열 생성 버튼을 클릭한다. 이름을 작성한다. 추가로 설정하고 싶은게 있으면 수정하고 마지막으로 "생성" 버튼을 누르면 대기열이 생성된다.

 

대기열을 만들었으면 해당 대기열의 메세지를 수신하기 위한 설정을 해야 한다. application.yml에 대기열 이름을 넣자. 아래 batch.sqs-name은 내가 임의로 만든 변수다. 나중에 코드 내에서 사용하기 위함이다. 값에는 방금 SQS 대기열에 생성한 이름을 넣자.

// application.yml
batch:
  sqs-name: my-queue-test

 

 

BatchSqsListener

SQS 메세지를 수신하는 리스너를 만들자. 확장성을 염두하여 인터페이스를 먼저 만들었지만 인터페이스 없이 간단하게 바로 BatchSqsListener를 구현해도 된다. 아래 BatchSqsListener는 메세지를 확인해서 특정 메세지용 Executor가 실행되는 방식으로 동작한다.

 

interface BatchListener {
    fun messageListener(payload: String)
}
@Component
class BatchSqsListener(
    private val executors: List<BatchEventExecutor>,
    private val mapper: ObjectMapper = jacksonObjectMapper()
        .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
) : BatchListener {
    @SqsListener(value = ["\${batch.sqs-name}"])
    override fun messageListener(payload: String) {
        logger.info(">>>> Batch Start")
        val batchEvent = parse(payload)
        val now = LocalDateTime.now()
        logger.info(">>>> Batch : {} / {}", batchEvent, LocalDateTimeFormatter.parseToPattern(now))
        batchEvent?.let {
            executors.find { executor -> executor.validate(batchEvent) }
                ?.execute(payload)
                ?: logger.error(">>>> unsupported batchEvent: $batchEvent")
        }
    }

    private fun parse(payload: String): BatchEvent? {
        try {
            return mapper.readValue(payload)
        } catch (e: UnrecognizedPropertyException) {
            e.printStackTrace()
        }
        return null
    }

    companion object : KLogging()
}

 

위 코드에서 설명이 추가로 필요한 객체는 아래와 같다. 이 객체는 나중에 설명하겠다.

  • BatchEventExecutor
  • BatchEvent

 

SQS 메세지를 수신하기 위해서는 반드시 SqsListener 어노테이션이 세팅된 함수가 필요하다. 여기서는 messageListener 함수가 이에 해당된다. value에는 메세지가 발신되는 SQS큐 이름이 지정되어야 한다. 여기서는 SQS 이름을 application.yml에 지정했기 때문에 변수 경로를 입력한다.

 

메세지를 해석해서 메세지에 맞는 Executor를 찾아야 한다. 메세지를 해석하는 것은 parse 함수로 한다. JSON으로 메세지가 수신되고 jacksonObjectMapper로 파싱을 한다. 메세지는 아래와 같은 형태로 보낼 예정이다.

// 메세지 예시
{"event_type":"GUIDE"}

 

 

BatchEvent와 BatchEventExecutor

SQS에서 발신되는 메세지는 BatchEvent로 파싱된다. event_type의 value는 enum으로 관리하여 지정된 메세지에만 반응하도록 했다.

data class BatchEvent(
    val eventType: BatchEventType = BatchEventType.UNKNOWN
)

enum class BatchEventType(val description: String) {
    UNKNOWN("미정"),
    GUIDE("회원 가이드 메세지 발신"),
    USER_INACTIVATION("회원 휴면 처리")
}

 

 

참고로 필자의 환경에서는 기본적인 요청/응답 메세지가 Snake case 기준으로 필터되게 해 두었다. 그래서 BatchEvent도 실제 메세지의 key값이 'event_type'이더라도 객체의 변수명인 'eventType'에 맞게 파싱이 된다.

// application.yml
spring:
  jackson:
    property-naming-strategy: SNAKE_CASE

// SnakeCaseFilter.kt
@Configuration
@Order(Ordered.HIGHEST_PRECEDENCE)
class SnakeCaseFilter : OncePerRequestFilter() {
  // 생략
}

 

이렇게 파싱된 BatchEvent는 BatchSqsListener에 주입된 BatchEventExecutor에 의해 어떤 타입의 이벤트인지 확인한다. validate 함수의 역할은 이벤트가 어떤 BatchEventExecutor를 수행시킬지 확인하는 것이다. execute 함수는 실제로 동작할 기능을 구현한 코드가 들어있다.

interface BatchEventExecutor {
    fun validate(event: BatchEvent): Boolean
    fun execute(payload: String)
}

 

Component로 구현된 여러 BatchEventExecutor들은 BatchSqsListener에 주입된 상태이다. 각 BatchEventExecutor는 validate를 통해 해당 이벤트가 스스로에 의해 수행되어야 하는지 확인한다. 이벤트 대상이 본인임을 확인하면 수행한다. BatchEventExecutor 구현체의 예시는 아래와 같다.

 

@Component
class TempUserGuideExecutor(
    private val userRepository: UserRepository,
    private val smsSender: SmsSender,
) : BatchEventExecutor {
    override fun validate(event: BatchEvent): Boolean =
        event.eventType == BatchEventType.GUIDE

    override fun execute(payload: String) {
        val baseUsers = userRepository.findUserByCreatedAt()
        if (baseUsers.isEmpty()) {
            return
        }
        val params = baseUsers.map {
            NotificationParameter(
                recipientNo = it.mobilePhoneNo,
                parameterMap = mutableMapOf<String, String>()
            )
        }
        smsSender.sendAllMmsByTemplate(
            templateCode = NotificationType.SIGN_UP_GUIDANCE,
            params = params,
            isErrorIgnore = true
        )
    }
}

 

메세지 예시로 보여준 JSON이 실제로 수신하여 실행되는 BatchEventExecutor가 바로 위 코드다. smsSender는 휴대폰 메세지를 보내기 위한 인터페이스다. 본 컨텐츠에는 중요한 부분이 아니기에 무시해도 좋다. 이런 BatchEventExecutor를 여러개 만들어두면 BatchSqsListener에서는 모두 주입받고 있다가 메세지를 해석해서 적절한 BatchEventExecutor를 수행하게 된다.

 

 

SQS 메세지 수신 테스트

스케줄러를 만들기 전에 메세지 수신이 잘 되는지 테스트를 해보자. 지금까지 짠 코드가 들어간 어플리케이션을 실행(Run)한다. 어플리케이션이 잘 떴으면 AWS SQS 콘솔화면으로 접근하자. 그리고 앞에서 만든 SQS 이름을 클릭한다. 그러면 아래와 같은 화면을 볼 수 있다. 상단에 "메시지 전송 및 수신"이라는 버튼을 클릭하자.

 

아래와 같은 화면을 볼 수 있다. 이제 메시지 본문에 예시의 JSON을 입력하자. 그리고 오른쪽 상단의 메시지 전송을 클릭한다. 

 

Logger를 잘 넣어두었다면 ">>>> Batch Start"로 시작하는 로그들이 보일 것이다. 만약 이런 로그가 보이지 않는다면 콘솔 화면에서 "사용 가능한 메시지"나 "이동 중인 메시지"를 찾아보자. 숫자 0으로 보이면 어디선가 수신이 된 것이다. 만약 숫자 1 이상의 값이 있다면 메세지를 발신했지만 어디에서도 수신을 하지 못한 것이다. 관련한 정보는 특정 대기열의 정보 내에서 "모니터링"탭을 통해서도 확인이 가능하다.

 

 

 

EventBridge

스케줄러에 해당하는 AWS 서비스인 이벤트브릿지를 이용해보자. 아래 이미지를 보면 왼쪽 카테고리에 여러개의 서비스가 보인다. 스케줄러로 이용 가능한 것은 "규칙"과 "일정"이 있는데 비교적 사용이 더 쉬운 "일정"을 선택해보자. 그리고 일정 "생성 버튼"을 클릭한다.

 

 

아래와 같은 화면이 나타나면 일정 이름과 설명을 작성한다. 만들어야 할 일정이 많다면 일정 그룹을 만들어서 사용하면 된다. 스케줄러는 기본적으로 반복적으로 발생되는 경우를 말한다. 그러므로 "반복 일정"을 선택하고 "Cron 기반 일정"을 선택해서 특정 날짜에 반복해서 발생하도록 만든다. 30분간 혹은 몇시간 간격으로 만들고 싶다면 그 옆의 "Rate 기반 일정"을 선택하면 된다. 

 

Cron 작성법을 몰라도 된다. Cron 표현식 옆에 "정보"라고 적혀있는 링크를 누르면 작성법을 알려준다. 이를 참고하여 작성해보자. 유연한 기간은 "꺼짐"으로 사용했다. 이 기능을 잘 이해가 되지 않고 또한 유연하게 발생하게 만들고 싶지 않아서 이렇게 했다. 그 아래에 기간 설정하는 것이 있었는데 필요시 작성하면 된다. 작성이 완료되면 "다음" 버튼을 클릭하자.

 

스케줄러 이벤트가 발생되었을 때 발생 대상을 어디로 둘지를 세팅할 차례이다. 대상 API를 Amazon SQS - SendMessage를 선택한다. 아래에 SendMessage 창이 생긴다. 여기에 application.yml에 작성한 SQS 이름을 찾아서 선택하자. 그리고 발송할 메세지를 입력하자. 여기서는 앞에서 소개한 JSON String을 넣으면 된다.

 

본 스케줄러를 완성하자마자 활성화 시키고 싶다면 일정 상태의 활성화를 ON에 두면 된다. 재시도 정책도 본인이 원하는대로 설정하면 된다. 필자의 경우에는 재시도를 하되 너무 반복적으로 발생되면 오히려 어플리케이션에 부담을 주기 때문에 최소화 형태로 세팅하였다. 

 

 

마지막으로 권한의 역할 이름을 작성하자. 기존 역할을 사용해도 되지만 작성법을 모른다면 "이 일정에 맞는 새 역할 생성"을 통해 자동으로 생성되도록 하자. 가급적이면 역할 이름은 구분하기 쉽게 변경하도록 하자. 나중에 반복적인 일정을 만들 때 권한 관리가 쉬워진다. 

 

 

작성이 완료되면 다음 버튼을 누르자. 요약 화면이 보일 것이다. 일정 생성 버튼을 클릭하면 스케줄러 생성을 완료하게 된다. 만약 스케줄러가 제대로 작동되는지 궁금하다면 현재시간과 가까운 시간으로 Cron을 수정해보자. 정상작동을 한다면 위의 SQS 메시지 수신 테스트과 유사한 결과가 나타날 것이다.

 

728x90
반응형