조회수 1571
SQS + Lambda
Overview안녕하세요. 저는 브랜디 R&D 본부 개발1팀의 기둥을 담당하는 이상근입니다. 오늘은 SQS(Simple Queue Service)와 Lambda를 간단한 예제와 함께 정리해보려고 합니다. 각 서비스에 대한 설명은 이미 매뉴얼로 쉽게 정리되어 있으므로, 이번 글에서는 서비스 간 구성을 집중적으로 살펴보겠습니다.1)SQS와 Lambda에 대하여SQS(Simple Queue Service)는 마이크로 서비스와 분산 시스템, 그리고 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 ‘완전관리형 메시지 대기열 서비스’입니다. 그리고 Lambda는 ‘이벤트 처리 방식의 서버리스 컴퓨팅 서비스’입니다. 아래 그림은 SQS와 Lambda Function을 이용해 메시지를 등록-조회-처리하는데 필요한 구성요소를 정리한 것입니다. SQS, Lambda ArchitectureProducer - 처리할 작업 메시지를 SQS에 등록Trigger - 큐(Queue) 대기열에 있는 메시지들을 조회하기 위해 CloueWatch의 스케줄 이벤트를 이용하여 매 분마다 Lambda Consumer 실행Consumer - Lambda Consumer는 큐 대기열에 있는 메시지 목록을 조회하여 각 메시지를 Lambda Worker에서 처리할 수 있도록 실행Worker - Lambda Worker는 메시지를 받아 작업을 처리하고 해당 메시지를 삭제큐 생성하기이번에는 큐 생성에 대해 살펴보겠습니다. ‘Create New Queue’를 클릭했을 때 지역(Region)에 따라 각각 다른 화면이 노출됩니다. Create New Queue Button타입 선택 화면항목 입력 화면두 번째 이미지와 같이 SQS에서는 Standard, FIFO 두 가지 타입을 제공하고 있습니다. 표준 대기열은 순서에 맞지 않게 메시지가 전송될 수 있습니다. 만약 순서를 반드시 유지해야 한다면 FIFO 대기열을 사용하거나, 순서 정보를 추가하고 사용해야 합니다. 하지만 FIFO 대기열의 경우 현재 미국 동부(버지니아 북부), 미국 동부(오하이오), 미국 서부(오레곤) 및 EU(아일랜드) 지역(Region)이서만 제공되고 있기 때문에 다른 곳에서는 사용할 수 없습니다. 2) 3) 1.Create New Queue ‘Create New Queue’에는 여러 항목이 있습니다. 우선 아래를 참조하여 각 항목에 적절한 내용을 기재합니다. Default Visibility Timeout : 대기열에서 조회한 메시지가 중복 조회되지 않기 위한 시간Message Retention Period : 메시지 보관 기간Maximum Message Size : 메시지 최대 사이즈Delivery Delay : 신규 메시지 전달 지연 시간Receive Message Wait Time : 조회된 메시지가 없을 경우, 사용 가능한 메시지를 기다리는 long polling 시간 설정Dead Letter Queue Settings : 정상적으로 처리되지 못한 메시지를 보관하기 위하여 메시지 수신 최대 수를 지정, 지정한 수신을 초과할 경우 지정한 큐에 메시지 저장2.큐 등록 확인 기본 값으로 설정한 큐 등록을 확인합니다. Queue List3.SQS 메시지 등록 import boto3, json
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx'
)
SQS 메시지 등록
response = sqs_client.send_message(
QueueUrl='https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1',
MessageBody='메시지 내용'
)
print(json.dumps(response))
{"MD5OfMessageBody": "xxxxxxx", "MessageId": "xxxxx-xxxx-xxxxxx", "ResponseMetadata": {"RequestId": "xxxxxxx", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:01:13 GMT", "content-type": "text/xml", "content-length": "378", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}
4.AWS Console 메시지 등록 확인 View MessageDetail Message5.조회와 실행 1)SQS 메시지를 조회합니다.2)LambdaWorker 함수를 실행하고 > InvocationType으로 동기, 비동기 등의 실행 유형을 설정합니다. import boto3, json
def handle(event, context):
queue_url = 'https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1'
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx'
)
lambda_client = boto3.client(
service_name='lambda',
region_name='ap-northeast-1'
)
# SQS 메시지 조회
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
AttributeNames=[
'All'
]
)
print(json.dumps(response))
# {"Messages": [{"MessageId": "xxxxx-xxxx-xxxxxx", "ReceiptHandle": "xxxxx-xxxx-xxxxxx", "MD5OfBody": "xxxxxxx", "Body": "\uba54\uc2dc\uc9c0 \ub0b4\uc6a9", "Attributes": {"SenderId": "xxxxxxx", "ApproximateFirstReceiveTimestamp": "1518163931724", "ApproximateReceiveCount": "1", "SentTimestamp": "1518163466941"}}], "ResponseMetadata": {"RequestId": "", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:12:11 GMT", "content-type": "text/xml", "content-length": "1195", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}
for message in response['Messages']:
payload = {'message': message, 'queueUrl': queue_url}
# Lambda Worker 함수 실행
lambda_client.invoke(
FunctionName='lambda_worker',
InvocationType='Event',
Payload=json.dumps(payload)
)
6.Lambda Consumer 함수 등록 Execution role : SQS ReceiveMessage, Lambda InvokeFunction, CloudWatchLogs7.확인-실행-삭제 1) 이벤트로 넘어온 메시지 내용을 확인하고2) 메시지 프로세스를 실행한 후3) SQS 메시지를 삭제합니다. import boto3, json
def handle(event, context):
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx'
)
message_body = json.loads(event['message']['Body'])
queue_url = event['queueUrl']
receipt_handle = event['message']['ReceiptHandle']
###############
# 큐 메시지 처리
###############
# SQS 메시지 삭제
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
8.Lambda Worker 함수 등록 Execution role : SQS DeleteMessage, CloudWatchLogs9.CloudWatch의 Event Rule 등록 Event RulesCreate Rule10.1분에 한 번씩 지정한 Lambda 함수를 실행하여 SQS 메시지 확인 참고)이것만은 꼭 알아두세요! 여러 대의 서버에 메시지 사본을 저장하기 때문에 가끔씩 메시지 사본을 받거나 삭제하는 중엔 저장 서버 중 하나를 사용할 수 없을 수도 있다고 합니다. 이 경우, 해당 문제가 발생하면 사용할 수 없는 서버의 메시지가 삭제되지 않아, 메시지를 다시 가져와야 하는 문제가 생길 수 있습니다. 그러므로 애플리케이션에서 동일 메시지를 두 번 이상 처리하는 것도 대비해야 합니다.Conclusion지금까지 AWS 환경에서 SQS, Lambda, CloudWatch EventRule을 이용한 메시지 대기열 등록과 처리에 대한 기본 예제들을 실행해봤습니다. AWS의 다른 서비스들과 같이 아주 간단한 방법으로 메시지 대기열을 이용할 수 있었습니다. 오늘 살펴본 방법들을 활용하면 동영상 트랜스 코딩 등의 작업을 비롯해 분산 애플리케이션 간의 데이터 처리에도 유용하게 사용할 수 있을 겁니다. ps.아마존 형님들의 IT 인프라를 이용하여 편하게 개발에만 집중합시다. 참고 1) 각 서비스 매뉴얼은 아래 페이지 링크 참조하면 된다.SQSLambdaboto3 2)2018년 2월 기준이다. 3)표준 대기열과 FIFO 대기열의 특징은 아래와 같으며 자세한 내용은 매뉴얼에 정리되어 있다. 표준 대기열 : 무제한 처리량, 최선 정렬FIFO 대기열 : 높은 처리량, 선입선출 전송 글이상근 팀장 | R&D 개발1팀[email protected]브랜디, 오직 예쁜 옷만#브랜디 #개발문화 #개발팀 #업무환경 #인사이트 #경험공유