同じことをEmbulkでやったネタはこちらです:
SlackのログをBigQueryにインポートする(手動) - 詩と創作・思索のひろば
Slackワークスペースの管理者であれば、ワークスペースのデータをエクスポートできる。このデータをBigQueryに投入したら面白いかもしれないので、Apache Beamの素振りがてらやってみた。
Apache Beamはデータ処理のパイプラインを記述するためのフレームワークで、Google Cloud DataflowやApache Sparkなどのランタイムに対応しているとのこと(Copilotくん調べ)。SDKもいつかの言語に対応していて、おそらく機能が豊富な順にJava、Python、Go、TypeScriptなどがあるみたい。TypeScriptが使えると個人的には書き味がいちばんいいのだけど、使ってみたところあまり機能が揃っていないようだったので、結局Pythonに落ち着いた。
GitHub - motemen/example-apache-beam-slack-export-to-bigquery
使い方はREADMEにある通り。もろもろのセットアップをしたうえで以下のようにして実行する。
python main.py --project=<project> --output_dataset=<dataset> --temp_location=gs://... --input='./*.zip' --extract_location='./tmp'
わりと素朴に、zipを展開→格納されているJSONファイルを読み込んで構造体を作る→BigQueryに投入、という流れをPythonで実装するだけだったけど、いくつか調べたり試したりしたことがあったので書いておく。
抽象的なファイルの読み書き
Beamでは、ローカルファイルシステムやGoogle Cloud Storageのファイルを透過的に扱えるしくみがある。apache_beam.io.fileioというやつで、これを利用して MatchFiles(input) | ReadMatches | ...
のようなパイプラインを書くと、inputに応じたファイルを読み込んでくれて、かつinputには ./*.zip
とか gs://bucket/*.zip
のようにローカルでもGCSのようなリモートのファイルでも指定できるようになっている。
ReadMatchesからの出力はio.fileio.ReadableFileとして与えられて、これを起点にファイルの内容を読み込める。展開した一時的なファイルの書き込みも同様にapache_beam.io.filesystemsで透過的に実装できた。
としてみたものの、今回のタスクにおいてはローカルに十分展開できるサイズになる(と思う)のでワンショットの実行ならローカルファイルで十分だった……。
ストリームの分割
Slackから提供されるzipには、チャンネル内のメッセージとメタ情報とが含まれている。メッセージはmessagesテーブルに、メタ情報のうちユーザ情報はusersテーブルに、と振り分けることを考えると、これらは別々のストリームに分割したい。これはpvalue.TaggedOutputで実現できる(Beam Programming Guide » Additional Outputs)。これは気持ちがわかってしまえばストレートに読み書きできる。Beam Programming Guideはかなり参考になる資料だった。
BigQueryへの書き込み
BigQueryへの書き込みの実装自体は難しくないのだけど、でかいデータを投入しようとすると異様に時間がかかってしまっていた。試していたところ、additional_bq_parametersでtimePartitioningを設定し、パーティション分割テーブルとしてやると十分速くなった……と思う。まあどのみちパーティションにはするのでこうしといて問題はなさそう。