Commit 6b607b49 authored by Volodymyr Kovalenko's avatar Volodymyr Kovalenko Committed by mashony
Browse files

feat: add new message type error to handle serialize errors

parent 5fb282d4
......@@ -122,6 +122,20 @@ example answer
'ts': ts
}
```
* error
```
{
'type': 'error',
'data': {
"_id": 542c2b97bac0595474108b48,
"error": {
"code": 4000,
"message": "Critical error on trying to serialize entry 542c2b97bac0595474108b48 - Failed to load 'title' field"
}
},
'ts': ts
}
```
### Available mirror clients
* [python](https://gitlab.prozorro.sale/public-projects/mirror-clients) (official CBD3 mirror client)
......
......@@ -43,6 +43,9 @@ class MirrorClient:
async def get_ids_since_timestamp(self, ts, data):
LOG.info(f'get_ids_since_timestamp {data}')
async def handle_error(self, ts, object_id, error_message, error_code=4000):
LOG.info(f'Error has been caught: {error_message}')
class BaseSocketMirrorClient(MirrorClient):
......@@ -118,6 +121,19 @@ class SocketMirrorClientSimpleProtocol(BaseSocketMirrorClient):
Not available for the simple protocol
"""
async def handle_error(self, ts, object_id, error_message, error_code=4000):
await self.ws.send_json({
"type": "error",
"data": {
"_id": str(object_id),
"error": {
"code": error_code,
"message": error_message
}
},
'ts': ts
}, dumps=ujson.dumps)
class SocketMirrorClientFullProtocol(SocketMirrorClientSimpleProtocol):
"""
......
......@@ -113,10 +113,14 @@ class MongoDBMirror:
data = self._serialize_data(data)
await client.delete(ts, data)
except (TypeError, ValueError, AttributeError, KeyError) as ex:
msg = f'Critical error on trying to serialize entry {data.get("_id")} - {ex}'
LOG.exception(msg)
msg = f'Critical error on trying to serialize entry {data.get("_id")}'
LOG.exception(f"{msg} - {ex}")
mirror_fail_migration.inc()
raise CriticalError(msg)
await client.handle_error(
ts=ts,
object_id=data.get("_id"),
error_message=msg
)
except PyMongoError as ex:
msg = f'Critical Mongo error - {ex}'
LOG.exception(msg)
......@@ -168,9 +172,14 @@ class MongoDBMirror:
try:
send_data = self._serialize_document(doc, collection)
except (TypeError, ValueError, AttributeError, KeyError) as ex:
msg = f'Error while trying to serialize record {doc.get("_id")} during initial sync - {ex}'
LOG.warning(msg, exc_info=True)
msg = f'Error while trying to serialize record {doc.get("_id")} during initial sync'
LOG.warning(f"{msg} - {ex}", exc_info=True)
mirror_fail_migration.inc()
await client.handle_error(
ts=ts,
object_id=doc.get("_id"),
error_message=msg
)
else:
await client.upsert(ts, send_data)
......
......@@ -5,6 +5,7 @@ import uuid
from bson import timestamp, ObjectId
from datetime import datetime
from contextlib import nullcontext as does_not_raise
from mongodb_mirror.errors import ClientTimestampException, CriticalError, IncorrectTimestamp
from mongodb_mirror.mirror import MongoDBMirror
from mongodb_mirror.utils import MongoOperation
......@@ -168,29 +169,39 @@ class TestMongoDBMirror:
mock_gauge_remove.assert_called_once()
@pytest.mark.parametrize('exception_type', (TypeError, ValueError, AttributeError, KeyError))
@patch('mongodb_mirror.mirror.prometheus_client.Gauge.remove')
@patch('mongodb_mirror.mirror.MongoDBMirror._serialize_data', return_value={'test': 'test'})
@patch('mongodb_mirror.mirror.MongoDBMirror.get_available_cursor')
@patch('mongodb_mirror.mirror.MongoDBMirror._get_oldest_oplog_ts', new_callable=AsyncMock)
async def test_sync_client_with_operations_fail(self, mock_get_oldest_oplog_ts, mock_get_available_cursor,
mock_serialize_data, exception_type):
mock_serialize_data, mock_gauge_remove, exception_type):
data_with_obj_id = {**self.test_data, **{"_id": "test_id"}}
entry_data = {
'op': MongoOperation.NOOP,
'o': self.test_data,
'o': data_with_obj_id,
'ts': self.current_ts,
'ns': None
}
mock_client = Mock()
mock_client.addr = "some_test"
mock_client.get_last_timestamp = AsyncMock(return_value=self.current_ts)
mock_client.noop = AsyncMock()
mock_client.handle_error = AsyncMock()
mock_serialize_data.side_effect = exception_type
mock_get_oldest_oplog_ts.return_value = self.oldest_ts
mock_get_available_cursor.return_value = AsyncIter([entry_data])
mock_client.namespace = self.client_namespace
with pytest.raises(CriticalError):
with does_not_raise():
await self.mongo_db_mirror.sync_client(mock_client)
mock_get_oldest_oplog_ts.assert_called_once()
mock_get_available_cursor.assert_called_once_with(self.current_ts)
mock_serialize_data.assert_called_once_with(self.test_data)
mock_client.handle_error.assert_called_once_with(
ts=[self.current_ts.time, self.current_ts.inc],
object_id=data_with_obj_id.get("_id"),
error_message=f'Critical error on trying to serialize entry {data_with_obj_id.get("_id")}')
mock_serialize_data.assert_called_once_with(data_with_obj_id)
mock_gauge_remove.assert_called_once()
def test_serialize_data(self):
test_id = ObjectId('507f1f77bcf86cd799439011')
......@@ -244,13 +255,20 @@ class TestMongoDBMirror:
mock_namespace_to_db_collection, mock_serialize_document, exception_type):
mock_client = Mock(get_sync_init_point=AsyncMock(return_value=''))
mock_client.namespace = self.client_namespace
mock_client.handle_error = AsyncMock()
mock_get_newest_oplog_ts.return_value = self.current_ts
mock_sort.return_value = AsyncIter([self.test_data])
data_with_obj_id = {**self.test_data, **{"_id": "test_id"}}
mock_sort.return_value = AsyncIter([data_with_obj_id])
mock_find.return_value = Mock(sort=mock_sort)
self.mongo_db_mirror._mongo = {'db_name': {'collection_name': Mock(find=mock_find)}}
mock_serialize_document.side_effect = exception_type
await self.mongo_db_mirror._initial_sync(mock_client)
mock_counter_inc.assert_called_once()
mock_client.handle_error.assert_called_once_with(
ts=[self.current_ts.time, self.current_ts.inc],
object_id=data_with_obj_id.get("_id"),
error_message=f'Error while trying to serialize record {data_with_obj_id.get("_id")} during initial sync'
)
async def test_get_newest_oplog_ts(self):
self.mongo_db_mirror._oplog = Mock(find_one=AsyncMock(return_value={'ts': self.current_ts}))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment