[Tech Blog] How we pipe data

 

버즈빌

버즈빌에서는 미국과 일본을 비롯한 전 세계 30개국에서 1,700만 이상의 유저의 행동에 대한 데이터를 수집하고 있습니다. 이 데이터에는 유저들이 잠금화면에서 어떤 Action을 수행하는지부터 잠금화면에 어떤 광고가 노출되고 유저들이 어떤 광고를 클릭 하는 지 등의 정보들이 포함되는데요. 이러한 데이터는 여러 종류의 다른 소스로부터 오고 각기 다른 종류의 DB (MySQL, DynamoDB, Redis, S3 등등) 에 저장됩니다. 하지만 데이터를 분석하고 활용하기 위해서는 이렇게 흩어져서 저장된 데이터들을 한 곳으로 모으는게 필수적입니다. 그래서 저희 팀에서는 이렇게 다양한 소스로 부터 발생해서 다양한 DB에 저장된 데이터를 어떤 과정을 통해 한 곳으로 모을 것인가에 대해서 고민하게 되었습니다.

 

그리고 고민 끝에 각각의 DB에 저장된 데이터를 하나의 큰 데이터 스토리지에 모을 수 있는 ‘데이터 파이프라인’을 구축하는 계획을 세우게 되었습니다. 하지만 다양한 소스로부터 수집된 수많은 데이터들을 잘 유지해가며 하나의 큰 DB에 모을 수 있는 데이터 파이프라인을 구축하는 것이 쉽지 않았는데요. 이 포스팅을 통해서 버즈빌에서는 어떻게 각각의 데이터들을 수집하고 저장하는지 또 이런 데이터들을 통합하기 위한 파이프라인을 어떻게 구축했는지 공유하고자 합니다.

 

본격적인 이야기에 앞서 현재 버즈빌에서 모든 데이터가 모이는 데이터 스토리지로 사용 중인 RedShift에 대해 이야기하고 싶습니다. 개인적으로는 정말 쓰면 쓸수록 감탄이 나오는 데이터 스토리지라고 생각합니다. Redshift는 AWS에서 관리하는 SQL기반의 열기반 스토리지(SQL based columnar data warehouse)이며 복잡하고 대규모의 데이터 분석에 적합합니다. 고객들로부터 생성된 수많은 종류의 데이터를 기반으로 다양한 인사이트를 얻고자 하는 많은 기업들(Yelp, Coursera, Pinterest 등)이 사용하고 있는 솔루션 이기도 합니다. 버즈빌에서는 여러가지 특징을 고려하여 Redshift를 도입하게 되었는데요. 그 이유는 아래와 같습니다.

  
  1.  
  2.   Column 기반 스토리지 -> 필요한 Column에만 접근한다.
  3.  
  4.   Join이나 aggregation이 많은 복잡한 쿼리도 쉽게 계산할 수 있다.
  5.  
  6.   분산 저장 방식 (Distributed Storage)
  7.  
  8.   Date Ingestion이 빠르다. (Ingest first, index and clean later)
  9.  
 

 

 
  •  
  • Horizontal Scalability

  •  
 
  1.  
  2. sharding이나 clustering에 추가적인 complexity가 필요하지 않다.
  3.  
  4. 데이터가 원래 노드에 저장되기 때문에 horizontal scaling을 위해서는 그냥 추가적인 노드만 붙이면 된다.
  5.  
  6. 다른 AWS서비스들과 쉽게 연동이 가능하다. (장점 이자 단점)
  7.  
 
 
 

하지만 몇 개의 아쉬운 점들도 있습니다. :

 
  1.  
  2. 다른 RDBMS와 달리 Mutilple indice를 지원하지 않는다. 
    •  
    • 1 Distribution Key and 1 Sort Key
    •  
  3.  
  4.  
  5. MySQL이나 다른 RDBMS처럼 uniqueness나 foreign key constraint를 걸 수 없다.
  6.  
 


 

 

 

모은 데이터를 어떤 방식으로 Redshift로 옮겨야 할까요?

 

버즈빌이 구축한 데이터 파이프 라인은 크게 3갈래의 메인 루트가 있습니다.

 
 
 

1) Athena Preprocessing Batch job을 통해서 (잠금화면 활동, 광고 할당)

 

Why?

 

전처리 작업(Preprocessing)이 필요한 가장 큰 이유는 들어오는 데이터의 어마어마한 크기 때문입니다. 또 어떤 데이터들은 너무 raw하기 때문에 애널리스트나 데이터 사이언티스트가 분석에 활용할 수 있는 형태로 바꾸기위해 전처리가 필요하기도 합니다. 버즈빌에서는 이런 데이터들을 처리하기 위해서 AWS Athena를 사용하고 있습니다. Athena는 과금 방식이 Athena 쿼리로 읽은 데이터의 사이즈를 기반으로 하기 때문에 다른 EMR이나 MapReduce solution들을 사용했을때보다 상대적으로 적은 비용으로 활용할 수 있다는 장점이 있습니다.

 

How?

 
  1.  
  2. 먼저 S3로 데이터를 보냅니다.
  3.  
  4. 그 후, Athena를 활용하여 데이터를 가공/처리합니다.
  5.  
  6. 가공된 데이터를 읽어서 Redshift로 보냅니다. (COPY command 활용)
  7.  
 

Pros?

 
  1.  
  2. 서버를 따로 가질 필요가 없습니다. (EMR 클러스터나 서버를 관리할 필요가 없음)
  3.  
  4. 경제적입니다. (S3에서 1TB를 읽을때마다 $5 정도의 비용)
  5.  
 

Cons?

 
  1.  
  2. 사용량이 몰리는 시간대 (12:00 AM UTC)에는 일부 쿼리가 실패할 수 있습니다. -> 중요하고 필수적인 데이터는 Athena가 아닌 다른 방법을 통해 처리하는것이 적합합니다.
  3.  
  4. PRESTO DB의 기능을 (아직은) 온전히 활용할 수 없습니다.
  5.  
 


 

 

 

2) Firehose를 통해서 (Impression, Clicks, Device, Events)

 

Why?

 

Kinesis Firehose는 Redshift, Elasticsearch, S3와 같은 최종 목적지까지 다양한 데이터들을 안정적으로 옮길 수 있는 파이프라인을 제공할 뿐 아니라 Fluentd와 매끄럽게 잘 연동된다는 점에서 굉장히 뛰어난 서비스 입니다. Fluentd는 서버로부터 firehose까지 데이터가 안정적이고 꾸준하게 전달 될 수 있도록 도와줍니다. 

 

따라서 firehose와 fluentd의 연동을 통해서 따로 두개의 파이프라인 ( SERVER -> S3, S3 -> Redshift) 을 관리할 필요 없이 데이터 소스부터 최종 저장소까지 이어지는 하나의 파이프 라인만 관리할 수 있게 됩니다.

 

How?

 
 

(https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html)

 
  1.  
  2. 적절한 data format과 원하는 ingestion period를 설정하여 Firehose delivery stream을 만듭니다.
  3.  
 
 conf["user_activity"] = {     "DataTableName": "user_activity",     "DataTableColumns": "user_id, app_id, activity_type, timestamp",     "CopyOptions": "FORMAT AS JSON "s3://buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths" gzip TIMEFORMAT AS "YYYY-MM-DDTHH:MI:SS" ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF",     "jsonpaths_file": "buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths", }  configuration = {         "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",         "ClusterJDBCURL": "jdbc:redshift://buzzvil.xxxxxxxxx.us-west-2.redshift.amazonaws.com:5439/sample_db",         "CopyCommand": {             "DataTableName": sample_table,             "DataTableColumns": conf[type]["DataTableColumns"],             "CopyOptions": conf[type]["CopyOptions"],         },         "Username": db_user,         "Password": db_password,         "S3Configuration": {             "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",             "BucketARN": "arn:aws:s3:::firehose_bucket",             "Prefix": "buzzvil/user_activity/",             "BufferingHints": {                 "SizeInMBs": 64,                 "IntervalInSeconds": 60             },             "CompressionFormat": "GZIP",             "EncryptionConfiguration": {                 "NoEncryptionConfig": "NoEncryption",             }         } } 
 

2. Fluentd docker containers을 각각의 서버에서 세팅하고 실행합니다.

 
   @type tail  path /var/log/containers/buzzad/impression.json  pos_file /var/log/containers/td-agent/impression-json.pos  format none  tag firehose.impression     @type kinesis_firehose  region us-west-2  delivery_stream_name "prod-buzzad-impression-stream"  flush_interval 1s  data_key message  
 

3. Firehose에서 데이터를 잘 모아서 Redshift 문제없이 보내고 있는지 모니터링 합니다.

 
789u
 

Pros?

 
  1.  
  2. 빠르고 안정적인 데이터 전송이 가능합니다.
  3.  
  4. 모니터링이 편합니다.
  5.  
 

Cons?

 
  1.  
  2. Schema가 자동으로 바뀌지 않습니다.( Redshift의 Schema를 수동으로 일일히 변경해주어야 합니다.)
  3.  
 


 

 

 

3) MySQL Asynchronous Loads를 통해 (Ads, Contents, Ad Provider, Ad Publishers)

 

Why?

 

여러대의 RDS MySQL DB로부터오는 데이터간의 sync를 맞춰가며 Redshift로 데이터를 복제하기 위해서는 3가지의 테크닉을 활용해야만 합니다. (이 방법은 소개하고 있는 세 메인 루트 중에서 가장 매력도가 떨어지는 방법입니다..)

 

How?

 
  •  
  • FULL_COPY 
    •  
    • MySQL 테이블 전체를 복사해서 SQL insert를 통해서 Redshift에 복사합니다.
    •  
  •  
  •  
 
  •  
  • INCREMENTAL_COPY 
    •  
    • 이전에 복사한 가장 마지막 Primary key부터 시작해서 새로생긴 row들을 읽어서 Redshift로 복사합니다.
    •  
  •  
  •  
 
  •  
  • UPDATE_LATEST_COPY 
    •  
    • 이전에 복사한 가장 마지막 타임스탬프부터 시작해서 새로 생성되거나 업데이트된 row들을 Redshift로 복사합니다.(중복된 값은 삭제).
    •  
  •  
  •  
 

Pros?

 
  1.  
  2. 데이터의 특징에 맞게 잘 조정된 방법입니다.
  3.  
  4. binary log를 통한 Replication보다 훨씬 다루기 쉽습니다.
  5.  
 

Cons?

 
  1.  
  2. MySQL을 잘 조정하기 위해 여러대의 서버나 lambda를 다루어야만 합니다. -> Redshift sync task를 위해서
  3.  
  4. 안정적인 schema altering을 할 수 있을 만큼 Redshift의 ORM이 발전된 상황은 아닙니다..
  5.  
 

 

 

어떤 데이터를 다루는지에 따라서 위에서 소개한 3가지 방법 중 어떤 방법을 활용해야할지가 달라진다고 할 수 있습니다. 예를 들어 Transactianl log 같은 데이터들의 경우에는 firehose를 통해 전달하는 방법이나 먼저 aggregate하는 과정을 거친 후에 Redshift에 저장하는 식으로 처리를 해야 합니다. 그리고 MySQL에 저장된 fact table같은 데이터들은 CDC (change data capture) sync method를 통해서 Redshift에 데이터를 전달하고 동기화를 하는 과정이 필요합니다. 버즈빌에서는 위에서 소개해드린 3가지 방법을 적절히 조합해가면서 BD 매니저나 애널리스트들이 서비스간 플랫폼간의 데이터분석을 쉽게 할 수 있는 데이터 환경을 구축하기 위해서 노력하고 있습니다.



기업문화 엿볼 때, 더팀스

로그인

/