Skip to content

salina.agents.asynchronous

salina.agents.asynchronous.AsynchronousAgent

Implements an agent that is executed aynchronously in another process, and that returns its own workspace

Usage is: * agent(workspace) * while agent.is_running(): * ..... * workspace=agent.get_workspace()

__call__(self, **kwargs) special

Executes the agent in non-blocking mode. A new workspace is created by the agent.

Source code in salina/agents/asynchronous.py
def __call__(self,**kwargs):
    """ Executes the agent in non-blocking mode. A new workspace is created by the agent.
    """
    assert not self._is_running
    if self.process is None:
        self.o_queue = mp.Queue()
        self.o_queue.cancel_join_thread()
        self.i_queue = mp.Queue()
        self.i_queue.cancel_join_thread()
        self.process = mp.Process(
            target=f, args=(self.agent, self.i_queue,self.o_queue)
        )
        self.process.daemon = False
        self.process.start()
    self._is_running=True
    self.i_queue.put(kwargs)

__init__(self, agent) special

Create the AsynchronousAgent

Parameters:

Name Type Description Default
agent [salina.Agent]

The agent to execute in another process

required
Source code in salina/agents/asynchronous.py
def __init__(self,agent):
    """ Create the AsynchronousAgent

    Args:
        agent ([salina.Agent]): The agent to execute in another process
    """
    self._is_running=False
    self.process=None
    self._workspace=None
    self.agent=agent

close(self)

Close the agent and kills the corresponding process

Source code in salina/agents/asynchronous.py
def close(self):
    """ Close the agent and kills the corresponding process
    """
    if self.process is None:
        return

    print("[AsynchronousAgent] closing process")
    self.i_queue.put("exit")
    self.o_queue.get()
    self.process.terminate()
    self.process.join()
    self.i_queue.close()
    self.o_queue.close()
    del self.i_queue
    del self.o_queue
    self.process = None

get_workspace(self)

Returns the built workspace is the agent has stopped its execution

Returns:

Type Description
[salina.Workspace]

The built workspace

Source code in salina/agents/asynchronous.py
def get_workspace(self):
    """ Returns the built workspace is the agent has stopped its execution

    Returns:
        [salina.Workspace]: The built workspace
    """
    if self.is_running():
        return None
    return self._workspace

is_running(self)

Is the agent still running ?

Returns:

Type Description
[bool]

True is the agent is running

Source code in salina/agents/asynchronous.py
def is_running(self):
    """ Is the agent still running ?

    Returns:
        [bool]: True is the agent is running
    """
    if self._is_running:
        try:
            r = self.o_queue.get(False)
            assert r == "ok"
            self._is_running = False
            r = self.o_queue.get()
            workspace=Workspace()
            while(r!="ok"):
                key,val=r
                workspace.set_full(key,val)
                r = self.o_queue.get()
            self._workspace=workspace.to("cpu")
        except:
            pass
    return self._is_running