salina.agents.asynchronous
salina.agents.asynchronous.AsynchronousAgent
Implements an agent that is executed aynchronously in another process, and that returns its own workspace
Usage is: * agent(workspace) * while agent.is_running(): * ..... * workspace=agent.get_workspace()
__call__(self, **kwargs)
special
Executes the agent in non-blocking mode. A new workspace is created by the agent.
Source code in salina/agents/asynchronous.py
def __call__(self,**kwargs):
""" Executes the agent in non-blocking mode. A new workspace is created by the agent.
"""
assert not self._is_running
if self.process is None:
self.o_queue = mp.Queue()
self.o_queue.cancel_join_thread()
self.i_queue = mp.Queue()
self.i_queue.cancel_join_thread()
self.process = mp.Process(
target=f, args=(self.agent, self.i_queue,self.o_queue)
)
self.process.daemon = False
self.process.start()
self._is_running=True
self.i_queue.put(kwargs)
__init__(self, agent)
special
Create the AsynchronousAgent
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent |
[salina.Agent] |
The agent to execute in another process |
required |
Source code in salina/agents/asynchronous.py
def __init__(self,agent):
""" Create the AsynchronousAgent
Args:
agent ([salina.Agent]): The agent to execute in another process
"""
self._is_running=False
self.process=None
self._workspace=None
self.agent=agent
close(self)
Close the agent and kills the corresponding process
Source code in salina/agents/asynchronous.py
def close(self):
""" Close the agent and kills the corresponding process
"""
if self.process is None:
return
print("[AsynchronousAgent] closing process")
self.i_queue.put("exit")
self.o_queue.get()
self.process.terminate()
self.process.join()
self.i_queue.close()
self.o_queue.close()
del self.i_queue
del self.o_queue
self.process = None
get_workspace(self)
Returns the built workspace is the agent has stopped its execution
Returns:
| Type | Description |
|---|---|
[salina.Workspace] |
The built workspace |
Source code in salina/agents/asynchronous.py
def get_workspace(self):
""" Returns the built workspace is the agent has stopped its execution
Returns:
[salina.Workspace]: The built workspace
"""
if self.is_running():
return None
return self._workspace
is_running(self)
Is the agent still running ?
Returns:
| Type | Description |
|---|---|
[bool] |
True is the agent is running |
Source code in salina/agents/asynchronous.py
def is_running(self):
""" Is the agent still running ?
Returns:
[bool]: True is the agent is running
"""
if self._is_running:
try:
r = self.o_queue.get(False)
assert r == "ok"
self._is_running = False
r = self.o_queue.get()
workspace=Workspace()
while(r!="ok"):
key,val=r
workspace.set_full(key,val)
r = self.o_queue.get()
self._workspace=workspace.to("cpu")
except:
pass
return self._is_running