詩と創作・思索のひろば

ドキドキギュンギュンダイアリーです!!!

Fork me on GitHub

Apache BeamでSlackのエクスポートデータをBigQueryに投入する

同じことをEmbulkでやったネタはこちらです:

SlackのログをBigQueryにインポートする(手動) - 詩と創作・思索のひろば

Slackワークスペースの管理者であれば、ワークスペースのデータをエクスポートできる。このデータをBigQueryに投入したら面白いかもしれないので、Apache Beamの素振りがてらやってみた。

Apache Beamはデータ処理のパイプラインを記述するためのフレームワークで、Google Cloud DataflowApache Sparkなどのランタイムに対応しているとのこと(Copilotくん調べ)。SDKもいつかの言語に対応していて、おそらく機能が豊富な順にJava、Python、Go、TypeScriptなどがあるみたい。TypeScriptが使えると個人的には書き味がいちばんいいのだけど、使ってみたところあまり機能が揃っていないようだったので、結局Pythonに落ち着いた。

GitHub - motemen/example-apache-beam-slack-export-to-bigquery

使い方はREADMEにある通り。もろもろのセットアップをしたうえで以下のようにして実行する。

python main.py --project=<project> --output_dataset=<dataset> --temp_location=gs://... --input='./*.zip' --extract_location='./tmp'

わりと素朴に、zipを展開→格納されているJSONファイルを読み込んで構造体を作る→BigQueryに投入、という流れをPythonで実装するだけだったけど、いくつか調べたり試したりしたことがあったので書いておく。

抽象的なファイルの読み書き

Beamでは、ローカルファイルシステムやGoogle Cloud Storageのファイルを透過的に扱えるしくみがある。apache_beam.io.fileioというやつで、これを利用して MatchFiles(input) | ReadMatches | ... のようなパイプラインを書くと、inputに応じたファイルを読み込んでくれて、かつinputには ./*.zip とか gs://bucket/*.zip のようにローカルでもGCSのようなリモートのファイルでも指定できるようになっている。

ReadMatchesからの出力はio.fileio.ReadableFileとして与えられて、これを起点にファイルの内容を読み込める。展開した一時的なファイルの書き込みも同様にapache_beam.io.filesystemsで透過的に実装できた。

としてみたものの、今回のタスクにおいてはローカルに十分展開できるサイズになる(と思う)のでワンショットの実行ならローカルファイルで十分だった……。

ストリームの分割

Slackから提供されるzipには、チャンネル内のメッセージとメタ情報とが含まれている。メッセージはmessagesテーブルに、メタ情報のうちユーザ情報はusersテーブルに、と振り分けることを考えると、これらは別々のストリームに分割したい。これはpvalue.TaggedOutputで実現できる(Beam Programming Guide » Additional Outputs)。これは気持ちがわかってしまえばストレートに読み書きできる。Beam Programming Guideはかなり参考になる資料だった。

BigQueryへの書き込み

BigQueryへの書き込みの実装自体は難しくないのだけど、でかいデータを投入しようとすると異様に時間がかかってしまっていた。試していたところ、additional_bq_parametersでtimePartitioningを設定し、パーティション分割テーブルとしてやると十分速くなった……と思う。まあどのみちパーティションにはするのでこうしといて問題はなさそう。

はてなで一緒に働きませんか?