test_registry.py 6.93 KB
Newer Older
1
2
3
4
5
6
7
8
9
import asyncio
import os
import json
from copy import deepcopy

from aiohttp import test_utils
from motor import motor_asyncio

from registry_mirror.main import App
10
from prozorro_sale.registry.migration import _migrate
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from prozorro_sale.registry.model_generator import load_registry_object

REPLICA_DB = os.environ['REPLICA_DB']
CLIENT_DB = os.environ['CLIENT_DB']
CLIENT_MONGO_URL = os.environ['CLIENT_MONGO_URL']
MONGO_URL = os.environ['MONGO_URL']

REGISTRY_OBJECT = None
fixtures_path = 'fixtures'
fixture_name = 'realEstate.json'
with open(os.path.join(fixtures_path, fixture_name)) as f:
    REGISTRY_OBJECT = json.load(f)


25
class WebSocketRegiatryTest(test_utils.AioHTTPTestCase):
26
27
28
29
30
31
32
33

    def setUp(self):
        super().setUp()
        self.mongo = motor_asyncio.AsyncIOMotorClient(MONGO_URL)
        self.db = self.mongo[REPLICA_DB]
        self.client_mongo = motor_asyncio.AsyncIOMotorClient(CLIENT_MONGO_URL)
        self.db_copy = self.client_mongo[CLIENT_DB]

34
35
36
37
38
    async def tearDownAsync(self):
        await self.db.registry.delete_many({})
        await self.db_copy.registry.delete_many({})
        await super().tearDownAsync()

39
40
41
42
43
44
45
46
47
48
    async def get_application(self):
        return App()._init_app()

    @test_utils.unittest_run_loop
    async def test_ping(self):
        self.assertTrue(True)

    @test_utils.unittest_run_loop
    async def test_insert_one(self):
        document = deepcopy(REGISTRY_OBJECT)
49
        document = _migrate(document, 'registry')
50
        document = load_registry_object(document).to_native()
51
        document.pop('_id', None)
52
53
54
55
56
57
58
59
60
61
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertIsNotNone(client_obj)
        self.assertEqual(document['relatedOrganizations']['propertyOwner']['representativeInfo'],
                         client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'])

    @test_utils.unittest_run_loop
    async def test_update(self):
        document = deepcopy(REGISTRY_OBJECT)
62
        document = _migrate(document, 'registry')
63
        document = load_registry_object(document).to_native()
64
        document.pop('_id', None)
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(document['relatedOrganizations']['propertyOwner']['representativeInfo'],
                         client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'])

        await self.db.registry.update_one(
            {"_id": result.inserted_id},
            {'$set': {'relatedOrganizations': {'propertyOwner': {'representativeInfo': 'updated_value'}}}}
        )
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'updated_value')

    @test_utils.unittest_run_loop
80
    async def test_update_with_invalid_value(self):
81
        document = deepcopy(REGISTRY_OBJECT)
82
        document = _migrate(document, 'registry')
83
        document = load_registry_object(document).to_native()
84
        document.pop('_id', None)
85
86
87
88
89
90
91
92
93
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'Довіреність')

        await self.db.registry.update_one({"_id": result.inserted_id},
                                          {'$set': {'relatedOrganizations.propertyOwner.representativeInfo': None}})
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
94
95
96

        # Should not be updated because the value changes to invalid
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'Довіреність')
97
98
99
100

    @test_utils.unittest_run_loop
    async def test_replace(self):
        document = deepcopy(REGISTRY_OBJECT)
101
        document = _migrate(document, 'registry')
102
        document = load_registry_object(document).to_native()
103
        document.pop('_id', None)
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'Довіреність')

        document = await self.db.registry.find_one({"_id": result.inserted_id})
        document['relatedOrganizations']['propertyOwner']['representativeInfo'] = 'replaced_value'
        await self.db.registry.replace_one({"_id": result.inserted_id}, document, upsert=True)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'replaced_value')

    @test_utils.unittest_run_loop
    async def test_update_upsert(self):
        document = deepcopy(REGISTRY_OBJECT)
119
        document = _migrate(document, 'registry')
120
        document = load_registry_object(document).to_native()
121
        document.pop('_id', None)
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'Довіреність')

        await self.db.registry.update_one(
            {"_id": result.inserted_id},
            {'$set': {'relatedOrganizations': {'propertyOwner': {'representativeInfo': 'updated_value_upsert'}}}},
            upsert=True
        )
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'],
                         'updated_value_upsert')

    @test_utils.unittest_run_loop
    async def test_delete(self):
        document = deepcopy(REGISTRY_OBJECT)
140
        document = _migrate(document, 'registry')
141
        document = load_registry_object(document).to_native()
142
        document.pop('_id', None)
143
144
145
146
147
148
149
150
151
        result = await self.db.registry.insert_one(document)
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertEqual(client_obj['relatedOrganizations']['propertyOwner']['representativeInfo'], 'Довіреність')

        await self.db.registry.delete_one({"_id": result.inserted_id})
        await asyncio.sleep(2)
        client_obj = await self.db_copy.registry.find_one({"_id": result.inserted_id})
        self.assertIsNone(client_obj)