English 中文(简体)
Can t pickle <type instancemethod > when using multiprocessing Pool.map()
原标题:

I m trying to use multiprocessing s Pool.map() function to divide out work simultaneously. When I use the following code, it works fine:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__==  __main__  :
    go()

However, when I use it in a more object-oriented approach, it doesn t work. The error message it gives is:

PicklingError: Can t pickle <type  instancemethod >: attribute lookup
__builtin__.instancemethod failed

This occurs when the following is my main program:

import someClass

if __name__==  __main__  :
    sc = someClass.someClass()
    sc.go()

and the following is my someClass class:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Anyone know what the problem could be, or an easy way around it?

最佳回答

The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable. The workaround (whether you consider it "easy" or not;-) is to add the infrastructure to your program to allow such methods to be pickled, registering it with the copy_reg standard library method.

For example, Steven Bethard s contribution to this thread (towards the end of the thread) shows one perfectly workable approach to allow method pickling/unpickling via copy_reg.

问题回答

All of these solutions are ugly because multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing s map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

And just to be explicit, you can do exactly want you wanted to do in the first place, and you can do it from the interpreter, if you wanted to.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Get the code here: https://github.com/uqfoundation/pathos

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)...

Some limitations though to Steven Bethard s solution :

When you register your class method as a function, the destructor of your class is surprisingly called every time your method processing is finished. So if you have 1 instance of your class that calls n times its method, members may disappear between 2 runs and you may get a message malloc: *** error for object 0x...: pointer being freed was not allocated (e.g. open member file) or pure virtual method called, terminate called without an active exception (which means than the lifetime of a member object I used was shorter than what I thought). I got this when dealing with n greater than the pool size. Here is a short example :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven s solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Output:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
[ results ,  results ,  results ,  results ,  results ,  results ,  results ,  results ]
... Destructor

The __call__ method is not so equivalent, because [None,...] are read from the results :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

So none of both methods is satisfying...

There s another short-cut you can use, although it can be inefficient depending on what s in your class instances.

As everyone has said the problem is that the multiprocessing code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn t do instance-methods.

However, instead of sending the instance-method, you can send the actual class instance, plus the name of the function to call, to an ordinary function that then uses getattr to call the instance-method, thus creating the bound method in the Pool subprocess. This is similar to defining a __call__ method except that you can call more than one member function.

Stealing @EricH. s code from his answer and annotating it a bit (I retyped it hence all the name changes and such, for some reason this seemed easier than cut-and-paste :-) ) for illustration of all the magic:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self,  process_obj , (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

The output shows that, indeed, the constructor is called once (in the original pid) and the destructor is called 9 times (once for each copy made = 2 or 3 times per pool-worker-process as needed, plus once in the original process). This is often OK, as in this case, since the default pickler makes a copy of the entire instance and (semi-) secretly re-populates it—in this case, doing:

obj = object.__new__(Klass)
obj.__dict__.update({ count :1})

—that s why even though the destructor is called eight times in the three worker processes, it counts down from 1 to 0 each time—but of course you can still get into trouble this way. If necessary, you can provide your own __setstate__:

    def __setstate__(self, adict):
        self.count = adict[ count ]

in this case for instance.

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

The solution from parisjohn above works fine with me. Plus the code looks clean and easy to understand. In my case there are a few functions to call using Pool, so I modified parisjohn s code a bit below. I made __call__ to be able to call several functions, and the function names are passed in the argument dict from go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass
    
    def f(self, x):
        return x*x
    
    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

In this simple case, where someClass.f is not inheriting any data from the class and not attaching anything to the class, a possible solution would be to separate out f, so it can be pickled:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

A potentially trivial solution to this is to switch to using multiprocessing.dummy. This is a thread based implementation of the multiprocessing interface that doesn t seem to have this problem in Python 2.7. I don t have a lot of experience here, but this quick import change allowed me to call apply_async on a class method.

A few good resources on multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

Why not to use separate func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

I ran into this same issue but found out that there is a JSON encoder that can be used to move these objects between processes.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use this to create your list:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Then in the mapped function, use this to recover the object:

pfVmomiObj = json.loads(jsonSerialized)

Update: as of the day of this writing, namedTuples are pickable (starting with python 2.7)

The issue here is the child processes aren t able to import the class of the object -in this case, the class P-, in the case of a multi-model project the Class P should be importable anywhere the child process get used

a quick workaround is to make it importable by affecting it to globals()

globals()["P"] = P

pathos.multiprocessing worked for me.

It has a pool method and serializes everything unlike multiprocessing

import pathos.multiprocessing as mp
pool = mp.Pool(processes=2) 

There is even no need of installing full pathos package.

Actually the only package needed is dill (pip install dill), and then override multiprocessing Pickler with the dill one:

dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump

This answer was borrowed from https://stackoverflow.com/a/69253561/10686785





相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签