使用aiomultiprocess实现Python中的多进程管理


我对Python中的多进程有一个问题。我需要创建异步进程,它运行的时间不确定,进程的数量也不确定。一旦有新的请求到达,就必须使用请求中的规范创建一个新流程。我们使用ZeroMQ进行消息传递。还有一个进程,它从头开始,只有在整个脚本终止时才结束。

现在我正在寻找一个解决方案,我可以等待所有进程,同时能够添加额外的进程。

asyncio.gather()

是我的第一个想法,但它需要进程列表才能被调用。

class Object:
  def __init__(self, var):
     self.var = var

  async def run(self):
      *do async things*

class object_controller:
  
  def __init__(self):
     self.ctx = zmq.Context()
     self.socket = self.ctx.socket(zmq.PULL)
     self.socket.connect("tcp://127.0.0.1:5558")

     self.static_process = AStaticProcess()
     self.sp = aiomultiprocess.Process(target=self.static_process.run)
     self.sp.start()
     #here I need a good way to await this process


  def process(self, var):
    object = Object(var)
    process = aiomultiprocess.Process(target=object.run)
    process.start()
  
  def listener(self)
    while True:
      msg = self.socket.recv_pyobj()
      # here I need to find a way how I can start and await this process while beeing able to 
      # receive additional request, which result in additional processes which need to be awaited

这是一些代码,希望能解释我的问题。我需要一种等待进程的收集器。

初始化后,对象和控制器之间没有交互,只通过zeroMQ (静态进程和变量进程之间)进行交互。也没有退路。

转载请注明出处:http://www.56zzk.com/article/20230526/1670689.html