カイワレの大冒険 Third

技術的なことや他愛もないことをたまに書いてます

GCPのインスタンスで、pysparkからGCSに繋いでみた

お仕事で大量のデータをピボットテーブルのように集計処理する必要がでてきまして、色々触ってみたので、まとめてみます。

やることはこちら。

  • GCP(Google Cloud Platform)でgcloudとbdutilの設定を行う
  • GCS(Google Cloud Storage)にCSVを置く
  • pysparkをダウンロードする
  • 初期設定をいくつかし、プログラムを書く
  • 実行

GCPでgcloudとbdutilの設定を行う

まずgcould環境の整備をします。GCPのアカウントは取得済という前提で進みます。それすらできてない人はググッてここまでがんばってください。gcloudをローカルPCでセットアップしましょう。

$ curl https://sdk.cloud.google.com | bash

次に、bdutilもローカルPCにインストールします。

展開したら、そのディレクトリに行き、bdutilコマンドを使って、クラスタを作成します。

./bdutil --bucket masuda-cluster -n 1 -P masuda-cluster -z asia-east1-a deploy

yes/no聞かれるので、yesと答えましょう。ちなみに、引数で指定したdeployをdeleteに変えるとクラスタを削除してくれます。

できたクラスタはこんな感じ。

f:id:masudaK:20160330132829p:plain

インスタンスにSSH後、以下をやっていきます。

まずGCSに以下のようなCSVを置いておきましょう。

aaa,111,1212,55555
vvv,121,45555,41441
aaa,144,1414141,515111
ddd,155,51333,2254555
aaa,4411,11111,66777

アップロードをしておきます。 たとえば、/masuda-cluster/var/tmp/user.csvとかでもよいでしょう。

次に、hadoopコマンド経由でgs://が使えることを確認します。bdutils経由でVM作る理由はこれですね。hadoop含めて環境整備してくれるので、非常に楽です。

$ hadoop fs -ls gs://masuda-cluster
15/10/19 09:43:34 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.4.2-hadoop1
15/10/19 09:43:38 WARN gcsio.GoogleCloudStorage: Repairing batch of 1 missing directories.
15/10/19 09:43:39 WARN gcsio.GoogleCloudStorage: Successfully repaired 1/1 implicit directories.
Found 2 items
drwx------   - masudak masudak          0 2015-10-19 09:12 /hadoop
drwx------   - masudak masudak          0 2015-10-19 09:43 /bdutil-staging
 

そして、hadoopのバージョン確認します。

$ hadoop version
Hadoop 1.2.1
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1503152
Compiled by mattf on Mon Jul 22 15:23:09 PDT 2013
From source with checksum 6923c86528809c4e7e6f493b6b413a9a
This command was run using /home/hadoop/hadoop-install/hadoop-core-1.2.1.jar

バージョン確認できたら、そのバージョンに合わせたsparkをインストールします。 ログもかなり多いので、WARNに絞って、ログを減らしておきましょう。

$ wget http://ftp.riken.jp/net/apache/spark/spark-1.5.1/spark-1.5.1-bin-hadoop1.tgz
$ tar xvzf spark-1.5.1-bin-hadoop1.tgz
$ cd spark-1.5.1-bin-hadoop1

$ cp conf/log4j.properties.template conf/log4j.properties
$ vim conf/log4j.properties
 
#log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console # WARNにする

また、sparkのconfにHADOOP_CONF_DIRの設定を追加します。これを忘れると動かないので気をつけましょう。

$ sudo su - hadoop
$ vi /home/hadoop/hadoop-install/conf/spark-env.sh
HADOOP_CONF_DIR=/home/hadoop/hadoop-install/conf

次に、pandasをインストールします。

$ pip install python-dev
$ pip install pandas

そして、解析用サンプルスクリプトを作成します。

$ vim /var/tmp/sample.py
# -*- coding: utf-8 -*-
import pandas as pd
  
pd.options.display.max_rows = 10
header = ['index1','index2','index3',  'index4']
pdf = pd.read_csv('gs://var/tmp/user.csv', header=None, names=header)
pdf.show()
 
print(pdf.pivot_table(index=['index1', 'index2'], \
                      columns='index3', \
                      values='index4'))

そして実行します。このとき、driver-class-pathでjarを指定しないと、gs://が使えないので要注意。

$ cd spark-1.5.1-bin-hadoop1
$ ./bin/spark-submit --driver-class-path /home/hadoop/hadoop-install/lib/gcs-connector-1.4.2-hadoop1.jar /var/tmp/sample.py 



index3         1212     11111    45555    51333    1414141
index1 index2
aaa    111       55555      NaN      NaN      NaN      NaN
       144         NaN      NaN      NaN      NaN   515111
       4411        NaN    66777      NaN      NaN      NaN
ddd    155         NaN      NaN      NaN  2254555      NaN
vvv    121         NaN      NaN    41441      NaN      NaN

ということで、無事にピボットテーブルが動きました。見事ですね。

終わりに

僕がビッグデータを使って解析を始めようと思った頃、そのセットアップに何日もかけて本当に苦労した思い出があります。

今回見せたようにちょっとお金をかけただけで、すぐセットアップできる時代となりました。すごいですね。

Excelとかですとデータ量がちょっと増えただけでもフリーズしちゃいますし、この環境であれば数億レコードレベルでもそんな苦労しないんじゃないかなーと(期待レベルですが)。

今回pandasを使いましたが、pysparkのメソッドを使っていけば分散処理もできるようなので、クラスタもより有効活用ができるはずです。

そういった意味で今回示した内容はそこまで厳密にイケてるものではないですが、サーバのメモリに載るレベルであればまぁそこまで問題にならないでしょう。

多少試行錯誤は必要ですが、それを乗り越えればかなり快適な環境がありますので、是非試してみてください!ではでは!