import collections
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from aiohttp import web
import yaml
AclContext = collections.namedtuple('AclContext', ['user', 'token'])
class User:
name: str
token: str
ips: Optional[List[str]] = None
procedures: Optional[Dict[str, List[str]]] = None
def has_access_to_procedure(self, procedure: str) -> bool:
return self.procedures and procedure in self.procedures
def load_auth(file_path):
def has_access_to_object(self, procedure: str, obj_name: str) -> bool:
return self.has_access_to_procedure(procedure) and obj_name in self.procedures[procedure]
def has_access_by_ip(self, ip: str) -> bool:
return not self.ips or ip in self.ips
def __str__(self):
class AclContext:
user: Optional[User]
token: Optional[str]
AUTH_FILE: Optional[Dict[str, User]] = None
def load_auth(file_path: str) -> None:
users = {}
global AUTH_FILE
with open(file_path) as auth_file:
config = yaml.safe_load(auth_file)
for group, group_users in config.items():
for user, token in group_users.items():
users[token] = user
for group_users in config.values():
for user, user_data in group_users.items():
user_data['name'] = user
users[user_data['token']] = User(**user_data)
AUTH_FILE = users
def _get_user_by_token(token: str) -> Optional[User]:
return AUTH_FILE.get(token)
async def token_middleware(request, handler):
request.token = request.query.get('acc_token') or request.headers.get('X-Access-Token')
async def auth_middleware(request: web.Request, handler: Callable) -> web.Response:
token = request.headers.get('Authorization')
request.auth_user = _get_user_by_token(token) if token else None
if request.method != 'GET' and not request.auth_user:
return forbidden_response()
return await handler(request)
async def request_type_middleware(request, handler):
token = request.headers.get('Authorization')
user = AUTH_FILE.get(token, None)
request.authenticated_user = user
if request.method != 'GET' and not user:
return web.json_response(data={'text': 'Forbidden'}, status=403)
async def context_middleware(request: web.Request, handler: Callable) -> web.Response:
token = request.query.get('acc_token') or request.headers.get('X-Access-Token')
request._context = AclContext(getattr(request, 'auth_user', None), token)
return await handler(request)
async def request_context_middleware(request, handler):
request._context = AclContext(request.authenticated_user, request.token)
async def ip_access_middleware(request: web.Request, handler: Callable) -> web.Response:
if request.method == 'GET':
return await handler(request)
remote = request.headers.get('X-Forwarded-For', request.remote)
if not request.auth_user.has_access_by_ip(remote):
return forbidden_response()
return await handler(request)
def forbidden_response(msg: Optional[str] = None) -> web.Response:
return web.json_response(data={'text': msg or 'Forbidden'}, status=403)
def check_procedure_access(func: Callable) -> Callable:
async def handler(request: web.Request, *args, **kwargs) -> web.Response:
data = await request.json()
if 'procurementMethod' not in data:
return await func(request, *args, **kwargs)
if not request.auth_user.has_access_to_procedure(data['procurementMethod']):
return forbidden_response()
return await func(request, *args, **kwargs)
return handler
