subprocess - Issue with sharing data between Python processes with multiprocessing -


i've seen several posts this, know straightforward do, seem coming short. i'm not sure if need create worker pool, or use queue class. basically, want able create several processes each act autonomously (which why inherit agent superclass).

at random ticks of main loop want update each agent. i'm using time.sleep different values in main loop , agent's run loop simulate different processor speeds.

here agent superclass:

# generic class handle mpc of each agent class agent(mpc.process):   # initialize agent parameters   def __init__(self,):     # init mpc     mpc.process.__init__(self)     self.exit = mpc.event()    # agent's main loop...generally should overridden   def run(self):     while not self.exit.is_set():       pass     print "you exited!"    # safely shutdown agent   def shutdown(self):     print "shutdown initiated"     self.exit.set()    # safely communicate values agent   def communicate(self,value):     print value 

a specific agent's subclass (simulating hvac system):

class hvac(agent):   def __init__(self, dt=70, dh=50.0):     super(agent, self).__init__()     self.exit = mpc.event()      self.__pref_heating     = true     self.__pref_cooling     = true     self.__desired_temperature = dt     self.__desired_humidity    = dh      self.__meas_temperature = 0     self.__meas_humidity    = 0.0     self.__hvac_status      = "" # heating, cooling, off      self.start()    def run(self): # handle ac or heater on      while not self.exit.is_set():       ctemp = self.measuretemp()       chum  = self.measurehumidity()        if (ctemp < self.__desired_temperature):         self.__hvac_status = 'heating'         self.__meas_temperature += 1        elif (ctemp > self.__desired_temperature):         self.__hvac_status = 'cooling'         self.__meas_temperature += 1        else:         self.__hvac_status = 'off'       print self.__hvac_status, self.__meas_temperature         time.sleep(0.5)       print "hvac exited"    def measuretemp(self):     return self.__meas_temperature   def measurehumidity(self):     return self.__meas_humidity    def communicate(self,updates):     self.__meas_temperature = updates['temp']     self.__meas_humidity    = updates['humidity']     print "measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity) 

and main loop:

if __name__ == "__main__":   print "initializing subsystems"   agents = {}   agents['hvac'] = hvac()    # run simulation   timestep = 0   while timestep < args.timesteps:     print "timestep %d" % timestep      if timestep % 10 == 0:       curr_temp = random.randrange(68,72)       curr_humidity = random.uniform(40.0,60.0)       agents['hvac'].communicate({'temp':curr_temp, 'humidity':curr_humidity})      time.sleep(1)     timestep += 1    agents['hvac'].shutdown()   print "hvac process state: %d" % agents['hvac'].is_alive() 

so issue that, whenever run agents['hvac'].communicate(x) within main loop, can see value being passed hvac subclass in run loop (so prints received value correctly). however, value never stored.

so typical output looks this:

initializing subsystems timestep 0 measured [68] [56.948675] heating 1 heating 2 timestep 1 heating 3 heating 4 timestep 2 heating 5 heating 6 

when in reality, measured [68] appears, internal stored value should updated output 68 (not heating 1, heating 2, etc.). effectively, hvac's self.__meas_temperature not being updated.


edit: after bit of research, realized didn't understand happening behind scenes. each subprocess operates own virtual chunk of memory , abstracted away data being shared way, passing value in isn't going work. new issue i'm not sure how share global value multiple processes.

i looking @ queue or joinablequeue packages, i'm not sure how pass queue type of superclass setup have (especially mpc.process.__init__(self) call).

a side concern if can have multiple agents reading values out of queue without pulling out of queue? instance, if wanted share temperature value multiple agents, queue work this?

pipe v queue

here's suggested solution assuming want following:

  • a centralized manager / main process controls lifetimes of workers
  • worker processes self-contained , report results manager , other processes

before show though, record want in general unless cpu bound multiprocessing not right fit, because of added complexity, , you'd better of using different high-level asynchronous framework. also, should use python 3, it's better!

that said, multiprocessing.manager, makes pretty easy using multiprocessing. i've done in python 3 don't think shouldn't "just work" in python 2, haven't checked.

from ctypes import c_bool multiprocessing import manager, process, array, value pprint import pprint time import sleep, time   class agent(process):      def __init__(self, name, shared_dictionary, delay=0.5):         """my take on agent.          key difference i've commonized run-loop , used         shared value signal when stop, demonstrate it.         """         super(agent, self).__init__()         self.name = name          # going how communicate between processes.         self.shared_dictionary = shared_dictionary          # create silo use.         shared_dictionary[name] = []         self.should_stop = value(c_bool, false)          # testing purposes, , simulating          # slower agents.         self.delay = delay      def get_next_results(self):         # in real world i'd use abc.abcmeta metaclass          # properly.         raise runtimeerror('subclasses must implement this')      def run(self):         ii = 0         while not self.should_stop.value:             ii += 1             # debugging / monitoring             print('%s %s run loop execution %d' % (                 type(self).__name__, self.name, ii))              next_results = self.get_next_results()              # add results, along timestamp.             self.shared_dictionary[self.name] += [(time(), next_results)]             sleep(self.delay)      def stop(self):         self.should_stop.value = true         print('%s %s stopped' % (type(self).__name__, self.name))   class hvacagent(agent):     def get_next_results(self):         # work, sake of         # example return constant dictionary.         return {'temperature': 5, 'pressure': 7, 'humidity': 9}   class dumbreadingagent(agent):     """a dumb agent demonstrate workers reading other worker values."""      def get_next_results(self):         # hvac 1 results:         hvac1_results = self.shared_dictionary.get('hvac 1')         if hvac1_results none:             return none          return hvac1_results[-1][1]['temperature']  # script starts. results = {}  # "with" ensures terminate manager @ end. manager() manager:      # manager subprocess in own right. can ask     # manage dictionary (or other python types)     # shared among other children.     shared_info = manager.dict()      hvac_agent1 = hvacagent('hvac 1', shared_info)     hvac_agent2 = hvacagent('hvac 2', shared_info, delay=0.1)     dumb_agent = dumbreadingagent('dumb hvac1 reader', shared_info)      agents = (hvac_agent1, hvac_agent2, dumb_agent)      list(map(lambda a: a.start(), agents))      sleep(1)      list(map(lambda a: a.stop(), agents))     list(map(lambda a: a.join(), agents))      # not quite sure happens shared dictionary after     # manager dies, safety make local copy.     results = dict(shared_info)  pprint(results) 

Comments

Popular posts from this blog

account - Script error login visual studio DefaultLogin_PCore.js -

xcode - CocoaPod Storyboard error: -