查看: 2617|回复: 2

[Python] 分布式并行库Parallel Python手记(一):局域网并行计算PI

[复制链接]
发表于 2013-12-1 04:12 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有帐号?我要加入

x
本帖最后由 Rainyboy 于 2013-12-1 21:06 编辑

Parallel Python


简介
这里要介绍的是一个名叫Parallel Python的开源库,戳这里,正如它的名字所言,这是一个纯粹用PYTHON实现的,可以实现分布式并行计算的库,可以用于下列计算资源的并行:
1)有多个处理器的本地机器;
2)通过互联网连接的多台计算机。
我恰好有两台笔记本电脑,一台是4核8线程的i7-2670QM,另一台是2核2线程的酷睿T6670,虽不是必须,但长久以来总是希望能够有一种方式将这些计算能力全都发挥出来,直到我遇到了PP。

安装
1) 说不定它已在你的电脑中
pythonxy中以及包含了pp,所以……
检查python**/Lib/site-packages 下面是否存在一个叫pp 的文件夹就可以了
或者
在python命令行中尝试 import pp
如果没有报错那么恭喜你,你已经安装了pp

2) 如果你确定没有安装这个库
在PP的网站上下载安装文件即可(约100K左右……);

文档和例子
如果你真的访问过PP的官网就可以发现,其实它的文档……几乎没有,当然由于它的简洁,也不需要什么文档。它几乎是用例子来展示如何使用PP的,但不用担心,网站上一共也就5,6个简短的例子……

关于本帖
由于可能会在日后的工作中用到这个库,因此我打算从简到繁,看看能不能将我的动力学计算的程序改装成并行计算;有时间的话可能会拜读部分代码,看看其中的巧妙实现,如果有些心得,会写在算法区。

但作为第一步,我们总是用简单的例子来让整个框架运行起来体验体验。

所以就有了——

第一个例子:局域网并行计算PI

准备工作
如前所述,我有两台电脑,所以事先需要在两台电脑上都安装python和pp,这个可以通过在它们上面装上pythonxy套装来搞定。

配置服务器
其中一台(i7的那一台)作为服务器,只需要在它上面运行ppserver.py这个脚本就可以了,可以将调用命令写成一个.cmd文件:
  1. python "c:/Python27/Scripts/ppserver.py" -w 6 -d
  2. pause
复制代码


其中
    -w 6 设置使用6个线程参与计算
    -d 将它设置为debug模式,这样输出会详细一些

跑起来之后是这个样子的:


QQ截图20131201210959.png


在另一台电脑提交计算任务

为此,我写了一个用投点法(这里提到的第一种)计算圆周率的代码:

Python 代码,双击复制代码
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 28 21:12:56 2013

@author: fanyu
"""
import pp
import thread

class Result:
    def __init__(self):
        self.lock = thread.allocate_lock()
        self.iscount = 0.0

    #the callback function
    def add(self, value):
        self.lock.acquire()
        self.iscount += value
        self.lock.release()


def randTest(n):
    import numpy as np
    x = np.random.rand(n)
    y = np.random.rand(n)
    r = np.power(x*x+y*y,0.5)
    r.sort()
    pos = np.searchsorted(r,(1,),)
    # pos represents the count of points that satisfies
    #   r<1.0
    return pos

ppservers = ('FanYu-HP',)
#ncpus controls the number of local cpus will be in use
job_server = pp.Server(ppservers = ppservers, ncpus=0)

re = Result()
jobCounts = 100
PCjob = 1000000

for i in range(0,jobCounts):
    job_server.submit(randTest,(PCjob,),modules = ('numpy',),callback=re.add)

#this script wil be blocked here so as to wait all the jobs to be done
job_server.wait()

print 'PI=%f\n'%(re.iscount*4.0/jobCounts/PCjob,)
print job_server.get_active_nodes()
job_server.print_stats()




代码很短,也很容易看懂,其实在这台电脑上要做的就是不停的调用
Python 代码,双击复制代码
job_server.submit()

将计算任务提交出去,再调用
Python 代码,双击复制代码
job_server.wait()

等待计算完毕就可以了。

运行这个脚本的计算机可以参加计算,也可以不参加计算,这取决于:
Python 代码,双击复制代码
job_server = pp.Server(ppservers = ppservers, ncpus=0)

中ncpus参数的设置,这里设置0意味着本机没有线程参与计算。


运行后,在本机控制台上输出:
  1. PI=3.141605

  2. {'FanYu-HP:60000': 6, 'local': 0}
  3. Job execution statistics:
  4. job count | % of all jobs | job time sum | time per job | job server
  5.        100 |        100.00 |      30.6440 |     0.306440 | FanYu-HP:60000
  6. Time elapsed since server creation 15.0699999332
  7. 0 active tasks, 0 cores
复制代码


当然这个过程总i7那台计算机的CPU必然是疯狂工作了一阵子。

如果将上述ncpus设置为1,即本机也生成一个线程参与计算,结果就是:
  1. PI=3.141984

  2. {'FanYu-HP:60000': 6, 'local': 1}
  3. Job execution statistics:
  4. job count | % of all jobs | job time sum | time per job | job server
  5.         71 |         71.00 |      20.1020 |     0.283127 | FanYu-HP:60000
  6.         29 |         29.00 |       9.4670 |     0.326448 | local
  7. Time elapsed since server creation 9.90100002289
  8. 0 active tasks, 1 cores
复制代码


可以看出本机也参与了一些计算。


小结
总的来说,PP库给人的感觉还是很轻盈、高效的,不需要使用者去作分配线程之类的事情。
但实际情况总是比较复杂的,比如在本机依赖库类较多、返回值教复杂的情况下,PP库是否还能依旧好用。

那么只能且听下回分解了,题目暂定为:

分布式并行库Parallel Python手记(二):计算频响曲线

谢谢关注!




回复
分享到:

使用道具 举报

 楼主| 发表于 2013-12-1 23:40 | 显示全部楼层
将上面那个程序作了些改动,可以实时刷新计算进度了,通常这也是必要的。

Python 代码,双击复制代码
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 28 21:12:56 2013

@author: fanyu
"""
import pp
import thread
import os
import time

class Result:
    def __init__(self):
        self.lock = thread.allocate_lock()
        self.iscount = 0.0
        self.jobcount = 0

    #the callback function
    def add(self, value):
        self.lock.acquire()
        self.iscount += value
        self.jobcount += 1
        self.lock.release()


def randTest(n):
    import numpy as np
    x = np.random.rand(n)
    y = np.random.rand(n)
    r = np.power(x*x+y*y,0.5)
    r.sort()
    pos = np.searchsorted(r,(1,),)
    # pos represents the count of points that satisfies
    #   r<1.0
    return pos

ppservers = ('FanYu-HP',)
#ncpus controls the number of local cpus will be in use
job_server = pp.Server(ppservers = ppservers, ncpus=0)

re = Result()
jobCounts = 100
PCjob = 1000000

for i in range(0,jobCounts):
    job_server.submit(randTest,(PCjob,),modules = ('numpy',),callback=re.add)

while(re.jobcount < jobCounts-1 ):
    os.system('cls')
    print job_server.get_active_nodes()
    job_server.print_stats()    
    time.sleep(1)

#this script wil be blocked here so as to wait all the jobs to be done
job_server.wait()

print 'PI=%f\n'%(re.iscount*4.0/jobCounts/PCjob,)

发表于 2013-12-13 16:40 | 显示全部楼层
嗯,pp很好用,而且文档也比较充分。不过超大型的算例目前没有见过,有谁见过用pp作千核量级的计算的?
您需要登录后才可以回帖 登录 | 我要加入

本版积分规则

快速回复 返回顶部 返回列表