English 中文(简体)
视窗
原标题:python os.mkfifo() for Windows

短文(如果你能够回答我的工作的短文,其余部分主要是为了其他任务相似的人):

在Windows的陈词中,我想创建2个附于同一档案的档案物体(它不一定是硬拷贝的实际档案),1个是阅读的,1个是书面的,这样,如果阅读的结尾试图读到它,就永远不会获得EOF(在撰写文章之前,它就将受到阻碍)。 我认为,在中枢轴中,工作要做,但在Windows,工作就没有。 可做些什么? (我必须使用档案物体)。

Some extra details: I have a python module (not written by me) that plays a certain game through stdin and stdout (using raw_input() and print). I also have a Windows executable playing the same game, through stdin and stdout as well. I want to make them play one against the other, and log all their communication.

这里可以写成的代码I(get_fifo()功能没有执行,因为我不知道做的是视窗:

class Pusher(Thread):
        def __init__(self, source, dest, p1, name):
                Thread.__init__(self)
                self.source = source
                self.dest = dest
                self.name = name
                self.p1 = p1

        def run(self):
                while (self.p1.poll()==None) and
                      (not self.source.closed) and (not self.source.closed):
                        line = self.source.readline()
                        logging.info( %s: %s  % (self.name, line[:-1]))
                        self.dest.write(line)
                        self.dest.flush()


exe_to_pythonmodule_reader, exe_to_pythonmodule_writer =
                          get_fifo()
pythonmodule_to_exe_reader, pythonmodule_to_exe_writer =
                          get_fifo()

p1 = subprocess.Popen(exe, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

old_stdin = sys.stdin
old_stdout = sys.stdout

sys.stdin = exe_to_pythonmodule_reader
sys.stdout = pythonmodule_to_exe_writer

push1 = Pusher(p1.stdout, exe_to_pythonmodule_writer, p1,  1 )
push2 = Pusher(pythonmodule_to_exe_reader, p1.stdin, p1,  2 )

push1.start()
push2.start()
ret = pythonmodule.play()
sys.stdin = old_stdin
sys.stdout = old_stdout
最佳回答

继上述两个答复之后,我意外地将工作推到答案。 感谢你的答复。

如果有人想这样做的话,我把完整的法典放在一边:

import subprocess
from threading import Thread
import time
import sys
import logging
import tempfile
import os

import game_playing_module

class Pusher(Thread):
    def __init__(self, source, dest, proc, name):
        Thread.__init__(self)
        self.source = source
        self.dest = dest
        self.name = name
        self.proc = proc

    def run(self):
        while (self.proc.poll()==None) and
              (not self.source.closed) and (not self.dest.closed):
            line = self.source.readline()
            logging.info( %s: %s  % (self.name, line[:-1]))
            self.dest.write(line)
            self.dest.flush()

def get_reader_writer():
    fd_read, fd_write = os.pipe()
    return os.fdopen(fd_read,  r ), os.fdopen(fd_write,  w )

def connect(exe):
    logging.basicConfig(level=logging.DEBUG,
                        format= %(message)s ,
                        filename=LOG_FILE_NAME,
                        filemode= w )

    program_to_grader_reader, program_to_grader_writer =
                              get_reader_writer()

    grader_to_program_reader, grader_to_program_writer =
                              get_reader_writer()

    p1 = subprocess.Popen(exe, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE)        

    old_stdin = sys.stdin
    old_stdout = sys.stdout

    sys.stdin = program_to_grader_reader
    sys.stdout = grader_to_program_writer

    push1 = Pusher(p1.stdout, program_to_grader_writer, p1,  1 )
    push2 = Pusher(grader_to_program_reader, p1.stdin, p1,  2 )

    push1.start()
    push2.start()

    game_playing_module.play()

    sys.stdin = old_stdin
    sys.stdout = old_stdout

    fil = file(LOG_FILE,  r )
    data = fil.read()
    fil.close()
    return data

if __name__== __main__ :
    if len(sys.argv) != 2:
        print  Usage: connect.py exe 
        print sys.argv
        exit()
    print sys.argv
    print connect(sys.argv[1])
问题回答

关于跨平台解决办法,我建议把类似档案的物体放在当地东道方(127.0.0.1)的备案量之上,即计算IDLE在违约时如何解决与你非常相似的问题。

<代码>管道()将匿名管道或视窗上标明的管道退回,这种管道非常轻重和高效。

TCP sockets(如user1495323所建议的) 重量更重:例如,你可以看到这些港口有净地位,每个港口都需要一个港口号码,现有港口的数量限于每个同龄人64k(例如从当地东道方到当地东道)。

另一方面,指定管道(视窗)有限,因为:

可在makefile()>上填入成色的卷宗中填入,从而能够使用to 直接排出或中继器。 因此,在某些使用案件中,例如从一对一对二者发送<代码>stdout,是一种有吸引力的选择。

可按自动签字的港口号码(基于_excellent Zhu socket HOWTO):

with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as input_socket:
    # Avoid socket exhaustion by setting SO_REUSEADDR <https://stackoverflow.com/a/12362623/648162>:
    input_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # localhost doesn t work if the definition is missing from the hosts file,
    # and 127.0.0.1 only works with IPv4 loopback, but socket.gethostname()
    # should always work:
    input_socket.bind((socket.gethostname(), 0))
    random_port_number = input_socket.getsockname()[1]
    input_socket.listen(1)

    # Do something with input_socket, for example pass it to another thread.

    output_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # close() should not strictly be necessary here, but since connect() could fail, it avoids leaking fds
    # in that case. "If a file descriptor is given, it is closed when the returned I/O object is closed".
    with output_socket:
        output_socket.connect((socket.gethostname(), random_port_number))

<代码>输入_socket(如另一版)的用户可以做到:

with input_socket:
    while True:
        readables, _, _ = select.select([input_socket], [], [input_socket], 1.0)

        if len(readables) > 0:
            input_conn, addr = self.input_socket.accept()
            break

    with input_conn:
        while True:
            readables, _, errored = select.select([input_conn], [], [input_conn], 1.0)
            if len(errored) > 0:
                print("connection errored, stopping")
                break

            if len(readables) > 0:
                read_data = input_conn.recv(1024)
                if len(read_data) == 0:
                    print("connection closed, stopping")
                    break
                else:
                    print(f"read data: {read_data!r}")




相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签