English 中文(简体)
在使用Python的多进程时,我应该如何记录日志?
原标题:
  • 时间:2009-03-13 04:02:31
  •  标签:

现在我在一个框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用了multiprocessing, 所以有一个模块级别的多进程日志, LOG = multiprocessing.get_logger()。根据文档,这个记录器(EDIT)并没有进程共享锁,所以你不能让多个进程同时写入它而破坏sys.stderr (或任何文件句柄)。

我现在面临的问题是框架中的其他模块并没有多进程意识。在我看来,我需要让所有对这个中央模块的依赖都使用多进程意识的日志记录。这在框架内非常烦人,更别提所有框架客户端了。我是否有没有考虑到的其他选择?

最佳回答

处理这个问题的唯一方法是:

  1. Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
  2. Your controller process can then do one of the following:
    • If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
    • If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
问题回答

我刚刚写了一个日志处理程序,它通过管道将所有内容传输给父进程。我只测试了十分钟,但它看起来运行得相当好。

注意: 这是硬编码为 RotatingFileHandler ,这是我的特定用例。


Update: @javier now maintains this approach as a package available on Pypi - see multiprocessing-logging on Pypi, github at https://github.com/jruere/multiprocessing-logging


Update: Implementation!

现在使用队列来正确处理并发,并且正确地从错误中恢复。我已经在生产中使用了几个月,下面的当前版本可以无问题运行。

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

QueueHandler 是 Python 3.2+ 中的原生库,可以完美实现这一功能。在之前的版本也很容易实现。

Python文档中有两个完整的示例:从多个进程记录到单个文件

每个进程(包括父进程)都将其日志放在队列上,然后一个监听器线程或进程(为每个提供了一个示例)接收它们并将所有内容写入文件,无风险损坏或混淆。

对于使用Python < 3.2的人,导入logutils(与Python 3.2本地代码相同)。

以下是另一种简单的解决方案,专注于为像我一样从谷歌跳转过来的任何人简化日志记录!仅适用于3.2或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info( function called with {} in worker thread. .format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info( hello from main thread )
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ ==  __main__ :
    main()

截至2020年,似乎有一种更简单的多进程日志记录方式。

这个函数将创建记录器。您可以在此设置格式和输出位置(文件,stdout)。

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(
         [%(asctime)s| %(levelname)s| %(processName)s] %(message)s )
    handler = logging.FileHandler( logs/your_file_name.log )
    handler.setFormatter(formatter)

    # this bit will make sure you won t have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

在init中,您实例化记录器:

if __name__ ==  __main__ : 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info( Starting pooling )
    p = Pool()
    # rest of the code

现在,您只需在您需要记录的每个函数中添加此引用:

logger = create_logger()

输出信息:

logger.info(f My message from {something} )

希望这能帮到你。

另外一种选择可能是logging包中的各种非文件日志处理程序:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

English: "Hello, how are you?" Chinese: 你好,你好吗? French: Bonjour, comment allez-vous? Spanish: Hola, ¿cómo estás? German: Hallo, wie geht es Ihnen? Italian: Ciao, come stai? Japanese: こんにちは、お元気ですか? Korean: 안녕하세요, 어떻게 지내세요? Russian: Здравствуйте, как поживаете?

这样,您可以轻松地在某个地方拥有一个日志守护程序,您可以安全写入并正确处理结果。(例如,一个简单的套接字服务器,它只是取消封送消息并将其发射到自己的旋转文件处理程序。)

SyslogHandler 也会为您处理这个问题。当然,您也可以使用自己的syslog实例,而不是系统实例。

另一种变体,将日志记录和队列线程保持分离。

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process s handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn t be contention between normal logging (from the main
        process) and this thread.

        Note that we re using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger( test.subprocess )
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info( hello from process %s with pid %s  % (p.name, p.pid))


if __name__ ==  __main__ :
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger( test )
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info( hello from the main process )
    # This thread will read from the subprocesses and write to the main log s
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they re finished.
    while multiprocessing.active_children():
        time.sleep(.1)

所有当前的解决方案都通过使用处理程序与日志配置过度耦合。我的解决方案具有以下架构和特点:

  • You can use any logging configuration you want
  • Logging is done in a daemon thread
  • Safe shutdown of the daemon by using a context manager
  • Communication to the logging thread is done by multiprocessing.Queue
  • In subprocesses, logging.Logger (and already defined instances) are patched to send all records to the queue
  • New: format traceback and message before sending to queue to prevent pickling errors

以下是代码示例和输出的 Gist 链接:https://gist.github.com/schlamar/7003737

由于我们可以将多进程日志记录表示为许多发布者和一个订阅者(监听器),因此使用 ZeroMQ 实现PUB-SUB消息传递确实是一个选项。

此外,PyZMQ模块是ZMQ的Python绑定,实现了PUBHandler对象,用于通过zmq.PUB套接字发布日志消息。

在网上有一个解决方案,使用PyZMQ和PUBHandler进行分布式应用程序的集中日志记录,可以轻松地采用多个发布进程在本地工作。

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect( tcp://{0}:{1} .format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind( tcp://*:{1} .format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that s the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger( 127.0.0.1 ).logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won t loose publisher s messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=( 127.0.0.1 ), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

我也喜欢zzzeek的回答,但是Andre是正确的,需要使用队列来防止混乱。我尝试使用管道,但是看到了一些混乱,这在某种程度上是可以预料的。实现起来比我想象的要难,特别是在Windows上运行时,存在一些关于全局变量等限制(参见:Python在Windows上多进程如何实现?).

但是,我最终让它工作了。这个示例可能不是完美的,所以欢迎评论和建议。它也不支持设置格式化程序或除根记录器以外的任何内容。基本上,您必须在每个池进程中使用队列重新初始化记录器,并设置记录器上的其他属性。

Again, any suggestions on how to make the code better are welcome. I certainly don t know all the Python tricks yet :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical( function number:   + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger(  ).addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger(  ).setLevel(level)

if __name__ ==  __main__ :
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger(  ).addHandler(handler)
    logging.getLogger(  ).setLevel(logging.DEBUG)

    logging.debug( starting main )

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger(  ).getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug( done )
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()

I d like to suggest to use the logger_tt library: https://github.com/Dragon2fly/logger_tt

The multiporcessing_logging library is not working on my macOSX, while logger_tt does.

just publish somewhere your instance of the logger. that way, the other modules and clients can use your API to get the logger without having to import multiprocessing.

I liked zzzeek s answer. I would just substitute the Pipe for a Queue since if multiple threads/processes use the same pipe end to generate log messages they will get garbled.

The concurrent-log-handler seems to do the job perfectly. Tested on Windows. Supports also POSIX systems.

Main idea

  • Create a separate file with a function that returns a logger. The logger must have fresh instance of ConcurrentRotatingFileHandler for each process. Example function get_logger() given below.
  • Creating loggers is done at the initialization of the process. For a multiprocessing.Process subclass it would mean the beginning of the run() method.

Detailed instructions

I this example, I will use the following file structure

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

Child process

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

  • Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process"
  • Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process.
  • The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

Main Process

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


  • The main process that logs into file two times a second "Main process". Also inheriting from multiprocessing.Process.
  • Same comments for get_logger() and do_something() apply as for the child process.

Logger setup

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger
  • This uses the ConcurrentRotatingFileHandler from the concurrent-log-handler package. Each process needs a fresh ConcurrentRotatingFileHandler instance.
  • Note that all the arguments for the ConcurrentRotatingFileHandler should be the same in every process.

Example app

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()
  • Just a simple example on how to start the multiprocess application

Example of 3rd party module using standard logging

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

  • Just a simple example to test if loggers from 3rd party code will work normally.

Example output

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]

把所有的日志委派给另一个进程,该进程从队列中读取所有的日志条目,如何?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger( some_config )
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

只需通过任何多进程机制或继承共享LOG_QUEUE即可,一切都可以很好地运作!

Below is a class that can be used in Windows environment, requires ActivePython. You can also inherit for other logging handlers (StreamHandler etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME =  logging_mutex 

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

And here is an example that demonstrates usage:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
       
    initilize the loggers
       
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ ==  __main__ :
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass

我有一个类似于Ironhacker的解决方案,但其中一些代码中我使用了logging.exception,并发现我需要在将异常传回Queue之前格式化它,因为tracebacks无法被pickle:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can t pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "
":
            s = s[:-1]
        return s

If you have deadlocks occurring in a combination of locks, threads and forks in the logging module, that is reported in bug report 6721 (see also related SO question).

There is a small fixup solution posted here.

However, that will just fix any potential deadlocks in logging. That will not fix that things are maybe garbled up. See the other answers presented here.

Here s my simple hack/workaround... not the most comprehensive, but easily modifiable and simpler to read and understand I think than any other answers I found before writing this:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put( INFO - {} .format(item))
    def debug(self, item):
        self.q.put( DEBUG - {} .format(item))
    def critical(self, item):
        self.q.put( CRITICAL - {} .format(item))
    def warning(self, item):
        self.q.put( WARNING - {} .format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get s discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger( local )
    local_logger.info( Hey I am logging this: {} and working on it to make this {}! .format(num, num*2))
    local_logger.debug( hmm, something may need debugging here )
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we re now in a new process, let s monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ ==  __main__ :
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print  worker output contained: {} .format(worker_output)

There is this great package

Package: https://pypi.python.org/pypi/multiprocessing-logging/

code: https://github.com/jruere/multiprocessing-logging

Install:

pip install multiprocessing-logging

Then add:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()

For whoever might need this, I wrote a decorator for multiprocessing_logging package that adds the current process name to logs, so it becomes clear who logs what.

It also runs install_mp_handler() so it becomes unuseful to run it before creating a pool.

This allows me to see which worker creates which logs messages.

Here s the blueprint with an example:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as  logger 
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u %(asctime)s :: %(levelname)s :: %(message)s ))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f {process_name} :: {record.msg} 
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f test function called via: {argument} )


# You can also redefine undecored functions
def undecorated_function():
    logger.info( I am not decorated )


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ ==  __main__ :
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ( mp pool ,))
        mp_pool.apply(redecorated)
        logger.info( some main logs )
        test( main program )

One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won t get a real-time flow to the output messages on stderr that way.

Simplest idea as mentioned:

  • Grab the filename and the process id of the current process.
  • Set up a [WatchedFileHandler][1]. The reasons for this handler are discussed in detail here, but in short there are certain worse race conditions with the other logging handlers. This one has the shortest window for the race condition.
    • Choose a path to save the logs to such as /var/log/...




相关问题
热门标签