Druid PDF資料についてのメモ

まえがき

Apache Druid を利用する機会ができたので、まずはドキュメントとか参考資料でインプットを増やしてる。これはその1つの資料として “Druid” A Real-time Analytical Data Store をざっと読んだ際の記録となる

読む目的としては Druid というものがそもそもどういうモチベーションで作られたもので、どういったシチュエーションでパフォーマンスを発揮するものなのか、どういったことが苦手なのかというのを理解すること

資料読メモ

Abstract

  • Druid とは大規模データセットのリアルタイム探索分析のために設計されたオープンソースのデータストア
  • カラム指向のストレージレイアウト、分散型のシェアードナッシングアーキテクチャ、先進的なインデックス構造を組み合わせている
  • 10億行のテーブルを1秒以下のレイテンシで任意に探索できるようになっている
  • この論文では、高速な集約、柔軟なフィルタ、低遅延データインジェストをどのようにサポートしているかを詳細に説明する

1.INTRODUCTION

  • Hadoop は大量なデータを保存しアクセスすることに優れている。これに対して以下の点に難がある
    • どれだけ速くそのデータのアクセスするかについては性能が保証されていない
    • 同時実行時の負荷が大きい場合にパフォーマンスが低下する
    • データを ingest (データの取得・取り込み)してすぐに読めるようにするためには最適化されていない
  • 同時並行性の高い環境(1000人以上のユーザー)でクエリのパフォーマンスとデータの可用性を製品レベルで保証している企業としては、Hadoopではニーズを満たすことができない
  • 当時のオープンソースでは要件を十分に満たすものがないため、オープンソースの分散型カラム指向リアルタイム分析データストアであるDruidを開発することにした

ここで話されているデータの取得(ingestion)というのは、イベント発生からクエリによって集計できるまでの間を指している

2.PROBLEM DEFINITION

  • 以下の要求を満たす必要がある
    • クエリのレイテンシは1秒以内
    • マルチテナントで可用性の高いシステム
    • 同時進行性の高い環境
    • できるだけダウンタイムのない環境
    • ユーザーとアラートシステムが「リアルタイム」でビジネス上の意思決定を行えるようにする

3.ARCHITECTURE

  • Druid は異なるタイプのノードで構成され、異なるノードタイプは互いに独立して動作し、ノード間の相互作用は最小限に抑えられている。これによりクラスタ内の通信障害がデータの可用性に与える影響は最小限になっている
  • Druid という名前はゲームで出てくる Druidクラス(shape-shifter)に由来する

3.1Real-time Nodes

  • data ingest とイベントストリームのクエリの機能をカプセル化しており、このノードを経由してインデックス化されたイベントは、すぐにクエリに利用できまる。
  • 小さな時間範囲のイベントのみを扱い、定期的にこの小さな時間範囲で収集したイベントのバッチをDruid クラスタ内の他のノードに渡す。
  • Druid クラスタとの連携のために Zookeeperを利用しており、Zookeeper にオンライン状態であることと提供するデータについて知らせる
  • すべての着信イベントのためのインメモリインデックスバッファを保持します。これらのインデックスは、イベントがインジェストされるとインクリメンタルに生成され、インデックスは直接クエリ可能
  • ヒープオーバーフローの問題を回避するために、定期的に、または最大行数に達した後に、インメモリインデックスをカラム指向のストレージ形式に変換してディスクに永続化する
  • パーシステッドインデックスをオフヒープメモリにロードして照会できるようにする
  • 定期的に ローカルに永続化されたすべてのインデックスを検索 するバックグラウンドタスクが走り、インデックスをマージして一定時間に ingect したすべてのイベントを含む不変のデータ・ブロックが作成される。これを segment という
  • handoff の段階ではこのセグメントを恒久的なバックアップストレージ(S3, HDFS など Druid では deep storage とよんでいるもの)にアップロードする
図3: リアルタイムノードは、イベントをインメモリインデックスにバッファリングし、定期的にディスクに永続化します。定期的に、永続化されたインデックスはマージされてからハンドオフされます。クエリは、インメモリインデックスとパーシステッドインデックスの両方にヒットします。

3.1.1 Availability and Scalability

  • 耐障害性について
    • ノードがディスク(データ)を失っていない場合は、ディスクから永続化されたインデックスのリロードをし、オフセットからイベントを読み続けることで復旧可能。回復に必要な時間としても数秒程度。
    • 複数のリアルタイムノードがイベントを読み込める単一のエンドポイントとして機能するため、 イベントのレプリケーションを作成することと同義。そのため、 ノードが完全に故障してディスクを失うシナリオでは、レプリケートされたストリームによってデータが失われることはない
  • スケールについて
    • 複数のリアルタイムノードがそれぞれストリームの一部をインジェストするようにデータストリームを分割することができるので、 追加のリアルタイムノードをシームレスに追加することができる。
    • 実績として 約500MB/s(150,000イベント/sまたは2TB/hour)の生データを消費することが可能

3.2 Historical Nodes

  • リアルタイムノードによって作成された segment のロードおよび提供する機能をカプセル化している。多くのワークフローにおいてDruidクラスタにロードされるデータの大部分は不変であるため、ヒストリカルノードがDruidクラスタのメインワーカーとなる
  • ヒストリカルノードは sharednothing アーキテクチャを採用しており、ノード間で競合する単一のポイントは存在しない。機能的には単純で、不変セグメントのロード、ドロップ、サーブのみ。
  • オンライン状態であることと提供しているデータをZookeeperに通知する 。
  • セグメントのロードとドロップの指示は Zookeeper を介して送信され、セグメントがディープストレージ内のどこにあるか、セグメントを処理する方法などの情報が含まれている。
  • セグメントに関する情報がキャッシュに存在しない場合、ヒストリカル・ノードは、ディープ・ストレージからセグメントをダウンロードし始める。 処理が完了すると、セグメントはZookeeper内で通知され問い合わせ可能となる
  • ローカルキャッシュを使用することで、履歴ノードを迅速に更新して再起動することができる。起動時に、ノードはキャッシュを検査し、見つけたデータをすぐに利用可能な状態とする。
  • 不変データのみを扱うため、読み取りの一貫性をサポートすることができ、不変データブロックはまた、単純な並列化モデルを可能にする。ヒストリカルノードは、ブロッキングすることなく不変ブロックを同時にスキャンして集約することもできる。
図5: ヒストリカルノードは、ディープストレージからセグメントをダウンロードします。セグメントは、クエリを実行する前にメモリにロードされている必要があります。

3.2.1 Tiers

  • ヒストリカルノードは異なる階層にグループ化することができ、特定の階層のすべてのノードが同じように構成される。階層ごとに異なるパフォーマンスとフォールトトレランスパラメータを設定できる。
  • 階層化ノードの目的は、優先度の高いセグメントや低いセグメントを重要度に応じて分散できるようにすること

3.2.2 Availability

  • ヒストリカルノードは、セグメントのロードおよびアンロード命令を Zookeeper に依存しており、Zookeeper が使用できなくなると新しいデータを提供したり、古いデータを削除したりすることができなくなるが、クエリーは HTTP で提供されるため、現在提供しているデータに対するクエリー要求に応答することはできる
  • Zookeeper の停止がヒストリカルノードの現在のデータ可用性に影響を与えることはないと書いてあるが、0.18.0 のバージョンでは Zookeeper の停止によりDruid も停止となるように修正されている

3.3 Broker Nodes

  • Broker ノードは、ヒストリカルノードやリアルタイムノードへの問い合わせルータとして動作する
  • Zookeeper で公開されているメタデータを確認し、どのセグメントがクエリ可能で、そのセグメントがどこにあるかを知る
  • クエリが適切なヒストリカルノードまたはリアルタイムノードにヒットするように、受信したクエリをルーティングし、ヒストリカルノードとリアルタイムノードからの部分的な結果をマージして、最終的な統合結果を呼び出し元に返す。

3.3.1 Caching

  • Brokerノードは、LRU [31, 20]の無効化戦略を持つキャッシュを含む。キャッシュはローカルのヒープメモリを使用するか、Memcached [16]のような外部分散型のキー/値ストアを使用することができる
  • キャッシュに存在しない結果については、ヒストリカルノードとリアルタイムノードにクエリを転送する。ヒストリカルノードが結果を返すと、Broker はこれらの結果をセグメントごとにキャッシュし、将来の使用に備える
  • リアルタイムデータは永続的に変化しており、結果をキャッシュすることは信頼性がないため、リアルタイムデータは決してキャッシュされない。リアルタイムデータへのリクエストは常にリアルタイムノードに転送されます。
  • すべてのヒストリカルノードが故障した場合でも、キャッシュに結果が既に存在する場合は結果を返すことができる

3.3.2 Availability

(Zookeeper が死んでも提供できる旨が書かれているが、0.18.0 で Zookeeper と疎通が取れない場合は Druid のプロセスを停止させるようになっているため、このあたりは最新のドキュメントを参考にするのが良さそう)

3.4 Coordinator Nodes

  • コーディネーターノードは、主にヒストリカルノードでのデータ管理や配信を担当しており、ヒストリカルノードに新しいデータのロード、古いデータのドロップ、データの複製、ロードバランスのためのデータの移動を指示する。
  • セグメントに新しいセグメントによって最新とは異なったデータが含まれている場合、その古いセグメントはクラスターから削除される。
  • リーダー選出プロセスを経て、コーディネータ機能を実行する1つのノードを決定し、残りのコーディネータノードは冗長バックアップとして機能する。
  • コーディネータノードは定期的にクラスタの現在の状態を予想されるクラスタの状態と実行時のクラスタの実際の状態を比較して判断する
  • 現在のクラスタ情報のために Zookeeper 接続をもつ。また、追加の運用パラメータや設定を含む MySQL データベースへの接続ももつ
  • MySQL データベースにある重要な情報の 1 つは、ヒストリカルノードがサービスを提供するすべてのセグメントのリストを含むテーブル。このテーブルは、リアルタイムノードなど、セグメントを作成するサービスによって更新される。MySQL データベースには、セグメントがクラスタ内でどのように作成、破棄、複製されるかを管理するルールテーブルも含まれている

3.4.1 Rules

  • ヒストリカルノードのセグメントに対して、どのようにクラスタからロード、ドロップするかを定めたもの。セグメントを異なるヒストリカルノード層にどのように割り当てるか、また、各層にセグメントのレプリケートが何個存在するかも示す
  • セグメントをクラスタから完全に削除するタイミングを示すこともできる(以下は例)
    • 最新の1ヶ月分のセグメントを「ホット」クラスタにロード
    • 最新の1年分のセグメントを「コールド」クラスタにロード
    • 古いセグメントはすべて削除
  • コーディネータノードは、MySQLデータベースのルールテーブルからルールのセットをロードする
    • ルールは特定のデータソースに固有のものでも、デフォルトのルールセットが設定されていてもよい。コーディネータ・ノードは、利用可能なすべてのセグメントを順番にそのセグメントに適用される最初のルールと一致させる

3.4.2 Load Balancing

  • クラスタの負荷が過度に不均衡にならないようにセグメントをクラスタ間で分散させる必要がある。 クラスタ間でセグメントを最適に分配し、バランスをとるために、セグメントデータのソース、再帰性、サイズを考慮に入れたコストベースの最適化手順を開発した(アルゴリズムについてはここでは話さない)

3.4.3 Replication

  • 異なるヒストリカルノードに同じセグメントのコピーをロードするように指示することができる。高いレベルの耐障害性を必要とする場合、レプリカの数を多く設定することができる。
  • 複製されたセグメントはオリジナルと同じように扱われ、同じ負荷分散アルゴリズムに従う。セグメントを複製することで、単一の履歴ノードの障害はDruidクラスタ内で透過的になり、ソフトウェアのアップグレードなどに利用できる。

3.4.4 Availability

  • コーディネータノードは、Zookeeper と MySQLと連携している
    • Zookeeper :クラスタ内にどのような履歴ノードが既に存在しているかを判断しており、Zookeeper が利用できなくなると、コーディネーターはセグメントの割り当て、バランス、およびドロップの指示を送信できなくなるが、これらの操作はデータの可用性には全く影響はない
    • MySQL:MySQL がダウンした場合、MySQL上のセグメントのメタ情報はコーディネータノードから利用できなくなる。しかしこれはデータ自体が利用できないという意味ではない。コーディネータノードがMySQLと通信できなくなると、新しいセグメントの割り当てを停止し、古くなったセグメントを削除する。ブローカーノード、ヒストリカルノード、およびリアルタイムノードは、MySQL が停止している間も照会可能

4. STORAGE FORMAT

  • Druidのデータテーブル(データソースと呼ばれる)は、タイムスタンプの付いたイベントの集合体であり、セグメントのセットに分割され、各セグメントは通常500万~1,000万行になる。正式には、セグメントをある期間に渡るデータの行の集合体と定義する。セグメントはDruidの基本的なストレージユニットを表し、レプリケーションと配布はセグメントレベルで行われる
  • データ分配ポリシー、データ保持ポリシー、および第一レベルのクエリ・プルーニングを簡素化する方法として、常にタイムスタンプ・カラムを必要としている
  • データソースを明確に定義された時間間隔(一般的には1時間または1日)に分割し、さらに他の列の値を分割して、希望のセグメ ントサイズに収めることができる
  • セグメントは、データソースの識別子、データの時間間隔、新しいセグメントが作成されるたびに増加するバージョン文字列によって一意に定まる
  • バージョン文字列はセグメントデータの新しさを示し、バージョンが古いセグメントよりも、バージョンが新しいセグメントの方が(ある時間範囲内で)新しいデータであることを示す
  • セグメントはカラムナーベースで保存される
  • LZF [24] 圧縮アルゴリズムを基本的には使用

4.1 Indices for Filtering Data

  • Druidは、特定のクエリ・フィルタに関連する行のみがスキャンされるように、文字列列用の追加のルックアップ・インデックスを作成する
  • Druid ではビットマップ圧縮アルゴリズムとしてConciseアルゴリズム[10]を使用することを選択した

4.2 Storage Engine

  • Druidのパーシステンスコンポーネントでは、異なるストレージエンジンを接続することができる
  • これらのストレージエンジンは、JVMヒープのような完全なインメモリ構造でデータを保存したり、メモリマップされた構造でデータを保存したりする
  • デフォルトでは、メモリマップされたストレージエンジンが使用されるが、パフォーマンスが必要な場合は、高価ではあるがインメモリ・ストレージ・エンジンを使用することもできる
  • メモリマップド・ストレージ・エンジンを使用する場合、Druid はセグメントをメモリの中に入れたり出したりする際にオペレーテ ィング・システムに依存する
  • メモリマップ型ストレージエンジンを使用した場合の主な欠点は、クエリの実行時に、あるノードの容量を超えて多くのセグメントをメモリにページする必要がある場合、セグメントをメモリ内でページングしたり、メモリ外でページングしたりするためのコストに悩まされる点

5. QUERY API

  • Druidは独自のクエリ言語を持ち、POSTリクエストとしてクエリを受け付ける。ブローカー、ヒストリカル、リアルタイムノードはすべて同じクエリAPIを共有している。
  • POSTリクエストの本文は、様々なクエリパラメータを指定したkey-valueペアを含むJSONオブジェクト。典型的なクエリには、データソース名、結果データの粒度、対象の時間範囲、リクエストのタイプ、集約するメトリクスが含まれ、結果は期間にわたって集約されたメトリクスを含む JSON オブジェクトになる
  • 執筆当初、Druid用の結合クエリはまだ実装されていない(0.18.0 で実装された)。 組織にとって実装コストは投資に見合うものではないという選択をしたため

6. PERFORMANCE

  • 2014年初頭の時点でMetamarketsで運用されているメインの運用クラスタの数値
  • 他のデータベースとの比較のために、TPC-Hデータ上での合成ワークロードの結果も掲載

6.1 Query Performance in Production

  • Druidクエリのパフォーマンスは、発行されるクエリによって大きく変化する
  • 実運用のDruidクラスタにおけるクエリの平均レイテンシを示すために、最もクエリされたデータソースの中から8つを選択し計測した
  • パフォーマンス
    • クエリの平均レイテンシは約550ミリ秒
    • 90%のクエリが1秒未満で返されている
    • 95%のクエリが2秒未満で返されている
    • 99%のクエリが10秒未満で返されている

6.2 Query Benchmarks on TPC-H Data

  • Druid のほうが断然速いという内容以外特に特筆することはなし
  • MySQL側の実行エンジンとして MyISAMを使用してたのは気になる(執筆当時はまだあるが、2020-04 だと InnoDBが主流)

6.3 Data Ingestion Performance

  • Druidのデータインジェストのレイテンシは、インジェストされるデータセットの複雑さに大きく依存する。データの複雑さは、各イベントに含まれるディメンジョンの数、各イベントに含まれるメトリクスの数、およびそれらのメトリクスで実行したい集計の種類によって決まる。
  • 最も基本的なデータセット(タイムスタンプ列のみを持つデータセット)では、800,000イベント/秒/コアの速度でデータをインジェストできますが、現実のデータセットは決してこれほど単純ではない。
  • スループットを、リアルタイムノードがインジェストし、クエリ可能なイベントの数と定義
  • あまりにも多くのイベントがリアルタイムノードに送られた場合、リアルタイムノードがそれらのイベントを受け入れる余裕ができるまで、それらのイベントはブロックされる。本番環境で測定したピークインジェストレイテンシは、Amazon cc2.8xlarge インスタンスを実行している30のディメンションと19のメトリクスを持つデータソースで22914.43イベント/秒/コア

7. DRUID IN PRODUCTION

  • QueryPattern
    • 探索的なユースケースでは、1人のユーザーが発行するクエリの数は多くなる。
    • 探索的なクエリでは、結果を絞り込むために同じ時間範囲のフィルタを段階的に追加することがよくあり、最近のデータの短い時間間隔を探索する傾向がある。
    • レポートを生成するユースケースでは、ユーザーはより長いデータ間隔でクエリを行うが、これらのクエリは一般的に数が少ない
  • Multitenancy
    • 負荷の高い同時実行クエリは、マルチテナントでは問題となりえる。
    • 負荷の高いクエリにより実行できないクエリが出ることへの対応として、クエリの優先順位付けを導入した。
    • 各ヒストリカルノードは、スキャンする必要のあるセグメントに優先順位をつけることができる
    • かなりの量のデータに対するクエリは、レポーティングのユースケースのためのものである傾向があり、優先順位を下げることができる。このユースケースでは、データを探索するときと同じレベルのインタラクティブ性をユーザーは期待していない
  • Node Failures
    • ヒストリカルノードが完全に 失敗して回復しない場合は、セグメントを再割り当てする必要がある。これはつまり、このデータをロードするためには余力が必要になる。
    • 経験から、2つ以上のノードが一度に完全に故障することは非常に稀であり、2つのヒストリカルノードからのデータを完全に再割り当てできるだけの余力をクラスタに残している
  • Data Center Outage
    • 完全なクラスタ停止はありえるが、非常にまれ。
    • 完全停止した場合、ディープストレージがまだ利用可能である限り、過去のノードがディープストレージからすべてのセグメントを再ダウンロードするだけで済むため、クラスタのリカバリ時間はネットワークに依存する
    • 過去の障害では、Amazon AWSのエコシステムで数テラバイトのデータに対して数時間のリカバリータイムが発生した

7.1 Operational Monitoring

  • 各Druidノードは、定期的に一連の運用メトリクスを出力するように設計されている
    • CPU使用率
    • 使用可能なメモリ
    • ディスク容量
    • ガベージコレクション時間
    • ヒープ使用率
    • セグメントスキャン時間
    • キャッシュヒット率
    • データインジェストレイテンシ
    • クエリごとのメトリクス

7.2 Pairing Druid with a Stream Processor

  • Druidは完全に非正規化されたデータストリームしか使用できない
  • 本番環境で完全なビジネスロジックを提供するために、DruidはApache Storm [27]のようなストリームプロセッサとペアにすることができる

7.3 Multiple Data Center Distribution

  • 大規模な停止は、単一ノードだけでなく、データセンター全体にも影響を及ぼす可能性がある
  • Druid コーディネーターノードのティア構成では、セグメントを複数のティアにまたがってレプリケートすることができる。そのため、セグメントは複数のデータセンターにあるヒストリカルノード間で正確に複製することができる。
  • クエリの優先順位を異なる階層に割り当てることができる
  • あるデータセンターのノードをプライマリクラスタとして動作させ別のデータセンターに冗長クラスタを設置することも可能

以下全文日本語訳

1.INTRODUCTION

近年、インターネット技術の普及により、機械が生成するイベントが急増している。これらのイベントは個々には有用な情報がほとんど含まれておらず、価値が低い。大規模なイベントのコレクションから意味を抽出するのに必要な時間とリソースを考えると、多くの企業はこのようなデータを破棄することになります。イベントベースのデータを扱うためのインフラストラクチャは構築されているが(IBMのNetezza[37]、HPのVertica[5]、EMCのGreenplum[29]など)、それらは大部分が高価格で販売されており、余裕のある企業のみを対象としている。数年前、GoogleはMapReduce [11]を同社の 汎用ハードウェアを活用してインターネットのインデックスを作成し、ログを分析する仕組み。Hadoop [36] プロジェクトがすぐに続き から出てきた洞察を主にパターン化したものです。MapReduceの論文。Hadoopは現在、大量のログデータを保存・分析するために多くの組織に導入されています。[…] は、価値の低いイベントストリームを、ビジネスインテリジェンスやA-Bテストなどのさまざまなアプリケーションのための価値の高いアグリゲートに変換する企業を支援することに多くの貢献をしてきました。 Hadoopは、企業が価値の低いイベントストリームを、ビジネスインテリジェンスやA-Bテストなどのさまざまなアプリケーションのための価値の高いアグリゲートに変換するのに多くの貢献をしてきました。

多くの優れたシステムと同様に、Hadoopは新たな問題に目を向けさせてくれました。具体的には、Hadoopは大量のデータを保存し、アクセスを提供することに優れていますが、そのデータにどれだけ素早くアクセスできるかについては、パフォーマンスが保証されていません。さらに、Hadoop は可用性の高いシステムではありますが、同時実行時の負荷が大きい場合にはパフォーマンスが低下します。最後に、Hadoop はデータの保存には適していますが、データをインジェストしてそのデータをすぐに読めるようにするためには最適化されていません。Metamarkets製品の開発の初期段階で、これらの問題に直面し、Hadoopは優れたバックオフィス、バッチ処理、データウェアハウスシステムであることに気付きました。しかし、同時並行性の高い環境(1000人以上のユーザー)でクエリのパフォーマンスとデータの可用性を製品レベルで保証している企業としては、Hadoopは私たちのニーズを満たすことができませんでした。この分野のさまざまなソリューションを検討し、リレーショナルデータベース管理システムとNoSQLアーキテクチャの両方を試した結果、オープンソースの世界には、当社の要件を十分に活用できるものはないという結論に達しました。結局、オープンソースの分散型カラム指向リアルタイム分析データストアであるDruidを開発することになりました。多くの点で、Druidは他のOLAPシステム[30, 35, 22]、対話型クエリシステム[28]、メインメモリデータベース[14]や として広く知られている分散データストア[7, 12, 23]。
分散型やクエリモデルは現時点の検索インフラからアイデアを借りています。[25, 3, 4]。

この論文では、Druid のアーキテクチャについて説明し、ホストされたサービスに電力を供給する(?)常時稼働型の生産システムを作成する際に行われた様々な設計上の決定事項を探り、同様の問題に直面している人に、解決の可能性のある方法についての情報を提供することを試みています。Druidは、複数のテクノロジー企業で本番さながらに展開されています. 本稿の構成は以下の通りである。セクション2で問題について説明し、次に、システムアーキテクチャについて、データがどのようにシステムを流れるかという観点から詳細について話す。次に、データがバイナリに変換される方法と理由について説明します。セクション4でフォーマットを説明し、クエリAPIについてはセクション5で簡単に説明し、性能結果をセクション6で紹介する.最後に セクション7では、本番でDruidを運用していた時の教訓を交えながら 第8節に関連研究を示す。

2.PROBLEM DEFINITION

Druidは元々、大量のトランザクションイベント(ログデータ)をインジェストして探索する問題を解決するために設計されました。
この形式の時系列データは、OLAPワークフローで一般的に見られ、データの性質上、非常に重い追加処理が必要になる傾向があります。次のような場合に使用します。例として、表1に示すデータを考えてみましょう。表1にはウィキペディア上で発生した編集の情報が含まれています。ユーザーが編集するたびに メタデータを含むイベントが生成されます。このメタデータは3つの異なるコンポーネントで構成されています。第一に、編集された時間の timestamp。次に、編集されたページや 編集を行ったユーザーと、そのユーザーの場所といったカラムのセット。最後に編集により追加・削除された文字数といったメトリクス郡(通常は数値)

私たちの目標は、このデータのドリルダウンや集計を迅速に計算することです。サンフランシスコの男性からJustin Bieberのページに何回編集が行われたか」や「カルガリーの人が1ヶ月間に追加した平均文字数は何文字か」といった質問に答えたいと考えています。また、任意の任意のディメンションの組み合わせに対するクエリは、1秒以内のレイテンシーで返すようにしたい。

Druidの必要性は、既存のオープンな RDBMS と NoSQLのキー/値ストアがインタラクティブなアプリケーションに対して低レイテンシでのデータ取得とクエリプラットフォームを提供できなかったことにより促進された[40]。ダッシュボードを支えるデータ・ストアは、その上に構築されたデータ・ビジュアライゼーションがユーザーにインタラクティブな体験を提供できるように、十分な速さでクエリを返す必要がありました。

クエリのレイテンシーのニーズに加えて、マルチテナントで可用性の高いシステムである必要がありました。Metamarkets製品は、高度に同時進行性の高い環境で使用されています。ダウンタイムにはコストがかかり、多くの企業はソフトウェアのアップグレードやネットワーク障害に直面してシステムが利用できなくなった場合、待っている余裕はありません。適切な社内運用管理を行っていないことが多い新興企業のダウンタイムは、ビジネスの成否を左右する可能性があります。

最後に、メタマーケッツが初期に直面したもう一つの課題は、ユーザーとアラートシステムが「リアルタイム」でビジネス上の意思決定を行えるようにすることでした。イベントが作成されてから、そのイベントがクエリ可能な状態になるまでの時間は、関心のある関係者がシステム内の潜在的に破滅的な状況にどれだけ早く対応できるかを決定します。Hadoopのような人気のあるオープンソースのデータウェアハウスシステムでは、私たちが必要とするサブセコンドのデータ取り込みレイテンシを提供することができませんでした。

データの探索、取り込み、可用性の問題は、複数の業界にまたがる。Druidは2012年10月にオープンソース化されて以来、ビデオ、ネットワーク監視、運用監視、オンライン広告分析プラットフォームとして複数の企業に導入されています。

3.ARCHITECTURE

Druidクラスタは異なるタイプのノードで構成され、各ノードタイプは特定のセットを実行するように設計されています。この設計は、懸念事項を分離し、システム全体の複雑さを単純化すると考えています。異なるノードタイプは互いに独立して動作し、ノード間の相互作用は最小限に抑えられています。したがって、クラスタ内の通信障害がデータの可用性に与える影響は最小限に抑えられています。

複雑なデータ分析の問題を解決するために、異なるノードタイプが一緒になって完全に動作するシステムを形成します。Druidという名前は、多くのロールプレイングゲームに登場するDruidクラスに由来しています。これはシェイプシフターで、グループ内で様々な役割を果たすために多くの異なる形態を取ることができます。
Druidクラスタ内のデータの構成と流れを図1に示します。

3.1Real-time Nodes

リアルタイムノードは、インジェストとイベントストリームのクエリの機能をカプセル化しています。これらのノードを経由してインデックス化されたイベントは、すぐにクエリに利用できます。ノードは小さな時間範囲のイベントのみを扱い、定期的にこの小さな時間範囲で収集した不変イベントのバッチを、不変イベントのバッチを扱うことに特化した Druid クラスタ内の他のノードに渡します。リアルタイムノードは、残りの Druid クラスタとの連携のために Zookeeper [19] を利用する。ノードは Zookeeper にオンライン状態であることと提供するデータについて知らせる

リアルタイムノードは、すべての着信イベントのためのインメモリインデックスバッファを保持します。これらのインデックスは、イベントがインジェストされるとインクリメンタルに生成され、インデックスは直接クエリ可能です。Druidは、このJVMヒープベースのバッファに存在するイベントに対するクエリのための行ストアとして動作します。ヒープオーバーフローの問題を回避するために、リアルタイムノードは、定期的に、または最大行数に達した後に、インメモリインデックスをディスクに永続化します。このパーシストプロセスは、インメモリバッファに格納されたデータをセクション4で説明したカラム指向のストレージ形式に変換します。各パーシステッドインデックスは不変であり、リアルタイムノードはパーシステッドインデックスをオフヒープメモリにロードして、まだ照会できるようにします。このプロセスは[33]で詳しく説明されており、図2に示されています。

図2:ノードは起動し、データをインジェストし、永続化し、定期的にデータを渡します。このプロセスは無期限に繰り返されます。異なるリアルタイム・ノード操作の間の期間は、構成可能です。

定期的に、各リアルタイムノードは ローカルに永続化されたすべてのインデックスを検索 するバックグラウンドタスクをスケジュールします。このタスクは、これらのインデックスをマージして、リアルタイム・ノードがある時間の間に摂取したすべてのイベントを含む不変のデータ・ブロックを構築します。このデータブロックを「セグメント」と呼びます。ハンドオフの段階では、リアルタイムノードはこのセグメントを恒久的なバックアップストレージ、一般的にはS3 [12]やHDFS [36]などの分散ファイルシステムにアップロードします。インジェスト、パーシスト、マージ、およびハンドオフの各段階は流動的です。

図3は、リアルタイムノードの動作を示す図である。ノードは13:37に開始し、現在の1時間(13時台)、あるいは次の1時間(14時台)のイベントのみを受け入れます。イベントがインジェストされると、ノードは13:00から14:00までの間隔でデータのセグメントを提供していることを知らせる。10分ごと(持続時間は設定可能)に、ノードはそのインメモリバッファをフラッシュしてディスクに持続させます。1時間の終わり近くになると、ノードは14:00~15:00のイベントを見ることになるでしょう。これが発生すると、ノードは次の時間のデータを提供する準備をし、新しいインメモリインデックスを作成します。その後、ノードは14:00から15:00までのセグメントも提供していることを知らせる。ノードは、13:00から14:00までの永続インデックスをすぐにマージするのではなく、13:00から14:00までのイベントが到着するまで、構成可能なウィンドウ期間を待ちます。このウィンドウ期間は、イベント配信の遅延によるデータ損失のリスクを最小限に抑えます。ウィンドウ期間の終了時に、ノードは13:00から14:00までのすべての永続インデックスを1つの不変セグメントにマージし、セグメントを渡します。このセグメントがロードされ、Druidクラスタ内のどこか他の場所でクエリ可能になると、リアルタイムノードは13:00~14:00の間に収集したデータに関するすべての情報をフラッシュし、このデータを提供していることを知らせないようになる。

図3: リアルタイムノードは、イベントをインメモリインデックスにバッファリングし、定期的にディスクに永続化します。定期的に、永続化されたインデックスはマージされてからハンドオフされます。クエリは、インメモリインデックスとパーシステッドインデックスの両方にヒットします。

3.1.1 Availability and Scalability

リアルタイムノードはデータの消費者であり、データストリームを提供するために対応するプロデューサを必要とします。一般的に、データの耐久性を目的として、図4に示すように、Kafka [21]のようなメッセージバスがプロデューサーとリアルタイムノードの間に配置されています。リアルタイムノードは、メッセージバスからイベントを読み取ることによってデータをインジェストする。イベント作成からイベント消費までの時間は通常数百ミリ秒のオーダーです。

図4:複数のリアルタイムノードが同じメッセージバスから読み出すことができます。各ノードは独自のオフセットを維持します。

図4のメッセージバスの目的は2つある。第一に、メッセージバスは受信イベントのバッファとして機能します。Kafka のようなメッセージバスは、コンシューマ(リアルタイムノード)がイベントストリームの中でどれだけの距離を読んだかを示す位置オフセットを保持しています。コンシューマはこれらのオフセットをプログラムで更新することができます。リアルタイムノードは、メモリ内のバッファをディスクに永続化するたびに、このオフセットを更新します。フェイルアンドリカバリーのシナリオでは、ノードがディスクを失っていない場合、ディスクからすべての永続化されたインデックスをリロードして、最後にコミットしたオフセットからイベントを読み続けることができます。最近コミットしたオフセットからイベントを取り込むことで、ノードのリカバリ時間が大幅に短縮されます。実際には、ノードがこのような障害シナリオから数秒で回復するのを確認している。

メッセージバスの第二の目的は、複数のリアルタイムノードがイベントを読み込める単一のエンドポイントとして機能することです。複数のリアルタイムノードは、バスから同じイベントセットをインジェストし、イベントのレプリケーションを作成することができます。ノードが完全に故障してディスクを失うシナリオでは、レプリケートされたストリームによってデータが失われることはありません。また、単一のインジェストエンドポイントにより、複数のリアルタイムノードがそれぞれストリームの一部をインジェストするようにデータストリームを分割することができます。これにより、追加のリアルタイムノードをシームレスに追加することができます。実際には、このモデルにより、最大規模のプロダクションDruidクラスタでは、約500MB/s(150,000イベント/sまたは2TB/hour)の生データを消費することが可能になりました。

3.2 Historical Nodes

ヒストリカルノードは、 リアルタイムノードによって作成されたデータの不変ブロック(セグメント) のロードおよび提供する機能をカプセル化しています。多くの実世界のワークフローでは、Druidクラスタにロードされるデータの大部分は不変であるため、ヒストリカルノードがDruidクラスタのメインワーカーとなります。ヒストリカルノードはsharednothingアーキテクチャを採用しており、ノード間で競合する単一のポイントは存在しません。ノードはお互いの知識を持たず、操作的には単純で、不変セグメントのロード、ドロップ、サーブの方法を知っているだけです。

リアルタイムノードと同様に、ヒストリカルノードは、そのオンライン状態と提供しているデータをZookeeperに通知する。セグメントのロードとドロップの指示は Zookeeper を介して送信され、セグメントがディープストレージ内のどこにあるか、セグメントを解凍して処理する方法などの情報が含まれています。ヒストリカルノードは、ディープストレージから特定のセグメントをダウンロードする前に、まず、ノードに既に存在するセグメントの情報を保持するローカルキャッシュをチェックします。セグメントに関する情報がキャッシュに存在しない場合、ヒストリカル・ノードは、ディープ・ストレージからセグメントをダウンロードし始める。この処理を図5に示す。処理が完了すると、セグメントはZookeeper内で通知される。この時点で、セグメントは問い合わせ可能です。また、ローカルキャッシュを使用することで、履歴ノードを迅速に更新して再起動することができます。起動時に、ノードはキャッシュを検査し、見つけたデータをすぐに提供します。

図5: ヒストリカルノードは、ディープストレージから不変のセグメントをダウンロードします。セグメントは、クエリを実行する前にメモリにロードされている必要があります。

ヒストリカルノードは不変データのみを扱うため、読み取りの一貫性をサポートすることができます。不変データブロックはまた、単純な並列化モデルを可能にします。ヒストリカルノードは、ブロッキングすることなく、不変ブロックを同時にスキャンして集約することができます。

3.2.1 Tiers

ヒストリカルノードは異なる階層にグループ化することができ、特定の階層のすべてのノードが同じように構成されます。階層ごとに異なるパフォーマンスとフォールトトレランスパラメータを設定できます。階層化ノードの目的は、優先度の高いセグメントや低いセグメントを重要度に応じて分散できるようにすることです。例えば、コア数が多く、メモリ容量が大きい履歴ノードの「ホット」ティアをスピンアップすることが可能です。「ホット」クラスタは、より頻繁にアクセスされるデータをダウンロードするように構成することができます。並列の「コールド」クラスタは、性能の低いバッキングハードウェアを使用して作成することもできます。コールド」クラスタには、アクセス頻度の低いセグメントのみが含まれます。

3.2.2 AVAILABILITY

履歴ノードは、セグメントのロードおよびアンロード命令を Zookeeper に依存しています。Zookeeper が使用できなくなると、ヒストリカルノードは新しいデータを提供したり、古いデータを削除したりすることができなくなりますが、クエリーは HTTP で提供されるため、ヒストリカルノードは現在提供しているデータに対するクエリー要求に応答することができます。つまり、Zookeeper の停止がヒストリカルノードの現在のデータ可用性に影響を与えることはありません。

3.3 Broker Nodes

Broker ノードは、ヒストリカルノードやリアルタイムノードへの問い合わせルータとして動作します。Broker ノードは、Zookeeper で公開されているメタデータを確認し、どのセグメントがクエリ可能で、そのセグメントがどこにあるかを知る。Broker ノードは、クエリが適切なヒストリカルノードまたはリアルタイムノードにヒットするように、受信したクエリをルーティングします。また、Broker ノードは、ヒストリカルノードとリアルタイムノードからの部分的な結果をマージして、最終的な統合結果を呼び出し元に返す。

3.3.1 CACHING

Brokerノードは、LRU [31, 20]の無効化戦略を持つキャッシュを含む。キャッシュはローカルのヒープメモリを使用するか、Memcached [16]のような外部分散型のキー/値ストアを使用することができる。特定のセグメントの結果はすでにキャッシュに存在している可能性があり、再計算の必要はありません。キャッシュに存在しない結果については、ブローカー・ノードが正しいヒストリカルノードとリアルタイムノードにクエリを転送します。ヒストリカルノードが結果を返すと、ブローカはこれらの結果をセグメントごとにキャッシュし、将来の使用に備えます。このプロセスを図6に示します。リアルタイムデータは決してキャッシュされないため、リアルタイムデータへのリクエストは常にリアルタイムノードに転送されます。リアルタイムデータは永続的に変化しており、結果をキャッシュすることは信頼性がありません。

キャッシュは、データ耐久性の追加レベルとしても機能します。すべての履歴ノードが故障した場合でも、キャッシュに結果が既に存在する場合は結果を照会することができます。

図6:結果はセグメントごとにキャッシュされます。クエリは、キャッシュされた結果と、ヒストリカルノードおよびリアルタイムノードで計算された結果を組み合わせます。

3.3.2 AVAILABILITY

Zookeeper が完全に停止した場合でも、データは照会可能である。ブローカーノードが Zookeeper と通信できない場合は、最後に知っているクラスタのビューを使用し、リアルタイムノードとヒストリカルノードにクエリを転送し続けます。ブローカーノードは、クラスタの構造が停止前と同じであることを前提としている。実際には、この可用性モデルにより、Zookeeper の停止を診断している間、Druid クラスタはかなりの期間、クエリを提供し続けることができました。

3.4 Coordinator Nodes

ドルイドのコーディネーターノードは、主にヒストリカルノードでのデータ管理や配信を担当しています。コーディネーターノードは、ヒストリカルノードに新しいデータのロード、古いデータのドロップ、データの複製、ロードバランスのためのデータの移動を指示します。Druidは、安定したビューを維持するために、不変セグメントを管理するためにマルチバージョン同時実行制御スワッピングプロトコルを使用しています。不変セグメントに新しいセグメントによって完全に時代遅れになったデータが含まれている場合、その時代遅れのセグメントはクラスターから削除されます。不変セグメントに新しいセグメントによって完全に時代遅れになったデータが含まれている場合、その時代遅れのセグメントはクラスターから削除されます。 コーディネータノードはリーダー選出プロセスを経て、コーディネータ機能を実行する1つのノードを決定します。残りのコーディネータノードは冗長バックアップとして機能します。

コーディネータノードは定期的に実行され、クラスタの現在の状態を判断します。コーディネータノードは、予想されるクラスタの状態と、実行時のクラスタの実際の状態を比較して判断します。他の Druid ノードと同様に、コーディネータノードは現在のクラスタ情報のために Zookeeper 接続を維持します。コーディネータ・ノードはまた、追加の運用パラメータや設定を含む MySQL データベースへの接続を維持します。MySQL データベースにある重要な情報の 1 つは、履歴ノードがサービスを提供すべきすべてのセグメントのリストを含むテーブルです。このテーブルは、リアルタイムノードなど、セグメントを作成するサービスによって更新することができます。MySQL データベースには、セグメントがクラスタ内でどのように作成、破棄、複製されるかを管理するルールテーブルも含まれています。

3.4.1 Rules

ヒストリカルセグメントをどのようにクラスタからロード、ドロップするかのルールが与えられる。ルールは、セグメントを異なるヒストリカルノード層にどのように割り当てるか、また、各層にセグメントのレプリケートが何個存在するかを示します。ルールはまた、セグメントをクラスタから完全に削除するタイミングを示すこともできます。ルールは通常、一定期間設定されます。例えば、ユーザーはルールを使用して、最新の1ヶ月分のセグメントを「ホット」クラスタにロードし、最新の1年分のセグメントを「コールド」クラスタにロードし、古いセグメントはすべて削除することができます。

コーディネータノードは、MySQLデータベースのルールテーブルからルールのセットをロードします。ルールは、特定のデータソースに固有のものであってもよいし、デフォルトのルールセットが設定されていてもよい。コーディネータ・ノードは、利用可能なすべてのセグメントを循環させ、各セグメントをそのセグメントに適用される最初のルールと一致させます。

3.4.2 LOAD BALANCING

典型的な本番環境では、クエリは数十、あるいは数百のセグメントにヒットすることがよくあります。各ヒストリカルノードのリソースは限られているため、クラスタの負荷が過度に不均衡にならないようにセグメントをクラスタ間で分散させる必要があります。最適な負荷分散を決定するには、クエリのパターンと速度に関する知識が必要です。一般的に、クエリは単一のデータソースの連続する時間間隔にまたがる最近のセグメントをカバーしています。平均的には、より小さなセグメントにアクセスするクエリの方が高速です。

これらのクエリパターンは、最近(直近1周間など)のヒストリカルセグメントをより高い割合で複製し、異なるヒストリカルノードにある時間的に近い大きなセグメントを分散させ、異なるデータソースからセグメントを同じ場所に配置するということを示している。クラスタ間でセグメントを最適に分配し、バランスをとるために、セグメントデータのソース、再帰性、サイズを考慮に入れたコストベースの最適化手順を開発した。アルゴリズムの正確な詳細は本論文の範囲を超えており、将来の文献で議論される可能性がある。

3.4.3 REPLICATION

コーディネータノードは、異なるヒストリカルノードに同じセグメントのコピーをロードするように指示することができます。 ヒストリカルコンピュートクラスタの各層のレプリカの数は、完全に設定可能です。高いレベルの耐障害性を必要とするセットアップでは、レプリカの数を多く設定することができます。複製されたセグメントはオリジナルと同じように扱われ、同じ負荷分散アルゴリズムに従います。セグメントを複製することで、単一の履歴ノードの障害はDruidクラスタ内で透過的になります。私たちはこの特性をソフトウェアのアップグレードに利用しています。シームレスにヒストリカルノードをオフラインにして、それを更新して、それを復活させ、クラスタ内のすべてのヒストリカルノードに対してこのプロセスを繰り返すことができます。過去2年間、ソフトウェア・アップグレードのためにDruidクラスタでダウンタイムを取ったことは一度もありません。

3.4.4 AVAILABILITY

Druid コーディネータノードは、Zookeeper と MySQL を外部依存関係として持っています。コーディネータ ノードは Zookeeper に依存して、クラスタ内にどのような履歴ノードが既に存在しているかを判断します。Zookeeper が利用できなくなると、コーディネーターはセグメントの割り当て、バランス、およびドロップの指示を送信できなくなります。ただし、これらの操作はデータの可用性には全く影響しません。

MySQLとZookeeperの障害に対応するための設計原理は同じ。連携を担当する外部依存関係が失敗した場合、クラスタは現状を維持します。DruidはMySQLを使用して、運用管理情報と、クラスタ内に存在すべきセグメントに関するセグメントメタデータ情報を保存します。MySQLがダウンした場合、この情報はコーディネータノードから利用できなくなります。しかし、これはデータ自体が利用できないという意味ではありません。コーディネータノードがMySQLと通信できなくなると、新しいセグメントの割り当てを停止し、古くなったセグメントを削除します。ブローカーノード、ヒストリカルノード、およびリアルタイムノードは、MySQL が停止している間も照会可能です。

4. STORAGE FORMAT

Druidのデータテーブル(データソースと呼ばれる)は、タイムスタンプの付いたイベントの集合体であり、セグメントのセットに分割されており、各セグメントは通常500万~1,000万行になります。正式には、セグメントをある期間に渡るデータの行の集合体と定義します。セグメントはDruidの基本的なストレージユニットを表し、レプリケーションと配布はセグメントレベルで行われます。

Druidは、データ分配ポリシー、データ保持ポリシー、および第一レベルのクエリ・プルーニングを簡素化する方法として、常にタイムスタンプ・カラムを必要としています。Druidは、データソースを明確に定義された時間間隔(一般的には1時間または1日)に分割し、さらに他の列の値を分割して、希望のセグメ ントサイズを達成することができます。セグメントを分割する時間の粒度は、データ量と時間範囲の関数です。1年に渡るタイムスタンプを持つデータセットは1日単位で、1日に渡るタイムスタンプを持つデータセットは1時間単位で分割するのが良いでしょう。

セグメントは、データソースの識別子、データの時間間隔、新しいセグメントが作成されるたびに増加するバージョン文字列によって一意に定まる。バージョン文字列は、セグメントデータの新しさを示します。バージョンが古いセグメントよりも、バージョンが後のセグメントの方が(ある時間範囲内で)新しいデータを表示します。読み込み操作は常に、特定の時間範囲のデータに、その時間範囲の最新のバージョン識別子を持つセグメントからアクセスします。

Druidのセグメントは、カラム志向で保存されます。Druidがイベントストリームの集約に最適であることを考えると(Druidに入るすべてのデータはタイムスタンプを持っていなければなりません)、集約情報を行ではなく列として保存することの利点は十分に文書化されています[1]。列ストレージでは、必要なものだけを実際にロードしてスキャンするため、CPUの使用効率が向上します。行指向のデータストアでは、行に関連付けられたすべての列が集約の一部としてスキャンされなければなりません。追加のスキャン時間は、以下のような重大なパフォーマンスの低下をもたらす可能性があります。

Druidには、様々なデータ形式を表現するための複数のカラムタイプがあります。カラムの種類に応じて、メモリやディスクにカラムを格納するコストを削減するために、さまざまな圧縮方法が使用されます。表1の例では、ページ、ユーザー、性別、および都市の列は文字列のみを含んでいます。文字列を直接格納することは不必要にコストがかかるため、文字列の列は代わりに辞書エンコードすることができます。辞書エンコーディングはデータを圧縮するための一般的な方法であり、PowerDrill [17]のような他のデータストアでも使用されています。

このマッピングにより、ページ列を整数の配列として表現することができ、配列のインデックスは元のデータセットの行に対応します。ページ列については、以下のように一意なページを表現することができます。結果として得られる整数配列は,それ自体が圧縮方式に非常に適しています.エンコーディングの上にある一般的な圧縮アルゴリズムは,カラムストアでは非常に一般的です.Druid は LZF [24] 圧縮アルゴリズムを使用しています。同様の圧縮方法を数値列にも適用することができます。例えば、表1の追加された文字と削除された文字の列は、個々の配列として表現することもできます。この場合、辞書表現とは対照的に生の値を圧縮します。

4.1 Indices for Filtering Data

多くの現実のOLAPワークフローでは、クエリは、ディメンジョン指定のセットが満たされているメトリクスセットの集約された結果に対して発行されます。例えば、”サンフランシスコの男性ユーザーがウィキペディアの編集を行った数は?” このクエリは、ディメンション値のブール式に基づいて、表1のWikipediaデータセットをフィルタリングする。多くの実世界のデータセットでは、ディメンジョンの列には文字列が含まれ、メトリックの列には数値が含まれています。Druidは、特定のクエリ・フィルタに関連する行のみがスキャンされるように、文字列列用の追加のルックアップ・インデックスを作成します。

表1のページ欄を考えてみましょう。表1の各ユニークなページについて、特定のページがどの表の行で見られるかを示す何らかの表現を形成することができます。この情報は,配列のインデックスが行を表すバイナリ配列に格納することができます.特定のページが特定の行にある場合、その配列インデックスは1とマークされます。(Table1 の例で)ジャスティン・ビーバーは0行目と1行目に見られる。 この列の値と行のインデックスの対応付けは、転置インデックスを形成する[39]。Justin BieberとKe$haのどちらの行が含まれているかを知るには、2つの配列をORすることができます。

大きなビットマップに対してブール演算を実行するアプローチ セットは検索エンジンでよく使われています。OLAPのためのビットマップインデックス ワークロードについては、[32]で詳しく説明しています。ビットマップ圧縮アルゴリズムは、よく定義された研究分野であり[2, 44, 42]、多くの場合、実行長エンコーディングを利用します。DruidはConciseアルゴリズム[10]を使用することを選択しました。図7は、整数配列を使用した場合とConcise圧縮を使用した場合のバイト数を示しています。結果はcc2.8xlargeシステム上で生成され、シングルスレッド、2Gヒープ、512mのヤングジェン、各実行間に強制GCを使用しました。データセットは、Twitter garden hose [41]のデータストリームから収集した1日分のデータです。データセットには、2,272,295行、様々なカーディナリティの12次元が含まれています。追加の比較として、我々はまた、圧縮を最大化するためにデータセットの行をリゾートしました

図7:Integer配列のサイズとConcise集合のサイズの比較

ソートされていないケースでは、Conciseの合計サイズは53,451,144バイト、整数配列の合計サイズは127,248,520バイトでした。全体として、Concise圧縮セットは整数配列よりも約42%小さくなっています。ソートされたケースでは、Concise圧縮の合計サイズは43,832,884バイト、整数配列の合計サイズは127,248,520バイトでした。興味深いのは、ソート後、グローバル圧縮は最小限にしか増加しなかったことです。

4.2 Storage Engine

Druidのパーシステンスコンポーネントでは、Dynamo [12]と同様に、異なるストレージエンジンを接続することができます。これらのストレージエンジンは、JVMヒープのような完全なインメモリ構造でデータを保存したり、メモリマップされた構造でデータを保存したりします。ストレージエンジンをスワップする機能により、特定のアプリケーションの仕様に応じて Druid を構成することができます。インメモリ・ストレージ・エンジンは、メモリ・マップド・ストレージ・エンジンよりも運用上高価ですが、パフォーマン スが重要な場合には、より良い代替手段となります。デフォルトでは、メモリマップされたストレージエンジンが使用されます。

メモリマップド・ストレージ・エンジンを使用する場合、Druid はセグメントをメモリの中に入れたり出したりする際にオペレーテ ィング・システムに依存します。セグメントはメモリにロードされていないとスキャンできないことを考えると、メモリマップされたストレージエンジンを使用することで、最近のセグメントはメモリに保持され、クエリーされなかったセグメントはページアウトされることになります。メモリマップ型ストレージエンジンを使用した場合の主な欠点は、クエリの実行時に、あるノードの容量を超えて多くのセグメントをメモリにページする必要がある場合です。この場合、クエリのパフォーマンスは、セグメントをメモリ内でページングしたり、メモリ外でページングしたりするためのコストに悩まされます。

5. QUERY API

Druidは独自のクエリ言語を持ち、POSTリクエストとしてクエリを受け付けます。ブローカー、ヒストリカル、リアルタイムノードはすべて同じクエリAPIを共有しています。

POSTリクエストの本文は、様々なクエリパラメータを指定したkey-valueペアを含むJSONオブジェクトです。典型的なクエリには、データソース名、結果データの粒度、関心のある時間範囲、リクエストのタイプ、集約するメトリクスが含まれます。結果は、期間にわたって集約されたメトリクスを含む JSON オブジェクトにもなります。

ほとんどの問い合わせタイプはフィルタセットもサポートしています。フィルタ・セットは、ディメンジョン名と値のペアのブール演算式です。ディメンジョンと値の任意の数および組み合わせを指定できます。フィルタ・セットが提供されると、フィルタ・セットに関連するデータのサブセットのみがスキャンされます。複雑な入れ子になったフィルタセットを扱うことができるため、Druidは任意の深さのデータを掘り下げることができます。

正確なクエリ構文は、クエリの種類と要求された情報によって異なります。1週間のデータを対象としたサンプルカウントクエリは以下のようになります。

{
 "queryType" : "timeseries",
 "dataSource" : "wikipedia",
 "intervals" : "2013-01-01/2013-01-08",
 "filter" : {
              "type" : "selector",
              "dimension" : "page",
              "value" : "Ke$ha"
            },
 "granularity" : "day",
 "aggregations" : [{"type":"count", "name":"rows"}] 
} 

上記のクエリは、2013-01-01から2013-01-08までのWikipediaデータソース内の行数のカウントを返しますが、”page “ディメンジョンの値が “Ke$ha “に等しい行のみにフィルタリングされています。結果は日ごとにバケット化され、以下の形式のJSON配列となる。

 [ {
     "timestamp": "2012-01-01T00:00:00.000Z",
     "result": {"rows":393298}
 },
 {
     "timestamp": "2012-01-02T00:00:00.000Z",
     "result": {"rows":382932}
 },
 ...
 {
     "timestamp": "2012-01-07T00:00:00.000Z",
     "result": {"rows": 1337}
 } ] 

Druidは、浮動小数点型や整数型の和、最小値、最大値、そしてカーディナリティ推定や近似分位推定などの複雑な集計を含む多くのタイプの集計をサポートしています。集約の結果は,他の集計を形成するために数式で組み合わせることができる.クエリAPIについて完全に説明することはこの論文の範囲を超えていますが、より多くの情報はオンラインで見ることができます。

この記事を書いている時点では、Druid用の結合クエリはまだ実装されていません。これは、エンジニアリングリソースの割り当てと 技術的なメリットに基づいて決定されるよりも、ケースの決定に基づいて決定されます。確かに、Druidのストレージ形式では、結合の実装が可能になります(ディメンジョンとして含まれる列の忠実度が損なわれることはありません)し、その実装は数ヶ月に一度の会話になっています。これまでのところ、私たちの組織にとって実装コストは投資に見合うものではないという選択をしてきました。この決定の理由は、一般的には2つあります。

  1. ジョインクエリのスケーリングは、私たちの専門的な経験では、分散データベースでの作業のボトルネックとなっています。
  2. 機能の増分は、同時進行性の高い、結合負荷の高いワークロードを管理する際に予想される問題よりも価値が低いと考えられます。

結合クエリとは、基本的には、2つ以上の データに基づいて、共有されたキーのセットに基づいて結合クエリを実行します。私たちが知っている結合クエリの主な高レベル戦略は、ハッシュベースの戦略か ソートされたマージ戦略です。ハッシュベースの戦略では、1つのデータセット以外のすべてのデータセットがハッシュテーブルのように見えるものとして利用可能であることが必要で、検索操作は「プライマリ」ストリームのすべての行に対してこのハッシュテーブルに対して実行されます。ソートマージ戦略は、各ストリームが結合キーでソートされていることを前提としているため、ストリームのインクリメンタルな結合が可能です。しかし、これらの戦略のそれぞれは、ソートされた順番かハッシュテーブル形式のいずれかで、いくつかのストリームを実体化する必要があります。

結合のすべての側面が非常に大きなテーブル(10億レコード以上)である場合、事前結合ストリームを実現するには、複雑な分散メモリ管理が必要になります。メモリ管理の複雑さは、高度に同時進行するマルチテナントのワークロードを対象としているという事実によってのみ増幅されます。これは、私たちが知る限りでは、学術的に活発な研究課題であり、スケーラブルな方法で解決することに貢献したいと考えています。

6. PERFORMANCE

Druidはいくつかの組織で運用されていますが、そのパフォーマンスを実証するために、2014年初頭の時点でMetamarketsで運用されているメインの運用クラスタの実世界の数値を共有することにしました。他のデータベースとの比較のために、TPC-Hデータ上での合成ワークロードの結果も掲載しています。

6.1 Query Performance in Production

Druidクエリのパフォーマンスは、発行されるクエリによって大きく変化します。例えば、与えられたメトリックに基づいて高カーディナリティディメンジョンの値をソートすることは、時間範囲内の単純なカウントよりもはるかに高価です。実運用のDruidクラスタにおけるクエリの平均レイテンシを示すために、表2に記載されているように、最もクエリされたデータソースの中から8つを選択しました。

Table2: データソースの特徴

クエリの約30%は異なるタイプのメトリクスやフィルタを含む標準的な集約であり、クエリの60%は集約された1つ以上のディメンション上でグループごとに順序付けられており、クエリの10%は検索クエリとメタデータ検索クエリである。アグリゲートクエリでスキャンされるカラム数は、ほぼ指数関数的な分布に従います。単一のカラムを含むクエリは非常に頻繁に発生し、すべてのカラムを含むクエリは非常にまれです。

結果についてのいくつかの注意事項。

  • 結果は、私たちのプロダクション・クラスタの「ホット」ティアからのものです。この結果は、当社の生産クラスタ内の「ホット」層での結果です。階層には約 50 のデータソースがあり、数百人のユーザーがクエリを発行していました。
  • ホット」な ティアと約10TBのセグメントをロードしています。合わせて。このティアには約500億のDruidの列があります。すべての データソースは表示されません。
  • ホットティアはインテル® Xeon® E5-2670 プロセッサーを使用しており、以下の構成になっています。1302個の処理スレッドと672個の総コア(ハイパースレッド)で構成されています。
  • メモリマップされたストレージエンジンが使用されました(マシンは にデータをロードするのではなく、メモリマップするように設定されています。Javaヒープ)。

クエリのレイテンシを図 8 に、1 分間あたりのクエリを図 9 に示す。さまざまなデータソースすべてにおいて、クエリの平均レイテンシは約550ミリ秒で、90%のクエリが1秒未満、95%のクエリが2秒未満、99%のクエリが10秒未満で返されています。2月19日に観測されたように、Memcachedインスタンスのネットワークの問題が、最大のデータソースの1つであるクエリの負荷が非常に高くなることで複合的に発生したように、レイテンシが急増することもあります。

図8: Query latencies of production data sources.
図9: Queries per minute of production data sources.

6.2 Query Benchmarks on TPC-H Data

また、TPC-Hのデータでドルイドのベンチマークを紹介しています。ほとんどのTPC-HクエリはDruidに直接適用されないため、クエリのパフォーマンスを示すために、Druidのワークロードに典型的なクエリを選択しました。比較として、MyISAMエンジンを使用してMySQLを使用した同じクエリの結果も提供しています(我々の実験ではInnoDBの方が遅かった)。

ベンチマークの対象として MySQL を選択したのは、その普遍的な人気の高さからです。他のオープンソースのカラムストアを選択しないことにしたのは、最適なパフォーマンスを得るために正しくチューニングできる自信がなかったからです。

Druidのセットアップでは、ヒストリカルノードにAmazon EC2のm3.2xlargeインスタンスタイプ(Intel® Xeon® E5-2680 v2 @ 2.80GHz)を、ブローカーノードにはc3.2xlargeインスタンス(Intel® Xeon® E5-2670 v2 @ 2.50GHz)を使用しました。MySQL のセットアップは、同じ m3.2xlarge インスタンスタイプで動作する Amazon RDS インスタンスでした。

1GBのTPC-Hデータセットの結果を図10に、100GBのデータセットの結果を図11に示す。Druidのスキャンレートを、与えられた時間間隔でのselect count(*)等価クエリの場合は53,539,211行/秒/コア、select sum(float)型クエリの場合は36,246,530行/秒/コアでベンチマークしました。

最後に、TPC-H 100GBのデータセットを用いて、増加するデータ量に対応するためにDruidをスケーリングした結果を紹介します。図12に示すように、コア数を8コアから48コアに増やした場合、すべてのタイプのクエリが線形スケーリングを達成するわけではありませんが、よりシンプルなアグリゲーションクエリでは線形スケーリングが達成されていることがわかります。

並列コンピューティングシステムの高速化は、システムの逐次処理に必要な時間によって制限されることが多い。この場合、ブローカレベルで相当量の作業を必要とするクエリは、同様に並列化されません。

6.3 Data Ingestion Performance

Druidのデータインジェストのレイテンシを示すために、ディメンション、メトリクス、イベントボリュームが異なる複数の本番用データソースを選択しました。本番環境でのインジェスト・セットアップは、6 ノード、合計 360GB の RAM、96 コア(12 x Intel®Xeon®E5-2670)で構成されています。

このセットアップでは、複数の他のデータソースがインジェストされ、他の多くのDruid関連のインジェストタスクがマシン上で同時に実行されていたことに注意してください。

Druidのデータインジェストのレイテンシは、インジェストされるデータセットの複雑さに大きく依存します。データの複雑さは、各イベントに含まれるディメンジョンの数、各イベントに含まれるメトリクスの数、およびそれらのメトリクスで実行したい集計の種類によって決まります。最も基本的なデータセット(タイムスタンプ列のみを持つデータセット)では、私たちのセットアップでは800,000イベント/秒/コアの速度でデータをインジェストできますが、これはイベントをデシリアライズする速度の測定に過ぎません。現実のデータセットは決してこれほど単純ではありません。表3は、データソースの選択とその特性を示しています。

表3の記述に基づいて、レイテンシは大きく変化し、インジェストレイテンシは必ずしもディメンションとメトリクスの数の要因ではないことがわかります。単純なデータ・セットでは、データ・プロデューサーがデータを配信していたレートであったため、いくつかの低いレイテンシが見られます。その結果を図13に示す。

スループットを、リアルタイムノードがインジェストし、クエリ可能なイベントの数と定義します。あまりにも多くのイベントがリアルタイムノードに送られた場合、リアルタイムノードがそれらのイベントを受け入れる容量があるまで、それらのイベントはブロックされます。本番環境で測定したピークインジェストレイテンシは、Amazon cc2.8xlarge インスタンスを実行している30のディメンションと19のメトリクスを持つデータソースで22914.43イベント/秒/コアでした。

我々が提示した待ち時間の測定値は、インタラクティブ性の問題を解決するのに十分なものです。我々は、待ち時間の変動が少ない方が良いと考えています。ハードウェアを追加することでレイテンシを減らすことはまだ可能ですが、インフラのコストがまだ考慮されているため、そうすることを選択していません。

7. DRUID IN PRODUCTION

ここ数年の間に、私たちはドルイドでの生産ワークロードの処理について多大な知識を得ており、いくつかの興味深い観察を行ってきました。

QueryPattaen

Druidは、データの探索やデータのレポート生成によく使われます。探索的なユースケースでは、1人のユーザーが発行するクエリの数は、報告的なユースケースよりもはるかに多くなります。探索的なクエリでは、結果を絞り込むために同じ時間範囲のフィルタを段階的に追加することがよくあります。ユーザーは、最近のデータの短い時間間隔を探索する傾向があります。レポートを生成するユースケースでは、ユーザーはより長いデータ間隔でクエリを行いますが、これらのクエリは一般的に数が少なく、事前に決定されています。

Multitenancy

負荷の高い同時実行クエリは、マルチテナントでは問題となりえる。大規模なデータソースに対するクエリは、クラスタ内のすべてのヒストリカルノードにヒットしてしまい、すべてのクラスタリソースを消費してしまう可能性があります。このような場合、より小さくて安価なクエリは実行がブロックされてしまう可能性があります。このような問題に対処するために、クエリの優先順位付けを導入しました。各ヒストリカルノードは、スキャンする必要のあるセグメントに優先順位をつけることができます。適切なクエリのプランニングは、本番のワークロードにとって非常に重要です。ありがたいことに、かなりの量のデータに対するクエリは、レポーティングのユースケースのためのものである傾向があり、優先順位を下げることができます。ユーザーは、このユースケースでは、データを探索するときと同じレベルのインタラクティブ性を期待していません。

Node failures.

シングルノードの障害は分散環境では一般的ですが 一度に多くのノードが故障した場合はそうではありません。ヒストリカルノードが完全に 失敗して回復しない場合は、セグメントを再割り当てする必要があります。つまり、このデータをロードするためには過剰なクラスタ容量が必要です。いつでも追加できるキャパシティの量は、クラスタの運用コストに貢献します。私たちの経験から、2つ以上のノードが一度に完全に故障することは非常に稀であり、したがって、私たちは2つの履歴ノードからのデータを完全に再割り当てできるだけの十分な容量をクラスタに残しています。

Data Center Outages.

完全なクラスタ停止はありえるが、非常にまれです。Druidが単一のデータセンターにしか導入されていない場合、データセンター全体で障害が発生する可能性があります。そのような場合、新しいマシンをプロビジョニングする必要があります。ディープストレージがまだ利用可能である限り、過去のノードがディープストレージからすべてのセグメントを再ダウンロードするだけで済むため、クラスタのリカバリ時間はネットワークに依存します。過去にもこのような障害を経験しており、Amazon AWSのエコシステムでは数テラバイトのデータに対して数時間のリカバリータイムが発生していました。

7.1 Operational Monitoring

大規模な分散型クラスタを運用するためには、適切なモニタリングが重要です。各Druidノードは、定期的に一連の運用メトリクスを出力するように設計されています。これらのメトリクスには、CPU使用率、使用可能なメモリ、ディスク容量などのシステムレベルのデータ、ガベージコレクション時間、ヒープ使用率などのJVM統計、セグメントスキャン時間、キャッシュヒット率、データインジェストレイテンシなどのノード固有のメトリクスが含まれます。Druidはまた、クエリごとのメトリクスを出力します。

本番の Druid クラスタからメトリクスを放出し、それをメトリクス Druidクラスタにロードします。本番クラスタの性能と安定性を探るために、メトリクスDruidクラスタを使用しています。この専用のメトリクス・クラスタを使用することで、クエリ速度の段階的な低下、最適にチューニングされていないハードウェア、その他の様々なシステム・ボトルネックなど、多数の本番環境の問題を発見することができました。また、本番環境でどのようなクエリがなげられているのか、どのようなデータの側面にユーザが興味を持っているのかを分析するために、メトリクス・クラスタを使用しています。

7.2 Pairing Druid with a Stream Processor

現在のところ、Druidは完全に非正規化されたデータストリームしか理解できません。本番環境で完全なビジネスロジックを提供するために、DruidはApache Storm [27]のようなストリームプロセッサとペアにすることができます。

ストーム トポロジーは、データ ストリームからイベントを消費し、「オンタイム」のイベントのみを保持し、関連するビジネス ロジックを適用します。これには、id から名前のルックアップなどの単純な変換から、複数ストリームの結合などの複雑な操作までが含まれます。ストームのトポロジーは、処理されたイベントストリームをリアルタイムでドルイドに転送します。ストームはストリーミングデータの処理作業を処理し、Druidはリアルタイムデータとヒストリカルデータの両方のクエリへの応答に使用されます。

7.3 Multiple Data Center Distribution

大規模な生産停止は、単一ノードだけでなく、データセンター全体にも影響を及ぼす可能性があります。Druid コーディネーターノードのティア構成では、セグメントを複数のティアにまたがってレプリケートすることができます。そのため、セグメントは複数のデータセンターにあるヒストリカルノード間で正確に複製することができます。同様に、クエリの優先順位を異なる階層に割り当てることができます。あるデータセンターのノードをプライマリクラスタとして動作させ(すべてのクエリを受信)、別のデータセンターに冗長クラスタを設置することも可能です。このようなセットアップは、1つのデータセンターがユーザーにかなり近い場所にある場合に必要になるかもしれません。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です