English 中文(简体)
Python:如何在多线程中发送数据包,然后线程自行终止
原标题:
  • 时间:2009-03-03 03:37:31
  •  标签:

我有一个问题。我想使用Python在一定的时间内(比如1分钟),向某个主机发送连续的字节流。

这是我目前的代码:

#! /usr/bin/env python                                                          

import socket
import thread
import time

IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing

def send_data():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect((IP, PORT))
  count = 1
  starttime = time.clock()
  while elapsed < test_time:
    sent = s.send(DATA + str(count) + "
")
    if sent == 0: break # assume that if nothing is sent -> connection died
    count = count+1
    elapsed = time.clock() - starttime
    if killed:
      break
  s.close()
  print str(count) + " has been sent"

print "to quit type quit"
thread.start_new_thread(send_data, ())

while True:
  var = raw_input("Enter something: ")
  if var == "quit":
    killed = True

Few question, is there a better way to let a thread die after 60 seconds other than polling the time.clock every time? When I run this program, it sends the bytes correctly but when I typed quit the other thread won t die, even though I set the var killed = True. I wonder why is that? the scope of var Killed should reach the other thread right?

谢谢 (Xièxiè)

问题回答

我推荐使用线程模块。更好的方法是使用InterruptableThread来终止线程。您不必使用标志来终止线程,但是如果您从父线程调用terminate(),则会出现异常。您可以处理或不处理异常。

import threading, ctypes

class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def raise_exc(self, excobj):
    assert self.isAlive(), "thread must be started"
    for tid, tobj in threading._active.items():
        if tobj is self:
            self._async_raise(tid, excobj)
            return

def terminate(self):
    self.raise_exc(SystemExit)

EDIT: You can rewrite your code like this using another thread that is waiting 1 minute and then killing your other thread

def send_data:
    IP = ...
    # other vars

    ...
    s = socket.socket(.....)

    # no killed checking
    # no time checking
    # just do your work here
    ...
    s.close()


my_thread = InterruptableThread(target=send_data)
my_thread.start()

def one_minute_kill(who):
   time.sleep(60)
   who.terminate()

killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()

print "to quit type quit"
while my_thread.isAlive():
  if raw_input("Enter something: ") == "quit":
    my_thread.terminate()

我不知道如何使用“thread”模块来实现这个,但我可以使用“threading”模块来做到。我认为这段代码可以达到你想要的效果。

For documentation on the threading module: http://docs.python.org/library/threading.html

#!/usr/bin/python

import time
from threading import Thread
import threading
import sys

test_time = 10
killed = False

class SillyThread( threading.Thread ):
    def run(self):
        global killed
        starttime = time.time()
        counter = 0
        while (time.time() - starttime) < test_time:
            if killed:
                break
            counter = counter + 1
            time.sleep(0.1)
        print "I did %d loops" % counter

class ManageThread( threading.Thread ):
    def run(self):
        global killed
        while True:
            var = raw_input("Enter something: ")
            if var == "quit":
                killed = True
                break
        print "Got var [%s]" % var

silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)

请注意我使用的是time.time() 而不是time.clock()。time.clock()在Unix系统上提供了经过的处理器时间(请参见http:// docs.python.org/library/time.html)。我认为time.clock()应该在任何地方都可以使用。我把我的test_time设定为10秒,因为我没有等待一分钟的耐心。

如果我让它完整运行10秒钟,会发生以下情况:

leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye

如果我输入 quit,就会发生以下事情:

leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye

希望这能帮到你。

如上所述,使用threading模块更容易使用,并提供几个同步原语。它还提供了一个计时器类,可以在指定的时间后运行。

如果您只想让程序退出,您可以简单地把发送线程设置为守护线程。在调用 start() 之前,您可以调用 setDaemon(True) 来实现这一点(2.6 可能会使用一个守护属性)。只要有一个非守护线程正在运行,Python 就不会退出。

你可以很容易地完成这件事情而无需使用线程。例如,使用Twisted,你只需要设置一个定时调用和一个生产者:

from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor

class Noisy(Protocol):
    def __init__(self, delay, data):
        self.delay = delay
        self.data = data

    def stop(self):
        self.transport.unregisterProducer()
        self.transport.loseConnection()
        reactor.stop()

    def resumeProducing(self):
        self.transport.write(self.data)

    def connectionMade(self):
        self.transport.registerProducer(self, False)
        reactor.callLater(self.delay, self.stop)

factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()

这种方法相对于线程方法具有各种优点。它不依赖于守护线程,因此你可以实际清理网络连接(例如,发送关闭消息,如果需要的话),而不是依靠平台去销毁它。它为你处理所有实际的底层网络代码(你的原始例子在socket.send返回0的情况下做的是错误的,而这段代码将正确处理这种情况)。你也不必依靠ctypes或者晦涩难懂的CPython API来在另一个线程中触发异常(因此它可以在更多版本的Python中使用,并且可以立即中断阻塞的发送,而不像其他建议的方法那样)。

确保“退出”功能正常工作,并添加一个小的打印来测试输入是否工作。

if var == "quit":
 print "Hey we got quit"

变量elapsed未被初始化。在while循环之前将其设置为零。

测试 killed 的范围很容易。

>>> import thread
>>> killed = False
>>> import time
>>> def test():
...  while True:
...   time.sleep(1)
...   if killed:
...     print  Dead. 
...     break
... 
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.




相关问题
热门标签