English 中文(简体)
多进程:如何在进程间共享一个大的只读对象?
原标题:
  • 时间:2009-03-18 19:58:46
  •  标签:

通过生成的子进程是否共享先前在程序中创建的对象?

我有以下设置:

do_some_processing(filename):
    for line in file(filename):
        if line.split( , )[0] in big_lookup_object:
            # something here

if __name__ ==  __main__ :
    big_lookup_object = marshal.load( file.bin )
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob( *.data ))

我正在将一些大对象加载到内存中,然后创建一个工作池,这个工作池需要使用那个大对象。这个大对象是只读的,我不需要在进程之间传递它的修改。

我的问题是:如果我在unix/c中生成一个进程,那么是否将大对象加载到共享内存中,或者每个进程都加载自己的大对象副本?

更新:进一步澄清 - big_lookup_object是一个共享的查找对象。我不需要将它拆分并单独处理。我需要保留一个单独的副本。我需要拆分它的工作是读取许多其他大文件,并根据查找对象查找这些大文件中的项目。

进一步更新:数据库是一种很好的解决方案,但是memcached可能更好,而磁盘上的文件(shelve或dbm)可能更好。在这个问题中,我特别关注内存解决方案。对于最终的解决方案,我将使用hadoop,但我也想看看是否可以拥有本地的内存版本。

问题回答

通过多进程生成的子进程是否共享程序中之前创建的对象?

对于Python<3.8,不支持,对于Python ≥ 3.8,支持。

进程具有独立的内存空间。

解决方案1

为了最大限度地利用人员众多的大型结构,请这样做。

  1. 将每个工人视为“过滤器”-从 stdin 读取中间结果,执行工作,在 stdout 上写入中间结果。

  2. 将所有工人连接成一条管道:

    process1 <source | process2 | process3 | ... | processn >result
    

每个进程都会读取、进行工作和写入。

这是非常高效的,因为所有进程都同时运行。写入和读取直接通过进程之间的共享缓冲区进行。


解决方案2

在某些情况下,您可能会拥有一种更复杂的结构 - 通常是扇形结构。在这种情况下,您有一个父亲有多个孩子。

  1. 家长打开源数据。家长分叉了几个孩子。

  2. 家长阅读资源,将资源的部分并行地分配给每个运行的子进程。

  3. 当父进程到达尾部时,关闭管道。子进程得到文件末尾并正常结束。

写子部分很愉快,因为每个子部分只需读取sys.stdin

家长在大量孩子的孵化和保持管道的过程中有点儿花哨,但并不严重。

Fan-in 是相反的结构。许多独立运行的进程需要将它们的输入交错到一个常见的进程中。收集器不容易编写,因为它必须从许多来源读取。

使用select模块查看哪些管道有待处理输入,通常会从许多命名管道中读取数据。


解决方案3

共享查找是数据库的定义。

解决方案3A-加载数据库。让工人处理数据库中的数据。

解决方案3B-使用werkzeug(或类似工具)创建一个非常简单的服务器,以提供WSGI应用程序,以响应HTTP GET,以便工作人员可以查询服务器。


解决方案4 (jiě jué fāng àn 4)

共享文件系统对象。Unix操作系统提供了共享内存对象。这些对象只是被映射到内存的文件,以便进行交换I/O,而不是进行更常规的缓冲读取。

你可以用几种方式从Python环境中完成此操作。

  1. 编写一个启动程序,将您的原始巨大对象分解成较小的对象,然后启动工作程序,每个程序都有一个较小的对象。较小的对象可以是腌制的Python对象,以节省一点文件读取时间。 编写一个启动程序,(1)将您的原始巨大对象拆分成较小的对象,(2)启动每个带有较小对象的工作程序。较小的对象可以是腌制的Python对象,以节省一些文件读取时间。

  2. 编写一个启动程序,该程序 (1) 读取您的原始大型对象并使用seek操作编写一个分页结构化的字节编码文件,以确保可以使用简单的seek来轻松查找单独的部分。这就是数据库引擎所做的——将数据分解为页面,使每个页面易于定位。

产生能够访问此大页面结构文件的工人。每个工人可以寻找相关部分并在那里工作。

Do child processes spawned via multiprocessing share objects created earlier in the program?

这取决于情况。对于全局只读变量,通常可以考虑这样做(除了消耗的内存),否则不应该。

multiprocessing的文档说:

继承比腌制/解腌更好。

On Windows many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which need access to a shared resource created elsewhere can inherit it from an ancestor process.

明确地向子进程传递资源。

On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

全局变量

Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start() was called.

Example

在Windows上(单个CPU):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let s try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ ==  __main__ :
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

带有sleep

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有 睡眠:

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

"S.Lott先生是正确的。Python的多进程快捷方式会有效地为您提供一个独立的、重复的大块内存。"

在大多数*nix系统中,使用较低级别的调用os.fork()实际上会给您提供写时复制的内存,这可能是您所考虑的。据我所知,在理论上,即使是最简单的程序,您也可以从该数据中读取而不需要重复。

然而,在Python解释器中,情况并不是那么简单。对象数据和元数据存储在同一内存段中,因此,即使对象从未更改,例如对象的引用计数被增加,也会导致内存写入,从而导致副本。几乎任何执行比“打印hello”更多操作的Python程序都会导致引用计数增加,因此您可能永远无法意识到写时复制的好处。

即使有人成功地在Python中破解了共享内存解决方案,尝试在进程间协调垃圾回收也可能会非常痛苦。

如果你在Unix下运行,它们可能会共享同一对象,因为fork是如何工作的(即子进程有单独的内存,但它是写时复制的,因此只要没有人修改它,它就可以共享)。我尝试了以下内容:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ ==  __main__ :
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

并获得以下输出:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

当然,这并不能证明没有副本被制作,但您应该能够通过查看ps的输出来验证每个子进程使用多少实际内存,在您的情况下。

不同的进程有不同的地址空间,就像运行不同的解释器实例一样。这就是IPC(进程间通信)的作用。

你可以使用队列或管道来实现这个目的。如果你想在以后分布到网络上,你也可以使用TCP上的RPC。

将此翻译为中文:http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

不是直接与多处理相关,但从您的示例中,似乎可以只使用shelve模块或类似的东西。 “big_lookup_object”是否真的必须完全在内存中?

不行,但您可以将数据作为子进程加载,并允许其与其他子进程共享数据,请参见下文。

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    




相关问题