import asyncio import os import json from copy import deepcopy from aiohttp import test_utils from motor import motor_asyncio from registry_mirror.main import App from prozorro_sale.registry.migration import _migrate from prozorro_sale.registry.model_generator import load_lease_request_object REPLICA_DB = os.environ['REPLICA_DB'] CLIENT_DB = os.environ['CLIENT_DB'] CLIENT_MONGO_URL = os.environ['CLIENT_MONGO_URL'] MONGO_URL = os.environ['MONGO_URL'] LEASE_OBJECT = None fixtures_path = 'fixtures' fixture_name = 'lease.json' with open(os.path.join(fixtures_path, fixture_name)) as f: LEASE_OBJECT = json.load(f) class WebSocketLeaseTest(test_utils.AioHTTPTestCase): def setUp(self): super().setUp() self.mongo = motor_asyncio.AsyncIOMotorClient(MONGO_URL) self.db = self.mongo[REPLICA_DB] self.client_mongo = motor_asyncio.AsyncIOMotorClient(CLIENT_MONGO_URL) self.db_copy = self.client_mongo[CLIENT_DB] async def tearDownAsync(self): await self.db.lease_request.delete_many({}) await self.db_copy.lease_request.delete_many({}) await super().tearDownAsync() async def get_application(self): return App()._init_app() @test_utils.unittest_run_loop async def test_ping(self): self.assertTrue(True) @test_utils.unittest_run_loop async def test_insert_one(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertIsNotNone(client_obj) self.assertEqual(document['sellingEntity']['contactPoint']['name']['uk_UA'], client_obj['sellingEntity']['contactPoint']['name']['uk_UA']) @test_utils.unittest_run_loop async def test_update(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(document['sellingEntity']['contactPoint']['name']['uk_UA'], client_obj['sellingEntity']['contactPoint']['name']['uk_UA']) await self.db.lease_request.update_one( {"_id": result.inserted_id}, {'$set': {'sellingEntity': {'contactPoint': {'name': {'uk_UA': 'updated_value'}}}}} ) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'updated_value') @test_utils.unittest_run_loop async def test_update_with_invalid_value(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'magna commodo officia') await self.db.lease_request.update_one({"_id": result.inserted_id}, {'$set': {'sellingEntity.contactPoint.name.uk_UA': None}}) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) # Should not be updated because the value changes to invalid self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'magna commodo officia') @test_utils.unittest_run_loop async def test_replace(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'magna commodo officia') document = await self.db.lease_request.find_one({"_id": result.inserted_id}) document['sellingEntity']['contactPoint']['name']['uk_UA'] = 'replaced_value' await self.db.lease_request.replace_one({"_id": result.inserted_id}, document, upsert=True) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'replaced_value') @test_utils.unittest_run_loop async def test_update_upsert(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'magna commodo officia') await self.db.lease_request.update_one( {"_id": result.inserted_id}, {'$set': {'sellingEntity': {'contactPoint': {'name': {'uk_UA': 'updated_value_upsert'}}}}}, upsert=True ) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'updated_value_upsert') @test_utils.unittest_run_loop async def test_delete(self): document = deepcopy(LEASE_OBJECT) document = _migrate(document, 'lease_request') document = load_lease_request_object(document).to_native() document.pop('_id', None) result = await self.db.lease_request.insert_one(document) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertEqual(client_obj['sellingEntity']['contactPoint']['name']['uk_UA'], 'magna commodo officia') await self.db.lease_request.delete_one({"_id": result.inserted_id}) await asyncio.sleep(2) client_obj = await self.db_copy.lease_request.find_one({"_id": result.inserted_id}) self.assertIsNone(client_obj)