我处理沙捞越多处理的方法之一是:
I have data on which I want to use a function function()
.
First I create a multiprocessing subclass:
import multiprocessing
class ProcessThread(multiprocessing.Process):
def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
self.id_t = id_t
self.inputlist = inputqueue
self.idqueue = idqueue
self.function = function
self.resultqueue = resultqueue
multiprocessing.Process.__init__(self)
def run(self):
s = "process number: " + str(self.id_t) + " starting"
print s
result = []
while self.inputqueue.qsize() > 0
try:
inp = self.inputqueue.get()
except Exception:
pass
result = self.function(inp)
while 1:
try:
self.resultqueue.put([self.id,])
except Exception:
pass
else:
break
self.idqueue.put(id)
return
和主要职能:
inputqueue = multiprocessing.Queue()
resultqueue = multiprocessing.Queue()
idqueue = multiprocessing.Queue()
def function(data):
print data # or what you want
for datum in data:
inputqueue.put(datum)
for i in xrange(nbprocess):
ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()
最后取得成果:
results = []
while idqueue.qsize() < nbprocess:
pass
while resultqueue.qsize() > 0:
results.append(resultqueue.get())
In this way you can control perfectly what is appended with process and other stuff.
Using a multiprocessing inputqueue
is an efficient technique only if the computation for each datum is quite slow (< 1,2 seconds) because of the concurrent access of the different process to the queues (that why I use exception). If your function computes very quickly, consider splitting up your data only once at the begining and put chunks of the dataset for every process at the beginning.