Skip to content

Recorder

RecorderAgent

Bases: Agent

An agent that records specific data from input streams based on configured queries, scanning JSON data. The recorded data is stored in session variables for later use by other agents or processes.

Properties (in addition to Agent properties):

Name Type Default Description
records list of dict [] List of record configurations, each containing: variable (name of the session variable to store the result), query (the jsonpath query to execute on the input data), and single (boolean indicating if a single result is expected).

Inputs: - DEFAULT: The JSON input stream to process and query records.

Outputs: None.

Source code in blue/agents/recorder.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class RecorderAgent(Agent):
    """An agent that records specific data from input streams based on configured queries, scanning JSON data.
    The recorded data is stored in session variables for later use by other agents or processes.

    Properties (in addition to Agent properties):
    ----------
    | Name           | Type                 | Default | Description |
    |----------------|--------------------|----------|---------|
    | `records`       | `list of dict`        | `[]`       | List of record configurations, each containing: `variable` (name of the session variable to store the result), `query` (the jsonpath query to execute on the input data), and `single` (boolean indicating if a single result is expected). |

    Inputs:
    - `DEFAULT`: The JSON input stream to process and query records.

    Outputs:
    None.

    """

    def __init__(self, **kwargs):
        if 'name' not in kwargs:
            kwargs['name'] = "RECORDER"
        super().__init__(**kwargs)

    def _initialize_properties(self):
        """Initialize default properties for the recorder agent, setting it as an aggregator with specific recording configurations."""
        super()._initialize_properties()

        # recorder is an aggregator agent
        self.properties['aggregator'] = True
        self.properties['aggregator.eos'] = 'NEVER'

        # recorder config
        records = []
        self.properties['records'] = records
        records.append({"variable": "all", "query": "$", "single": True})

    ####### inputs / outputs
    def _initialize_inputs(self):
        """Initialize input parameters for the recorder agent, listening to streams tagged as JSON."""
        self.add_input("DEFAULT", description="JSON input stream to process and query records", includes=["JSON"])

    def _initialize_outputs(self):
        """Initialize outputs for the recorder agent. No outputs by default."""
        # no output
        return

    def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
        """Process messages for the recorder agent, executing configured queries on JSON input data and storing results in session variables.

        Parameters:
            message: The incoming message to process.
            input: The input stream name. Defaults to "DEFAULT".
            properties: Additional properties for processing.
            worker: The worker handling the processing.

        Returns:
            None or a list of variable names that were set in the session.
        """
        if message.isEOS():
            return None
        elif message.isBOS():
            pass
        elif message.isData():
            # store data value
            data = message.getData()

            # TODO: Record from other data types
            if message.getContentType() == ContentType.JSON:
                if 'records' in self.properties:
                    records = self.properties['records']
                    variables = []
                    for record in records:
                        variable = record['variable']
                        query = record['query']
                        single = False
                        if 'single' in record:
                            single = record['single']

                        # evaluate path on json_data
                        self.logger.info('Executing query {query}'.format(query=query))
                        result = None
                        try:
                            result = json_utils.json_query(data, query, single=single)
                        except:
                            pass

                        if result:
                            worker.set_session_data(variable, result)
                            variables.append(variable)

                    if len(variables) > 0:
                        return variables

        return None

default_processor(message, input='DEFAULT', properties=None, worker=None)

Process messages for the recorder agent, executing configured queries on JSON input data and storing results in session variables.

Parameters:

Name Type Description Default
message

The incoming message to process.

required
input

The input stream name. Defaults to "DEFAULT".

'DEFAULT'
properties

Additional properties for processing.

None
worker

The worker handling the processing.

None

Returns:

Type Description

None or a list of variable names that were set in the session.

Source code in blue/agents/recorder.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
    """Process messages for the recorder agent, executing configured queries on JSON input data and storing results in session variables.

    Parameters:
        message: The incoming message to process.
        input: The input stream name. Defaults to "DEFAULT".
        properties: Additional properties for processing.
        worker: The worker handling the processing.

    Returns:
        None or a list of variable names that were set in the session.
    """
    if message.isEOS():
        return None
    elif message.isBOS():
        pass
    elif message.isData():
        # store data value
        data = message.getData()

        # TODO: Record from other data types
        if message.getContentType() == ContentType.JSON:
            if 'records' in self.properties:
                records = self.properties['records']
                variables = []
                for record in records:
                    variable = record['variable']
                    query = record['query']
                    single = False
                    if 'single' in record:
                        single = record['single']

                    # evaluate path on json_data
                    self.logger.info('Executing query {query}'.format(query=query))
                    result = None
                    try:
                        result = json_utils.json_query(data, query, single=single)
                    except:
                        pass

                    if result:
                        worker.set_session_data(variable, result)
                        variables.append(variable)

                if len(variables) > 0:
                    return variables

    return None
Last update: 2025-10-09