SQS + Lambda

예제와 함께 무작정 따라하기

주식회사 브랜디

Overview

안녕하세요. 저는 브랜디 R&D 본부 개발1팀의 기둥을 담당하는 이상근입니다. 오늘은 SQS(Simple Queue Service)와 Lambda를 간단한 예제와 함께 정리해보려고 합니다. 각 서비스에 대한 설명은 이미 매뉴얼로 쉽게 정리되어 있으므로, 이번 글에서는 서비스 간 구성을 집중적으로 살펴보겠습니다.1)



SQS와 Lambda에 대하여

SQS(Simple Queue Service)는 마이크로 서비스와 분산 시스템, 그리고 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 ‘완전관리형 메시지 대기열 서비스’입니다. 그리고 Lambda는 ‘이벤트 처리 방식의 서버리스 컴퓨팅 서비스’입니다. 아래 그림은 SQS와 Lambda Function을 이용해 메시지를 등록-조회-처리하는데 필요한 구성요소를 정리한 것입니다. 

SQS, Lambda Architecture


  1. Producer - 처리할 작업 메시지를 SQS에 등록
  2. Trigger - 큐(Queue) 대기열에 있는 메시지들을 조회하기 위해 CloueWatch의 스케줄 이벤트를 이용하여 매 분마다 Lambda Consumer 실행
  3. Consumer - Lambda Consumer는 큐 대기열에 있는 메시지 목록을 조회하여 각 메시지를 Lambda Worker에서 처리할 수 있도록 실행
  4. Worker - Lambda Worker는 메시지를 받아 작업을 처리하고 해당 메시지를 삭제



큐 생성하기

이번에는 큐 생성에 대해 살펴보겠습니다. ‘Create New Queue’를 클릭했을 때 지역(Region)에 따라 각각 다른 화면이 노출됩니다. 

Create New Queue Button


타입 선택 화면


항목 입력 화면


두 번째 이미지와 같이 SQS에서는 Standard, FIFO 두 가지 타입을 제공하고 있습니다. 표준 대기열은 순서에 맞지 않게 메시지가 전송될 수 있습니다. 만약 순서를 반드시 유지해야 한다면 FIFO 대기열을 사용하거나, 순서 정보를 추가하고 사용해야 합니다. 

하지만 FIFO 대기열의 경우 현재 미국 동부(버지니아 북부), 미국 동부(오하이오), 미국 서부(오레곤) 및 EU(아일랜드) 지역(Region)이서만 제공되고 있기 때문에 다른 곳에서는 사용할 수 없습니다. 2) 3) 

1.Create New Queue 
‘Create New Queue’에는 여러 항목이 있습니다. 우선 아래를 참조하여 각 항목에 적절한 내용을 기재합니다. 


  • Default Visibility Timeout : 대기열에서 조회한 메시지가 중복 조회되지 않기 위한 시간
  • Message Retention Period : 메시지 보관 기간
  • Maximum Message Size : 메시지 최대 사이즈
  • Delivery Delay : 신규 메시지 전달 지연 시간
  • Receive Message Wait Time : 조회된 메시지가 없을 경우, 사용 가능한 메시지를 기다리는 long polling 시간 설정
  • Dead Letter Queue Settings : 정상적으로 처리되지 못한 메시지를 보관하기 위하여 메시지 수신 최대 수를 지정, 지정한 수신을 초과할 경우 지정한 큐에 메시지 저장



2.큐 등록 확인 
기본 값으로 설정한 큐 등록을 확인합니다. 

Queue List



3.SQS 메시지 등록 

import boto3, json 

sqs_client = boto3.client(
    service_name='sqs',
    region_name='xxxxxx'
) 

SQS 메시지 등록
 response = sqs_client.send_message(
    QueueUrl='https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1',
    MessageBody='메시지 내용'
)
 
print(json.dumps(response))

  {"MD5OfMessageBody": "xxxxxxx", "MessageId": "xxxxx-xxxx-xxxxxx", "ResponseMetadata": {"RequestId": "xxxxxxx", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:01:13 GMT", "content-type": "text/xml", "content-length": "378", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}



4.AWS Console 메시지 등록 확인 

View Message


Detail Message


5.조회와 실행 
1)SQS 메시지를 조회합니다.
2)LambdaWorker 함수를 실행하고 > InvocationType으로 동기, 비동기 등의 실행 유형을 설정합니다. 

import boto3, json
 
def handle(event, context):
    queue_url = 'https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1'

    sqs_client = boto3.client(
        service_name='sqs',
        region_name='xxxxxx'
    )

     lambda_client = boto3.client(
        service_name='lambda',
        region_name='ap-northeast-1'
    )

     # SQS 메시지 조회
    response = sqs_client.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        AttributeNames=[
            'All'
        ]
    )

     print(json.dumps(response))

     # {"Messages": [{"MessageId": "xxxxx-xxxx-xxxxxx", "ReceiptHandle": "xxxxx-xxxx-xxxxxx", "MD5OfBody": "xxxxxxx", "Body": "\uba54\uc2dc\uc9c0 \ub0b4\uc6a9", "Attributes": {"SenderId": "xxxxxxx", "ApproximateFirstReceiveTimestamp": "1518163931724", "ApproximateReceiveCount": "1", "SentTimestamp": "1518163466941"}}], "ResponseMetadata": {"RequestId": "", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:12:11 GMT", "content-type": "text/xml", "content-length": "1195", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}

     for message in response['Messages']:
        payload = {'message': message, 'queueUrl': queue_url}
         # Lambda Worker 함수 실행
        lambda_client.invoke(
            FunctionName='lambda_worker',
            InvocationType='Event',
            Payload=json.dumps(payload)
        )



6.Lambda Consumer 함수 등록 
Execution role : SQS ReceiveMessage, Lambda InvokeFunction, CloudWatchLogs

7.확인-실행-삭제 
1) 이벤트로 넘어온 메시지 내용을 확인하고
2) 메시지 프로세스를 실행한 후
3) SQS 메시지를 삭제합니다. 

import boto3, json
 
def handle(event, context):
    sqs_client = boto3.client(
        service_name='sqs',
        region_name='xxxxxx'
    )

     message_body = json.loads(event['message']['Body'])

     queue_url = event['queueUrl']
    receipt_handle = event['message']['ReceiptHandle']

     ###############
    # 큐 메시지 처리
    ###############

    # SQS 메시지 삭제
    sqs_client.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle
    )



8.Lambda Worker 함수 등록 
Execution role : SQS DeleteMessage, CloudWatchLogs

9.CloudWatch의 Event Rule 등록 

Event Rules


Create Rule



10.1분에 한 번씩 지정한 Lambda 함수를 실행하여 SQS 메시지 확인 


참고)이것만은 꼭 알아두세요! 
여러 대의 서버에 메시지 사본을 저장하기 때문에 가끔씩 메시지 사본을 받거나 삭제하는 중엔 저장 서버 중 하나를 사용할 수 없을 수도 있다고 합니다. 이 경우, 해당 문제가 발생하면 사용할 수 없는 서버의 메시지가 삭제되지 않아, 메시지를 다시 가져와야 하는 문제가 생길 수 있습니다. 그러므로 애플리케이션에서 동일 메시지를 두 번 이상 처리하는 것도 대비해야 합니다.



Conclusion

지금까지 AWS 환경에서 SQS, Lambda, CloudWatch EventRule을 이용한 메시지 대기열 등록과 처리에 대한 기본 예제들을 실행해봤습니다. AWS의 다른 서비스들과 같이 아주 간단한 방법으로 메시지 대기열을 이용할 수 있었습니다. 오늘 살펴본 방법들을 활용하면 동영상 트랜스 코딩 등의 작업을 비롯해 분산 애플리케이션 간의 데이터 처리에도 유용하게 사용할 수 있을 겁니다. 

ps.아마존 형님들의 IT 인프라를 이용하여 편하게 개발에만 집중합시다. 


참고 
1) 각 서비스 매뉴얼은 아래 페이지 링크 참조하면 된다.

2)2018년 2월 기준이다. 

3)표준 대기열과 FIFO 대기열의 특징은 아래와 같으며 자세한 내용은 매뉴얼에 정리되어 있다. 

  • 표준 대기열 : 무제한 처리량, 최선 정렬
  • FIFO 대기열 : 높은 처리량, 선입선출 전송 

이상근 팀장 | R&D 개발1팀

leesg@brandi.co.kr

브랜디, 오직 예쁜 옷만


#브랜디 #개발문화 #개발팀 #업무환경 #인사이트 #경험공유


관련 스택

기업문화 엿볼 때, 더팀스

로그인

/