Skip to content

Platform

Source code in blue/platform.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class Platform:
    def __init__(self, name="PLATFORM", id=None, sid=None, cid=None, prefix=None, suffix=None, properties={}):
        """Initializes a new Platform instance.

        This constructor constructs a canonical id (cid) given id, prefix and suffix, automatically creates a
        unique id if not given.

        Initializes a logger to use in platform.

        Starts platform, connecting to db to store data, metadata and starts platform stream for agent and other
        containers to listen to.

        Parameters:
            id (str): id of the platform,
            sid (str): short id of the platform
            cid (str): canonical id of the platform
            prefix (str): prefix to build a canonical id
            suffix (str): suffix to build a canonical id
            properties (dict): dictionary of key-value pairs that identify properties of the platform
        """
        self.connection = None
        self.name = name
        if id:
            self.id = id
        else:
            self.id = uuid_utils.create_uuid()

        if sid:
            self.sid = sid
        else:
            self.sid = self.name + ":" + self.id

        self.prefix = prefix
        self.suffix = suffix
        self.cid = cid

        if self.cid == None:
            self.cid = self.sid

            if self.prefix:
                self.cid = self.prefix + ":" + self.cid
            if self.suffix:
                self.cid = self.cid + ":" + self.suffix

        self._initialize(properties=properties)

        # platform stream
        self.producer = None

        self._start()

    ###### INITIALIZATION
    def _initialize(self, properties=None):
        """Initializes platform, overriding default properties with given properties.

        Initializes a logger to use in platform.

        Parameters:
            properties (dict): dictionary of key-value pairs that identify properties of the platform
        """
        self._initialize_properties()
        self._update_properties(properties=properties)

        self._initialize_logger()

    def _initialize_properties(self):
        """Initializes default properties.

        Sets db connection properties, and platform trocker properties.

        """
        self.properties = {}

        # db connectivity
        self.properties['db.host'] = 'localhost'
        self.properties['db.port'] = 6379

        # tracking for platform
        self.properties['tracker.perf.platform.outputs'] = ["log.INFO"]
        self.properties['tracker.perf.platform.period'] = 30

    def _update_properties(self, properties=None):
        """Overrides default properties with given properties.

        Parameters:
            properties (dict): dictionary of key-value pairs that identify properties of the platform
        """
        if properties is None:
            return

        # override
        for p in properties:
            self.properties[p] = properties[p]

    def _initialize_logger(self):
        """Initializes platform logger to use in platform.

        Sets logger configuration to include calls stack, and platform id.
        """
        self.logger = log_utils.CustomLogger()
        # customize log
        self.logger.set_config_data(
            "stack",
            "%(call_stack)s",
        )
        self.logger.set_config_data("platform", self.sid, -1)

    ###### SESSION
    def _init_session_cleanup_scheduler(self, callback=None):
        """Initializes scheduler to clean up session data.

        Uses `default_session_expiration_duration` to determine session expiration.

        Parameters:
            callback (callable): function to execute on schedule.
        """
        key = 'default_session_expiration_duration'
        if key in self.properties:
            self.set_metadata('settings.session_expiration_duration', self.properties[key], nx=True)
        self.session_cleanup_scheduler = SessionCleanupScheduler(platform=self, callback=callback)

    def _start_session_cleanup_job(self):
        """Starts session cleanup ."""
        self.session_cleanup_scheduler.start()

    def _stop_session_cleanup_job(self):
        """Stops session cleanup ."""
        self.session_cleanup_scheduler.stop()

    def get_session_sids(self):
        """Get session sids on platform.

        Returns:
            (list[str]): List of session sids (short id)
        """
        keys = self.connection.keys(pattern=self.cid + ":SESSION:*:DATA")
        keys = "\n".join(keys)
        result = []

        # further apply re to match
        regex = r"SESSION:[^:]*:DATA"

        matches = re.finditer(regex, keys)
        session_sids = [match.group()[:-5] for match in matches]
        return session_sids

    def get_sessions(self):
        """Get session data for all sessions on platform

        Returns:
            (list[dict]): List of session data as a dictionary.
        """
        session_sids = self.get_session_sids()

        result = []
        for session_sid in session_sids:
            session = self.get_session(session_sid)
            if session is not None:
                result.append(session.to_dict())
        return result

    def get_session(self, session_sid):
        """Get session object for given session sid

        Parameters:
            session_sid (str): Session sid

        Returns:
            (Session): Session object for given session sid.
        """
        session_sids = self.get_session_sids()

        if session_sid in set(session_sids):
            return Session(sid=session_sid, prefix=self.cid, properties=self.properties)
        else:
            return None

    def create_session(self, created_by=None):
        """Create a new Session object

        Update platform metadata for user, if created_by is provided, to store owned sessions by user.

        Parameters:
            created_by (str): User id

        Returns:
            (Session): Session object created.
        """
        session = Session(prefix=self.cid, properties=self.properties)
        if not pydash.is_empty(created_by):
            self.set_metadata(f'users.{created_by}.sessions.owner.{session.sid}', True)
        return session

    def delete_session(self, session_sid):
        """Deletes the sesion, for given session sid.

        Deletes session stream, data, and metadata from db.

        Parameters:
            session_sid (str): Session sid
        """
        session_cid = self.cid + ":" + session_sid

        # delete session stream
        self.connection.delete(session_cid + ":STREAM")

        # delete session data, metadata
        self.connection.delete(session_cid + ":DATA")
        self.connection.delete(session_cid + ":METADATA")

        # TODO: delete more

        # TODO: remove, stop all agents

    def _send_message(self, code, params):
        message = {'code': code, 'params': params}
        self.producer.write(data=message, dtype="json", label="INSTRUCTION")

    def join_session(self, session_sid, registry, agent, properties):
        """Instructs an agent to join a given session

        Writes a JOIN_SESSION control message to platform stream.

        Parameters:
            session_sid (str): Session sid
            registry (str): Name of the agent registry
            agent (str): Name of the agent
            properties(dict): dictionary of key-value pairs that identify properties of the agent

        """
        session_cid = self.cid + ":" + session_sid

        args = {}
        args["session"] = session_cid
        args["registry"] = registry
        args["agent"] = agent
        args["properties"] = properties
        self.producer.write_control(ControlCode.JOIN_SESSION, args)

    ###### METADATA RELATED
    def create_update_user(self, user):
        """Creates of updates user metadata.

        Writes a metadata to platform metadata, for given user.
        Metadata includes uid, email, name, picture, role, etc.

        Parameters:
            user(dict): User metadata
        """
        uid = user['uid']
        default_user_role = self.get_metadata('settings.default_user_role')
        if pydash.is_empty(default_user_role):
            default_user_role = 'guest'
        default_user_settings = self.get_metadata('settings.default_user_settings')
        if pydash.is_empty(default_user_settings):
            default_user_settings = {}
        # create user profile if does not exist
        self.set_metadata(
            f'users.{uid}',
            {'uid': user['uid'], 'role': default_user_role, 'email': user['email'], 'name': user['name'], 'picture': user['picture']},
            nx=True,
        )
        self.set_metadata(f'users.{uid}.ui_visibility', {}, nx=True)
        self.set_metadata(f'users.{uid}.role', default_user_role, nx=True)
        self.set_metadata(f'users.{uid}.sessions', {"pinned": {}, "owner": {}, "member": {}}, nx=True)
        self.set_metadata(f'users.{uid}.settings', default_user_settings, nx=True)

        self.set_metadata(f'users.{uid}.email', user['email'])
        self.set_metadata(f'users.{uid}.name', user['name'])
        self.set_metadata(f'users.{uid}.picture', user['picture'])

    def __get_json_value(self, value):
        if value is None:
            return None
        if type(value) is list:
            if len(value) == 0:
                return None
            else:
                return value[0]
        else:
            return value

    def _init_metadata_namespace(self):
        """Initializes platform metadat namespace on db"""
        # create namespaces for any session common data, and stream-specific data
        self.connection.json().set(
            self._get_metadata_namespace(),
            "$",
            {'users': {}, "settings": {"allowed_emails": {}}},
            nx=True,
        )

    def _get_metadata_namespace(self):
        """Get metadata namespace

        Returns:
            metadata namespace string
        """
        return self.cid + ":METADATA"

    def set_metadata(self, key, value, nx=False):
        self.connection.json().set(self._get_metadata_namespace(), "$." + key, value, nx=nx)

    def get_metadata(self, key=""):
        """Get platform metadata, for key, or all metadata

        Parameters:
            key (str): key of the metadata

        Returns:
            (Any): metadata value for key, or all platform metadata if no key is given.
        """
        value = self.connection.json().get(
            self._get_metadata_namespace(),
            Path("$" + ("" if pydash.is_empty(key) else ".") + key),
        )
        return self.__get_json_value(value)

    ###### OPERATIONS
    def _start_producer(self):
        """Starts platform stream producer"""
        # start, if not started
        if self.producer == None:
            producer = Producer(sid="STREAM", prefix=self.cid, properties=self.properties, owner=self.sid)
            producer.start()
            self.producer = producer

    def perf_tracker_callback(self, data, tracker=None, properties=None):
        """Callback function for performance tracker"""
        pass

    def _init_tracker(self):
        """Initialize platform performance tracker"""
        self._tracker = PlatformPerformanceTracker(self, properties=self.properties, callback=lambda *args, **kwargs: self.perf_tracker_callback(*args, **kwargs))

    def _start_tracker(self):
        """Starts platform performance tracker"""
        # start tracker
        self._tracker.start()

    def _stop_tracker(self):
        """Stops platform performance tracker"""
        self._tracker.stop()

    def _terminate_tracker(self):
        """Terminates platform performance tracker"""
        self._tracker.terminate()

    def _start(self):
        """Starts platform

        Initialize connection to db, initialize platform metadata.
        Initializes platform tracker.
        Starts platform stream producer.
        """
        # self.logger.info('Starting session {name}'.format(name=self.sid))
        self._start_connection()

        # initialize platform metadata
        self._init_metadata_namespace()

        # init tracker
        self._init_tracker()

        # start platform communication stream
        self._start_producer()

        self.logger.info('Started platform {name}'.format(name=self.sid))

    def _start_connection(self):
        """Initialize connection to db

        Uses pooled connection factory to obtain a db connection
        """
        self.connection_factory = PooledConnectionFactory(properties=self.properties)
        self.connection = self.connection_factory.get_connection()

    def stop(self):
        """Stops platform

        Stops platform tracker.
        """
        # stop tracker
        self._stop_tracker()

__init__(name='PLATFORM', id=None, sid=None, cid=None, prefix=None, suffix=None, properties={})

Initializes a new Platform instance.

This constructor constructs a canonical id (cid) given id, prefix and suffix, automatically creates a unique id if not given.

Initializes a logger to use in platform.

Starts platform, connecting to db to store data, metadata and starts platform stream for agent and other containers to listen to.

Parameters:

Name Type Description Default
id str

id of the platform,

None
sid str

short id of the platform

None
cid str

canonical id of the platform

None
prefix str

prefix to build a canonical id

None
suffix str

suffix to build a canonical id

None
properties dict

dictionary of key-value pairs that identify properties of the platform

{}
Source code in blue/platform.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(self, name="PLATFORM", id=None, sid=None, cid=None, prefix=None, suffix=None, properties={}):
    """Initializes a new Platform instance.

    This constructor constructs a canonical id (cid) given id, prefix and suffix, automatically creates a
    unique id if not given.

    Initializes a logger to use in platform.

    Starts platform, connecting to db to store data, metadata and starts platform stream for agent and other
    containers to listen to.

    Parameters:
        id (str): id of the platform,
        sid (str): short id of the platform
        cid (str): canonical id of the platform
        prefix (str): prefix to build a canonical id
        suffix (str): suffix to build a canonical id
        properties (dict): dictionary of key-value pairs that identify properties of the platform
    """
    self.connection = None
    self.name = name
    if id:
        self.id = id
    else:
        self.id = uuid_utils.create_uuid()

    if sid:
        self.sid = sid
    else:
        self.sid = self.name + ":" + self.id

    self.prefix = prefix
    self.suffix = suffix
    self.cid = cid

    if self.cid == None:
        self.cid = self.sid

        if self.prefix:
            self.cid = self.prefix + ":" + self.cid
        if self.suffix:
            self.cid = self.cid + ":" + self.suffix

    self._initialize(properties=properties)

    # platform stream
    self.producer = None

    self._start()

create_session(created_by=None)

Create a new Session object

Update platform metadata for user, if created_by is provided, to store owned sessions by user.

Parameters:

Name Type Description Default
created_by str

User id

None

Returns:

Type Description
Session

Session object created.

Source code in blue/platform.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def create_session(self, created_by=None):
    """Create a new Session object

    Update platform metadata for user, if created_by is provided, to store owned sessions by user.

    Parameters:
        created_by (str): User id

    Returns:
        (Session): Session object created.
    """
    session = Session(prefix=self.cid, properties=self.properties)
    if not pydash.is_empty(created_by):
        self.set_metadata(f'users.{created_by}.sessions.owner.{session.sid}', True)
    return session

create_update_user(user)

Creates of updates user metadata.

Writes a metadata to platform metadata, for given user. Metadata includes uid, email, name, picture, role, etc.

Parameters:

Name Type Description Default
user dict

User metadata

required
Source code in blue/platform.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def create_update_user(self, user):
    """Creates of updates user metadata.

    Writes a metadata to platform metadata, for given user.
    Metadata includes uid, email, name, picture, role, etc.

    Parameters:
        user(dict): User metadata
    """
    uid = user['uid']
    default_user_role = self.get_metadata('settings.default_user_role')
    if pydash.is_empty(default_user_role):
        default_user_role = 'guest'
    default_user_settings = self.get_metadata('settings.default_user_settings')
    if pydash.is_empty(default_user_settings):
        default_user_settings = {}
    # create user profile if does not exist
    self.set_metadata(
        f'users.{uid}',
        {'uid': user['uid'], 'role': default_user_role, 'email': user['email'], 'name': user['name'], 'picture': user['picture']},
        nx=True,
    )
    self.set_metadata(f'users.{uid}.ui_visibility', {}, nx=True)
    self.set_metadata(f'users.{uid}.role', default_user_role, nx=True)
    self.set_metadata(f'users.{uid}.sessions', {"pinned": {}, "owner": {}, "member": {}}, nx=True)
    self.set_metadata(f'users.{uid}.settings', default_user_settings, nx=True)

    self.set_metadata(f'users.{uid}.email', user['email'])
    self.set_metadata(f'users.{uid}.name', user['name'])
    self.set_metadata(f'users.{uid}.picture', user['picture'])

delete_session(session_sid)

Deletes the sesion, for given session sid.

Deletes session stream, data, and metadata from db.

Parameters:

Name Type Description Default
session_sid str

Session sid

required
Source code in blue/platform.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def delete_session(self, session_sid):
    """Deletes the sesion, for given session sid.

    Deletes session stream, data, and metadata from db.

    Parameters:
        session_sid (str): Session sid
    """
    session_cid = self.cid + ":" + session_sid

    # delete session stream
    self.connection.delete(session_cid + ":STREAM")

    # delete session data, metadata
    self.connection.delete(session_cid + ":DATA")
    self.connection.delete(session_cid + ":METADATA")

get_metadata(key='')

Get platform metadata, for key, or all metadata

Parameters:

Name Type Description Default
key str

key of the metadata

''

Returns:

Type Description
Any

metadata value for key, or all platform metadata if no key is given.

Source code in blue/platform.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def get_metadata(self, key=""):
    """Get platform metadata, for key, or all metadata

    Parameters:
        key (str): key of the metadata

    Returns:
        (Any): metadata value for key, or all platform metadata if no key is given.
    """
    value = self.connection.json().get(
        self._get_metadata_namespace(),
        Path("$" + ("" if pydash.is_empty(key) else ".") + key),
    )
    return self.__get_json_value(value)

get_session(session_sid)

Get session object for given session sid

Parameters:

Name Type Description Default
session_sid str

Session sid

required

Returns:

Type Description
Session

Session object for given session sid.

Source code in blue/platform.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def get_session(self, session_sid):
    """Get session object for given session sid

    Parameters:
        session_sid (str): Session sid

    Returns:
        (Session): Session object for given session sid.
    """
    session_sids = self.get_session_sids()

    if session_sid in set(session_sids):
        return Session(sid=session_sid, prefix=self.cid, properties=self.properties)
    else:
        return None

get_session_sids()

Get session sids on platform.

Returns:

Type Description
list[str]

List of session sids (short id)

Source code in blue/platform.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_session_sids(self):
    """Get session sids on platform.

    Returns:
        (list[str]): List of session sids (short id)
    """
    keys = self.connection.keys(pattern=self.cid + ":SESSION:*:DATA")
    keys = "\n".join(keys)
    result = []

    # further apply re to match
    regex = r"SESSION:[^:]*:DATA"

    matches = re.finditer(regex, keys)
    session_sids = [match.group()[:-5] for match in matches]
    return session_sids

get_sessions()

Get session data for all sessions on platform

Returns:

Type Description
list[dict]

List of session data as a dictionary.

Source code in blue/platform.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def get_sessions(self):
    """Get session data for all sessions on platform

    Returns:
        (list[dict]): List of session data as a dictionary.
    """
    session_sids = self.get_session_sids()

    result = []
    for session_sid in session_sids:
        session = self.get_session(session_sid)
        if session is not None:
            result.append(session.to_dict())
    return result

join_session(session_sid, registry, agent, properties)

Instructs an agent to join a given session

Writes a JOIN_SESSION control message to platform stream.

Parameters:

Name Type Description Default
session_sid str

Session sid

required
registry str

Name of the agent registry

required
agent str

Name of the agent

required
properties dict

dictionary of key-value pairs that identify properties of the agent

required
Source code in blue/platform.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def join_session(self, session_sid, registry, agent, properties):
    """Instructs an agent to join a given session

    Writes a JOIN_SESSION control message to platform stream.

    Parameters:
        session_sid (str): Session sid
        registry (str): Name of the agent registry
        agent (str): Name of the agent
        properties(dict): dictionary of key-value pairs that identify properties of the agent

    """
    session_cid = self.cid + ":" + session_sid

    args = {}
    args["session"] = session_cid
    args["registry"] = registry
    args["agent"] = agent
    args["properties"] = properties
    self.producer.write_control(ControlCode.JOIN_SESSION, args)

perf_tracker_callback(data, tracker=None, properties=None)

Callback function for performance tracker

Source code in blue/platform.py
350
351
352
def perf_tracker_callback(self, data, tracker=None, properties=None):
    """Callback function for performance tracker"""
    pass

stop()

Stops platform

Stops platform tracker.

Source code in blue/platform.py
400
401
402
403
404
405
406
def stop(self):
    """Stops platform

    Stops platform tracker.
    """
    # stop tracker
    self._stop_tracker()
Last update: 2025-10-03