subprocess - Issue with sharing data between Python processes with multiprocessing -
i've seen several posts this, know straightforward do, seem coming short. i'm not sure if need create worker pool, or use queue class. basically, want able create several processes each act autonomously (which why inherit agent superclass).
at random ticks of main loop want update each agent. i'm using time.sleep
different values in main loop , agent's run loop simulate different processor speeds.
here agent superclass:
# generic class handle mpc of each agent class agent(mpc.process): # initialize agent parameters def __init__(self,): # init mpc mpc.process.__init__(self) self.exit = mpc.event() # agent's main loop...generally should overridden def run(self): while not self.exit.is_set(): pass print "you exited!" # safely shutdown agent def shutdown(self): print "shutdown initiated" self.exit.set() # safely communicate values agent def communicate(self,value): print value
a specific agent's subclass (simulating hvac system):
class hvac(agent): def __init__(self, dt=70, dh=50.0): super(agent, self).__init__() self.exit = mpc.event() self.__pref_heating = true self.__pref_cooling = true self.__desired_temperature = dt self.__desired_humidity = dh self.__meas_temperature = 0 self.__meas_humidity = 0.0 self.__hvac_status = "" # heating, cooling, off self.start() def run(self): # handle ac or heater on while not self.exit.is_set(): ctemp = self.measuretemp() chum = self.measurehumidity() if (ctemp < self.__desired_temperature): self.__hvac_status = 'heating' self.__meas_temperature += 1 elif (ctemp > self.__desired_temperature): self.__hvac_status = 'cooling' self.__meas_temperature += 1 else: self.__hvac_status = 'off' print self.__hvac_status, self.__meas_temperature time.sleep(0.5) print "hvac exited" def measuretemp(self): return self.__meas_temperature def measurehumidity(self): return self.__meas_humidity def communicate(self,updates): self.__meas_temperature = updates['temp'] self.__meas_humidity = updates['humidity'] print "measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity)
and main loop:
if __name__ == "__main__": print "initializing subsystems" agents = {} agents['hvac'] = hvac() # run simulation timestep = 0 while timestep < args.timesteps: print "timestep %d" % timestep if timestep % 10 == 0: curr_temp = random.randrange(68,72) curr_humidity = random.uniform(40.0,60.0) agents['hvac'].communicate({'temp':curr_temp, 'humidity':curr_humidity}) time.sleep(1) timestep += 1 agents['hvac'].shutdown() print "hvac process state: %d" % agents['hvac'].is_alive()
so issue that, whenever run agents['hvac'].communicate(x)
within main loop, can see value being passed hvac
subclass in run
loop (so prints received value correctly). however, value never stored.
so typical output looks this:
initializing subsystems timestep 0 measured [68] [56.948675] heating 1 heating 2 timestep 1 heating 3 heating 4 timestep 2 heating 5 heating 6
when in reality, measured [68] appears, internal stored value should updated output 68 (not heating 1, heating 2, etc.). effectively, hvac's self.__meas_temperature not being updated.
edit: after bit of research, realized didn't understand happening behind scenes. each subprocess operates own virtual chunk of memory , abstracted away data being shared way, passing value in isn't going work. new issue i'm not sure how share global value multiple processes.
i looking @ queue or joinablequeue packages, i'm not sure how pass queue type of superclass setup have (especially mpc.process.__init__(self)
call).
a side concern if can have multiple agents reading values out of queue without pulling out of queue? instance, if wanted share temperature
value multiple agents, queue work this?
here's suggested solution assuming want following:
- a centralized manager / main process controls lifetimes of workers
- worker processes self-contained , report results manager , other processes
before show though, record want in general unless cpu bound multiprocessing
not right fit, because of added complexity, , you'd better of using different high-level asynchronous framework. also, should use python 3, it's better!
that said, multiprocessing.manager
, makes pretty easy using multiprocessing
. i've done in python 3 don't think shouldn't "just work" in python 2, haven't checked.
from ctypes import c_bool multiprocessing import manager, process, array, value pprint import pprint time import sleep, time class agent(process): def __init__(self, name, shared_dictionary, delay=0.5): """my take on agent. key difference i've commonized run-loop , used shared value signal when stop, demonstrate it. """ super(agent, self).__init__() self.name = name # going how communicate between processes. self.shared_dictionary = shared_dictionary # create silo use. shared_dictionary[name] = [] self.should_stop = value(c_bool, false) # testing purposes, , simulating # slower agents. self.delay = delay def get_next_results(self): # in real world i'd use abc.abcmeta metaclass # properly. raise runtimeerror('subclasses must implement this') def run(self): ii = 0 while not self.should_stop.value: ii += 1 # debugging / monitoring print('%s %s run loop execution %d' % ( type(self).__name__, self.name, ii)) next_results = self.get_next_results() # add results, along timestamp. self.shared_dictionary[self.name] += [(time(), next_results)] sleep(self.delay) def stop(self): self.should_stop.value = true print('%s %s stopped' % (type(self).__name__, self.name)) class hvacagent(agent): def get_next_results(self): # work, sake of # example return constant dictionary. return {'temperature': 5, 'pressure': 7, 'humidity': 9} class dumbreadingagent(agent): """a dumb agent demonstrate workers reading other worker values.""" def get_next_results(self): # hvac 1 results: hvac1_results = self.shared_dictionary.get('hvac 1') if hvac1_results none: return none return hvac1_results[-1][1]['temperature'] # script starts. results = {} # "with" ensures terminate manager @ end. manager() manager: # manager subprocess in own right. can ask # manage dictionary (or other python types) # shared among other children. shared_info = manager.dict() hvac_agent1 = hvacagent('hvac 1', shared_info) hvac_agent2 = hvacagent('hvac 2', shared_info, delay=0.1) dumb_agent = dumbreadingagent('dumb hvac1 reader', shared_info) agents = (hvac_agent1, hvac_agent2, dumb_agent) list(map(lambda a: a.start(), agents)) sleep(1) list(map(lambda a: a.stop(), agents)) list(map(lambda a: a.join(), agents)) # not quite sure happens shared dictionary after # manager dies, safety make local copy. results = dict(shared_info) pprint(results)
Comments
Post a Comment