Commit 52de711a authored by dmitry.mashoshin's avatar dmitry.mashoshin
Browse files

Merge branch 'mashony/mirror-user-context' into 'master'

feat: add acl context to serialize document method

See merge request !190
parents df429e05 05a7958c
......@@ -46,8 +46,9 @@ class MirrorClient:
async def upsert(self, ts, data=None):
"""Insert data, if data exists, update it."""
self._ts = ts
LOG.info(f'upsert {data}')
if data is not None:
self._ts = ts
LOG.info(f'upsert {data}')
async def update(self, ts, data=None):
"""Update data."""
......
......@@ -139,7 +139,8 @@ class MongoDBMirror:
elif entry['ns'] != client.namespace:
continue
elif op == MongoOperation.INSERT:
data = await self._get_serialized_document(data, collection, entry['ts'])
data = await self._get_serialized_document(data, collection, entry['ts'],
client.ws.acl_context)
await client.upsert(ts, data)
elif op == MongoOperation.UPDATE:
if '$v' in data:
......@@ -149,7 +150,8 @@ class MongoDBMirror:
if data is None:
LOG.warning(f'Document not found: {collection} with {entry["o2"]["_id"]}')
continue
data = await self._get_serialized_document(data, collection, entry['ts'])
data = await self._get_serialized_document(data, collection, entry['ts'],
client.ws.acl_context)
await client.upsert(ts, data)
elif op == MongoOperation.DELETE:
data = self._serialize_data(data)
......@@ -197,7 +199,7 @@ class MongoDBMirror:
await oplog_cursor.close()
raise ex
async def _get_serialized_document(self, data, collection, timestamp):
async def _get_serialized_document(self, data, collection, timestamp, acl_context=None):
if CACHE_ENABLED:
_obj_id = data.get('_id', '')
cache_key = f"{_obj_id}_{timestamp.time}_{timestamp.inc}"
......@@ -216,7 +218,7 @@ class MongoDBMirror:
except Exception as ex:
LOG.warning(f'Failed to set object data to cache. {str(ex)}')
else:
obj_data = self._serialize_document(data, collection)
obj_data = self._serialize_document(data, collection, acl_context)
return obj_data
......@@ -238,7 +240,7 @@ class MongoDBMirror:
doc[key] = str(value)
return doc
def _serialize_document(self, doc, collection):
def _serialize_document(self, doc, collection, acl_context=None):
"""Deserialize MongoDB object to Python object.
Args:
......@@ -269,7 +271,7 @@ class MongoDBMirror:
ts = [entry.time, entry.inc]
async for doc in self._mongo[db][collection].find(collection_filter, batch_size=10).sort('dateModified', 1):
try:
send_data = self._serialize_document(doc, collection)
send_data = self._serialize_document(doc, collection, client.ws.acl_context)
except (TypeError, ValueError, AttributeError, KeyError) as ex:
msg = f'Error while trying to serialize record {doc.get("_id")} during initial sync'
LOG.warning(f"{msg} - {ex}", exc_info=True)
......@@ -383,6 +385,14 @@ class WebSocketsWatcher:
self.__current_job = None
super().__init__()
@property
def request(self):
return self.__request
@property
def acl_context(self):
return getattr(self.request, "_context", None)
async def __aenter__(self):
"""Initialize the context to use in the with statement.
......
......@@ -128,6 +128,8 @@ class TestMongoDBMirror:
'ns': self.client_namespace
}
mock_client = Mock()
mocked_acl_context = Mock()
mock_client.ws.acl_context = mocked_acl_context
mock_client.get_last_timestamp = AsyncMock(return_value=self.current_ts)
mock_client.upsert = AsyncMock()
mock_get_oldest_oplog_ts.return_value = self.oldest_ts
......@@ -136,7 +138,8 @@ class TestMongoDBMirror:
await self.mongo_db_mirror.sync_client(mock_client)
mock_get_oldest_oplog_ts.assert_called_once()
mock_get_available_cursor.assert_called_once_with(self.current_ts)
mock_serialize_document.assert_called_once_with(self.test_data, 'procedures', self.current_ts)
mock_serialize_document.assert_called_once_with(self.test_data, 'procedures', self.current_ts,
mocked_acl_context)
mock_client.upsert.assert_called_once_with([self.current_ts.time, self.current_ts.inc], self.test_data)
@patch('mongodb_mirror.mirror.MongoDBMirror._serialize_data', return_value={'test': 'test'})
......@@ -224,6 +227,8 @@ class TestMongoDBMirror:
get_sync_init_point=AsyncMock(return_value=init_point),
upsert=AsyncMock()
)
mocked_acl_context = Mock()
mock_client.ws.acl_context = mocked_acl_context
mock_client.namespace = self.client_namespace
mock_get_newest_oplog_ts.return_value = self.current_ts
mock_sort.return_value = AsyncIter([self.test_data])
......@@ -233,7 +238,7 @@ class TestMongoDBMirror:
mock_namespace_to_db_collection.assert_called_once_with(self.client_namespace)
mock_get_newest_oplog_ts.assert_called_once()
mock_find.assert_called_once_with(collection_filter, batch_size=10)
mock_serialize_document.assert_called_once_with(self.test_data, 'collection_name')
mock_serialize_document.assert_called_once_with(self.test_data, 'collection_name', mocked_acl_context)
mock_client.upsert.assert_called_once_with([self.current_ts.time, self.current_ts.inc], self.test_data)
@pytest.mark.parametrize('exception_type', (TypeError, ValueError, AttributeError, KeyError))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment