Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/ |
import queue import time from defence360agent.model.instance import db from defence360agent.model.messages_to_send import MessageToSend class PersistentMessagesQueue: """ The queue to store messages sent to the server if it is unavailable. - stores more recent data; if a limit is exceeded, older messages are deleted. - no duplicate messages are sent NOTE: it is worth remembering that when writing a large number of messages, the amount of memory used may increase by the size of the sqlite cache (this may not be immediately obvious). https://www.sqlite.org/pragma.html#pragma_cache_size """ def __init__(self, buffer_limit=20, storage_limit=1000, model=None): self._buffer_limit = buffer_limit self._storage_limit = storage_limit self._buffer = [] # [(timestamp, message),...] self._model = model or MessageToSend def push_buffer_to_storage(self) -> None: if self._buffer: with db.atomic(): # buffer may contain older messages than db, # so remove oldest items after insert self._model.insert_many(self._buffer) need_to_remove = self.storage_size - self._storage_limit if need_to_remove > 0: # keep only the most recent messages self._model.delete_old(need_to_remove) self._buffer = [] def pop_all(self) -> list: items = [] with db.atomic(): items += list( self._model.select( self._model.timestamp, self._model.message ).tuples() ) self._model.delete().execute() items += self._buffer self._buffer = [] return sorted(items) # older first def empty(self) -> bool: return self.qsize() == 0 def qsize(self) -> int: return self.storage_size + len(self._buffer) @property def buffer_size(self) -> int: return len(self._buffer) @property def storage_size(self) -> int: return self._model.select().count() def put(self, message: bytes, timestamp=None): if timestamp is None: timestamp = time.time() self._buffer.append((timestamp, message)) if self.buffer_size >= self._buffer_limit: self.push_buffer_to_storage() def get(self) -> bytes: if not self._buffer and self.storage_size != 0: # reduce the number of database calls items = self._model.get_oldest(limit=self._buffer_limit) # not save id value self._buffer += [item[1:] for item in items.tuples()] self._model.delete_in(items) if self._buffer: _, message = self._buffer.pop(0) return message raise queue.Empty()