GCPのインスタンスで、pysparkからGCSに繋いでみた
お仕事で大量のデータをピボットテーブルのように集計処理する必要がでてきまして、色々触ってみたので、まとめてみます。
やることはこちら。
- GCP(Google Cloud Platform)でgcloudとbdutilの設定を行う
- GCS(Google Cloud Storage)にCSVを置く
- pysparkをダウンロードする
- 初期設定をいくつかし、プログラムを書く
- 実行
GCPでgcloudとbdutilの設定を行う
まずgcould環境の整備をします。GCPのアカウントは取得済という前提で進みます。それすらできてない人はググッてここまでがんばってください。gcloudをローカルPCでセットアップしましょう。
$ curl https://sdk.cloud.google.com | bash
次に、bdutilもローカルPCにインストールします。
展開したら、そのディレクトリに行き、bdutilコマンドを使って、クラスタを作成します。
./bdutil --bucket masuda-cluster -n 1 -P masuda-cluster -z asia-east1-a deploy
yes/no聞かれるので、yesと答えましょう。ちなみに、引数で指定したdeployをdeleteに変えるとクラスタを削除してくれます。
できたクラスタはこんな感じ。
インスタンスにSSH後、以下をやっていきます。
まずGCSに以下のようなCSVを置いておきましょう。
aaa,111,1212,55555 vvv,121,45555,41441 aaa,144,1414141,515111 ddd,155,51333,2254555 aaa,4411,11111,66777
アップロードをしておきます。 たとえば、/masuda-cluster/var/tmp/user.csvとかでもよいでしょう。
次に、hadoopコマンド経由でgs://が使えることを確認します。bdutils経由でVM作る理由はこれですね。hadoop含めて環境整備してくれるので、非常に楽です。
$ hadoop fs -ls gs://masuda-cluster 15/10/19 09:43:34 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.4.2-hadoop1 15/10/19 09:43:38 WARN gcsio.GoogleCloudStorage: Repairing batch of 1 missing directories. 15/10/19 09:43:39 WARN gcsio.GoogleCloudStorage: Successfully repaired 1/1 implicit directories. Found 2 items drwx------ - masudak masudak 0 2015-10-19 09:12 /hadoop drwx------ - masudak masudak 0 2015-10-19 09:43 /bdutil-staging
そして、hadoopのバージョン確認します。
$ hadoop version Hadoop 1.2.1 Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1503152 Compiled by mattf on Mon Jul 22 15:23:09 PDT 2013 From source with checksum 6923c86528809c4e7e6f493b6b413a9a This command was run using /home/hadoop/hadoop-install/hadoop-core-1.2.1.jar
バージョン確認できたら、そのバージョンに合わせたsparkをインストールします。 ログもかなり多いので、WARNに絞って、ログを減らしておきましょう。
$ wget http://ftp.riken.jp/net/apache/spark/spark-1.5.1/spark-1.5.1-bin-hadoop1.tgz $ tar xvzf spark-1.5.1-bin-hadoop1.tgz $ cd spark-1.5.1-bin-hadoop1 $ cp conf/log4j.properties.template conf/log4j.properties $ vim conf/log4j.properties #log4j.rootCategory=INFO, console log4j.rootCategory=WARN, console # WARNにする
また、sparkのconfにHADOOP_CONF_DIRの設定を追加します。これを忘れると動かないので気をつけましょう。
$ sudo su - hadoop $ vi /home/hadoop/hadoop-install/conf/spark-env.sh HADOOP_CONF_DIR=/home/hadoop/hadoop-install/conf
次に、pandasをインストールします。
$ pip install python-dev $ pip install pandas
そして、解析用サンプルスクリプトを作成します。
$ vim /var/tmp/sample.py # -*- coding: utf-8 -*- import pandas as pd pd.options.display.max_rows = 10 header = ['index1','index2','index3', 'index4'] pdf = pd.read_csv('gs://var/tmp/user.csv', header=None, names=header) pdf.show() print(pdf.pivot_table(index=['index1', 'index2'], \ columns='index3', \ values='index4'))
そして実行します。このとき、driver-class-pathでjarを指定しないと、gs://が使えないので要注意。
$ cd spark-1.5.1-bin-hadoop1 $ ./bin/spark-submit --driver-class-path /home/hadoop/hadoop-install/lib/gcs-connector-1.4.2-hadoop1.jar /var/tmp/sample.py index3 1212 11111 45555 51333 1414141 index1 index2 aaa 111 55555 NaN NaN NaN NaN 144 NaN NaN NaN NaN 515111 4411 NaN 66777 NaN NaN NaN ddd 155 NaN NaN NaN 2254555 NaN vvv 121 NaN NaN 41441 NaN NaN
ということで、無事にピボットテーブルが動きました。見事ですね。
終わりに
僕がビッグデータを使って解析を始めようと思った頃、そのセットアップに何日もかけて本当に苦労した思い出があります。
今回見せたようにちょっとお金をかけただけで、すぐセットアップできる時代となりました。すごいですね。
Excelとかですとデータ量がちょっと増えただけでもフリーズしちゃいますし、この環境であれば数億レコードレベルでもそんな苦労しないんじゃないかなーと(期待レベルですが)。
今回pandasを使いましたが、pysparkのメソッドを使っていけば分散処理もできるようなので、クラスタもより有効活用ができるはずです。
そういった意味で今回示した内容はそこまで厳密にイケてるものではないですが、サーバのメモリに載るレベルであればまぁそこまで問題にならないでしょう。
多少試行錯誤は必要ですが、それを乗り越えればかなり快適な環境がありますので、是非試してみてください!ではでは!