Tutorial

Installation

サンプルアプリケーションはgithubレポジトリにありますので、まだcloneをしていない場合はcloneをしてください。

$ git clone https://github.com/laysakura/shellstreaming

shellstreamingはPythonのパッケージであり、使うのにインストールする必要があります。 ソースからインストールする場合は、cloneしたレポジトリのroot directoryから下記のコマンドを 実行してください(Pythonのインストール、virtualenvの有無によりsudoが必要な場合があります)。

$ python setup.py install

PyPIというPythonのパッケージ・レポジトリからダウンロードしてインストールすることもできます。

$ pip install shellstreaming

Warning

shellstreamingはPython 2.7以上を必要としています。Python 3をサポートしていません。

Run first example in localhost

shellstreamingで最初のアプリケーションを動かしてみます。 shellstreamingは分散ストリーム処理系ですが、まずは設定の容易なlocalhost-modeを使ってみましょう。

shellstreamingの動作には設定ファイルが必要です。 設定ファイルは $HOME/.shellstreaming.cnf に配置します。

$ cd shellstreaming
$ cp support-files/sample-shellstreaming.cnf $HOME/.shellstreaming.cnf

では、早速アプリケーションを動かします。

$ shellstreaming example/01_RandInt.py
...
[2014-03-04 10:56:37,464] master.py main():122
Finished all job execution.
Execution time: 4.766653 sec.

[2014-03-04 10:56:38,257] master.py _run_test():247     01_RandInt.test finished without any exception
[2014-03-04 10:56:38,257] master.py main():129  passed test()!

正しく動作していれば、このような出力が得られるはずです。

example/01_RandInt.py アプリケーションは、0から100までの整数値をランダムに生成し、 それを /tmp/01_RandInt.txt というファイルに書き出します。 ファイルを開いて内容を確認してみてください。

また、その他の example/*.py もサンプルアプリケーションです。 動作を試してみてください。

Run shellstreaming in parallel mode

shellstreamingは分散ストリーム処理系です。 複数のノードを用いてアプリケーションを並列動作させてみましょう。

まずは設定ファイルを編集します(変更箇所のみ抜粋)。

$HOME/.shellstreaming.cnf

[shellstreaming]

worker_hosts = a.example.com:10000,b.example.com:10000

localhost_debug = no

send_latest_codes_on_start = yes

ssh_private_key = /home/yourname/.ssh/id_rsa

この設定では、 a.example.com , b.example.com の2台のノードでの並列処理が行われます。 10000 はTCPポート番号で、shellstreaming のワーカプロセスがこのポートでマスターや他ワーカからの接続を待ち受けます。 ファイアーウォール等の設定にご注意ください。 また、 shellstreaming コマンドを発行するノードにマスタープロセスが立ち上がります。

ワーカプロセスが立ち上がるそれぞれのノードには、マスタープロセスが立ち上がるノードからSSHログインが 可能である必要があります。 ログイン時には、ssh_private_key で指定した秘密鍵が用いられます。

ここでは、ストリーム処理アプリケーションとして、 example/02_FilterSplit.py を使用します。 マシン環境に合わせるために、ソースを以下のように変更してください。

example/02_FilterSplit.py

...
api.OStream(lo_stream, LocalFile, LOW_OUTPUT_FILE,  output_format='json', fixed_to=['a.example.com:10000'])
api.OStream(hi_stream, LocalFile, HIGH_OUTPUT_FILE, output_format='json', fixed_to=['a.example.com:10000'])
...

こうすることで、処理結果は a.example.com に集約されるようになります。

では、実際に簡単なアプリケーションを並列動作させてみましょう。

$ shellstreaming example/02_FilterSplit.py
[2014-03-04 15:36:47,632] master.py main():50   Used config file: /home/nakatani/.shellstreaming.cnf
[2014-03-04 15:36:47,632] master.py _launch_workers():210       Auto-deploy starts with this command:
fab -f /home/nakatani/git/shellstreaming/shellstreaming/master/../autodeploy/auto_deploy.py -H localhost,cloko020,cloko021 pack deploy_codes deploy_config:cnfpath=/home/nakatani/.shellstreaming.cnf start_worker:cnfpath=/home/nakatani/.shellstreaming.cnf,logpath=/tmp/shellstreaming-worker-HOSTNAME-PORT.log -P -i /home/nakatani/.ssh/id_rsa_lab_nopass

...

[2014-03-04 15:42:37,434] master_main.py sched_loop():174       All jobs are finished!
[2014-03-04 15:42:37,434] master.py main():107  Finished all job execution. Killing worker servers...
[2014-03-04 15:42:37,438] comm.py kill_worker_server():34       requested close worker server on localhost:18871 to close
[2014-03-04 15:42:37,441] comm.py kill_worker_server():34       requested close worker server on a.example.com:10000 to close
[2014-03-04 15:42:37,445] comm.py kill_worker_server():34       requested close worker server on b.example.com:10000 to close
[2014-03-04 15:42:37,446] master.py main():122
Finished all job execution.
Execution time: 6.883384 sec.

[2014-03-04 15:42:38,317] master.py _run_test():247     02_FilterSplit.test finished without any exception
[2014-03-04 15:42:38,317] master.py main():129  passed test()!

最初のうちは、コマンドを走らせたノードから、ワーカを立ち上げるノードへSSH経由でコードを送る処理が続きます。 これが一段落すると、example/02_FilterSplit.py に記述された実際の処理が走ります。

一度SSH経由でコードを送られたノードは、次回以降はそのコードを再利用することができます。 設定ファイルを send_latest_codes_on_start=no とすることで、起動時間を抑えることができます。

example/02_FilterSplit.py は、最初に0から100までの乱数列を生成し、それが50より小さいかどうかで出力先を /tmp/02_FilterSplit_lo.txt, /tmp/02_FilterSplit_hi.txt の2つに振り分けています。

Run more realistic applications

ここでは、ストリーム処理系で実際に動作させるようなアプリケーションを取り上げます。 特に、シェルコマンドを活用した shellstreaming の強みが発揮されるようなアプリケーションを紹介しています。

Log analysis for Apache HTTP Server

Apache HTTP Server, 通称 apache2 のアクセスログを解析するストリームアプリケーションを動作させてみましょう。 アプリケーションの動作の詳細は、 http://www.logos.ic.i.u-tokyo.ac.jp/~nakatani/pdf/master_thesis.pdf の3.1節をご覧ください。

ここでは、a.example.com, b.example.com の2台が /tmp/access.log にアクセスログを所有するものとし、 c.example.com も加えた3台でログ解析を分散処理することとします。

まず、設定ファイルでワーカの設定を確認してください。

[shellstreaming]

worker_hosts = a.example.com:10000,b.example.com:10000,c.example.com:10000

次に、ログ解析のためのストリーム処理アプリケーションを作成します。 といっても、example/51_apache_log_analysis.py でほとんどできているので、あとは多少のパラメータを変更するだけです。

変更箇所は以下のとおりです。

example/51_apache_log_analysis.py

...

APACHE_LOG   = '/tmp/access.log'
DAILY_ACCESS = '/tmp/51_apache_log_analysis_daily.txt'
STATUS_CODES = '/tmp/51_apache_log_analysis_statuscode.txt'

workers_with_access_log   = ['a.example.com:10000', 'b.example.com:10000']
worker_to_collect_results = ['c.example.com:10000']

...

では、 localhost においてこのアプリケーションを開始します。 ただし、 a.example.com, b.example.com/tmp/access.log ファイルを予め作成しておいてください(空でも大丈夫です)。

$ (a.example.com) touch /tmp/access.log
$ (b.example.com) touch /tmp/access.log
$ shellstreaming example/51_apache_log_analysis.py

このとき、shellstreaming は a.example.comb.example.com/tmp/access.log ファイルを監視し、 それに対して追記があった場合に、ログの解析処理をします。

試しに、 a.example.com においてログの追記を行なってみましょう。

$ (a.example.com) echo '192.168.100.3 - - [03/01/2014:16:09:00 +0900] "GET / HTTP/1.1" 400 265 "-" "-"' >> /tmp/access.log

この追記されたログの集計結果は、 c.example.com/tmp/51_apache_log_analysis_daily.txt/tmp/51_apache_log_analysis_statuscode.txt にあります。 集約結果は追記毎に追加されていくので、 tail コマンドで確認しましょう。

$ (c.example.com) tail -f /tmp/51_apache_log_analysis_daily.txt
$ (c.example.com) tail -f /tmp/51_apache_log_analysis_statuscode.txt

更に追記を続けると、集約結果が更新されていくことが分かります。

$ (a.example.com) echo '192.168.100.3 - - [03/01/2014:16:09:00 +0900] "GET / HTTP/1.1" 400 265 "-" "-"' >> /tmp/access.log
$ (a.example.com) echo '192.168.100.3 - - [01/01/2014:16:09:00 +0900] "GET / HTTP/1.1" 400 265 "-" "-"' >> /tmp/access.log
$ (a.example.com) echo '192.168.100.3 - - [02/01/2014:16:09:00 +0900] "GET / HTTP/1.1" 200 265 "-" "-"' >> /tmp/access.log

b.example.com からも追記をしてみて、複数の(仮想の)Webサーバのログを集計できていることを確認してください。