Limitando o comprimento da fila com o PyZMQ

Eu quero limitar a quantidade de memory consumida pelas minhas filas de mensagens ZeroMQ em um aplicativo Python. Eu sei que definir a marca d’água máxima limitará a quantidade que será enfileirada no lado do remetente, mas existe uma maneira de controlar quanto será enfileirada no lado do receptor? A binding Python ZeroMQ parece ter definido como ilimitado.

Meu cenário de teste: Eu tenho dois terminais python que estou usando para teste. Um é o receptor:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context = zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.bind("tcp://127.0.0.1:12345") 

O outro é o remetente:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PUSH) >>> socket.setsockopt(zmq.SNDBUF, 2048) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.connect("tcp://127.0.0.1:12345") >>> num = 0 >>> while True: ... print num ... socket.send(str(num)) ... num = num + 1 ... 

Eu corro o socket.recv() no lado do receptor algumas vezes para ter certeza de que a fila funciona, mas além disso, deixe os dois terminais ficarem lá. O loop de envio parece nunca ser bloqueado e o prompt de recebimento parece ter uma pegada de memory crescente.

Em contradição com a documentação do ZeroMQ, a marca d’água alta precisa ser definida no lado PUSH e no lado PULL . Depois que eu mudei o PULL , funcionou melhor. O novo código PULL é:

 Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) [GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import zmq >>> context=zmq.Context() >>> socket = context.socket(zmq.PULL) >>> socket.setsockopt(zmq.RCVBUF, 256) >>> socket.setsockopt(zmq.HWM, 1) >>> socket.bind("tcp://127.0.0.1:12345") 

Na verdade, a documentação diz isso:

“Quando um soquete ZMQ_PUSH entra em um estado excepcional devido a ter atingido a marca d’água alta para todos os nós de recebimento de dados, ou se não houver nenhum nó downstream, qualquer operação zmq_send (3) no soquete deverá ser bloqueada até que o estado excepcional termine ou pelo menos um nó downstream fica disponível para envio; as mensagens não são descartadas. ”

http://api.zeromq.org/2-1:zmq-socket

Que afirma abertamente que você pode (e deve) definir o limite máximo para nós descendentes (também conhecido como pull), e talvez implique que configurá-lo no lado do push não terá efeito (embora eu suspeite que isso não seja verdade, porque ainda há caso em que os nós de recebimento de dados estão disponíveis, mas as mensagens estão chegando mais rapidamente do que podem ser enviadas.)

Com as opções zmq.RCVBUF e zmq.RCVBUF , você pode definir um limite para o tamanho do buffer .


Além disso, estou usando a opção zmq.CONFLATE no lado do receptor para limitar o tamanho da fila de ZeroMQ a um:

Aqui está um exemplo com o ZMQ PUSH/PULL :

Remetente ( zmq.PUSH ):

 def create_pub_socket(ip, port): try: context = zmq.Context() socket = context.socket(zmq.PUSH) socket.setsockopt(zmq.SNDHWM, 1) zmq_address = "tcp://{}:{}".format(ip, port) socket.connect(zmq_address) return socket except zmq.ZMQError as exp: print(exp) return False sock = create_push_socket('127.0.0.1', 5558) if sock: sock.send_json({'a': 1}) 

Getter ( zmq.PULL ):

 def listen(self): sock = None try: context = zmq.Context() sock = context.socket(zmq.PULL) sock.setsockopt(zmq.RCVHWM, 1) sock.setsockopt(zmq.CONFLATE, 1) # last msg only. sock.bind("tcp://*:5558") except zmq.ZMQError: logger.captureException() configs = None while configs is None: if sock: configs = sock.recv_json() time.sleep(1e-1) else: time.sleep(5) listen() # Recursive. listen()