Skip to content

Requestor

RequestorAgent

Bases: Agent, ServiceClient

An agent that sends requests to an external service via a WebSocket API. The agent collects input data from the input stream, sends it to the specified service URL, and writes the response to the output stream.

Properties (in addition to Agent properties):

Name Type Default Description
service.url str ws://localhost:8000/request The WebSocket URL of the external service to send requests to.
input_json str None Optional JSON string to use as the entire input payload. If specified, this overrides other input fields.
input_context str None Optional session variable name to use as context data in the input payload.
input_context_field str None The field name in the input payload where the context data should be placed. Required if input_context is specified.
input_field str input The field name in the input payload where the main input data should be placed.
output_path str output The JSON path in the response payload where the output data can be found.

Inputs: - DEFAULT: The main input stream where the agent receives data to send to the external service.

Outputs: - DEFAULT: The output stream where the responses from the external service are sent.

Source code in blue/agents/requestor.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class RequestorAgent(Agent, ServiceClient):
    """An agent that sends requests to an external service via a WebSocket API.
    The agent collects input data from the input stream, sends it to the specified service URL,
    and writes the response to the output stream.

    Properties (in addition to Agent properties):
    ----------
    | Name           | Type                 | Default | Description |
    |----------------|--------------------|----------|---------|
    | `service.url`   | `str`                | `ws://localhost:8000/request` | The WebSocket URL of the external service to send requests to. |
    | `input_json`    | `str`                | `None`    | Optional JSON string to use as the entire input payload. If specified, this overrides other input fields. |
    | `input_context` | `str`                | `None`    | Optional session variable name to use as context data in the input payload. |
    | `input_context_field` | `str`                | `None`    | The field name in the input payload where the context data should be placed. Required if `input_context` is specified. |
    | `input_field`   | `str`                | `input`   | The field name in the input payload where the main input data should be placed. |
    | `output_path`   | `str`                | `output`  | The JSON path in the response payload where the output data can be found. |

    Inputs:
    - `DEFAULT`: The main input stream where the agent receives data to send to the external service.

    Outputs:
    - `DEFAULT`: The output stream where the responses from the external service are sent.
    """

    def __init__(self, **kwargs):
        if 'name' not in kwargs:
            kwargs['name'] = "REQUESTOR"
        super().__init__(**kwargs)

    def _initialize_properties(self):
        """Initialize default properties for the requestor agent, including service URL and input/output processing configurations."""
        super()._initialize_properties()

        self.properties['service_url'] = "ws://localhost:8001"

        # input / output processing properties
        self.properties['input_json'] = None
        self.properties['input_context'] = None
        self.properties['input_context_field'] = None
        self.properties['input_field'] = 'input'
        self.properties['output_path'] = 'output'

    def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
        """Process messages for the requestor agent, sending input data to an external service via WebSocket and writing the response to the output stream.

        Parameters:
            message: The incoming message to process.
            input: The input stream name. Defaults to "DEFAULT".
            properties: Additional properties for processing.
            worker: The worker handling the processing.

        Returns:
            None or a response message.
        """
        if message.isEOS():
            # get all data received from stream
            stream_data = ""
            if worker:
                stream_data = worker.get_data('stream')

            #### call api to compute
            input_data = stream_data[0]
            self.logger.info(input_data)
            session_data = self.session.get_all_data()
            output = self.execute_api_call(input_data, properties=properties, additional_data=session_data)
            worker.write_data(output)
            worker.write_eos()

        elif message.isBOS():
            # init stream to empty array
            if worker:
                worker.set_data('stream', [])
            pass
        elif message.isData():
            # store data value
            data = message.getData()
            self.logger.info(data)

            if worker:
                worker.append_data('stream', str(data))

        return None

default_processor(message, input='DEFAULT', properties=None, worker=None)

Process messages for the requestor agent, sending input data to an external service via WebSocket and writing the response to the output stream.

Parameters:

Name Type Description Default
message

The incoming message to process.

required
input

The input stream name. Defaults to "DEFAULT".

'DEFAULT'
properties

Additional properties for processing.

None
worker

The worker handling the processing.

None

Returns:

Type Description

None or a response message.

Source code in blue/agents/requestor.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
    """Process messages for the requestor agent, sending input data to an external service via WebSocket and writing the response to the output stream.

    Parameters:
        message: The incoming message to process.
        input: The input stream name. Defaults to "DEFAULT".
        properties: Additional properties for processing.
        worker: The worker handling the processing.

    Returns:
        None or a response message.
    """
    if message.isEOS():
        # get all data received from stream
        stream_data = ""
        if worker:
            stream_data = worker.get_data('stream')

        #### call api to compute
        input_data = stream_data[0]
        self.logger.info(input_data)
        session_data = self.session.get_all_data()
        output = self.execute_api_call(input_data, properties=properties, additional_data=session_data)
        worker.write_data(output)
        worker.write_eos()

    elif message.isBOS():
        # init stream to empty array
        if worker:
            worker.set_data('stream', [])
        pass
    elif message.isData():
        # store data value
        data = message.getData()
        self.logger.info(data)

        if worker:
            worker.append_data('stream', str(data))

    return None
Last update: 2025-10-09