Commit 6fbce954 authored by Viacheslav Sukhovieiev's avatar Viacheslav Sukhovieiev
Browse files

Merge branch 'slava/add_serialization_caching' into 'master'

refactor(serialization) add serialized data caching

See merge request !178
parents a850d31b ffe630db
......@@ -40,7 +40,7 @@ version:
## Build python package
build-wheel:
@python3 setup.py $(EGG_INFO) sdist bdist_wheel
@python3.9 setup.py $(EGG_INFO) sdist bdist_wheel
## Publish python package
publish-wheel:
......
......@@ -5,4 +5,5 @@ motor==2.4.0
aiotask_context
prozorro-metrics~=1.4
prozorro-tools~=0.9
ujson
\ No newline at end of file
ujson
diskcache
\ No newline at end of file
......@@ -8,6 +8,7 @@ import os
import iso8601
from aiohttp import web, WSCloseCode
from aiotask_context import task_factory
from diskcache import JSONDisk
from bson import ObjectId
from motor import motor_asyncio
......@@ -18,6 +19,7 @@ from prozorro_sale.tools import logger
from prozorro_sale.tools.middlewares import request_id_middleware
from mongodb_mirror.utils import MongoOperation
from mongodb_mirror.errors import ClientTimestampException, IncorrectTimestamp, CriticalError, MirrorClosed
from diskcache import Cache
import prometheus_client
......@@ -38,9 +40,13 @@ CLIENT_PROTOCOL = {
'simple': client.SocketMirrorClientSimpleProtocol
}
CACHE_FILE_PATH = os.environ.get('CACHE_FILE_PATH', 'serialize_cache')
CACHE_FILE = None
CACHE_SIZE_LIMIT = 1024 ** 2 * 100 # 100 Mb
class MongoDBMirror:
__slots__ = ['_mongo_url', 'cursors', '_mongo', '_oplog']
__slots__ = ['_mongo_url', 'cursors', '_mongo', '_oplog', '_cache_file']
def __init__(self, mongo_url, config):
self._mongo_url = mongo_url
......@@ -49,6 +55,7 @@ class MongoDBMirror:
async def run(self, app):
self._mongo = motor_asyncio.AsyncIOMotorClient(self._mongo_url, w='majority')
self._oplog = self._mongo.local.oplog.rs
self._cache_file = app.cache_file
oldest_ts = await self._get_oldest_oplog_ts()
newest_ts = await self._get_newest_oplog_ts()
LOG.info('Oldest timestamp available in oplog %s' % oldest_ts.as_datetime())
......@@ -97,7 +104,7 @@ class MongoDBMirror:
elif entry['ns'] != client.namespace:
continue
elif op == MongoOperation.INSERT:
data = self._serialize_document(data, collection)
data = await self._get_serialized_document(data, collection, timestamp)
await client.upsert(ts, data)
elif op == MongoOperation.UPDATE:
if '$v' in data:
......@@ -107,7 +114,7 @@ class MongoDBMirror:
if data is None:
LOG.warning(f'Document not found: {collection} with {entry["o2"]["_id"]}')
continue
data = self._serialize_document(data, collection)
data = await self._get_serialized_document(data, collection, timestamp)
await client.upsert(ts, data)
elif op == MongoOperation.DELETE:
data = self._serialize_data(data)
......@@ -144,6 +151,26 @@ class MongoDBMirror:
await oplog_cursor.close()
raise ex
async def _get_serialized_document(self, data, collection, timestamp):
_obj_id = data.get('_id', '')
cache_key = f"{timestamp.time}_{_obj_id}"
loop = asyncio.get_running_loop()
try:
obj_data = await loop.run_in_executor(None, self._cache_file.get, cache_key)
except Exception as ex:
LOG.warning(f'Failed to get object data from cache. {str(ex)}')
obj_data = None
if not obj_data:
obj_data = self._serialize_document(data, collection)
try:
await loop.run_in_executor(None, self._cache_file.set, cache_key, obj_data)
except Exception as ex:
LOG.warning(f'Failed to set object data to cache. {str(ex)}')
return obj_data
def _serialize_data(self, doc):
_doc = doc.copy()
for key, value in _doc.items():
......@@ -199,7 +226,23 @@ async def ping(request):
return web.json_response({'text': 'pong'})
async def on_startup(app: web.Application):
LOG.info('Application is starting...')
app.cache_file = Cache(
CACHE_FILE_PATH,
size_limit=CACHE_SIZE_LIMIT,
disk=JSONDisk,
sqlite_foreign_keys='ON',
sqlite_journal_mode='OFF',
sqlite_synchronous='OFF',
sqlite_count_changes='OFF',
sqlite_read_uncommitted=1,
)
async def on_shutdown(app: web.Application):
app.cache_file.close()
LOG.info('Shutdown app')
......@@ -359,6 +402,7 @@ class BaseApp:
mirror = self.mirror_class(MONGO_URL, None)
app.mirror = mirror
app.websockets = WebSocketsStorage()
app.on_startup.append(on_startup)
app.on_startup.append(mirror.run)
app.on_startup.append(app.websockets.init_storage)
app.on_shutdown.append(app.websockets.close_storage)
......
......@@ -117,7 +117,7 @@ class TestMongoDBMirror:
mock_gauge_remove.assert_called_once()
@pytest.mark.parametrize('operation', [MongoOperation.INSERT, MongoOperation.UPDATE])
@patch('mongodb_mirror.mirror.MongoDBMirror._serialize_document', return_value={'test': 'test'})
@patch('mongodb_mirror.mirror.MongoDBMirror._get_serialized_document', return_value={'test': 'test'})
@patch('mongodb_mirror.mirror.MongoDBMirror.get_available_cursor')
@patch('mongodb_mirror.mirror.MongoDBMirror._get_oldest_oplog_ts', new_callable=AsyncMock)
@patch('mongodb_mirror.mirror.prometheus_client.Gauge.remove')
......@@ -139,7 +139,7 @@ class TestMongoDBMirror:
await self.mongo_db_mirror.sync_client(mock_client)
mock_get_oldest_oplog_ts.assert_called_once()
mock_get_available_cursor.assert_called_once_with(self.current_ts)
mock_serialize_document.assert_called_once_with(self.test_data, 'procedures')
mock_serialize_document.assert_called_once_with(self.test_data, 'procedures', self.current_ts)
mock_client.upsert.assert_called_once_with([self.current_ts.time, self.current_ts.inc], self.test_data)
mock_gauge_remove.assert_called_once()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment