diff --git a/Makefile b/Makefile index c12c28a0b52b236d66dc621403a2a53194aeb718..41ec060b10e677611b1bbe2cae46861a1242815b 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ GIT_STAMP ?= $(shell git describe || echo v0.1.0) CHART_MUSEUM_URL ?= "https://helm.prozorro.sale/api/charts" CI_PIPELINE_ID ?= 1 COMPOSE_PROJECT_NAME ?= $(PROJECT_NAME)-$(CI_PIPELINE_ID) -COMPOSE_LOCAL = $(PROJECT_NAME)-api-local +COMPOSE_LOCAL = $(PROJECT_NAME)-api-local $(PROJECT_NAME)-databridge-local PROJECT_ID ?= $(COMPOSE_PROJECT_NAME)-$(CI_COMMIT_SHORT_SHA) PROJECT_ID_UNIT=$(PROJECT_ID)-unit PROJECT_ID_INTEGRATION ?= $(PROJECT_ID)-integration diff --git a/docker-compose.yml b/docker-compose.yml index 3a07791c4f080d82d18ff188d13afb65fb3076cd..c13f818326001bcc78a86c7a225fedbede29f5d4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,18 @@ services: depends_on: - mongo + document-service-databridge-local: + image: "${IMAGE}" + container_name: document-service-databridge + command: python -m prozorro_sale.document_service.databridge + environment: + MONGO_URL: mongodb://mongo:27017 + MONGO_DATABASE: documents + ports: + - 8082:80 + depends_on: + - mongo + document-service-test-integration: image: "${IMAGE_TEST}" container_name: document-service-test-integration diff --git a/src/prozorro_sale/document_service/databridge/api.py b/src/prozorro_sale/document_service/databridge/api.py index 2d927ff0c2185f8d9a0017584f6b52c1a3d8615f..357f7a48b225e96317f8d5831c8394fe09826bf3 100644 --- a/src/prozorro_sale/document_service/databridge/api.py +++ b/src/prozorro_sale/document_service/databridge/api.py @@ -5,12 +5,12 @@ from prometheus_async.aio import time, count_exceptions, track_inprogress import cgi import prozorro_sale +from prozorro_sale import tools from prozorro_sale.document_service import sign from prozorro_sale.document_service.errors import MultipartDataException, KeyAlreadyExists from prozorro_sale.document_service.utils import get_token, format_header_datetime from prozorro_sale.document_service.databridge import prometheus - LOG = logger.get_custom_logger(__name__) @@ -101,3 +101,35 @@ async def migrate_document(request): ) return web.Response(status=201) + + +@time(prometheus.move_document_request_time) +@count_exceptions(prometheus.move_document_fails) +@track_inprogress(prometheus.move_document_gauge) +async def move_document(request): + """Coroutine for moving document + + Args: + request (HttpRequest): request obj + + Returns: + response obj + """ + data = await request.json() + if field_errors := tools.check_required_fields({'doc_id', 'scope', 'new_scope'}, data): + return web.json_response(data=field_errors, status=422) + + field_errors = {} + allowed_scopes = ['private', 'public'] + if data['scope'] not in allowed_scopes: + field_errors['scope'] = f'Value must be in {allowed_scopes}' + if data['new_scope'] not in allowed_scopes: + field_errors['new_scope'] = f'Value must be in {allowed_scopes}' + if field_errors: + return web.json_response(data=field_errors, status=422) + + document_data = await request.app.storage.move(data['scope'], data['doc_id'], data['new_scope'], request) + LOG.info(f'Moved document {data["doc_id"]} to {document_data["id"]}') + await request.app.db.documents.insert(document_data, request.app.storage.STORAGE_NAME) + encoded_jwt = sign.create_token(document_data) + return web.Response(body=encoded_jwt, status=201, content_type='application/jwt') diff --git a/src/prozorro_sale/document_service/databridge/prometheus.py b/src/prozorro_sale/document_service/databridge/prometheus.py index 0f285d87f75d08bf8912f6c754cd73e9d332fbea..629200d83087c9ffd0dff8cf631c0082bb6c0123 100644 --- a/src/prozorro_sale/document_service/databridge/prometheus.py +++ b/src/prozorro_sale/document_service/databridge/prometheus.py @@ -1,12 +1,17 @@ from prometheus_client import Gauge, Counter, Histogram - upload_document_gauge = Gauge('databridge_upload_document_current_connections', 'number of current databridge upload_document endpoint connections') upload_document_request_time = Histogram('databridge_upload_document_request_processing_seconds', 'time of databridge upload_document request processing') upload_document_fails = Counter('databridge_upload_document_request_fails', 'number of databridge upload_document request fails') +move_document_gauge = Gauge('move_document_current_connections', + 'number of current databridge delete_document endpoint connections') +move_document_request_time = Histogram('move_document_request_processing_seconds', + 'time of databridge delete_document request processing') +move_document_fails = Counter('move_document_request_fails', + 'number of databridge delete_document request fails') delete_document_gauge = Gauge('databridge_delete_document_current_connections', 'number of current databridge delete_document endpoint connections') delete_document_request_time = Histogram('databridge_delete_document_request_processing_seconds', diff --git a/src/prozorro_sale/document_service/databridge/routes.py b/src/prozorro_sale/document_service/databridge/routes.py index 3f4862403658cd101354382c560c0f3f0980fea5..fdf1cadfe5489c950a93237efbf8cd28083f600d 100644 --- a/src/prozorro_sale/document_service/databridge/routes.py +++ b/src/prozorro_sale/document_service/databridge/routes.py @@ -1,6 +1,6 @@ from aiohttp import web -from .api import delete_document, get_document, migrate_document, ping, upload_document, version +from .api import delete_document, get_document, migrate_document, ping, upload_document, version, move_document def init_routes(app: web.Application) -> None: @@ -10,3 +10,4 @@ def init_routes(app: web.Application) -> None: app.router.add_put('/api/documents/{scope:(public|private)}', upload_document) app.router.add_delete('/api/documents/{scope:(public|private)}/{doc_id}', delete_document) app.router.add_post('/api/documents/migrate', migrate_document) + app.router.add_post('/api/documents/move', move_document) diff --git a/src/prozorro_sale/document_service/storages/base_storage.py b/src/prozorro_sale/document_service/storages/base_storage.py index eb05b86466b2a2c817587196053e700fb33dad77..43199f699cae3fd833f3dfe53198176617e45d06 100644 --- a/src/prozorro_sale/document_service/storages/base_storage.py +++ b/src/prozorro_sale/document_service/storages/base_storage.py @@ -13,7 +13,7 @@ class StorageFactory(type): class BaseStorage(metaclass=StorageFactory): - __slots__ = ('bucket', ) + __slots__ = ('bucket',) STORAGES = {} STORAGE_NAME = 'base' @@ -39,6 +39,9 @@ class BaseStorage(metaclass=StorageFactory): async def get(self, scope, uuid, request): raise NotImplementedError("Get method must be implemented in subclass") + async def move(self, scope, uuid, new_scope, request): + raise NotImplementedError("Move method must be implemented in subclass") + async def get_metadata(self, scope, uuid, request, extra_data=False): raise NotImplementedError("Get_metadata method must be implemented in subclass") diff --git a/src/prozorro_sale/document_service/storages/memory_storage.py b/src/prozorro_sale/document_service/storages/memory_storage.py index 3946873c289e2b4868d15fd5c880fc7d6eb8ec44..36cc01d369d22b82c7435e94faf013dda792115c 100644 --- a/src/prozorro_sale/document_service/storages/memory_storage.py +++ b/src/prozorro_sale/document_service/storages/memory_storage.py @@ -5,9 +5,12 @@ from uuid import uuid4 from aiohttp import web +from prozorro_sale.tools import logger from prozorro_sale.document_service.errors import FileNotFound, KeyNotFound, HeaderNotExists from prozorro_sale.document_service.storages.base_storage import BaseStorage, DATETIME_FORMAT +LOG = logger.get_custom_logger(__name__) + class MemoryStorage(BaseStorage): STORAGE_NAME = 'memory' @@ -17,48 +20,77 @@ class MemoryStorage(BaseStorage): self.storage = {} async def upload(self, scope, doc_type, request): - uuid = uuid4().hex - data = {'scope': scope, 'documentType': doc_type} - file_name = None + """Method uploads file to storage, parsing data beforehand + + Args: + scope (str): private|public + doc_type (str): document type + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + meta = {'scope': scope, 'documentType': doc_type} + file_data = None async for field in await self._get_request_data(request): - if field.name in data: - data[field.name] = await field.text() + if field.name == 'documentType': + meta[field.name] = await field.text() if field.name == 'file': try: - file_name = cgi.parse_header(field.headers['Content-Disposition'])[1]['filename'] file_data = await field.read() - data.update({ + meta.update({ 'Content-Type': field.headers['Content-Type'], 'Content-Disposition': field.headers['Content-Disposition'], - 'body': file_data, 'sha': hashlib.sha256(file_data).hexdigest(), 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), - 'dateCreated': datetime.now().strftime(DATETIME_FORMAT), - 'Content-Length': len(file_data) + 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) }) except KeyError as ex: raise HeaderNotExists(ex) - if 'body' not in data: - raise FileNotFound - self.storage[uuid] = data + return await self.__upload(file_data, meta, scope) + + async def __upload(self, file_data, meta, scope): + """Method uploads file to storage + + Args: + file_data (bytes): file data + meta (dict): file metadata + scope (str): private|public + + Returns: + dict with metadata of uploaded file + """ + if not file_data: + raise FileNotFound + uuid = uuid4().hex + file_name = cgi.parse_header(meta['Content-Disposition'])[1]['filename'] + meta['body'] = file_data + self.storage[f'{scope}_{uuid}'] = meta return { 'id': uuid, - 'scope': data['scope'], + 'scope': scope, 'filename': file_name, - 'documentType': data['documentType'], - 'format': data['Content-Type'], - 'sha': data['sha'], - 'hash': data['hash'], - 'dateCreated': data['dateCreated'], - 'size': data['Content-Length'] + 'documentType': meta['documentType'], + 'format': meta['Content-Type'], + 'sha': meta['sha'], + 'hash': meta['hash'], + 'dateCreated': meta['dateCreated'], + 'size': len(file_data) } async def get(self, scope, uuid, request): - try: - data = self.storage[uuid] - except KeyError: - raise KeyNotFound(uuid) + """Method gets document form storage, parsing data beforehand + + Args: + scope (str): private|public + uuid (str): id of document + request (HttpRequest): request obj + + Returns: + response obj + """ + data = await self.__get(f'{scope}_{uuid}') stream_resp = web.StreamResponse(headers={ 'Content-Type': data['Content-Type'], 'Content-Disposition': data['Content-Disposition'] @@ -68,11 +100,59 @@ class MemoryStorage(BaseStorage): await stream_resp.write_eof() return stream_resp + async def __get(self, _id): + """Method gets document form storage + + Args: + _id (str): id of document + + Returns: + response obj + """ + try: + data = self.storage[_id] + except KeyError: + raise KeyNotFound(_id) + return data + + async def move(self, scope, uuid, new_scope, request): + """Method moves existing document to new scope + + Args: + scope (str): private|public + uuid (str): id of document + new_scope (str): private|public + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + metadata = await self.get_metadata(scope, uuid, request, True) + data = await self.__get(f'{scope}_{uuid}') + file_data = data['body'] + meta = { + 'scope': new_scope, + 'documentType': metadata['X-Document-Type'], + 'Content-Type': metadata['Content-Type'], + 'Content-Disposition': metadata['Content-Disposition'], + 'sha': hashlib.sha256(file_data).hexdigest(), + 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), + 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) + } + + res = await self.__upload(file_data, meta, new_scope) + try: + await self.delete(scope, uuid, request) + except KeyNotFound: + LOG.warning(f'Document {uuid} has been already deleted') + return res + async def get_metadata(self, scope, uuid, request, extra_data=False): + _id = f'{scope}_{uuid}' try: - data = self.storage[uuid] + data = self.storage[_id] except KeyError: - raise KeyNotFound(uuid) + raise KeyNotFound(_id) headers = { 'X-Scope': data['scope'], @@ -81,7 +161,7 @@ class MemoryStorage(BaseStorage): 'X-SHA': data['sha'], 'X-Date-Created': data['dateCreated'], 'ETag': data['hash'], - 'Content-Length': str(data['Content-Length']) + 'Content-Length': str(len(data['body'])) } if extra_data: headers.update({ @@ -90,8 +170,9 @@ class MemoryStorage(BaseStorage): return headers async def delete(self, scope, uuid, request): + _id = f'{scope}_{uuid}' try: - del self.storage[uuid] + del self.storage[_id] except KeyError: - raise KeyNotFound(uuid) + raise KeyNotFound(_id) return web.Response(status=200) diff --git a/src/prozorro_sale/document_service/storages/s3_storage.py b/src/prozorro_sale/document_service/storages/s3_storage.py index 4682c07a40e4ef48045b81e60e3ca4e492fc90eb..cd9e271c08f8b474ebe3810b2ea6695f60fdc637 100644 --- a/src/prozorro_sale/document_service/storages/s3_storage.py +++ b/src/prozorro_sale/document_service/storages/s3_storage.py @@ -8,6 +8,7 @@ from aiobotocore.session import get_session from aiohttp import web from botocore.exceptions import ClientError +from prozorro_sale.tools import logger from prozorro_sale.document_service.environment import environment from prozorro_sale.document_service.errors import (FileNotFound, KeyNotFound, HeaderNotExists, critical_error, connection_error) @@ -16,6 +17,7 @@ from prozorro_sale.document_service.utils import data_to_ascii, data_from_ascii # 2 ** 16 - default linux socket buffer size READ_BUFFER_SIZE = 2 ** 16 * 20 +LOG = logger.get_custom_logger(__name__) class S3Storage(BaseStorage): @@ -44,20 +46,27 @@ class S3Storage(BaseStorage): @connection_error async def upload(self, scope, doc_type, request): - uuid = uuid4().hex - data = {'scope': scope, 'documentType': doc_type} - file_name = None + """Method uploads file to storage, parsing data beforehand + + Args: + scope (str): private|public + doc_type (str): document type + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + meta = {'scope': scope, 'documentType': doc_type} + file_data = None async for field in await self._get_request_data(request): - if field.name in data: - data[field.name] = await field.text() + if field.name == 'documentType': + meta[field.name] = await field.text() if field.name == 'file': try: - file_name = cgi.parse_header(field.headers['Content-Disposition'])[1]['filename'] file_data = await field.read() - data.update({ + meta.update({ 'Content-Type': field.headers['Content-Type'], 'Content-Disposition': field.headers['Content-Disposition'], - 'body': file_data, 'sha': hashlib.sha256(file_data).hexdigest(), 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) @@ -65,55 +74,125 @@ class S3Storage(BaseStorage): except KeyError as ex: raise HeaderNotExists(ex) - if 'body' not in data: - raise FileNotFound + return await self.__upload(file_data, meta, scope) + + async def __upload(self, file_data, meta, scope): + """Method uploads file to storage - content_type = data['Content-Type'] - file_size = len(data['body']) + Args: + file_data (bytes): file data + meta (dict): file metadata + scope (str): private|public + + Returns: + dict with metadata of uploaded file + """ + if not file_data: + raise FileNotFound + uuid = uuid4().hex + content_disposition = meta.pop('Content-Disposition') + content_type = meta.pop('Content-Type') + file_name = cgi.parse_header(content_disposition)[1]['filename'] async with self.client() as client: await client.put_object( Bucket=self.bucket, Key=f"{scope}/{uuid}", - Body=data.pop('body'), - ContentDisposition=data.pop('Content-Disposition'), - ContentType=data.pop('Content-Type'), - Metadata=data_to_ascii(copy(data)) + Body=file_data, + ContentDisposition=content_disposition, + ContentType=content_type, + Metadata=data_to_ascii(copy(meta)) ) - data['id'] = uuid + return { 'id': uuid, - 'scope': data['scope'], + 'scope': scope, 'filename': file_name, - 'documentType': data['documentType'], + 'documentType': meta['documentType'], 'format': content_type, - 'sha': data['sha'], - 'hash': data['hash'], - 'dateCreated': data['dateCreated'], - 'size': file_size + 'sha': meta['sha'], + 'hash': meta['hash'], + 'dateCreated': meta['dateCreated'], + 'size': len(file_data) } @connection_error async def get(self, scope, uuid, request): + """Method gets document form storage, parsing data beforehand + + Args: + scope (str): private|public + uuid (str): id of document + request (HttpRequest): request obj + + Returns: + response obj + """ + response = await self.__get(scope, uuid) + metadata_response = response.get('ResponseMetadata', {}).get('HTTPHeaders', {}) + headers = { + 'Content-Type': metadata_response['content-type'], + 'Content-Disposition': metadata_response['content-disposition'], + } + stream_resp = web.StreamResponse(headers=headers) + await stream_resp.prepare(request) + + async with response['Body'] as stream: + while chunk := await stream.read(READ_BUFFER_SIZE): + await stream_resp.write(chunk) + + await stream_resp.write_eof() + return stream_resp + + async def __get(self, scope, uuid): + """Method gets document form storage + + Args: + scope (str): private|public + uuid (str): id of document + + Returns: + response obj + """ async with self.client() as client: try: response = await client.get_object(Bucket=self.bucket, Key=f"{scope}/{uuid}") except ClientError: raise KeyNotFound(uuid) + return response - metadata_response = response.get('ResponseMetadata', {}).get('HTTPHeaders', {}) - headers = { - 'Content-Type': metadata_response['content-type'], - 'Content-Disposition': metadata_response['content-disposition'], - } - stream_resp = web.StreamResponse(headers=headers) - await stream_resp.prepare(request) - - async with response["Body"] as stream: - while chunk := await stream.read(READ_BUFFER_SIZE): - await stream_resp.write(chunk) - - await stream_resp.write_eof() - return stream_resp + @connection_error + async def move(self, scope, uuid, new_scope, request): + """Method moves existing document to new scope + + Args: + scope (str): private|public + uuid (str): id of document + new_scope (str): private|public + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + metadata = await self.get_metadata(scope, uuid, request, True) + response = await self.__get(scope, uuid) + async with response['Body'] as stream: + file_data = await stream.read() + meta = { + 'scope': new_scope, + 'documentType': metadata['X-Document-Type'], + 'Content-Type': metadata['Content-Type'], + 'Content-Disposition': metadata['Content-Disposition'], + 'sha': hashlib.sha256(file_data).hexdigest(), + 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), + 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) + } + + res = await self.__upload(file_data, meta, new_scope) + try: + await self.delete(scope, uuid, request) + except KeyNotFound: + LOG.warning(f'Document {uuid} has been already deleted') + return res @connection_error async def get_metadata(self, scope, uuid, request, extra_data=False): diff --git a/src/prozorro_sale/document_service/storages/swift_storage.py b/src/prozorro_sale/document_service/storages/swift_storage.py index f19da6c8fca244d552df2cbdd8f0b0a4c8dfd1e0..0d024a42199855e355a8dc81946b571bd5c51c8a 100644 --- a/src/prozorro_sale/document_service/storages/swift_storage.py +++ b/src/prozorro_sale/document_service/storages/swift_storage.py @@ -1,13 +1,13 @@ import asyncio import cgi -import datetime import hashlib import logging -from uuid import uuid4 - import aiohttp import prometheus_client + +from uuid import uuid4 from aiohttp import web +from datetime import datetime, timedelta from prozorro_sale.document_service.environment import environment from prozorro_sale.document_service.errors import (FileNotFound, KeyNotFound, HeaderNotExists, @@ -59,7 +59,7 @@ class Auth: @connection_error async def refresh_token(self): if self._expires_at: - sleep_until = self._expires_at - datetime.datetime.now() - datetime.timedelta(seconds=120) + sleep_until = self._expires_at - datetime.now() - timedelta(seconds=120) await asyncio.sleep(sleep_until.seconds) auth_data = { "auth": { @@ -92,7 +92,7 @@ class Auth: if self._token is None: raise RuntimeError(f'Object store auth failed. Header X-Subject-Token not exist."\ "\nAvailable headers: \n{dict(resp.headers)}') - self._expires_at = datetime.datetime.strptime(data['token']['expires_at'], '%Y-%m-%dT%H:%M:%S.%fZ') + self._expires_at = datetime.strptime(data['token']['expires_at'], '%Y-%m-%dT%H:%M:%S.%fZ') asyncio.create_task(self.refresh_token()) @@ -133,92 +133,163 @@ class SwiftStorage(BaseStorage): text = await response.text() msg = f"\nGot {response.status} from swift storage during upload document." \ f"\nUrl: {censored_url}" \ - f"\nReason: {reason}"\ + f"\nReason: {reason}" \ f"\nBody: {text}" \ f"\nHeaders: {' '.join(['{}:{}'.format(*header) for header in response.raw_headers])}" raise self.ERRORS.get(response.status, StorageException)(msg) @connection_error async def upload(self, scope, doc_type, request): - uuid = uuid4().hex - sha_hash = hashlib.sha256() - md5_hash = hashlib.md5() - data = {'scope': scope, 'documentType': doc_type, 'file_size': 0} - object_url = f"{self.url}/{self._container}/{scope}_{uuid}" - file_name = None - date_created = None - headers = {} + """Method uploads file to storage, parsing data beforehand + + Args: + scope (str): private|public + doc_type (str): document type + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + meta = {'scope': scope, 'documentType': doc_type} + file_data = None async for field in await self._get_request_data(request): - if field.name in data: - data[field.name] = await field.text() + if field.name == 'documentType': + meta[field.name] = await field.text() if field.name == 'file': - file_name = cgi.parse_header(field.headers['Content-Disposition'])[1]['filename'] + try: + file_data = await field.read() + meta.update({ + 'Content-Type': field.headers['Content-Type'], + 'Content-Disposition': field.headers['Content-Disposition'], + 'sha': hashlib.sha256(file_data).hexdigest(), + 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), + 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) + }) + except KeyError as ex: + raise HeaderNotExists(ex) - async def read_data(_field) -> bytes: - while file_data := await _field.read_chunk(BUFF_SIZE): - sha_hash.update(file_data) - md5_hash.update(file_data) - data['file_size'] += len(file_data) - yield file_data + return await self.__upload(file_data, meta, scope) - headers = { - 'X-Auth-Token': self.auth.get_token() - } - with put_document_session_latency.time(): - async with client() as session: - response = await session.put(object_url, ssl=False, data=read_data(field), headers=headers) - await self.__response_validate(response) - date_created = datetime.datetime.now().strftime(DATETIME_FORMAT) - try: - headers.update({ - 'Content-Type': field.headers['Content-Type'], - 'Content-Disposition': field.headers['Content-Disposition'], - 'X-Object-Meta-scope': scope, - 'X-Object-Meta-documentType': data['documentType'], - 'X-Object-Meta-sha': sha_hash.hexdigest(), - 'X-Object-Meta-hash': 'md5:' + md5_hash.hexdigest(), - }) - except KeyError as ex: - raise HeaderNotExists(ex) - - if not file_name: + async def __upload(self, file_data, meta, scope): + """Method uploads file to storage + + Args: + file_data (bytes): file data + meta (dict): file metadata + scope (str): private|public + + Returns: + dict with metadata of uploaded file + """ + if not file_data: raise FileNotFound + uuid = uuid4().hex + file_name = cgi.parse_header(meta['Content-Disposition'])[1]['filename'] + object_url = f"{self.url}/{self._container}/{scope}_{uuid}" async with client() as session: + headers = {'X-Auth-Token': self.auth.get_token()} + response = await session.put(object_url, ssl=False, data=file_data, headers=headers) + await self.__response_validate(response) + + headers.update({ + 'Content-Type': meta['Content-Type'], + 'Content-Disposition': meta['Content-Disposition'], + 'X-Object-Meta-scope': scope, + 'X-Object-Meta-documentType': meta['documentType'], + 'X-Object-Meta-sha': meta['sha'], + 'X-Object-Meta-hash': meta['hash'], + }) response = await session.post(object_url, ssl=False, headers=headers) await self.__response_validate(response, valid_status=202) return { 'id': uuid, - 'scope': headers['X-Object-Meta-scope'], + 'scope': scope, 'filename': file_name, - 'documentType': headers['X-Object-Meta-documentType'], - 'format': headers['Content-Type'], - 'sha': headers['X-Object-Meta-sha'], - 'hash': headers['X-Object-Meta-hash'], - 'dateCreated': date_created, - 'size': data['file_size'] + 'documentType': meta['documentType'], + 'format': meta['Content-Type'], + 'sha': meta['sha'], + 'hash': meta['hash'], + 'dateCreated': meta['dateCreated'], + 'size': len(file_data) } @connection_error async def get(self, scope, uuid, request): + """Method gets document form storage, parsing data beforehand + + Args: + scope (str): private|public + uuid (str): id of document + request (HttpRequest): request obj + + Returns: + response obj + """ + response = await self.__get(scope, uuid) + headers = { + 'Content-Type': response.headers['Content-Type'], + 'Content-Disposition': response.headers['Content-Disposition'], + } + stream = web.StreamResponse(headers=headers) + await stream.prepare(request) + while chunk := await response.content.read(BUFF_SIZE): + await stream.write(chunk) + await stream.write_eof() + + return stream + + async def __get(self, scope, uuid): + """Method gets document form storage + + Args: + scope (str): private|public + uuid (str): id of document + + Returns: + response obj + """ headers = {'X-Auth-Token': self.auth.get_token()} object_url = f"{self.url}/{self._container}/{scope}_{uuid}" with get_document_session_latency.time(): async with client() as session: response = await session.get(object_url, ssl=False, headers=headers) await self.__response_validate(response, valid_status=200) - headers = { - 'Content-Type': response.headers['Content-Type'], - 'Content-Disposition': response.headers['Content-Disposition'], - } - stream = web.StreamResponse(headers=headers) - await stream.prepare(request) - while chunk := await response.content.read(BUFF_SIZE): - await stream.write(chunk) - await stream.write_eof() + return response + + @connection_error + async def move(self, scope, uuid, new_scope, request): + """Method moves existing document to new scope + + Args: + scope (str): private|public + uuid (str): id of document + new_scope (str): private|public + request (HttpRequest): request obj + + Returns: + dict with metadata of uploaded file + """ + metadata = await self.get_metadata(scope, uuid, request, True) + response = await self.__get(scope, uuid) + file_data = await response.content.read() + meta = { + 'scope': new_scope, + 'documentType': metadata['X-Document-Type'], + 'Content-Type': metadata['Content-Type'], + 'Content-Disposition': metadata['Content-Disposition'], + 'sha': hashlib.sha256(file_data).hexdigest(), + 'hash': 'md5:' + hashlib.md5(file_data).hexdigest(), + 'dateCreated': datetime.now().strftime(DATETIME_FORMAT) + } - return stream + res = await self.__upload(file_data, meta, new_scope) + try: + await self.delete(scope, uuid, request) + except KeyNotFound: + LOG.warning(f'Document {uuid} has been already deleted') + return res @connection_error async def get_metadata(self, scope, uuid, request, extra_data=False): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 677ad5af59234ebc21d0d0cf0a61ebaff4a44459..c25062b0fcf90c0516b6f3e696c0acce43afcb97 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,7 +19,7 @@ async def setup_environment(): @pytest.fixture -async def client(aiohttp_client, loop, setup_environment): +async def client(aiohttp_client, setup_environment): async def wrap(name): environment.BUCKET_NAME = BUCKET environment.STORAGE_NAME = name @@ -29,7 +29,7 @@ async def client(aiohttp_client, loop, setup_environment): @pytest.fixture -async def client_databridge(aiohttp_client, loop, setup_environment): +async def client_databridge(aiohttp_client, setup_environment): async def wrap(name): environment.BUCKET_NAME = BUCKET environment.STORAGE_NAME = name diff --git a/tests/integration/test_databridge.py b/tests/integration/test_databridge.py index 19624fa60c1ac84e159607750213931626ba950a..c7f9001d1f8384f7f6364702676169a451f34c49 100644 --- a/tests/integration/test_databridge.py +++ b/tests/integration/test_databridge.py @@ -186,3 +186,49 @@ class TestDataBridge: response = await response.json() assert response['message'] == f'Item with the same id already exists: {doc_id}' assert response['id'] == doc_id + + @pytest.mark.parametrize(('storage_type', 'init_scope', 'new_scope'), [ + ('memory', 'public', 'private'), + ('memory', 'private', 'public'), + ('memory', 'public', 'public'), + ('memory', 'private', 'private'), + ('s3', 'public', 'private'), + ('s3', 'private', 'public'), + ('s3', 'public', 'public'), + ('s3', 'private', 'private'), + # ('swift', 'public', 'private'), + # ('swift', 'private', 'public'), + # ('swift', 'public', 'public'), + # ('swift', 'private', 'private'), + ]) + async def test_move_document(self, client_databridge, multipart_obj, storage_type: str, init_scope: str, + new_scope: str): + cl = await client_databridge(storage_type) + response = await cl.put(f'/api/documents/{init_scope}', data=multipart_obj, + headers={'Authorization': 'auction_token'}) + assert response.status == 201 + raw_token = await response.read() + raw_token = raw_token.decode() + token = sign._decode_token(raw_token) + + response = await cl.get(f'/api/documents/{init_scope}/{token["id"]}?token={raw_token}') + assert response.status == 200 + data = await response.read() + assert response.content_type == 'text/plain' + assert data == b'bar' + + response = await cl.post('/api/documents/move', + json={'doc_id': token['id'], 'scope': init_scope, 'new_scope': new_scope}) + assert response.status == 201 + new_raw_token = await response.read() + new_raw_token = new_raw_token.decode() + new_token = sign._decode_token(new_raw_token) + + response = await cl.get(f'/api/documents/{new_scope}/{new_token["id"]}?token={new_raw_token}') + assert response.status == 200 + data = await response.read() + assert response.content_type == 'text/plain' + assert data == b'bar' + + response = await cl.get(f'/api/documents/{init_scope}/{token["id"]}?token={raw_token}') + assert response.status == 404