English 中文(简体)
原标题:Yet another producer/consumer problem in Twisted Python

I am building a server which stores key/value data on top of Redis using Twisted Python. The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.

class Datastore(Resource):

isLeaf = True

def __init__(self):
    self.clientCreator = protocol.ClientCreator(reactor, Redis)
    d = self.clientCreator.connectTCP(...)
    self.redis = None
    self.buffer = deque()

def render_POST(self, request):
        task_id = request.requestHeaders.getRawHeaders( x-task-id )[0]
    except IndexError:
        return  <html><body>Error reading task_id</body></html>   

    data = json.loads(request.content.read())
    self.buffer.append((task_id, data))
    reactor.callLater(0, self.write_on_redis)

def write_on_redis(self):
        task_id, dic = self.buffer.pop()
        log.msg( Buffer: %s  % len(self.buffer))
    except IndexError:
        log.msg( buffer empty )

    m = yield self.redis.sismember( DONE , task_id)
    # Simple check
    if m ==  1 :
        log.msg( %s already stored  % task_id)
        log.msg( %s unpacking  % task_id)
        s = yield self.redis.sadd( DONE , task_id)

        d = defer.Deferred()
        for k, v in dic.iteritems():
            k = k.encode()
            d.addCallback(self.redis.push, k, v)


Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm. I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case. Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.


Twisted、IProducerIConsumer的生产者和消费者预报系统均涉及流动控制。 你们似乎没有任何流动控制,你只是把一个议定书的信息传递给另一个议定书。

由于没有流动控制,缓冲区就更加复杂。 仅通过将数据直接通过<代码>write_on_redis方法,你就可以删除。 http://code>write_on_redis 没有必要处理空洞的缓冲案件,你不需要额外的资源,你甚至可以删除<代码>的。 (即使你保持缓冲,你也可以这样做)。

不过,我不知道这是否回答了你的问题。 就这一方法是否“良好”而言,我刚才通过阅读该守则注意到的情况是:

  • If data arrives faster than redis accepts it, your list of outstanding jobs may become arbitrarily large, causing you to run out of memory. This is what flow control would help with.
  • With no error handling around the sismember call or the sadd call, you may lose tasks if either of these fail, since you ve already popped them from the work buffer.
  • Doing a push as a callback on that Deferred d also means that any failed push will prevent the rest of the data from being pushed. It also passes the result of the Deferred returned by push (I m assuming it returns a Deferred) as the first argument to the next call, so unless push more or less ignores its first argument, you won t be pushing the right data to redis.

如果你想实施流动控制,那么您的吉大港山区服务器需要检查<条码>的长度。 您仍在使用<代码>IConsumer和_IProducer,但也有类似之处。



Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...
