English 中文(简体)
多处理——集合分配
原标题:multiprocessing - pool allocation

我注意到在分配集合时采取的这种行动。 尽管我共有20个工作流程,但当我绘制了8个工作流程的地图时,我只剩下4个工作要做。 当这4人完成时,它又派了2人,然后派了2人。

当我向它投下20多起,直到上述行为重复时,它才开始在座的不到20起。

我假定这样做是出于目的,但看起来是 we。 我的目标是,一旦提出这些要求,即立即处理这些请求,而且显然这种行为不合适。

Using python 2.6 with billiard for maxtasksperchild support

我如何能够改进这种想法?

法典:

mypool = pool.Pool(processes=settings[ num-processes ], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print  Starting to process:  , len(lines),   urls 
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print  Pool returns:  , idx, Returns
    else:
        time.sleep(0.5)
最佳回答

我处理沙捞越多处理的方法之一是:

I have data on which I want to use a function function().
First I create a multiprocessing subclass:

import multiprocessing

class ProcessThread(multiprocessing.Process):
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
        self.id_t = id_t
        self.inputlist = inputqueue
        self.idqueue = idqueue
        self.function = function
        self.resultqueue = resultqueue

        multiprocessing.Process.__init__(self)

    def run(self):
        s = "process number: " + str(self.id_t) + " starting"
        print s
        result = []

        while self.inputqueue.qsize() > 0
            try:
                inp = self.inputqueue.get()
            except Exception:
                pass
            result = self.function(inp)
            while 1:
               try:
                   self.resultqueue.put([self.id,])
               except Exception:
                   pass
               else:
                   break
            self.idqueue.put(id)
            return

和主要职能:

inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()

def function(data):
    print data # or what you want

for datum in data:
    inputqueue.put(datum)

for i in xrange(nbprocess):
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()

最后取得成果:

results = []
while idqueue.qsize() < nbprocess:
    pass
while resultqueue.qsize() > 0:
    results.append(resultqueue.get())

In this way you can control perfectly what is appended with process and other stuff. Using a multiprocessing inputqueue is an efficient technique only if the computation for each datum is quite slow (< 1,2 seconds) because of the concurrent access of the different process to the queues (that why I use exception). If your function computes very quickly, consider splitting up your data only once at the begining and put chunks of the dataset for every process at the beginning.

问题回答

暂无回答




相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签