Skip to content

Session

Bases: Entity

Session to provide context for managing agents and streams. Session data is shared among all agents in the session.

Source code in blue/session.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
class Session(Entity):
    """
    Session to provide context for managing agents and streams.
    Session data is shared among all agents in the session.
    """

    def __init__(self, id=None, sid=None, cid=None, prefix=None, suffix=None, properties=None):
        super().__init__(name="SESSION", id=id, sid=sid, cid=cid, prefix=prefix, suffix=suffix)

        self.connection = None

        # session stream
        self.producer = None

        self.agents = {}

        self._initialize(properties=properties)

        self._start()

    ###### INITIALIZATION
    def _initialize(self, properties=None):
        """Initialize session properties and logger.

        Parameters:
            properties: Dictionary of properties to configure the session. Defaults to None.
        """
        self._initialize_properties()
        self._update_properties(properties=properties)

        self._initialize_logger()

    def _initialize_properties(self):
        """Initialize default session properties."""
        self.properties = {}

        # db connectivity
        self.properties['db.host'] = 'localhost'
        self.properties['db.port'] = 6379

    def _update_properties(self, properties=None):
        """Update session properties with provided values.

        Parameters:
            properties: Dictionary of properties to update. Defaults to None.
        """
        if properties is None:
            return

        # override
        for p in properties:
            self.properties[p] = properties[p]

    def get_stream(self):
        """Get the session's stream identifier.

        Returns:
            The session's stream identifier.
        """
        return self.producer.get_stream()

    def _initialize_logger(self):
        """Initialize the session logger."""
        self.logger = log_utils.CustomLogger()
        # customize log
        self.logger.set_config_data(
            "stack",
            "%(call_stack)s",
        )
        self.logger.set_config_data("session", self.sid, -1)

    ###### AGENTS, NOTIFICATION
    def add_agent(self, agent):
        """
        Add an agent to the session and initialize its data namespace.
        Announces agent addition via control message to the session stream.

        Parameters:
            agent: Agent object to be added to the session.
        """
        self._init_agent_data_namespace(agent)
        self.agents[agent.name] = agent

        # add join message
        args = {}
        args["agent"] = agent.name
        args["session"] = self.cid
        args["sid"] = agent.sid
        args["cid"] = agent.cid

        self.producer.write_control(ControlCode.ADD_AGENT, args)

    def remove_agent(self, agent):
        """
        Remove an agent from the session and announce its removal via control message to the session stream.

        Parameters:
            agent: Agent object to be removed from the session.
        """
        ### TODO: Purge agent memory, probably not..

        if agent.name in self.agents:
            del self.agents[agent.name]

        # add leave message
        args = {}
        args["agent"] = agent.name
        args["session"] = self.cid
        args["sid"] = agent.sid
        args["cid"] = agent.cid

        self.producer.write_control(ControlCode.REMOVE_AGENT, args)

    def list_agents(self):
        """
        List all agents currently in the session.

        Returns:
            List of agents in the session.
        """
        ## read stream in producer, scan join/leave events
        agents = {}

        m = self.producer.read_all()
        for message in m:
            if message.getCode() == ControlCode.ADD_AGENT:
                name = message.getArg('agent')
                sid = message.getArg('sid')
                cid = message.getArg('cid')
                agents[sid] = {"name": name, "sid": sid, "cid": cid}
            elif message.getCode() == ControlCode.REMOVE_AGENT:
                sid = message.getArg('sid')
                if sid in agents:
                    del agents[sid]

        return list(agents.values())

    def notify(self, agent, output_stream, tags):
        """
        Notify the session about a new output stream created by an agent.
        Updates stream metadata and announces the new stream via control message to the session stream.

        Parameters:
            agent: Agent object that created the output stream.
            output_stream: Identifier of the output stream.
            tags: List of tags associated with the output stream.
        """
        self._update_stream_metadata(output_stream, agent, tags)

        # add to stream to notify others, unless it exists
        args = {}
        args["session"] = self.cid
        args["agent"] = agent.cid
        args["stream"] = output_stream
        args["tags"] = tags
        self.producer.write_control(ControlCode.ADD_STREAM, args)

    ###### DATA/METADATA RELATED
    def __get_json_value(self, value):
        if value is None:
            return None
        if type(value) is list:
            if len(value) == 0:
                return None
            else:
                return value[0]
        else:
            return value

    ## session metadata
    def _init_metadata_namespace(self):
        """Initialize the metadata namespace for the session."""
        # create namespaces for any session common data, and stream-specific data
        self.connection.json().set(
            self._get_metadata_namespace(),
            "$",
            {"members": {}, 'pinned': {}, 'debugger': {}},
            nx=True,
        )

        # add created_date
        self.set_metadata("created_date", int(time.time()), nx=True)

        # init budget
        self._init_budget()

    def _get_metadata_namespace(self):
        """Get the metadata namespace for the session."""
        return self.cid + ":METADATA"

    def set_metadata(self, key, value, nx=False):
        """
        Set metadata for the session.

        Parameters:
            key: Metadata key.
            value: Metadata value.
            nx: If True, set the value only if the key does not already exist. Defaults to False.
        """
        self.connection.json().set(self._get_metadata_namespace(), "$." + key, value, nx=nx)

    def get_metadata(self, key=""):
        """
        Get metadata for the session.

        Parameters:
            key: Metadata key. If empty, returns all metadata. Defaults to "".

        Returns:
            Metadata value or all metadata if key is empty.
        """
        value = self.connection.json().get(
            self._get_metadata_namespace(),
            Path("$" + ("" if pydash.is_empty(key) else ".") + key),
        )
        return self.__get_json_value(value)

    ## budget
    def _init_budget(self):
        """Initialize budget metadata for the session, such as cost, accuracy, and latency for allocation and usage tracking."""
        self.set_metadata('budget', {}, nx=True)
        self.set_metadata('budget.allocation', {}, nx=True)
        self.set_metadata('budget.use', {}, nx=True)
        self.set_budget_allocation(cost=-1, accuracy=-1, latency=-1, nx=True)

    def get_budget(self):
        """Get the overall budget metadata for the session.

        Returns:
            (dict): Dictionary containing overall budget metadata.
        """
        return self.get_metadata('budget')

    def set_budget_allocation(self, cost=None, accuracy=None, latency=None, nx=False):
        """
        Set budget allocation metadata for the session.

        Parameters:
            cost: Cost allocation value.
            accuracy: Accuracy allocation value.
            latency: Latency allocation value.
            nx (bool): If True, set the value only if the key does not already exist. Defaults to False.
        """
        if cost is not None:
            self.set_metadata('budget.allocation.cost', cost, nx)
        if accuracy is not None:
            self.set_metadata('budget.allocation.accuracy', accuracy, nx)
        if latency is not None:
            self.set_metadata('budget.allocation.latency', latency, nx)

    def get_budget_allocation(self):
        """Get the budget allocation metadata for the session.

        Returns:
            Dictionary containing budget allocation metadata.
        """
        return self.get_metadata(key='budget.allocation')

    def _set_budget_use(self, cost=None, accuracy=None, latency=None):
        """
        Set budget usage metadata for the session.

        Parameters:
            cost: Cost usage value.
            accuracy: Accuracy usage value.
            latency: Latency usage value.
        """
        if cost:
            self.set_metadata('budget.use.cost', cost)
        if accuracy:
            self.set_metadata('budget.use.accuracy', accuracy)
        if latency:
            self.set_metadata('budget.use.latency', latency)

    def update_budget_use(self, cost=None, accuracy=None, latency=None):
        """
        !!! warning "Not Implemented"

        Update budget usage metadata for the session by incrementing existing values.

        Parameters:
            cost: Cost usage value to increment.
            accuracy: Accuracy usage value to increment.
            latency: Latency usage value to increment.
        """
        # TODO
        pass

    def get_budget_use(self):
        """Get the budget usage metadata for the session.

        Returns:
            (dict): Dictionary containing budget usage metadata.
        """
        return self.get_metadata(key='budget.use')

    ## session data (shared by all agents)
    def _init_data_namespace(self):
        """Initialize session data namespace."""
        # create namespaces for any session common data, and stream-specific data
        self.connection.json().set(
            self._get_data_namespace(),
            "$",
            {},
            nx=True,
        )

    def _get_data_namespace(self):
        """Get the data namespace for the session.

        Returns:
            The data namespace for the session.
        """

        return self.cid + ":DATA"

    def set_data(self, key, value):
        """Set session data for a specific key.

        Parameters:
            key: Data key.
            value (Any): Data value.
        """
        self.connection.json().set(self._get_data_namespace(), "$." + key, value)

    def delete_data(self, key):
        """Delete session data for a specific key.

        Parameters:
            key (str): Data key to delete.
        """
        self.connection.json().delete(self._get_data_namespace(), "$." + key)

    def get_data(self, key):
        """Get session data for a specific key.

        Parameters:
            key (str): Data key.

        Returns:
            Data value for the specified key.
        """
        value = self.connection.json().get(self._get_data_namespace(), Path("$." + key))
        return self.__get_json_value(value)

    def get_all_data(self):
        """Get all session data.

        Returns:
            (dict): Dictionary containing all session data.
        """
        value = self.connection.json().get(self._get_data_namespace(), Path("$"))
        return self.__get_json_value(value)

    def append_data(self, key, value):
        """Append a value to a list in session data for a specific key.

        Parameters:
            key: Data key.
            value: Value to append to the list.
        """
        self.connection.json().arrappend(self._get_data_namespace(), "$." + key, value)

    def get_data_len(self, key):
        """Get the length of a list in session data for a specific key.

        Parameters:
            key: Data key.

        Returns:
            (int): Length of the list for the specified key.
        """
        return self.connection.json().arrlen(self._get_data_namespace(), "$." + key)

    ## session agent data (shared by all workers of an agent)
    def _get_agent_data_namespace(self, agent):
        """Get the data namespace for a specific agent in the session.

        Parameters:
            agent: Agent object.

        Returns:
            The data namespace for the specified agent.
        """
        return agent.cid + ":DATA"

    def _init_agent_data_namespace(self, agent):
        """Initialize data namespace for a specific agent in the session.

        Parameters:
            agent: Agent object.
        """
        # create namespaces for stream-specific data
        return self.connection.json().set(
            self._get_agent_data_namespace(agent),
            "$",
            {},
            nx=True,
        )

    def set_agent_data(self, agent, key, value):
        """Set data for a specific key in an agent's data namespace.

        Parameters:
            agent: Agent object.
            key: Data key.
            value: Data value.
        """
        self.connection.json().set(
            self._get_agent_data_namespace(agent),
            "$." + key,
            value,
        )

    def get_agent_data(self, agent, key):
        """Get data for a specific key in an agent's data namespace.

        Parameters:
            agent: Agent object.
            key: Data key.

        Returns:
            (Any): Data value for the specified key.
        """
        value = self.connection.json().get(
            self._get_agent_data_namespace(agent),
            Path("$." + key),
        )
        return self.__get_json_value(value)

    def get_all_agent_data(self, agent):
        """Get all data in an agent's data namespace.

        Parameters:
            agent: Agent object.

        Returns:
            (dict): Dictionary containing all data for the specified agent.
        """
        value = self.connection.json().get(
            self._get_agent_data_namespace(agent),
            Path("$"),
        )
        return self.__get_json_value(value)

    def append_agent_data(self, agent, key, value):
        """Append a value to a list in an agent's data namespace for a specific key.

        Parameters:
            agent: Agent object.
            key: Data key.
            value: Value to append to the list.
        """
        self.connection.json().arrappend(
            self._get_agent_data_namespace(agent),
            "$." + key,
            value,
        )

    def get_agent_data_len(self, agent, key):
        """Get the length of a list in an agent's data namespace for a specific key.

        Parameters:
            agent: Agent object.
            key: Data key.

        Returns:
            Length of the list for the specified key.
        """
        return self.connection.json().arrlen(
            self._get_agent_data_namespace(agent),
            Path("$." + key),
        )

    def _get_stream_metadata_namespace(self, stream):
        """
        Get the metadata namespace for a specific stream in the session.
        """
        return stream + ":METADATA"

    def _update_stream_metadata(self, stream, agent, tags):
        """
        Update metadata for a specific stream in the session with agent and tags information.

        Parameters:
            stream: Stream identifier.
            agent: Agent object that created the stream.
            tags: List of tags associated with the stream.
        """
        metadata_tags = {}
        for tag in tags:
            metadata_tags.update({tag: True})

        self.connection.json().set(self._get_stream_metadata_namespace(stream), "$." + 'created_by', agent.name)
        self.connection.json().set(self._get_stream_metadata_namespace(stream), "$." + 'id', agent.id)
        self.connection.json().set(self._get_stream_metadata_namespace(stream), "$." + 'tags', metadata_tags)

    ## session stream data
    def _get_stream_data_namespace(self, stream):
        """Get the data namespace for a specific stream in the session.

        Parameters:
            stream: Stream identifier.

        Returns:
            The data namespace for the specified stream.
        """
        return stream + ":DATA"

    def set_stream_data(self, stream, key, value):
        """Set data for a specific key in a stream's data namespace.

        Parameters:
            stream: Stream identifier.
            key: Data key.
            value: Data value to set.
        """
        self.connection.json().set(
            self._get_stream_data_namespace(stream),
            "$." + key,
            value,
        )

    def get_stream_data(self, stream, key):
        """Get data for a specific key in a stream's data namespace.

        Parameters:
            stream: Stream identifier.
            key: Data key.

        Returns:
            Data value for the specified key.
        """
        value = self.connection.json().get(
            self._get_stream_data_namespace(stream),
            Path("$." + key),
        )
        return self.__get_json_value(value)

    def get_all_stream_data(self, stream):
        """Get all data in a stream's data namespace.

        Parameters:
            stream: Stream identifier.

        Returns:
            All data in the stream's data namespace.
        """
        value = self.connection.json().get(
            self._get_stream_data_namespace(stream),
            Path("$"),
        )
        return self.__get_json_value(value)

    def append_stream_data(self, stream, key, value):
        """Append a value to a list in a stream's data namespace for a specific key.

        Parameters:
            stream: Stream identifier.
            key: Data key.
            value: Value to append to the list.
        """
        self.connection.json().arrappend(
            self._get_stream_data_namespace(stream),
            "$." + key,
            value,
        )

    def get_stream_data_len(self, stream, key):
        """Get the length of a list in a stream's data namespace for a specific key.

        Parameters:
            stream: Stream identifier.
            key: Data key.

        Returns:
            Length of the list for the specified key.
        """
        return self.connection.json().arrlen(
            self._get_stream_data_namespace(stream),
            Path("$." + key),
        )

    def to_dict(self):
        """Get a dictionary representation of the session, including its metadata.

        Returns:
            Dictionary containing session metadata and identifier.
        """
        return {**self.get_metadata(), "id": self.sid}

    def get_stream_debug_info(self):
        """Get debug information for all streams in the session.

        Returns:
            Dictionary containing debug information for all streams.
        """
        streams_metadata_ids = self.connection.keys("*" + self.sid + "*:STREAM:METADATA")
        debug_info = {}
        for streams_metadata_id in streams_metadata_ids:
            stream_id = streams_metadata_id[: -len(":METADATA")]
            stream_metadata = self.connection.json().get(streams_metadata_id)
            debug_info[stream_id] = stream_metadata

        return debug_info

    ###### OPERATIONS
    def _start(self):
        """Start the session by establishing a database connection, initializing metadata and data namespaces, and starting the session producer."""
        self._start_connection()

        # initialize session metadata
        self._init_metadata_namespace()

        # initialize session data
        self._init_data_namespace()

        # start  producer to emit session events
        self._start_producer()

    def _start_connection(self):
        """Establish a database connection using the provided properties."""
        self.connection_factory = PooledConnectionFactory(properties=self.properties)
        self.connection = self.connection_factory.get_connection()

    def _start_producer(self):
        """Start the session producer."""
        # start, if not started
        if self.producer == None:

            producer = Producer(sid="STREAM", prefix=self.cid, properties=self.properties, owner=self.sid)
            producer.start()
            self.producer = producer

    def stop(self):
        """Stop the session by stopping all agents and writing an end-of-stream message to the session stream."""
        # stop agents
        for agent_name in self.agents:
            self.agents[agent_name].stop()

        # put EOS to stream
        self.producer.write_eos()

    def wait(self):
        """Wait for all agents in the session to complete."""
        for agent_name in self.agents:
            self.agents[agent_name].wait()

        while True:
            time.sleep(1)

add_agent(agent)

Add an agent to the session and initialize its data namespace. Announces agent addition via control message to the session stream.

Parameters:

Name Type Description Default
agent

Agent object to be added to the session.

required
Source code in blue/session.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def add_agent(self, agent):
    """
    Add an agent to the session and initialize its data namespace.
    Announces agent addition via control message to the session stream.

    Parameters:
        agent: Agent object to be added to the session.
    """
    self._init_agent_data_namespace(agent)
    self.agents[agent.name] = agent

    # add join message
    args = {}
    args["agent"] = agent.name
    args["session"] = self.cid
    args["sid"] = agent.sid
    args["cid"] = agent.cid

    self.producer.write_control(ControlCode.ADD_AGENT, args)

append_agent_data(agent, key, value)

Append a value to a list in an agent's data namespace for a specific key.

Parameters:

Name Type Description Default
agent

Agent object.

required
key

Data key.

required
value

Value to append to the list.

required
Source code in blue/session.py
467
468
469
470
471
472
473
474
475
476
477
478
479
def append_agent_data(self, agent, key, value):
    """Append a value to a list in an agent's data namespace for a specific key.

    Parameters:
        agent: Agent object.
        key: Data key.
        value: Value to append to the list.
    """
    self.connection.json().arrappend(
        self._get_agent_data_namespace(agent),
        "$." + key,
        value,
    )

append_data(key, value)

Append a value to a list in session data for a specific key.

Parameters:

Name Type Description Default
key

Data key.

required
value

Value to append to the list.

required
Source code in blue/session.py
376
377
378
379
380
381
382
383
def append_data(self, key, value):
    """Append a value to a list in session data for a specific key.

    Parameters:
        key: Data key.
        value: Value to append to the list.
    """
    self.connection.json().arrappend(self._get_data_namespace(), "$." + key, value)

append_stream_data(stream, key, value)

Append a value to a list in a stream's data namespace for a specific key.

Parameters:

Name Type Description Default
stream

Stream identifier.

required
key

Data key.

required
value

Value to append to the list.

required
Source code in blue/session.py
576
577
578
579
580
581
582
583
584
585
586
587
588
def append_stream_data(self, stream, key, value):
    """Append a value to a list in a stream's data namespace for a specific key.

    Parameters:
        stream: Stream identifier.
        key: Data key.
        value: Value to append to the list.
    """
    self.connection.json().arrappend(
        self._get_stream_data_namespace(stream),
        "$." + key,
        value,
    )

delete_data(key)

Delete session data for a specific key.

Parameters:

Name Type Description Default
key str

Data key to delete.

required
Source code in blue/session.py
347
348
349
350
351
352
353
def delete_data(self, key):
    """Delete session data for a specific key.

    Parameters:
        key (str): Data key to delete.
    """
    self.connection.json().delete(self._get_data_namespace(), "$." + key)

get_agent_data(agent, key)

Get data for a specific key in an agent's data namespace.

Parameters:

Name Type Description Default
agent

Agent object.

required
key

Data key.

required

Returns:

Type Description
Any

Data value for the specified key.

Source code in blue/session.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def get_agent_data(self, agent, key):
    """Get data for a specific key in an agent's data namespace.

    Parameters:
        agent: Agent object.
        key: Data key.

    Returns:
        (Any): Data value for the specified key.
    """
    value = self.connection.json().get(
        self._get_agent_data_namespace(agent),
        Path("$." + key),
    )
    return self.__get_json_value(value)

get_agent_data_len(agent, key)

Get the length of a list in an agent's data namespace for a specific key.

Parameters:

Name Type Description Default
agent

Agent object.

required
key

Data key.

required

Returns:

Type Description

Length of the list for the specified key.

Source code in blue/session.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def get_agent_data_len(self, agent, key):
    """Get the length of a list in an agent's data namespace for a specific key.

    Parameters:
        agent: Agent object.
        key: Data key.

    Returns:
        Length of the list for the specified key.
    """
    return self.connection.json().arrlen(
        self._get_agent_data_namespace(agent),
        Path("$." + key),
    )

get_all_agent_data(agent)

Get all data in an agent's data namespace.

Parameters:

Name Type Description Default
agent

Agent object.

required

Returns:

Type Description
dict

Dictionary containing all data for the specified agent.

Source code in blue/session.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_all_agent_data(self, agent):
    """Get all data in an agent's data namespace.

    Parameters:
        agent: Agent object.

    Returns:
        (dict): Dictionary containing all data for the specified agent.
    """
    value = self.connection.json().get(
        self._get_agent_data_namespace(agent),
        Path("$"),
    )
    return self.__get_json_value(value)

get_all_data()

Get all session data.

Returns:

Type Description
dict

Dictionary containing all session data.

Source code in blue/session.py
367
368
369
370
371
372
373
374
def get_all_data(self):
    """Get all session data.

    Returns:
        (dict): Dictionary containing all session data.
    """
    value = self.connection.json().get(self._get_data_namespace(), Path("$"))
    return self.__get_json_value(value)

get_all_stream_data(stream)

Get all data in a stream's data namespace.

Parameters:

Name Type Description Default
stream

Stream identifier.

required

Returns:

Type Description

All data in the stream's data namespace.

Source code in blue/session.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
def get_all_stream_data(self, stream):
    """Get all data in a stream's data namespace.

    Parameters:
        stream: Stream identifier.

    Returns:
        All data in the stream's data namespace.
    """
    value = self.connection.json().get(
        self._get_stream_data_namespace(stream),
        Path("$"),
    )
    return self.__get_json_value(value)

get_budget()

Get the overall budget metadata for the session.

Returns:

Type Description
dict

Dictionary containing overall budget metadata.

Source code in blue/session.py
247
248
249
250
251
252
253
def get_budget(self):
    """Get the overall budget metadata for the session.

    Returns:
        (dict): Dictionary containing overall budget metadata.
    """
    return self.get_metadata('budget')

get_budget_allocation()

Get the budget allocation metadata for the session.

Returns:

Type Description

Dictionary containing budget allocation metadata.

Source code in blue/session.py
272
273
274
275
276
277
278
def get_budget_allocation(self):
    """Get the budget allocation metadata for the session.

    Returns:
        Dictionary containing budget allocation metadata.
    """
    return self.get_metadata(key='budget.allocation')

get_budget_use()

Get the budget usage metadata for the session.

Returns:

Type Description
dict

Dictionary containing budget usage metadata.

Source code in blue/session.py
310
311
312
313
314
315
316
def get_budget_use(self):
    """Get the budget usage metadata for the session.

    Returns:
        (dict): Dictionary containing budget usage metadata.
    """
    return self.get_metadata(key='budget.use')

get_data(key)

Get session data for a specific key.

Parameters:

Name Type Description Default
key str

Data key.

required

Returns:

Type Description

Data value for the specified key.

Source code in blue/session.py
355
356
357
358
359
360
361
362
363
364
365
def get_data(self, key):
    """Get session data for a specific key.

    Parameters:
        key (str): Data key.

    Returns:
        Data value for the specified key.
    """
    value = self.connection.json().get(self._get_data_namespace(), Path("$." + key))
    return self.__get_json_value(value)

get_data_len(key)

Get the length of a list in session data for a specific key.

Parameters:

Name Type Description Default
key

Data key.

required

Returns:

Type Description
int

Length of the list for the specified key.

Source code in blue/session.py
385
386
387
388
389
390
391
392
393
394
def get_data_len(self, key):
    """Get the length of a list in session data for a specific key.

    Parameters:
        key: Data key.

    Returns:
        (int): Length of the list for the specified key.
    """
    return self.connection.json().arrlen(self._get_data_namespace(), "$." + key)

get_metadata(key='')

Get metadata for the session.

Parameters:

Name Type Description Default
key

Metadata key. If empty, returns all metadata. Defaults to "".

''

Returns:

Type Description

Metadata value or all metadata if key is empty.

Source code in blue/session.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_metadata(self, key=""):
    """
    Get metadata for the session.

    Parameters:
        key: Metadata key. If empty, returns all metadata. Defaults to "".

    Returns:
        Metadata value or all metadata if key is empty.
    """
    value = self.connection.json().get(
        self._get_metadata_namespace(),
        Path("$" + ("" if pydash.is_empty(key) else ".") + key),
    )
    return self.__get_json_value(value)

get_stream()

Get the session's stream identifier.

Returns:

Type Description

The session's stream identifier.

Source code in blue/session.py
75
76
77
78
79
80
81
def get_stream(self):
    """Get the session's stream identifier.

    Returns:
        The session's stream identifier.
    """
    return self.producer.get_stream()

get_stream_data(stream, key)

Get data for a specific key in a stream's data namespace.

Parameters:

Name Type Description Default
stream

Stream identifier.

required
key

Data key.

required

Returns:

Type Description

Data value for the specified key.

Source code in blue/session.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
def get_stream_data(self, stream, key):
    """Get data for a specific key in a stream's data namespace.

    Parameters:
        stream: Stream identifier.
        key: Data key.

    Returns:
        Data value for the specified key.
    """
    value = self.connection.json().get(
        self._get_stream_data_namespace(stream),
        Path("$." + key),
    )
    return self.__get_json_value(value)

get_stream_data_len(stream, key)

Get the length of a list in a stream's data namespace for a specific key.

Parameters:

Name Type Description Default
stream

Stream identifier.

required
key

Data key.

required

Returns:

Type Description

Length of the list for the specified key.

Source code in blue/session.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def get_stream_data_len(self, stream, key):
    """Get the length of a list in a stream's data namespace for a specific key.

    Parameters:
        stream: Stream identifier.
        key: Data key.

    Returns:
        Length of the list for the specified key.
    """
    return self.connection.json().arrlen(
        self._get_stream_data_namespace(stream),
        Path("$." + key),
    )

get_stream_debug_info()

Get debug information for all streams in the session.

Returns:

Type Description

Dictionary containing debug information for all streams.

Source code in blue/session.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def get_stream_debug_info(self):
    """Get debug information for all streams in the session.

    Returns:
        Dictionary containing debug information for all streams.
    """
    streams_metadata_ids = self.connection.keys("*" + self.sid + "*:STREAM:METADATA")
    debug_info = {}
    for streams_metadata_id in streams_metadata_ids:
        stream_id = streams_metadata_id[: -len(":METADATA")]
        stream_metadata = self.connection.json().get(streams_metadata_id)
        debug_info[stream_id] = stream_metadata

    return debug_info

list_agents()

List all agents currently in the session.

Returns:

Type Description

List of agents in the session.

Source code in blue/session.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def list_agents(self):
    """
    List all agents currently in the session.

    Returns:
        List of agents in the session.
    """
    ## read stream in producer, scan join/leave events
    agents = {}

    m = self.producer.read_all()
    for message in m:
        if message.getCode() == ControlCode.ADD_AGENT:
            name = message.getArg('agent')
            sid = message.getArg('sid')
            cid = message.getArg('cid')
            agents[sid] = {"name": name, "sid": sid, "cid": cid}
        elif message.getCode() == ControlCode.REMOVE_AGENT:
            sid = message.getArg('sid')
            if sid in agents:
                del agents[sid]

    return list(agents.values())

notify(agent, output_stream, tags)

Notify the session about a new output stream created by an agent. Updates stream metadata and announces the new stream via control message to the session stream.

Parameters:

Name Type Description Default
agent

Agent object that created the output stream.

required
output_stream

Identifier of the output stream.

required
tags

List of tags associated with the output stream.

required
Source code in blue/session.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def notify(self, agent, output_stream, tags):
    """
    Notify the session about a new output stream created by an agent.
    Updates stream metadata and announces the new stream via control message to the session stream.

    Parameters:
        agent: Agent object that created the output stream.
        output_stream: Identifier of the output stream.
        tags: List of tags associated with the output stream.
    """
    self._update_stream_metadata(output_stream, agent, tags)

    # add to stream to notify others, unless it exists
    args = {}
    args["session"] = self.cid
    args["agent"] = agent.cid
    args["stream"] = output_stream
    args["tags"] = tags
    self.producer.write_control(ControlCode.ADD_STREAM, args)

remove_agent(agent)

Remove an agent from the session and announce its removal via control message to the session stream.

Parameters:

Name Type Description Default
agent

Agent object to be removed from the session.

required
Source code in blue/session.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def remove_agent(self, agent):
    """
    Remove an agent from the session and announce its removal via control message to the session stream.

    Parameters:
        agent: Agent object to be removed from the session.
    """
    ### TODO: Purge agent memory, probably not..

    if agent.name in self.agents:
        del self.agents[agent.name]

    # add leave message
    args = {}
    args["agent"] = agent.name
    args["session"] = self.cid
    args["sid"] = agent.sid
    args["cid"] = agent.cid

    self.producer.write_control(ControlCode.REMOVE_AGENT, args)

set_agent_data(agent, key, value)

Set data for a specific key in an agent's data namespace.

Parameters:

Name Type Description Default
agent

Agent object.

required
key

Data key.

required
value

Data value.

required
Source code in blue/session.py
422
423
424
425
426
427
428
429
430
431
432
433
434
def set_agent_data(self, agent, key, value):
    """Set data for a specific key in an agent's data namespace.

    Parameters:
        agent: Agent object.
        key: Data key.
        value: Data value.
    """
    self.connection.json().set(
        self._get_agent_data_namespace(agent),
        "$." + key,
        value,
    )

set_budget_allocation(cost=None, accuracy=None, latency=None, nx=False)

Set budget allocation metadata for the session.

Parameters:

Name Type Description Default
cost

Cost allocation value.

None
accuracy

Accuracy allocation value.

None
latency

Latency allocation value.

None
nx bool

If True, set the value only if the key does not already exist. Defaults to False.

False
Source code in blue/session.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def set_budget_allocation(self, cost=None, accuracy=None, latency=None, nx=False):
    """
    Set budget allocation metadata for the session.

    Parameters:
        cost: Cost allocation value.
        accuracy: Accuracy allocation value.
        latency: Latency allocation value.
        nx (bool): If True, set the value only if the key does not already exist. Defaults to False.
    """
    if cost is not None:
        self.set_metadata('budget.allocation.cost', cost, nx)
    if accuracy is not None:
        self.set_metadata('budget.allocation.accuracy', accuracy, nx)
    if latency is not None:
        self.set_metadata('budget.allocation.latency', latency, nx)

set_data(key, value)

Set session data for a specific key.

Parameters:

Name Type Description Default
key

Data key.

required
value Any

Data value.

required
Source code in blue/session.py
338
339
340
341
342
343
344
345
def set_data(self, key, value):
    """Set session data for a specific key.

    Parameters:
        key: Data key.
        value (Any): Data value.
    """
    self.connection.json().set(self._get_data_namespace(), "$." + key, value)

set_metadata(key, value, nx=False)

Set metadata for the session.

Parameters:

Name Type Description Default
key

Metadata key.

required
value

Metadata value.

required
nx

If True, set the value only if the key does not already exist. Defaults to False.

False
Source code in blue/session.py
212
213
214
215
216
217
218
219
220
221
def set_metadata(self, key, value, nx=False):
    """
    Set metadata for the session.

    Parameters:
        key: Metadata key.
        value: Metadata value.
        nx: If True, set the value only if the key does not already exist. Defaults to False.
    """
    self.connection.json().set(self._get_metadata_namespace(), "$." + key, value, nx=nx)

set_stream_data(stream, key, value)

Set data for a specific key in a stream's data namespace.

Parameters:

Name Type Description Default
stream

Stream identifier.

required
key

Data key.

required
value

Data value to set.

required
Source code in blue/session.py
531
532
533
534
535
536
537
538
539
540
541
542
543
def set_stream_data(self, stream, key, value):
    """Set data for a specific key in a stream's data namespace.

    Parameters:
        stream: Stream identifier.
        key: Data key.
        value: Data value to set.
    """
    self.connection.json().set(
        self._get_stream_data_namespace(stream),
        "$." + key,
        value,
    )

stop()

Stop the session by stopping all agents and writing an end-of-stream message to the session stream.

Source code in blue/session.py
656
657
658
659
660
661
662
663
def stop(self):
    """Stop the session by stopping all agents and writing an end-of-stream message to the session stream."""
    # stop agents
    for agent_name in self.agents:
        self.agents[agent_name].stop()

    # put EOS to stream
    self.producer.write_eos()

to_dict()

Get a dictionary representation of the session, including its metadata.

Returns:

Type Description

Dictionary containing session metadata and identifier.

Source code in blue/session.py
605
606
607
608
609
610
611
def to_dict(self):
    """Get a dictionary representation of the session, including its metadata.

    Returns:
        Dictionary containing session metadata and identifier.
    """
    return {**self.get_metadata(), "id": self.sid}

update_budget_use(cost=None, accuracy=None, latency=None)

Not Implemented

Update budget usage metadata for the session by incrementing existing values.

Parameters:

Name Type Description Default
cost

Cost usage value to increment.

None
accuracy

Accuracy usage value to increment.

None
latency

Latency usage value to increment.

None
Source code in blue/session.py
296
297
298
299
300
301
302
303
304
305
306
307
308
def update_budget_use(self, cost=None, accuracy=None, latency=None):
    """
    !!! warning "Not Implemented"

    Update budget usage metadata for the session by incrementing existing values.

    Parameters:
        cost: Cost usage value to increment.
        accuracy: Accuracy usage value to increment.
        latency: Latency usage value to increment.
    """
    # TODO
    pass

wait()

Wait for all agents in the session to complete.

Source code in blue/session.py
665
666
667
668
669
670
671
def wait(self):
    """Wait for all agents in the session to complete."""
    for agent_name in self.agents:
        self.agents[agent_name].wait()

    while True:
        time.sleep(1)
Last update: 2025-10-08