Skip to content

Memory store

MemoryStore

Bases: ABC

Abstract Base Class (interface) for memory storage backends.

This interface defines the contract that all concrete storage implementations (e.g., RedisMemoryStore, InMemoryStore) must adhere to. It supports two primary data access patterns: 1. List-based storage: Appending items to a simple sequence. 2. Indexed storage: Storing items with support for lookups by unique ID, secondary custom keys, and time ranges.

The base_key parameter used in many methods serves as the unique namespace or scope identifier (e.g., specific to a user session or agent).

Source code in blue/memories/memory_store.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class MemoryStore(ABC):
    """
    Abstract Base Class (interface) for memory storage backends.

    This interface defines the contract that all concrete storage implementations
    (e.g., `RedisMemoryStore`, `InMemoryStore`) must adhere to. It supports two primary
    data access patterns:
    1.  **List-based storage**: Appending items to a simple sequence.
    2.  **Indexed storage**: Storing items with support for lookups by unique ID,
        secondary custom keys, and time ranges.

    The `base_key` parameter used in many methods serves as the unique namespace
    or scope identifier (e.g., specific to a user session or agent).
    """

    @abstractmethod
    def initialize(self):
        """
        Initialize the storage backend.

        This method should handle setting up connections, creating necessary
        tables or data structures, and ensuring the store is ready for operations.
        """
        pass

    # list methods
    @abstractmethod
    def append_to_list(self, key: str, entry: Dict) -> bool:
        """
        Appends a dictionary entry to a sequential list stored at the given key.

        Parameters:
            key (str): The specific storage key for the list.
            entry (Dict): The data object to append.

        Returns:
            bool: True if the operation succeeded, False otherwise.
        """
        pass

    @abstractmethod
    def get_list(self, key: str) -> List[Dict]:
        """
        Retrieves the full list of entries stored at the given key.

        Parameters:
            key (str): The specific storage key to retrieve.

        Returns:
            List[Dict]: A list of all stored entries. Returns an empty list if
            the key does not exist.
        """
        pass

    @abstractmethod
    def exists(self, key: str) -> bool:
        """
        Checks if a specific key exists in the storage.

        Parameters:
            key (str): The key to check.

        Returns:
            bool: True if the key exists, False otherwise.
        """
        pass

    # indexed methods
    @abstractmethod
    def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
        """
        Atomically stores an entry and updates all associated indices.

        Implementations should ensure that the data is retrievable by:
        1. Its `entry_id` (via `get_entry_by_id`).
        2. Its `custom_key` (via `get_entry_by_key_index`), if provided.
        3. Its `timestamp` (via `get_entries_by_time_range`).

        Parameters:
            base_key (str): The base namespace/scope identifier.
            entry_id (str): The unique ID for this specific entry.
            entry (Dict): The actual data payload to store.
            custom_key (Optional[str]): A secondary unique key for direct lookup
                (e.g., 'latest_summary').
            timestamp (float): The Unix timestamp associated with the entry.

        Returns:
            bool: True if the operation succeeded, False otherwise.
        """
        pass

    @abstractmethod
    def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
        """
        Retrieves a specific entry directly by its unique ID.

        Parameters:
            base_key (str): The base namespace/scope identifier.
            entry_id (str): The unique ID of the entry to retrieve.

        Returns:
            Optional[Dict]: The entry data if found, None otherwise.
        """
        pass

    @abstractmethod
    def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
        """
        Retrieves a specific entry using a secondary custom key (index).

        Parameters:
            base_key (str): The base namespace/scope identifier.
            custom_key (str): The secondary lookup key used during storage.

        Returns:
            Optional[Dict]: The entry data if found, None otherwise.
        """
        pass

    @abstractmethod
    def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
        """
        Retrieves a list of entries falling within a specific time range.

        Parameters:
            base_key (str): The base namespace/scope identifier.
            start (float): The start timestamp (inclusive).
            end (float): The end timestamp (inclusive).

        Returns:
            List[Dict]: A list of entries found within the specified time window.
        """
        pass

append_to_list(key, entry) abstractmethod

Appends a dictionary entry to a sequential list stored at the given key.

Parameters:

Name Type Description Default
key str

The specific storage key for the list.

required
entry Dict

The data object to append.

required

Returns:

Name Type Description
bool bool

True if the operation succeeded, False otherwise.

Source code in blue/memories/memory_store.py
31
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def append_to_list(self, key: str, entry: Dict) -> bool:
    """
    Appends a dictionary entry to a sequential list stored at the given key.

    Parameters:
        key (str): The specific storage key for the list.
        entry (Dict): The data object to append.

    Returns:
        bool: True if the operation succeeded, False otherwise.
    """
    pass

exists(key) abstractmethod

Checks if a specific key exists in the storage.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in blue/memories/memory_store.py
59
60
61
62
63
64
65
66
67
68
69
70
@abstractmethod
def exists(self, key: str) -> bool:
    """
    Checks if a specific key exists in the storage.

    Parameters:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    pass

get_entries_by_time_range(base_key, start, end) abstractmethod

Retrieves a list of entries falling within a specific time range.

Parameters:

Name Type Description Default
base_key str

The base namespace/scope identifier.

required
start float

The start timestamp (inclusive).

required
end float

The end timestamp (inclusive).

required

Returns:

Type Description
List[Dict]

List[Dict]: A list of entries found within the specified time window.

Source code in blue/memories/memory_store.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@abstractmethod
def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
    """
    Retrieves a list of entries falling within a specific time range.

    Parameters:
        base_key (str): The base namespace/scope identifier.
        start (float): The start timestamp (inclusive).
        end (float): The end timestamp (inclusive).

    Returns:
        List[Dict]: A list of entries found within the specified time window.
    """
    pass

get_entry_by_id(base_key, entry_id) abstractmethod

Retrieves a specific entry directly by its unique ID.

Parameters:

Name Type Description Default
base_key str

The base namespace/scope identifier.

required
entry_id str

The unique ID of the entry to retrieve.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The entry data if found, None otherwise.

Source code in blue/memories/memory_store.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@abstractmethod
def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
    """
    Retrieves a specific entry directly by its unique ID.

    Parameters:
        base_key (str): The base namespace/scope identifier.
        entry_id (str): The unique ID of the entry to retrieve.

    Returns:
        Optional[Dict]: The entry data if found, None otherwise.
    """
    pass

get_entry_by_key_index(base_key, custom_key) abstractmethod

Retrieves a specific entry using a secondary custom key (index).

Parameters:

Name Type Description Default
base_key str

The base namespace/scope identifier.

required
custom_key str

The secondary lookup key used during storage.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The entry data if found, None otherwise.

Source code in blue/memories/memory_store.py
110
111
112
113
114
115
116
117
118
119
120
121
122
@abstractmethod
def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
    """
    Retrieves a specific entry using a secondary custom key (index).

    Parameters:
        base_key (str): The base namespace/scope identifier.
        custom_key (str): The secondary lookup key used during storage.

    Returns:
        Optional[Dict]: The entry data if found, None otherwise.
    """
    pass

get_list(key) abstractmethod

Retrieves the full list of entries stored at the given key.

Parameters:

Name Type Description Default
key str

The specific storage key to retrieve.

required

Returns:

Type Description
List[Dict]

List[Dict]: A list of all stored entries. Returns an empty list if

List[Dict]

the key does not exist.

Source code in blue/memories/memory_store.py
45
46
47
48
49
50
51
52
53
54
55
56
57
@abstractmethod
def get_list(self, key: str) -> List[Dict]:
    """
    Retrieves the full list of entries stored at the given key.

    Parameters:
        key (str): The specific storage key to retrieve.

    Returns:
        List[Dict]: A list of all stored entries. Returns an empty list if
        the key does not exist.
    """
    pass

initialize() abstractmethod

Initialize the storage backend.

This method should handle setting up connections, creating necessary tables or data structures, and ensuring the store is ready for operations.

Source code in blue/memories/memory_store.py
20
21
22
23
24
25
26
27
28
@abstractmethod
def initialize(self):
    """
    Initialize the storage backend.

    This method should handle setting up connections, creating necessary
    tables or data structures, and ensuring the store is ready for operations.
    """
    pass

store_indexed_entry(base_key, entry_id, entry, custom_key, timestamp) abstractmethod

Atomically stores an entry and updates all associated indices.

Implementations should ensure that the data is retrievable by: 1. Its entry_id (via get_entry_by_id). 2. Its custom_key (via get_entry_by_key_index), if provided. 3. Its timestamp (via get_entries_by_time_range).

Parameters:

Name Type Description Default
base_key str

The base namespace/scope identifier.

required
entry_id str

The unique ID for this specific entry.

required
entry Dict

The actual data payload to store.

required
custom_key Optional[str]

A secondary unique key for direct lookup (e.g., 'latest_summary').

required
timestamp float

The Unix timestamp associated with the entry.

required

Returns:

Name Type Description
bool bool

True if the operation succeeded, False otherwise.

Source code in blue/memories/memory_store.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@abstractmethod
def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
    """
    Atomically stores an entry and updates all associated indices.

    Implementations should ensure that the data is retrievable by:
    1. Its `entry_id` (via `get_entry_by_id`).
    2. Its `custom_key` (via `get_entry_by_key_index`), if provided.
    3. Its `timestamp` (via `get_entries_by_time_range`).

    Parameters:
        base_key (str): The base namespace/scope identifier.
        entry_id (str): The unique ID for this specific entry.
        entry (Dict): The actual data payload to store.
        custom_key (Optional[str]): A secondary unique key for direct lookup
            (e.g., 'latest_summary').
        timestamp (float): The Unix timestamp associated with the entry.

    Returns:
        bool: True if the operation succeeded, False otherwise.
    """
    pass

RedisMemoryStore

Bases: MemoryStore

Redis implementation of the MemoryStore interface using PooledConnectionFactory.

This class provides optimized storage mechanisms for retrieving data by direct ID, secondary key index, or time ranges. It utilizes specific Redis data structures for performance: - Hashes: For storing the actual data payload and the secondary key index. - Sorted Sets (ZSET): For time-series indexing. - RedisJSON: For list-based storage operations.

Attributes:

Name Type Description
properties Dict

Configuration properties for the Redis connection.

connection_factory PooledConnectionFactory

Factory instance to manage Redis connections.

connection

The active Redis client instance.

Source code in blue/memories/redis_memory_store.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class RedisMemoryStore(MemoryStore):
    """
    Redis implementation of the MemoryStore interface using PooledConnectionFactory.

    This class provides optimized storage mechanisms for retrieving data by direct ID,
    secondary key index, or time ranges. It utilizes specific Redis data structures
    for performance:
    - **Hashes**: For storing the actual data payload and the secondary key index.
    - **Sorted Sets (ZSET)**: For time-series indexing.
    - **RedisJSON**: For list-based storage operations.

    Attributes:
        properties (Dict): Configuration properties for the Redis connection.
        connection_factory (PooledConnectionFactory): Factory instance to manage Redis connections.
        connection: The active Redis client instance.
    """

    def __init__(self, properties: Dict):
        """
        Initialize the RedisMemoryStore.

        Parameters:
            properties (Dict): Dictionary containing connection details (host, port, etc.)
                passed to the `PooledConnectionFactory`.
        """
        self.properties = properties
        self.connection_factory = None
        self.connection = None

    def initialize(self):
        """
        Sets up the connection factory and establishes the Redis connection.
        """
        self.connection_factory = PooledConnectionFactory(properties=self.properties)
        self.connection = self.connection_factory.get_connection()

    def append_to_list(self, key: str, entry: Dict) -> bool:
        """
        Appends a dictionary entry to a JSON list stored at the specified key.
        Uses RedisJSON (`JSON.ARRAPPEND`).

        Parameters:
            key (str): The Redis key where the list is stored.
            entry (Dict): The data to append.

        Returns:
            bool: Always returns None (implicitly), but raises exception on failure.

        Raises:
            Exception: If the Redis operation fails.
        """
        try:
            self.connection.json().set(key, "$", [], nx=True)
            self.connection.json().arrappend(key, "$", entry)
        except Exception as ex:
            raise ex

    def get_list(self, key: str) -> List[Dict]:
        """
        Retrieves the full list of dictionaries stored at the specified key.
        Uses RedisJSON (`JSON.GET`).

        Parameters:
            key (str): The Redis key to retrieve.

        Returns:
            List[Dict]: The list of stored entries, or an empty list if the key
            does not exist or contains no data.

        Raises:
            Exception: If the Redis operation fails.
        """
        if not self.connection.exists(key):
            return []
        try:
            data = self.connection.json().get(key)
            return data if data else []
        except Exception as ex:
            raise ex

    def exists(self, key: str) -> bool:
        """
        Checks if a specific key exists in Redis.

        Parameters:
            key (str): The key to check.

        Returns:
            bool: True if the key exists, False otherwise.
        """
        return self.connection.exists(key)

    # key helpers
    def _get_data_key(self, base_key: str) -> str:
        """Helper to generate the Redis key for the data hash."""
        return f"{base_key}:DATA"

    def _get_key_index_key(self, base_key: str) -> str:
        """Helper to generate the Redis key for the secondary index hash."""
        return f"{base_key}:INDEX:KEYS"

    def _get_time_index_key(self, base_key: str) -> str:
        """Helper to generate the Redis key for the time-sorted ZSET."""
        return f"{base_key}:INDEX:TIME"

    # optimized implementation
    def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
        """
        Atomically stores an entry and updates secondary and time indices.

        Storage Structure:
        1. **Data Hash**: Stores `entry_id` -> `json_string` in `{base_key}:DATA`.
        2. **Key Index**: Stores `custom_key` -> `entry_id` in `{base_key}:INDEX:KEYS` (optional).
        3. **Time Index**: Stores `entry_id` with score `timestamp` in `{base_key}:INDEX:TIME`.

        Parameters:
            base_key (str): The base identifier for the collection.
            entry_id (str): Unique ID for the specific entry.
            entry (Dict): The data payload to store.
            custom_key (Optional[str]): A secondary lookup key (e.g., a tag).
            timestamp (float): The timestamp associated with the entry.

        Returns:
            bool: None (implicitly), raises exception on failure.

        Raises:
            Exception: If any Redis command fails.
        """
        try:
            # store data (hash)
            self.connection.hset(self._get_data_key(base_key), entry_id, json.dumps(entry))
            # update key index (hash)
            if custom_key:
                self.connection.hset(self._get_key_index_key(base_key), custom_key, entry_id)
            # update time index (sorted set)
            self.connection.zadd(self._get_time_index_key(base_key), {entry_id: timestamp})
        except Exception as ex:
            raise ex

    def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
        """
        Retrieves a specific entry directly by its unique ID from the data hash.

        Parameters:
            base_key (str): The base identifier for the collection.
            entry_id (str): The unique ID of the entry.

        Returns:
            Optional[Dict]: The deserialized entry data, or None if not found.

        Raises:
            Exception: If the Redis operation fails.
        """
        try:
            result = self.connection.hget(self._get_data_key(base_key), entry_id)
            return json.loads(result) if result else None
        except Exception as ex:
            raise ex

    def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
        """
        Retrieves an entry using the secondary custom key index.
        This performs two lookups: one to resolve the ID, and one to get the data.

        Parameters:
            base_key (str): The base identifier for the collection.
            custom_key (str): The secondary lookup key.

        Returns:
            Optional[Dict]: The entry data if found, None otherwise.

        Raises:
            Exception: If the Redis operation fails.
        """
        try:
            # get ID from index
            entry_id = self.connection.hget(self._get_key_index_key(base_key), custom_key)
            if not entry_id:
                return None
            # get data using ID
            return self.get_entry_by_id(base_key, entry_id)
        except Exception as ex:
            raise ex

    def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
        """
        Retrieves entries falling within a specific time range using the ZSET index.
        This is optimized to use `HMGET` for batch retrieval after identifying IDs.

        Parameters:
            base_key (str): The base identifier for the collection.
            start (float): The start timestamp (score).
            end (float): The end timestamp (score).

        Returns:
            List[Dict]: A list of deserialized entries found in the range.

        Raises:
            Exception: If the Redis operation fails.
        """
        try:
            # get IDs from sorted set
            entry_ids = self.connection.zrangebyscore(self._get_time_index_key(base_key), start, end)
            if not entry_ids:
                return []
            # batch get data (HMGET)
            result = self.connection.hmget(self._get_data_key(base_key), entry_ids)
            return [json.loads(v) for v in result if v]
        except Exception as ex:
            raise ex

__init__(properties)

Initialize the RedisMemoryStore.

Parameters:

Name Type Description Default
properties Dict

Dictionary containing connection details (host, port, etc.) passed to the PooledConnectionFactory.

required
Source code in blue/memories/redis_memory_store.py
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, properties: Dict):
    """
    Initialize the RedisMemoryStore.

    Parameters:
        properties (Dict): Dictionary containing connection details (host, port, etc.)
            passed to the `PooledConnectionFactory`.
    """
    self.properties = properties
    self.connection_factory = None
    self.connection = None

append_to_list(key, entry)

Appends a dictionary entry to a JSON list stored at the specified key. Uses RedisJSON (JSON.ARRAPPEND).

Parameters:

Name Type Description Default
key str

The Redis key where the list is stored.

required
entry Dict

The data to append.

required

Returns:

Name Type Description
bool bool

Always returns None (implicitly), but raises exception on failure.

Raises:

Type Description
Exception

If the Redis operation fails.

Source code in blue/memories/redis_memory_store.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def append_to_list(self, key: str, entry: Dict) -> bool:
    """
    Appends a dictionary entry to a JSON list stored at the specified key.
    Uses RedisJSON (`JSON.ARRAPPEND`).

    Parameters:
        key (str): The Redis key where the list is stored.
        entry (Dict): The data to append.

    Returns:
        bool: Always returns None (implicitly), but raises exception on failure.

    Raises:
        Exception: If the Redis operation fails.
    """
    try:
        self.connection.json().set(key, "$", [], nx=True)
        self.connection.json().arrappend(key, "$", entry)
    except Exception as ex:
        raise ex

exists(key)

Checks if a specific key exists in Redis.

Parameters:

Name Type Description Default
key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in blue/memories/redis_memory_store.py
87
88
89
90
91
92
93
94
95
96
97
def exists(self, key: str) -> bool:
    """
    Checks if a specific key exists in Redis.

    Parameters:
        key (str): The key to check.

    Returns:
        bool: True if the key exists, False otherwise.
    """
    return self.connection.exists(key)

get_entries_by_time_range(base_key, start, end)

Retrieves entries falling within a specific time range using the ZSET index. This is optimized to use HMGET for batch retrieval after identifying IDs.

Parameters:

Name Type Description Default
base_key str

The base identifier for the collection.

required
start float

The start timestamp (score).

required
end float

The end timestamp (score).

required

Returns:

Type Description
List[Dict]

List[Dict]: A list of deserialized entries found in the range.

Raises:

Type Description
Exception

If the Redis operation fails.

Source code in blue/memories/redis_memory_store.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
    """
    Retrieves entries falling within a specific time range using the ZSET index.
    This is optimized to use `HMGET` for batch retrieval after identifying IDs.

    Parameters:
        base_key (str): The base identifier for the collection.
        start (float): The start timestamp (score).
        end (float): The end timestamp (score).

    Returns:
        List[Dict]: A list of deserialized entries found in the range.

    Raises:
        Exception: If the Redis operation fails.
    """
    try:
        # get IDs from sorted set
        entry_ids = self.connection.zrangebyscore(self._get_time_index_key(base_key), start, end)
        if not entry_ids:
            return []
        # batch get data (HMGET)
        result = self.connection.hmget(self._get_data_key(base_key), entry_ids)
        return [json.loads(v) for v in result if v]
    except Exception as ex:
        raise ex

get_entry_by_id(base_key, entry_id)

Retrieves a specific entry directly by its unique ID from the data hash.

Parameters:

Name Type Description Default
base_key str

The base identifier for the collection.

required
entry_id str

The unique ID of the entry.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The deserialized entry data, or None if not found.

Raises:

Type Description
Exception

If the Redis operation fails.

Source code in blue/memories/redis_memory_store.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
    """
    Retrieves a specific entry directly by its unique ID from the data hash.

    Parameters:
        base_key (str): The base identifier for the collection.
        entry_id (str): The unique ID of the entry.

    Returns:
        Optional[Dict]: The deserialized entry data, or None if not found.

    Raises:
        Exception: If the Redis operation fails.
    """
    try:
        result = self.connection.hget(self._get_data_key(base_key), entry_id)
        return json.loads(result) if result else None
    except Exception as ex:
        raise ex

get_entry_by_key_index(base_key, custom_key)

Retrieves an entry using the secondary custom key index. This performs two lookups: one to resolve the ID, and one to get the data.

Parameters:

Name Type Description Default
base_key str

The base identifier for the collection.

required
custom_key str

The secondary lookup key.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The entry data if found, None otherwise.

Raises:

Type Description
Exception

If the Redis operation fails.

Source code in blue/memories/redis_memory_store.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
    """
    Retrieves an entry using the secondary custom key index.
    This performs two lookups: one to resolve the ID, and one to get the data.

    Parameters:
        base_key (str): The base identifier for the collection.
        custom_key (str): The secondary lookup key.

    Returns:
        Optional[Dict]: The entry data if found, None otherwise.

    Raises:
        Exception: If the Redis operation fails.
    """
    try:
        # get ID from index
        entry_id = self.connection.hget(self._get_key_index_key(base_key), custom_key)
        if not entry_id:
            return None
        # get data using ID
        return self.get_entry_by_id(base_key, entry_id)
    except Exception as ex:
        raise ex

get_list(key)

Retrieves the full list of dictionaries stored at the specified key. Uses RedisJSON (JSON.GET).

Parameters:

Name Type Description Default
key str

The Redis key to retrieve.

required

Returns:

Type Description
List[Dict]

List[Dict]: The list of stored entries, or an empty list if the key

List[Dict]

does not exist or contains no data.

Raises:

Type Description
Exception

If the Redis operation fails.

Source code in blue/memories/redis_memory_store.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_list(self, key: str) -> List[Dict]:
    """
    Retrieves the full list of dictionaries stored at the specified key.
    Uses RedisJSON (`JSON.GET`).

    Parameters:
        key (str): The Redis key to retrieve.

    Returns:
        List[Dict]: The list of stored entries, or an empty list if the key
        does not exist or contains no data.

    Raises:
        Exception: If the Redis operation fails.
    """
    if not self.connection.exists(key):
        return []
    try:
        data = self.connection.json().get(key)
        return data if data else []
    except Exception as ex:
        raise ex

initialize()

Sets up the connection factory and establishes the Redis connection.

Source code in blue/memories/redis_memory_store.py
36
37
38
39
40
41
def initialize(self):
    """
    Sets up the connection factory and establishes the Redis connection.
    """
    self.connection_factory = PooledConnectionFactory(properties=self.properties)
    self.connection = self.connection_factory.get_connection()

store_indexed_entry(base_key, entry_id, entry, custom_key, timestamp)

Atomically stores an entry and updates secondary and time indices.

Storage Structure: 1. Data Hash: Stores entry_id -> json_string in {base_key}:DATA. 2. Key Index: Stores custom_key -> entry_id in {base_key}:INDEX:KEYS (optional). 3. Time Index: Stores entry_id with score timestamp in {base_key}:INDEX:TIME.

Parameters:

Name Type Description Default
base_key str

The base identifier for the collection.

required
entry_id str

Unique ID for the specific entry.

required
entry Dict

The data payload to store.

required
custom_key Optional[str]

A secondary lookup key (e.g., a tag).

required
timestamp float

The timestamp associated with the entry.

required

Returns:

Name Type Description
bool bool

None (implicitly), raises exception on failure.

Raises:

Type Description
Exception

If any Redis command fails.

Source code in blue/memories/redis_memory_store.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
    """
    Atomically stores an entry and updates secondary and time indices.

    Storage Structure:
    1. **Data Hash**: Stores `entry_id` -> `json_string` in `{base_key}:DATA`.
    2. **Key Index**: Stores `custom_key` -> `entry_id` in `{base_key}:INDEX:KEYS` (optional).
    3. **Time Index**: Stores `entry_id` with score `timestamp` in `{base_key}:INDEX:TIME`.

    Parameters:
        base_key (str): The base identifier for the collection.
        entry_id (str): Unique ID for the specific entry.
        entry (Dict): The data payload to store.
        custom_key (Optional[str]): A secondary lookup key (e.g., a tag).
        timestamp (float): The timestamp associated with the entry.

    Returns:
        bool: None (implicitly), raises exception on failure.

    Raises:
        Exception: If any Redis command fails.
    """
    try:
        # store data (hash)
        self.connection.hset(self._get_data_key(base_key), entry_id, json.dumps(entry))
        # update key index (hash)
        if custom_key:
            self.connection.hset(self._get_key_index_key(base_key), custom_key, entry_id)
        # update time index (sorted set)
        self.connection.zadd(self._get_time_index_key(base_key), {entry_id: timestamp})
    except Exception as ex:
        raise ex

InMemoryStore

Bases: MemoryStore

In-memory implementation of the MemoryStore interface using standard Python dictionaries.

This class serves as a volatile storage backend, primarily useful for testing, development, or non-persistent runtime memory. It mimics the structure of the RedisMemoryStore by separating data storage from indexing:

  • Data Storage: Uses a dictionary at {base_key}:DATA to map entry IDs to data.
  • Key Index: Uses a dictionary at {base_key}:INDEX:KEYS to map custom keys to entry IDs.
  • Time Index: Uses a list of tuples (timestamp, entry_id) at {base_key}:INDEX:TIME, sorted by timestamp, to facilitate range queries.

Attributes:

Name Type Description
_store Dict

The internal dictionary acting as the database.

Source code in blue/memories/in_memory_store.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class InMemoryStore(MemoryStore):
    """
    In-memory implementation of the MemoryStore interface using standard Python dictionaries.

    This class serves as a volatile storage backend, primarily useful for testing,
    development, or non-persistent runtime memory. It mimics the structure of the
    `RedisMemoryStore` by separating data storage from indexing:

    - **Data Storage**: Uses a dictionary at `{base_key}:DATA` to map entry IDs to data.
    - **Key Index**: Uses a dictionary at `{base_key}:INDEX:KEYS` to map custom keys to entry IDs.
    - **Time Index**: Uses a list of tuples `(timestamp, entry_id)` at `{base_key}:INDEX:TIME`,
      sorted by timestamp, to facilitate range queries.

    Attributes:
        _store (Dict): The internal dictionary acting as the database.
    """

    def __init__(self):
        """
        Initialize the InMemoryStore with an empty storage dictionary.
        """
        self._store = {}

    def initialize(self):
        """
        No-op initialization for in-memory storage.
        """
        pass

    # key helpers
    def _get_data_key(self, base_key: str) -> str:
        """Helper to generate the internal key for the data dictionary."""
        return f"{base_key}:DATA"

    def _get_key_index_key(self, base_key: str) -> str:
        """Helper to generate the internal key for the secondary index dictionary."""
        return f"{base_key}:INDEX:KEYS"

    def _get_time_index_key(self, base_key: str) -> str:
        """Helper to generate the internal key for the time-sorted list."""
        return f"{base_key}:INDEX:TIME"

    def append_to_list(self, base_key: str, entry: Dict) -> bool:
        """
        Appends a dictionary entry to a list stored at the given key.

        Parameters:
            base_key (str): The storage key.
            entry (Dict): The data to append.

        Returns:
            bool: Always returns None (implicitly), mimicking the void return of a procedure.
        """
        if base_key not in self._store:
            self._store[base_key] = []
        if not isinstance(self._store[base_key], list):
            self._store[base_key] = []
        self._store[base_key].append(entry)

    def get_list(self, base_key: str) -> List[Dict]:
        """
        Retrieves the full list of entries stored at the given key.

        Parameters:
            base_key (str): The storage key.

        Returns:
            List[Dict]: The stored list, or an empty list if the key is missing or invalid.
        """
        result = self._store.get(base_key, [])
        return result if isinstance(result, list) else []

    def exists(self, base_key: str) -> bool:
        """
        Checks if the base key exists in the internal store.

        Parameters:
            base_key (str): The key to check.

        Returns:
            bool: True if key is present, False otherwise.
        """
        return base_key in self._store

    def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
        """
        Atomically stores data and updates in-memory indices.

        1. **Data**: Stored in a nested dict under `{base_key}:DATA`.
        2. **Key Index**: Mapped in a nested dict under `{base_key}:INDEX:KEYS`.
        3. **Time Index**: Appended as `(timestamp, id)` to a list under `{base_key}:INDEX:TIME`
           and re-sorted.

        Parameters:
            base_key (str): The scope identifier.
            entry_id (str): Unique ID for the entry.
            entry (Dict): The data payload.
            custom_key (Optional[str]): Secondary lookup key.
            timestamp (float): The timestamp for sorting.

        Returns:
            bool: None (implicitly).
        """
        # store data
        data_key = self._get_data_key(base_key)
        if data_key not in self._store:
            self._store[data_key] = {}
        self._store[data_key][entry_id] = entry
        # update key index
        if custom_key:
            index_key = self._get_key_index_key(base_key)
            if index_key not in self._store:
                self._store[index_key] = {}
            self._store[index_key][custom_key] = entry_id
        # update time index
        time_key = self._get_time_index_key(base_key)
        if time_key not in self._store:
            self._store[time_key] = []
        # append tuple: (timestamp, id)
        self._store[time_key].append((timestamp, entry_id))
        # sort by timestamp
        self._store[time_key].sort(key=lambda x: x[0])

    def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
        """
        Retrieves an entry directly from the data dictionary.

        Parameters:
            base_key (str): The scope identifier.
            entry_id (str): The unique entry ID.

        Returns:
            Optional[Dict]: The entry data if found, None otherwise.
        """
        data_key = self._get_data_key(base_key)
        data_hash = self._store.get(data_key)
        if data_hash and isinstance(data_hash, dict):
            return data_hash.get(entry_id)
        return None

    def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
        """
        Retrieves an entry by looking up the ID in the key index, then fetching the data.

        Parameters:
            base_key (str): The scope identifier.
            custom_key (str): The secondary lookup key.

        Returns:
            Optional[Dict]: The entry data if found, None otherwise.
        """
        # get ID from index
        index_key = self._get_key_index_key(base_key)
        index_hash = self._store.get(index_key)
        if not index_hash or not isinstance(index_hash, dict):
            return None
        entry_id = index_hash.get(custom_key)
        if not entry_id:
            return None
        # get data using ID
        return self.get_entry_by_id(base_key, entry_id)

    def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
        """
        Retrieves entries within a time range by filtering the sorted time index list.

        Parameters:
            base_key (str): The scope identifier.
            start (float): Start timestamp (inclusive).
            end (float): End timestamp (inclusive).

        Returns:
            List[Dict]: A list of matching entries.
        """
        # get IDs from time index
        time_key = self._get_time_index_key(base_key)
        time_list = self._store.get(time_key, [])
        if not time_list or not isinstance(time_list, list):
            return []
        # filter tuples where start <= timestamp <= end
        entry_ids = [eid for ts, eid in time_list if start <= ts <= end]
        if not entry_ids:
            return []
        # batch get data
        results = []
        for eid in entry_ids:
            entry = self.get_entry_by_id(base_key, eid)
            if entry:
                results.append(entry)
        return results

__init__()

Initialize the InMemoryStore with an empty storage dictionary.

Source code in blue/memories/in_memory_store.py
25
26
27
28
29
def __init__(self):
    """
    Initialize the InMemoryStore with an empty storage dictionary.
    """
    self._store = {}

append_to_list(base_key, entry)

Appends a dictionary entry to a list stored at the given key.

Parameters:

Name Type Description Default
base_key str

The storage key.

required
entry Dict

The data to append.

required

Returns:

Name Type Description
bool bool

Always returns None (implicitly), mimicking the void return of a procedure.

Source code in blue/memories/in_memory_store.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def append_to_list(self, base_key: str, entry: Dict) -> bool:
    """
    Appends a dictionary entry to a list stored at the given key.

    Parameters:
        base_key (str): The storage key.
        entry (Dict): The data to append.

    Returns:
        bool: Always returns None (implicitly), mimicking the void return of a procedure.
    """
    if base_key not in self._store:
        self._store[base_key] = []
    if not isinstance(self._store[base_key], list):
        self._store[base_key] = []
    self._store[base_key].append(entry)

exists(base_key)

Checks if the base key exists in the internal store.

Parameters:

Name Type Description Default
base_key str

The key to check.

required

Returns:

Name Type Description
bool bool

True if key is present, False otherwise.

Source code in blue/memories/in_memory_store.py
80
81
82
83
84
85
86
87
88
89
90
def exists(self, base_key: str) -> bool:
    """
    Checks if the base key exists in the internal store.

    Parameters:
        base_key (str): The key to check.

    Returns:
        bool: True if key is present, False otherwise.
    """
    return base_key in self._store

get_entries_by_time_range(base_key, start, end)

Retrieves entries within a time range by filtering the sorted time index list.

Parameters:

Name Type Description Default
base_key str

The scope identifier.

required
start float

Start timestamp (inclusive).

required
end float

End timestamp (inclusive).

required

Returns:

Type Description
List[Dict]

List[Dict]: A list of matching entries.

Source code in blue/memories/in_memory_store.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def get_entries_by_time_range(self, base_key: str, start: float, end: float) -> List[Dict]:
    """
    Retrieves entries within a time range by filtering the sorted time index list.

    Parameters:
        base_key (str): The scope identifier.
        start (float): Start timestamp (inclusive).
        end (float): End timestamp (inclusive).

    Returns:
        List[Dict]: A list of matching entries.
    """
    # get IDs from time index
    time_key = self._get_time_index_key(base_key)
    time_list = self._store.get(time_key, [])
    if not time_list or not isinstance(time_list, list):
        return []
    # filter tuples where start <= timestamp <= end
    entry_ids = [eid for ts, eid in time_list if start <= ts <= end]
    if not entry_ids:
        return []
    # batch get data
    results = []
    for eid in entry_ids:
        entry = self.get_entry_by_id(base_key, eid)
        if entry:
            results.append(entry)
    return results

get_entry_by_id(base_key, entry_id)

Retrieves an entry directly from the data dictionary.

Parameters:

Name Type Description Default
base_key str

The scope identifier.

required
entry_id str

The unique entry ID.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The entry data if found, None otherwise.

Source code in blue/memories/in_memory_store.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_entry_by_id(self, base_key: str, entry_id: str) -> Optional[Dict]:
    """
    Retrieves an entry directly from the data dictionary.

    Parameters:
        base_key (str): The scope identifier.
        entry_id (str): The unique entry ID.

    Returns:
        Optional[Dict]: The entry data if found, None otherwise.
    """
    data_key = self._get_data_key(base_key)
    data_hash = self._store.get(data_key)
    if data_hash and isinstance(data_hash, dict):
        return data_hash.get(entry_id)
    return None

get_entry_by_key_index(base_key, custom_key)

Retrieves an entry by looking up the ID in the key index, then fetching the data.

Parameters:

Name Type Description Default
base_key str

The scope identifier.

required
custom_key str

The secondary lookup key.

required

Returns:

Type Description
Optional[Dict]

Optional[Dict]: The entry data if found, None otherwise.

Source code in blue/memories/in_memory_store.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def get_entry_by_key_index(self, base_key: str, custom_key: str) -> Optional[Dict]:
    """
    Retrieves an entry by looking up the ID in the key index, then fetching the data.

    Parameters:
        base_key (str): The scope identifier.
        custom_key (str): The secondary lookup key.

    Returns:
        Optional[Dict]: The entry data if found, None otherwise.
    """
    # get ID from index
    index_key = self._get_key_index_key(base_key)
    index_hash = self._store.get(index_key)
    if not index_hash or not isinstance(index_hash, dict):
        return None
    entry_id = index_hash.get(custom_key)
    if not entry_id:
        return None
    # get data using ID
    return self.get_entry_by_id(base_key, entry_id)

get_list(base_key)

Retrieves the full list of entries stored at the given key.

Parameters:

Name Type Description Default
base_key str

The storage key.

required

Returns:

Type Description
List[Dict]

List[Dict]: The stored list, or an empty list if the key is missing or invalid.

Source code in blue/memories/in_memory_store.py
67
68
69
70
71
72
73
74
75
76
77
78
def get_list(self, base_key: str) -> List[Dict]:
    """
    Retrieves the full list of entries stored at the given key.

    Parameters:
        base_key (str): The storage key.

    Returns:
        List[Dict]: The stored list, or an empty list if the key is missing or invalid.
    """
    result = self._store.get(base_key, [])
    return result if isinstance(result, list) else []

initialize()

No-op initialization for in-memory storage.

Source code in blue/memories/in_memory_store.py
31
32
33
34
35
def initialize(self):
    """
    No-op initialization for in-memory storage.
    """
    pass

store_indexed_entry(base_key, entry_id, entry, custom_key, timestamp)

Atomically stores data and updates in-memory indices.

  1. Data: Stored in a nested dict under {base_key}:DATA.
  2. Key Index: Mapped in a nested dict under {base_key}:INDEX:KEYS.
  3. Time Index: Appended as (timestamp, id) to a list under {base_key}:INDEX:TIME and re-sorted.

Parameters:

Name Type Description Default
base_key str

The scope identifier.

required
entry_id str

Unique ID for the entry.

required
entry Dict

The data payload.

required
custom_key Optional[str]

Secondary lookup key.

required
timestamp float

The timestamp for sorting.

required

Returns:

Name Type Description
bool bool

None (implicitly).

Source code in blue/memories/in_memory_store.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def store_indexed_entry(self, base_key: str, entry_id: str, entry: Dict, custom_key: Optional[str], timestamp: float) -> bool:
    """
    Atomically stores data and updates in-memory indices.

    1. **Data**: Stored in a nested dict under `{base_key}:DATA`.
    2. **Key Index**: Mapped in a nested dict under `{base_key}:INDEX:KEYS`.
    3. **Time Index**: Appended as `(timestamp, id)` to a list under `{base_key}:INDEX:TIME`
       and re-sorted.

    Parameters:
        base_key (str): The scope identifier.
        entry_id (str): Unique ID for the entry.
        entry (Dict): The data payload.
        custom_key (Optional[str]): Secondary lookup key.
        timestamp (float): The timestamp for sorting.

    Returns:
        bool: None (implicitly).
    """
    # store data
    data_key = self._get_data_key(base_key)
    if data_key not in self._store:
        self._store[data_key] = {}
    self._store[data_key][entry_id] = entry
    # update key index
    if custom_key:
        index_key = self._get_key_index_key(base_key)
        if index_key not in self._store:
            self._store[index_key] = {}
        self._store[index_key][custom_key] = entry_id
    # update time index
    time_key = self._get_time_index_key(base_key)
    if time_key not in self._store:
        self._store[time_key] = []
    # append tuple: (timestamp, id)
    self._store[time_key].append((timestamp, entry_id))
    # sort by timestamp
    self._store[time_key].sort(key=lambda x: x[0])
Last update: 2026-02-11