스토리 홈

인터뷰

피드

개발에 관심있다면 꼭 읽어야하는 글
조회수 4598

AWS Instance Scheduler Bot 적용기

이 포스팅은 총 2부로 이어지며 현재는 2부입니다.1부 : AWS 비용 얼마까지 줄여봤니?2부 : AWS Instance Scheduler Bot 적용기1부에서 AWS 비용을 절감하기 위한 Instance Scheduler에 대한 소개를 하였습니다. 2부에서는 Instance Scheduler의 설정을 손쉽게 변경하기 위한 Bot을 적용한 사례에 대해서 소개합니다.Bot의 필요성Instance Scheduler의 설정을 변경하기 위해서는 정보를 담고 있는 Dynamo DB 의 데이터를 변경해야 합니다. AWS Console을 이용하여 직접 수정할 수도 있지만 여전히 불편하고 느립니다. 더군다나 이를 이용하는 사용자가 DB Table의 구조와 AWS Console 사용법을 알고 있어야 하고 비 개발자라면 더 쉽지 않은 문제입니다. 하지만 Bot을 이용하면 사용자는 어려운 DB Query나 구조를 알아야 할 필요도 없고 손쉽게 채팅 메시지를 통해 Bot에게 질의하고 처리 결과를 응답받을 수 있습니다.Outgoing WebhookJANDI에서는 Incoming Webhook과 반대되는 개념으로 Outgoing Webhook을 제공합니다. 특정 키워드로 시작하는 메시지가 있을 경우 내용을 설정된 URL Endpoint에 POST로 Webhook을 보내줍니다. Webhook을 수신한 곳에서는 일련의 처리 후 메시지 데이터 형식을 맞춰 응답하게 되면 채팅창에 메시지를 표시하게 됩니다. 이를 통해 다른 외부 시스템과 연동할 수 있습니다.POST Data예를 들어 날씨 키워드로 Outgoing Webhook을 생성했다면 /날씨 메시지가 시작될 때 다음과 같은 데이터가 Webhook으로 발송됩니다.{ "token": "YE1ronbbuoZkq7h3J5KMI4Tn", "teamName": "Toss Lab, Inc.", "roomName": "토스랩 코리아", "writerName": "Gloria", "text": "/날씨 서울", "keyword": "날씨", "createdAt": "2017-07-19T14:49:11.266Z" } token을 이용하여 요청의 유효성 체크를 할 수 있고 text를 적절히 파싱 하여 요청에 부합하는 처리를 할 수 있습니다.ResponsePOST Data를 적절히 처리 후 결과를 채팅창에 응답 메시지를 표시하고 싶다면 아래와 같은 JSON Data를 Response body에 넣어주면 됩니다.{ "body" : "서울의 현재 날씨", "connectColor" : "#FAC11B", "connectInfo" : [{ "title" : "온도", "description" : "최고:28.00, 최저:24.00, 현재: 24.30" }, { "title": "날씨", "description": "흐리고 비" }] } 이를 이용하여 Instance Scheduler에도 적용해봤습니다.Schedule BotSchedule Bot은 Instance Scheduler의 Lambda 함수에 함께 포함되어 작동하며 스케쥴 조회 / 예외 설정, 서버 강제 시작/중지, 서버 상태 조회 등의 기능을 수행합니다.API Gateway와 Lambda 함수를 연결하여 Endpoint URL을 생성하고 Outgoing Webhook URL로 설정하여 Webhook으로 Lambda 함수가 실행될 수 있도록 하였습니다. Lambda 함수는 Cloudwatch를 통해서 실행되면 Scheduler가 작동되고 API Gateway를 통해 실행되면 Schedule Bot이 작동됩니다.Schedule Bot 명령어Schedule Bot은 다음과 같은 명령어를 수행합니다./서버 help : 도움말 /서버 [스케쥴명] status : 현재 서버 상태 조회 /서버 [스케쥴명] info : 오늘의 스케쥴 조회 /서버 [스케쥴명] info [YYYY-MM-DD] : 특정일 스케쥴 조회 /서버 [스케쥴명] exception info : 오늘의 스케쥴 예외 조회 /서버 [스케쥴명] exception info [YYYY-MM-DD] : 특정일 스케쥴 예외 조회 /서버 [스케쥴명] exception set [YYYY-MM-DD] [start|stop] [h:m] : 예외 설정 /서버 [스케쥴명] exception del [YYYY-MM-DD] [start|stop] : 예외 삭제 /서버 [스케쥴명] force_start : 서버 강제 실행 /서버 [스케쥴명] force_stop : 서버 강제 중지 Schedule Bot 작동 화면Schedule Bot은 서버병이라는 컨셉으로 인격화(?)에 힘썼습니다.스케쥴 정보 조회서버 상태 조회서버 강제 시작/중지명령어 오류마무리AWS 기반의 서비스를 운영하는 스타트업이라면 더욱더 현실적으로 부딪히는 비용 문제에 대해서 저희가 고민한 내용과 솔루션에 대해서 공유하였습니다.아직 적용기간이 길지 않아 절감비용에 대해 수치적인 데이터를 언급하기는 힘들지만 많은 금액이 절감될 거라 예상하고 있습니다.저희와 같은 고민을 하고 있다면 Instance Scheduler를 적극 권장합니다.#토스랩 #잔디 #JANDI #개발 #개발자 #AWS #도입후기 #일지 #인사이트 #경험공유
조회수 3422

[Tech Blog] Go 서버 개발하기

Go 서버 개발을 시작하며   특정 API만 다른 언어로 구현해서 최대의 성능을 내보자! 저희 서버는 대부분 Django framework 위에서 구현된 광고 할당 / 컨텐츠 할당 / 허니스크린 앱 서비스 이렇게 나눌 수 있는데 Python 이라는 언어 특성상 높은 성능을 기대하기가 어려웠습니다. 하지만 세가지 서비스에서 락스크린에서 어떤 컨텐츠나 광고를 보여줄지 결정하는 Allocation(할당) API 가 가장 많이 호출되고 있었는데 빈도로 보면 80% 정도로 높은 비중을 차지하고 있어서 이 Allocation API 들을 성능이 좋은 다른 언어로 구현하면 어떨까 하는 팀내 의견이 있었습니다. Why Go? 저는 예전부터 Java,  C# 등의 컴파일 언어에 익숙해서 기존 Java 와 C, 그리고 Go 라는 최근에 새로 나온 언어 중에서 아래 블로그글과 같이 여러 reference 들을 통해 성능이 좋다는 Go 로 이 API 들을 포팅하는 작업을 시작하게 되었습니다. Go 에 대한 첫 인상은 Java, C계열 언어보다 덜 verbose 보였고 python 보다는 strongly-typed, encapsulated 하다보니 자유도를 제한해서 코드를 보기 쉽게 하는 것을 선호하는 저의 성격과도 잘 맞는 언어였습니다.     출처: Carles Mateo, Performance of several languages서버 개발 환경   Server design How to import libraries  GVT (https://github.com/FiloSottile/gvt) – Go 는 vendering tool 을 통해 dependency 를 관리할 수 있습니다. GVT 의 경우 처음 도입했을 때 별로 유명하지 않았는데 사용법이 간단해서 도입하게 되었습니다. 아래와 같이 참조하고 있는 revision 을 관리해주며 update 통해서 최신 소스를 받아 올수 있습니다.   { "version": 0, "dependencies": [ { "importpath": "github.com/Buzzvil/go-env", "repository": "https://github.com/Buzzvil/go-env", "vcs": "git", "revision": "2d8489d40184a12c4d09d09ce1ff717e5dbb0745", "branch": "master", "notests": true }, ....  Design pattern  Go 언어에서는 package level cycling dependency 를 허용하지 않아서 좀더 명확한 구조를 만들기 좋았습니다. 예를들어 Service 에서는 Controller 를 참조할수 없고 Model 에서는 Controller / Service / DTO 등을 참조할수 없도록 강제했습니다. 모든 API 요청은 Route 를 통해 Controller 에게 전달되고 이 때 생성된 DTO (Data transfer object) 들을 Controller 가 직접 혹은 Service layer 에서 처리하도록 하였고 DB 에 접근할 때는 모델을 통해 혹은 직접 접근하도록 했지만 추후 구조가 복잡해지면 DB 쿼리 등을 담당하는 DAO (Data access object) 를 도입할 계획입니다   Libraries                  요소이름선택 이유NetworkGinWeb 서버이다 보니 네트워크 성능을 최우선으로 고려, 벤치마크 표를 보고 이 라이브러리를 선택Redis & cachego-redis역시 성능을 가장 중요한 지표로 보고 이 라이브러리 선택MysqlGormORM 없이는 개발하기 힘든 시대이죠. 여러 Database를 지원하고 ORM 중에서도 method chaining 을 사용하는 Gorm 을 선택Dynamoguregu dynamoAWS에서 제공하는 Dynamo 패키지를 그대로 사용하면 코드 양이 너무 많아지고 역시 method chaining 을 지원해서 선택Environment variablescaarlos0 envGo 에서는 tag 를 이용하면 좀더 코드를 간결하고 읽기 쉽게 사용할수 있는데 이 라이브러리가 환경변수를 읽어오기 쉽도록 해줌   Redis cache  func SetCache(key string, obj interface{}, expiration time.Duration) error { err := getCodec().Set(&cache.Item{ Key: key, Object: obj, Expiration: expiration, }) return err } func GetCache(key string, obj interface{}) error { return getCodec().Get(key, obj) }  Mysql  var config model.DeviceContentConfig env.GetDatabase().Where(&model.DeviceContentConfig{DeviceId: deviceId}).FirstOrInit(&config)  Dynamo if err := env.GetDynamoDb().Table(env.Config.DynamoTableProfile).Get(keyId, deviceId).All(&profiles); err == nil && len(profiles) > 0 { ... }  Environment variables  var ( Config = ServerConfigStruct{} onceConfig sync.Once ) type ( ServerConfigStruct struct { ServerEnv string `env:"SERVER_ENV"` LogLevel string .... } ) func LoadServerConfig(configDir string) { onceConfig.Do(func() {//최초 한번반 호출되도록 env.Parse(&Config) } }    Unit test   환경 구성 Test 환경에는 Redis / Mysql / Elastic search 등에 대한 independent / isolated 된 환경이 필요해서 이를 위해 docker 환경을 따로 구성하였습니다. Test case 작성은 아래와 같이 package 를 분리해서 작성했습니다.  package buzzscreen_test var ts *httptest.Server func TestMain(m *testing.M) { ts = tests.GetTestServer(m) // 환경 시작 tearDownElasticSearch := tests.SetupElasticSearch() tearDownDatabase := tests.SetupDatabase() code := m.Run() // 여기서 작성한 TestCase 들 실행 // 환경 종료 tearDownDatabase() tearDownElasticSearch() ts.Close() os.Exit(code) }  Mock server는 은 http.RoundTripper interface 를 구현해서 http.Client 의 Transport 멤버로 설정해서 구현했습니다. 아래는 Test case 작성 예제입니다.  httpClient := network.DefaultHttpClient mockServer := mock.NewTargetServer(network.GetHost(MockServerUrl)) .AddResponseHandler(&mock.ResponseHandler{ WriteToBody: func() []byte { return []byte(mockRes) }, Path: "/path", Method: http.MethodGet, }) clientPatcher := mock.PatchClient(httpClient, mockServer) defer clientPatcher.RemovePatch()  Unit test 관련해서는 내용이 방대해서 추후 다른 포스트를 통해 자세히 소개하도록 하겠습니다.  Infra API 요청 분할 AWS Application load balancer 여러 API 중에서 할당 API 를 제외한 요청은 기존의 Django 서버로 요청을 보내고 할당요청에 대해서만 Go서버로 요청을 보내도록 구현하기 위해 먼저 시도 했던 것은 AWS Application load balancer (이후 ALB) 였습니다. ALB 의 특징이 path 로 요청을 구별해서 처리할수 있었기 때문에 Allocation API 만 Go 서버 로 요청이 가도록 구현했습니다.  출처: Amazon Devops Blog, Introducing Application Load Balancer   하지만 이렇게 오랫동안 서비스 하지 못했는데 그 이유는 서버 구성이 하나 더 늘어나고 앞단에 ALB 까지 추가되다 보니 이를 관리하는데 추가 리소스가 들어가게 되어서 어떻게 하면 이러한 비용을 줄일수 있을까 고민하게 되었습니다.   Using docker & nginx  Go로 작성된 서버가 독립적인 Micro service 냐 아니면 Django 서버에서 특정 API 를 독립시켜 성능을 강화한 모듈이냐 의 정체성을 두고 생각해봤을때 후자가 조금더 적합하다보니 Go / Django 서버는 한 묶음으로 관리하는 것이 명확했습니다. Docker 를 도입하면서 nginx container 가 proxy 역할을 하고 path를 보고 Go container / Django container 로 요청을 보내는 구성을 가지게 되었습니다.  글을 마치며   시작은 미약하였으나 끝은 창대하리라 하나의 API를 이전했음에도 불구하고 Allocation API 에 대해서는 약 1/3, 서버 Instance 비용은 1/2.5 수준으로 감소했습니다.   설명: 기존 4개의 Django 인스턴스의 CPU 사용률이 모두 13% 정도 감소, Go 인스턴스의 CPU 사용율은 17% 정도   17 / (13 * 4)  ≒ 1 / 3  충분히 만족할만한 성과가 나와서 그 뒤로 몇가지 API도 Go 로 옮겼고 새로 작성하는 API 는 Go 환경 안에서 직접 구현하는 중입니다. 처음에는 호출이 많은 하나의 API 를 다른 언어로 포팅하기 위해 시작한 작업이었는데 Container 기술을 도입하는 등 서버 Infra 까지 변경하면서 상당히 큰 작업이 뒤따르게 되었습니다. 하지만 이 작업을 하면서 많은 동료들의 도움과 조언이 있었고 결국 완성할수 있었습니다. 이렇게 실험적인 도전을 성공 할수 있는 환경에 여러분을 초대하고 싶습니다! Go언어에 대한 문의나 좋은 의견도 환영합니다.
조회수 1212

RxJava2 함수 파헤치기!

Overview지난 글 Rxjava를 이용한 안드로이드 개발에서는 RxJava의 Android 연결 방법과 기본적인 사용법을 다뤘습니다. 이번 글에서는 RxJava의 강력하고 다양한 함수들을 살펴보고자 합니다. Android에서 복잡하게 구현되는 내용들을 단 몇 개의 함수로 처리할 수 있는 RxJava를 꼭 사용해보길 권합니다.1. just2. fromArray/fromlterable3. range/rangLong4. interval5. timer6. map7. flatMap8. concatMap9. toList10. toMap11. toMultiMap12. filter13. distinct14. take15. skip16. throttleFirst17. throttleLast18. throttleWithTimeout참고: 공통적으로 사용하는 구독(수신) 클래스는 아래와 같습니다.static class CustomSubscriber<T> extends DisposableSubscriber<T> { @Override public void onNext(T t) { System.out.println(Thread.currentThread().getName() + " onNext( " + t + " )"); } @Override public void onError(Throwable t) { System.out.println(Thread.currentThread().getName() + " onError( " + t + ")"); } @Override public void onComplete() { System.out.println(Thread.currentThread().getName() + " onComplete()"); } } 1. just파라미터를 통해 받은 데이터로 Flowable을 생성하는 연산자입니다. 최대 10까지 전달할 수 있고, 모든 데이터가 수신되면 onComplete() 수신됩니다. 기본적인 Flowable 생성자 함수로 볼 수 있으며 단순 작업에서 많이 사용합니다.public static void just() { //파라미터 값을 순차적으로 송신하는 Flowable 생성 Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E", "F"); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( A ) main onNext( B ) main onNext( C ) main onNext( D ) main onNext( E ) main onNext( F ) main onComplete() 2. fromArray/fromIterablefromArray, fromIterable 함수는 파리미터로 배열 또는 Iterable(리스트 등)에 담긴 데이터를 순서대로 Flowable을 생성하는 연산자입니다. 모든 데이터를 순차적으로 송신 후 완료됩니다. 반복적인 데이터 변환 작업 같은 경우 for 문 대신 대체할 수 있습니다. 결과를 보면 main Thread 에서 작업 결과가 나오지만, flatMap 을 사용한다면 별도의 Thread로 main Thread의 부하를 막을 수 있습니다.1. fromArray public static void fromArray() { //fromArray 배열로 파라미터를 전달 받는다. Flowable<String> flowable = Flowable.fromArray("A", "B", "C", "D", "E"); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( A ) main onNext( B ) main onNext( C ) main onNext( D ) main onNext( E ) main onComplete() 2. fromIterable public static void fromIterable() { List<String> list = Arrays.asList("A", "B", "C", "D", "E"); //fromIterable 리스트로 파라미터를 전달받는다. Flowable<String> flowable = Flowable.fromIterable(list); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( A ) main onNext( B ) main onNext( C ) main onNext( D ) main onNext( E ) main onComplete() 파라미터와 함수는 다르지만 동일하게 처리된다. 3. range/rangLongrange 함수는 지정한 숫자부터 지정한 개수만큼 증가하는 Integer 값 데이터를 송신하는 Flowable를 생성합니다. rangLong 함수는 range와 동일하며 데이터 타입은 Long을 사용합니다. 두 함수 데이터 송신을 마치면 onComplete를 송신합니다.1. range public static void range() { //range(int start, int count) //start : 시작 값 //end : 발생하는 횟수 Flowable<Integer> flowable = Flowable.range(10, 5); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( 10 ) main onNext( 11 ) main onNext( 12 ) main onNext( 13 ) main onNext( 14 ) main onComplete() 2. rangLong public static void rangeLong() { //range(int start, int count) //start : 시작 값 //end : 발생하는 횟수 Flowable<Long> flowable = Flowable.rangeLong(10, 5); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( 10 ) main onNext( 11 ) main onNext( 12 ) main onNext( 13 ) main onNext( 14 ) main onComplete() 4. interval지정한 간격마다 0부터 시작해 Long 타입 숫자의 데이터를 송신하는 Flowable을 생성합니다. 데이터는 0, 1, 2, 4 순차적으로 증가된 데이터를 송신합니다. Android 에서는 반복적인 작업인 TimerTask를 대신해서 interval로 간단하게 처리할 수 있습니다. UI 변경이 필요한 부분에서는 interval scheduler를 AndroidSchedulers.mainThread() 를 변경해 적용할 수 있습니다.public static void interval() { //(long time, TimeUnit unit, Scheduler scheduler) //time : 발생 간격 시간 //unit : 간격 시간 단위 //scheduler : 발생 scheduler를 변경하여 사용할 수 있습니다. // ex)AndroidSchedulers.mainThread() // - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - 9 // 1초 간격으로 데이터 요청을 송신하다. Flowable<Long> flowable = Flowable .interval(1000L, TimeUnit.MILLISECONDS).take(10); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 1 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onNext( 5 ) RxComputationThreadPool-1 onNext( 6 ) RxComputationThreadPool-1 onNext( 7 ) RxComputationThreadPool-1 onNext( 8 ) RxComputationThreadPool-1 onNext( 9 ) 5. timertimer 함수는 호출된 시간부터 일정한 시간 동안 대기하고 Long 타입 0을 송신 및 종료하는 flowable을 생성합니다. interval이 조건까지 반복적으로 송신한다면, timer는 한번만 송신하고 종료됩니다.public static void timer() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy.MM.dd hh:mm ss"); System.out.println("현재시간 : " + simpleDateFormat.format(System.currentTimeMillis())); //(long time, TimeUnit unit, Scheduler scheduler) //time : 발생 간격 시간 //unit : 간격 시간 단위 //scheduler : 발생 scheduler를 변경하여 사용할 수 있습니다. // ex)AndroidSchedulers.mainThread() Flowable<Long> flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS); //구독을 시작한다. flowable.subscribe(value -> { System.out.println(" timer : " + simpleDateFormat.format(System.currentTimeMillis())); }, throwable -> { System.out.println(throwable); }, () -> { System.out.println(" complete"); }); } 결과 현재시간 : 2019.04.29 09:09 56 timer : 2019.04.29 09:09 57 complete 6. mapFlowable 에서 송신하는 데이터를 변환하고, 변환된 데이터를 송신하는 연산자입니다. 하나의 데이터만 송신할 수 있으며, 반드시 데이터를 송신해야 합니다. 혹여 송신되는 데이터가 null 을 포함하면 map 대신 아래의 flatMap 을사용하는 것이 좋습니다.public static void map() { Flowable<String> flowable = Flowable.just("A", "B", "C", "D", "E") //map(Function mapper) //mapper : 받은 데이터를 가공하는 함수형 인터페이스 //알파벳 값을 소문자로 변경하여 return 한다 .map(value -> value.toLowerCase()); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( a ) main onNext( b ) main onNext( c ) main onNext( d ) main onNext( e ) main onComplete() 7. flatMapflatMap은 map과 동일한 함수이지만, map과는 달리 여러 데이터가 담긴 Flowable을 반환할 수 있습니다. 또한 빈 Flowable를 리턴해 특정 데이터를 건너뛰거나 에러 Flowable를 송신할 수 있습니다.파라미터 mapper에서 새로운 Flowable의 데이터 전달이 아닌 다른 타임라인 Flowable로 작업하면 들어온 데이터 순서대로 출력을 지원하지 않습니다. 타임라인 Flowable(timer, delay, interval 등)에서는 가급적 사용을 피하거나, 순서에 지장이 없을 때 사용하는 것이 좋습니다.public static void flatMap() { Flowable<String> flowable = Flowable.range(10, 2) //flatMap(Function mapper, BiFunction combiner) //mapper : 받은 데이터로 새로운 Flowable를 생성하는 함수형 인터페이스 //combiner : mapper가 새로 생성한 Flowable 과 원본 데이터를 조합해 새로운 송신 데이트를 생성하는 함수형 인터페이스 //첫 번째 데이터를 받으면 새로운 Flowable를 생성한다. //take(3) : 3개까지만 발생한다. .flatMap(value -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(3), (value, newData) -> "value " + value + " newData " + newData); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( value 10 newData 0 ) RxComputationThreadPool-2 onNext( value 11 newData 0 ) RxComputationThreadPool-1 onNext( value 10 newData 1 ) RxComputationThreadPool-2 onNext( value 11 newData 1 ) RxComputationThreadPool-1 onNext( value 10 newData 2 ) RxComputationThreadPool-2 onNext( value 11 newData 2 ) RxComputationThreadPool-2 onComplete() 결과를 보면 각기 생성된 Flowable이 비동기식으로 송신 되기때문에 서로 다른 스레드에서 실행돼 데이터를 받는 순서대로 송신하지 않는다는 점을 주목하자 8. concatMap받은 데이터를 Flowable로 변환하고 변환된 Flowable을 하나씩 순서대로 실행해서 수신자에서 송신합니다. 다시 말해 여러 데이터를 계속 받더라도 첫 번째 데이터로 생성한 Flowable 의 처리가 끝나야 다음 데이터로 생성한 Flowable을 실행하는 것입니다.생성된 Flowable의 스레드에서 실행되더라도 데이터를 받은 순서대로 처리하는 것을 보장하지만, 처리 성능에 영향을 줄 수 있습니다.public static void concatMap() { Flowable<String> flowable = Flowable.range(10, 5) //map(Function mapper) //mapper : 받은 데이터를 가공하는 함수형 인터페이스 .concatMap(value -> Flowable.interval(100L, TimeUnit.MILLISECONDS).take(2) .map(data -> ("value : " + value + " data : " + data))); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( value : 10 data : 0 ) RxComputationThreadPool-1 onNext( value : 10 data : 1 ) RxComputationThreadPool-2 onNext( value : 11 data : 0 ) RxComputationThreadPool-2 onNext( value : 11 data : 1 ) RxComputationThreadPool-3 onNext( value : 12 data : 0 ) RxComputationThreadPool-3 onNext( value : 12 data : 1 ) RxComputationThreadPool-4 onNext( value : 13 data : 0 ) RxComputationThreadPool-4 onNext( value : 13 data : 1 ) RxComputationThreadPool-5 onNext( value : 14 data : 0 ) RxComputationThreadPool-5 onNext( value : 14 data : 1 ) RxComputationThreadPool-5 onComplete() 결과를 보면 생성된 Flowable 스레드와 데이터 순서대로 출력이 보장된다 것을 알 수 있다. 9. toListtoList는 송신할 데이터를 모두 리스트에 담아 전달합니다. 한꺼번에 데이터를 List로 가공해서 받기에 좋습니다. 하지만 많은 양의 데이터를 처리할 경우 버퍼가 생길 수 있고, 쌓은 데이터 때문에 메모리가 부족해질 수도 있습니다. 또한 수신되는 데이터는 하나이므로 Flowable이 아닌 Single 반환값을 사용합니다.public static void toList() { Single<List<String>> single = Flowable.just("A", "B", "C", "D", "E", "F") .toList(); // 구독을 시작한다. single.subscribe(new SingleObserver<List<String>>() { @Override public void onSubscribe(Disposable d) { System.out.println(Thread.currentThread().getName() + " onNext()"); } @Override public void onSuccess(List<String> strings) { //최종 완료된 리스트를 순서대로 출력한다. for (String text : strings) { System.out.println(Thread.currentThread().getName() + " onSuccess( " + text + " )"); } } @Override public void onError(Throwable e) { System.out.println(Thread.currentThread().getName() + " onError() " + e); } }); } 결과 main onNext() main onSuccess( A ) main onSuccess( B ) main onSuccess( C ) main onSuccess( D ) main onSuccess( E ) main onSuccess( F ) 10. toMaptoMap은 송신할 데이터를 모두 키와 값의 쌍으로 Map에 담아 전달합니다. 나머지는 toList의 특징과 같습니다. 송신되는 데이터 타입은 Map에 담아서 송신하는데 동일한 key에서 value는 마지막 데이터가 덮어 씁니다. 요청되는 값보다 결과 값이 적을 수도 있습니다. List 값을 손쉽게 key, value로 분리할 수 있는 함수이기도 합니다.public static void toMap() { Single<Map<Long, String>> single = Flowable.just("1A", "2B", "3C", "1D", "2E") //toMap(Fuction keySelector, Function valueSelector, Callable mapSupplier) //keySelector : 받은 데이터로 Map에서 사용할 키를 생성하는 함수형 인터페이스 //valueSelector : 받은 데이터로 Map 넣을 값을 생성하는 함수형 인터페이스 .toMap(value -> Long.valueOf(value.substring(0, 1)), data -> data.substring(1)); //구독을 시작한다. single.subscribe(new SingleObserver<Map<Long, String>>() { @Override public void onSubscribe(Disposable d) { System.out.println(Thread.currentThread().getName() + " onNext()"); } @Override public void onSuccess(Map<Long, String> longStringMap) { //최종 완료된 map을 순서대로 출력한다. for (long id : longStringMap.keySet()) { System.out.println(Thread.currentThread().getName() + " onSuccess( id : " + id + ", value " + longStringMap.get(id) + " )"); } } @Override public void onError(Throwable e) { System.out.println(Thread.currentThread().getName() + " onError() " + e); } }); } 결과 main onNext() main onSuccess( id : 1, value D ) main onSuccess( id : 2, value E ) main onSuccess( id : 3, value C ) 11. toMultiMap키와 컬렉션 값으로 이루어진 Map을 데이터로 변환하여 송신하는 함수입니다. 나머지 특징은 toList, toMap과 같습니다. toMap에서 중복되는 value를 관리하는 건 없었지만, value를 collection으로 관리하여 전달되는 데이터를 모두 수신할 수 있습니다.public static void toMultiMap() { Single<Map<String, Collection<Long>>> single = Flowable.interval(100L, TimeUnit.MILLISECONDS) .take(5) //toMultimap(Function keySelector, Function valueSelector) .toMultimap(value -> { //value가 홀수인지 짝수 인지 판단해서 key값을 리턴한다. if (value % 2 == 0) { return "짝수"; } else { return "홀수"; } }); //구독을 시작한다. single.subscribe(new SingleObserver<Map<String, Collection<Long>>>() { @Override public void onSubscribe(Disposable d) { System.out.println(Thread.currentThread().getName() + " onNext( " + d + " )"); } @Override public void onSuccess(Map<String, Collection<Long>> stringCollectionMap) { for (String key : stringCollectionMap.keySet()) { StringBuffer stringBuffer = new StringBuffer(); for (long value : stringCollectionMap.get(key)) { stringBuffer.append(" " + value); } System.out.println(Thread.currentThread().getName() + " onSuccess( id : " + key + ", value " + stringBuffer.toString() + ")"); } } @Override public void onError(Throwable e) { System.out.println(Thread.currentThread().getName() + " onError() " + e); } }); } 결과 main onNext() RxComputationThreadPool-1 onSuccess( id : 짝수, value 0 2 4 ) RxComputationThreadPool-1 onSuccess( id : 홀수, value 1 3 ) 12. filterfilter는 받은 데이터가 조건에 맞는지 판단해 결과가 true인 값만 송신합니다. 위의 just, fromArray, interval이 반복적인 케이스였다면, filter는 if문처럼 조건문의 역할을 할 수 있습니다. 반복문 함수와 조건문 함수를 같이 사용해 몇 줄 안에 for, if와 똑같이 구현할 수 있죠.public static void filter() { Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) //짝수만 통과한다. 3개만큼 .filter(value -> value % 2 == 0).take(3); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 13. distinct이미 처리된 데이터를 다시 볼 필요가 없을 때 사용하는 함수입니다. 송신하려는 데이터가 이미 송신된 데이터와 같다면 해당 데이터는 무시합니다. 이 함수는 내부에서 HashSet으로 데이터가 같은지 확인합니다.public static void distinct() { Flowable<String> flowable = Flowable.just("A", "a", "B", "b", "A", "a", "B", "b") //distinct(Function keySelector) //keySelector : 받은 데이터와 비교할 데이터를 확인하는 함수 //모두 소문자로 변환하여 알파벳 기준으로 데이터를 판단한다. .distinct(value -> value.toLowerCase()); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 main onNext( A ) main onNext( B ) main onComplete() 14. take1.taketake 함수로 지정된 횟수만큼 받은 데이터를 송신합니다. 지정된 횟수에 도달하면 완료를 송신해 처리 종료합니다.2.takeUntil지정된 조건까지 데이터를 송신하는 연산자입니다. 조건이 되면 완료를 송신해 종료합니다.3.takeWhile지정된 조건이 해당할 때만 데이터를 송신하는 연산자입니다.4.takeLast데이터의 끝에서부터 지정한 조건까지 데이터를 송신하는 연산자입니다.take 함수는 한 화면에 출력되거나 칠요한 데이터만큼 리스트에서 값을 하나씩 수신할 때 사용합니다. 예를 들어 화면에 데이터가 6개가 필요하면 take를 이용해 원하는 만큼의 데이터를 가져올 수 있습니다.Flowable.take(6) 또한 이후에 나올 skip 함수를 같이 사용하면 두 번째 화면에서 필요한 데이터를 6개 가져올 수 있습니다.Flowable.skip(6).take(12) 1. take public static void take() { // 100 밀리세컨드만큼 반복하며 총 5개를 출력후 종료한다. Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .take(5); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 1 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 2. takeUntil public static void takeUntil() { // 100 밀리세컨드만큼 반복하며 값이 5가 될때까지 송신한다. Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .takeUntil(value -> value == 5); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 1 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onNext( 5 ) RxComputationThreadPool-1 onComplete() 3. takeWhile public static void takeWhile() { // 100 밀리세컨드만큼 반복하며 값이 5가 아닐경우까지 송신한다. Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .takeWhile(value -> value != 5); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 1 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 4. takeLast public static void takeLast() { //100밀리 세컨트만큼 반복하며 5개의 출력중 뒤에 2개만 송신한다. Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .take(5) .takeLast(2); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 15. skip1.skip함수로 지정된 횟수만큼 받은 데이터 송신을 제외합니다. 지정된 횟수가 초과되면 나머지 데이터를 송신합니다.2.skipUntil지정된 조건까지 데이터 송신을 제외하는 연산자입니다. 조건이 되면 나머지 데이터를 송신합니다.3.skipWhile지정된 조건이 해당될 때만 데이터 송신을 제외하는 함수입니다.4.skipLast데이터의 끝에서부터 지정한 조건까지 데이터 송신을 제외하는 함수입니다.take와 반대의 기능을 갖고 있습니다. 보통 페이저나 리스트에서 paging을 처리할 때는 take와 skip을 혼용합니다.1. skip public static void skip() { //100 밀리세컨드만큼 반복하며 5번 발행하고, 처음 2개를 제외합니다. Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .take(5) .skip(2); //구독을 시잔한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 2. skipUntil public static void skipUntil() { //300밀리 세컨드만큼 반복하며 5개를 발행하고, 1000 밀리세컨드 제외 후 송신합니다. Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) .skipUntil(Flowable.timer(1000L, TimeUnit.MILLISECONDS)) .take(5); //구독을 시잔한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-2 onNext( 3 ) RxComputationThreadPool-2 onNext( 4 ) RxComputationThreadPool-2 onNext( 5 ) RxComputationThreadPool-2 onNext( 6 ) RxComputationThreadPool-2 onNext( 7 ) RxComputationThreadPool-2 onComplete() 3. skipWhile public static void skipWhile() { //300밀리세컨드만큼 반복하며 5개를 발행하고, 데이터 3이 올때까지 데이터를 제외힙니다. Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) .skipWhile(value -> value != 3) .take(5); //구독을 시잔한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 3 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onNext( 5 ) RxComputationThreadPool-1 onNext( 6 ) RxComputationThreadPool-1 onNext( 7 ) RxComputationThreadPool-1 onComplete() 4. skipLast public static void skipLast() { //1000 밀리세컨드만큼 반복하며 5개를 발행하고 마지막 2개는 제외합니다 Flowable<Long> flowable = Flowable.interval(1000L, TimeUnit.MILLISECONDS) .take(5) .skipLast(2); //구독을 시작한다. flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 1 ) RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onComplete() 16. throttleFirst데이터를 송신하고 지정된 시간 동안 들어오는 요청을 무시합니다. 이 함수는 View의 Event 처리에서 많이 사용됩니다. 중복되는 처리를 막기 위해 최초 실행 후 일정 시간 동안 View의 클릭 이벤트나 API 이벤트를 막을 수 있기 때문에 비동기 처리와 화면에 직접적인 피드백이 발생했을 때 throttleFirst를 자주 사용하고 있습니다. //데이터 요청이 30 밀리초마다 5번 발생합니다. //데이터 요청 발생시 100 밀리세컨트 동안 들어오는 데이터 요청을 무시합니다. // — 0 — 1 — 2 — 3 — 4 interval 30 밀리초 마다 // — — -*- — throttleFirst 100 밀리초 무시 Flowable<Long> flowable = Flowable.interval(30L, TimeUnit.MILLISECONDS) .take(5).throttleFirst(100L, TimeUnit.MILLISECONDS); flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 0 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 17. throttleLastthrottleLast 함수는 데이터를 송신하고 지정된 시간 동안 들어오는 마지막 요청을 송신합니다. 이 함수도 throttleFirst처럼 반복적인 선택 이벤트 처리에 유용하게 사용할 수 있습니다. 간단하게 장바구니 카운트 변경을 요청할 때 마지막 변경 이벤트 데이터만 처리하면 되므로 값이 선택되고 일정 시간이 지났을 때 API를 요청해 리소스 낭비를 줄일 수 있습니다.public static void throttleLast() { //데이터 요청이 1 초 마다 6번 발생합니다. //데이터 요청 발생시 2 초 동안 들어오는 마지막 요청을 송신하다. // - 0 - 1 - 2 - 3 - 4 interval 1 초 마다 // - - -* - throttleLast 2 초의 마지막 값 송신 Flowable<Long> flowable = Flowable.interval(1, TimeUnit.SECONDS) .take(5) .throttleLast(2, TimeUnit.SECONDS); flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( 2 ) RxComputationThreadPool-1 onNext( 4 ) RxComputationThreadPool-1 onComplete() 18. throttleWithTimeoutthrottleWithTimeout 함수는 데이터를 송신하고 지정된 시간 동안 다음 데이터를 받지 못하면 현재 데이터를 송신합니다. 완료 시엔 마지막 데이터를 송신하고 종료됩니다.public static void throttleWithTimeout() { Flowable<String> flowable = Flowable.<String>create(emitter -> { emitter.onNext("A"); Thread.sleep(1000L); // 1000 밀리세컨드 슬립 // 500 밀리세컨드 동안 데이터 다음 데이터 요청이 없으므로 A 송신 emitter.onNext("B"); Thread.sleep(300L); // 300 밀리세컨드 슬립 emitter.onNext("C"); Thread.sleep(300L); // 300 밀리세컨드 슬립 emitter.onNext("D"); Thread.sleep(1000L); // 1000 밀리세컨드 슬립 // 500 밀리세컨드 동안 데이터 다음 데이터 요청이 없으므로 D 송신 emitter.onNext("E"); Thread.sleep(100L); // 100 밀리세컨드 슬립 emitter.onComplete(); //완료 요청 시 마지막 데이터 송신 후 종료 }, BackpressureStrategy.BUFFER) .throttleWithTimeout(500L, TimeUnit.MILLISECONDS); flowable.subscribe(new CustomSubscriber<>()); } 결과 RxComputationThreadPool-1 onNext( A ) RxComputationThreadPool-1 onNext( D ) main onNext( E ) main onComplete() ConclusionRxJava에서 많이 사용되고, 또 알고 있으면 좋은 함수들을 살펴봤습니다. 브랜디에서도 이 함수들을 응용해 그동안 다양한 기능을 구현했고, 복잡한 함수도 사용하고 있습니다. 지금까지는 Flowable로 송신과 수신이 1 : 1 로 진행되었지만, 다양한 수신자를 사용해 하나의 Flowable로도 다른 화면에서 여러 수신자를 등록하여 반복적인 작업을 할 수 있습니다. 덕분에 같은 작업을 코드 중복 없이 간단하게 구현할 수 있죠.다음 글에서는 2개 이상의 Flowable을 결합해 사용하는 방법과 Android View에서 RxJava를 응용하는 방법, 구독을 관리하는 방법 등 Android에서 유용하게 쓰는 방법들을 알아보겠습니다.글고재성 팀장 | R&D 개발MA팀gojs@brandi.co.kr브랜디, 오직 예쁜 옷만
조회수 3257

크몽 개발팀 문화와 구조 이야기

안녕하세요. 크몽 개발자들과 함께하고 있는 크레이그(a.k.a. 크알)입니다.크몽 개발자 그룹은 1년 내 그 규모가 3배로 커지고, Data Science, Growth Hacking 조직이 만들어지는 등 질적, 양적으로 급성장하고 있는 팀입니다.크몽 개발 부서에 계신 분들은 크몽에 대해 이렇게 이야기 합니다.(참고 : 크몽 개발팀원 더팀스 인터뷰 - '신뢰할 수 있는 동료와 함께 초고속 성장을 만들어가는 크몽 팀' )"제가 크몽에서 전반적으로 느낀 인상은 능동적인 분들이 많다는 거예요. 수동적인 업무를 책임감 있게 하는 것도 중요하지만 문제를 스스로 찾고, 동료들에게 제기하고, 문제를 해결했을 때 진심으로 기뻐하면서 행복감을 느끼시는 분들이 많아요. 그게 큰 조직에 있다가 온 저에게는 정말 많은 자극이 되었어요. "- 데이터분석 KM님"크몽이 저의 개발자 커리어에서 마지막 회사였으면 좋겠다고 생각해요. 실은 진심이고요. 그동안 회사의 성장을 지켜봤고 개발적으로도 많은 변화를 경험했어요"- BackEnd Sean님이렇게 개발자들이 행복하게 개발할 수 있는 환경을 우선시하고 있습니다. 그리고 크몽의 오픈 커뮤니케이션 문화를 지향함과 동시에 ‘Work Happy’와 'Freedom with Responsibility’ 란 가치 아래 최대한 자율성을 보장된 실무자 중심의 개발 문화를 추구합니다.크몽 개발 조직 구조위 핵심 가치 아래 크몽 개발 조직 구조는 크게 ‘Go’와 ‘Chapter’로 구성되어 있습니다.Go  ; 고우선 ‘Go’는 프로젝트 개발 팀 단위로 크몽 서비스를 개선하기 위한 목표 중심의 조직입니다. 다른 회사에서는 ‘Silo’, ‘Team'로 명칭 하기도 합니다. 물리적으로 한 공간에서 스크럼을 이루어 일할 수 있도록 자원을 갖추고 있습니다. Go 안에는 Go Leader(GL) 가 있어 팀 업무 관리 및 우선순위를 정합니다.현재 크몽 개발 파트의 Go는 아래와 같이 구성되어 있습니다.UX-Go크몽 서비스 UX를 개선하기 위한 목표로 데이터를 기반으로한 UX Iteration & Growth Mission 을 수행하는 팀Data-Go데이터 파이프라인을 구축, 활용하여 조직 내 필요한 데이터 자료를 공급하고, 크몽 서비스안에 머신러닝/딥러닝 등의 인공지능 기술 영역을 담당하는 팀Dasi-Go서비스 안정적인 운영 및 릴리즈,  CRM 기술 지원을 담당하는 팀Mobile-Go검색 서비스, 서비스 카테고리 개선 등 크몽 서비스 향상을 위한 모듈 개발팀크몽 라운지Chapter  ; 챕터'Chapter'는 직군별 조직 단위로 주 1회 정도의 커뮤니케이션 타임을 통해 업무 및 기술 동향을 교환합니다. 더불어 챕터 안에서 필요한 스터디, 외부 교육 등의 직군별 자기 능력 향상을 도모하고, 회사에선 이를 적극 지원합니다. 그리고 챕터 내 프로젝트를 통해 서비스 개선에 기여하기도 합니다.크몽 개발 파트는 아래와 같은 챕터가 있습니다.(참고 : 웹 프로트엔드 챕터의 'gulp 개선기' -  https://brunch.co.kr/@kmongdev/5 )**챕터 프로젝트는 챕터 내에서 개발자분들이 스스로 필요하다는 판단 하에 빌딩 된 프로젝트입니다. 챕터 내에는 CL(Chapter Leader)가 존재하며, Chapter 구성원 관리 및 의견을 모아 조직에 전파하는 역할을 담당합니다.Guild  ; 길드개발 파트 안에서의 'Guild'는 토이 프로젝트 같은 성격의 공통 관심 분야를 지닌 프로젝트 팀이라고 볼 수 있습니다. 길드 기획 단계에서 회사 전사적으로 적용되면서, 동호회 성격으로 피보팅(Pivoting) 되어 있지만, 기본적으로 공통의 관심 분야를 같이 학습하고 프로젝트에 적용하는 팀입니다. 매주 수요일 오후 2~3시 사이의 시간은 챕터(Chapter), 고(Go)를 떠나 본인이 원하는 길드에 들어가서 새로운 영역을 탐색하고 연구하는 시간입니다.크몽 개발 파트는 아래와 같은 길드가 있습니다.(참고 : 코틀린 길드의 코틀린 리서치 이야기  https://brunch.co.kr/@kmongdev/9 )정리모든 개발 조직은 '성과 중심' 또는 '성장 중심'의 문화를 가지고 있습니다. 균형을 꾀하는 게 이상적이긴 하지만 스타트업에선 쉽지 않은 일입니다.하지만 크몽 개발 부서에선 인적 성장 중심 문화를 고민하고, 끊임없이 시도하고 있습니다. 이를 위해 여러 전문 교육 기관과 협약을 맺고 교육 지원을 하고 있으며, 국내 정상급 권위자 분들로 구성된 외부 컨설턴트 그룹을 구성해 개발자 분들께 배움과 성장의 기회를 부여하려고 노력하고 있습니다. 1년의 기간 동안 이직률3%의 수치를 기록하고 있는 크몽 개발 파트에선 신규 인력 채용 시 제 1의 인사 기준은 '높은 학력'도, '화려한 커리어'도 아닌우리와 '오랫동안' 함께 '성장'할 수 있는가?입니다. 이를 위해선 개발자 성장을 돕기 위한 환경 구축 및 관리가 필수이고,  그것이 궁극적으로는 회사 및 팀원에게도 장기적인 발전을 가져올 꺼란 굳은 믿음이 있습니다.크몽 개발 그룹CTO#크몽 #개발팀 #개발자 #사내복지 #기업문화 #조직문화 #사내스터디 #CTO

기업문화 엿볼 때, 더팀스

로그인

/