Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
prozorro-sale
Prozorro Auth
Commits
65a797e0
Commit
65a797e0
authored
Dec 07, 2021
by
Pavel Kuzmenko
Browse files
feat(core): add use prozorro_sale.tools
parent
7bd4fe16
Changes
12
Hide whitespace changes
Inline
Side-by-side
.flake8
View file @
65a797e0
[flake8]
max-line-length = 120
ignore = D
\ No newline at end of file
Makefile
View file @
65a797e0
...
...
@@ -77,11 +77,7 @@ test-unit: $(REBUILD_IMAGES_FOR_TESTS)
@
docker
rm
-f
$(PROJECT_NAME)
-unit-
$(CI_COMMIT_SHORT_SHA)$(CI_PIPELINE_ID)
||
true
@
docker-compose
-p
$(COMPOSE_PROJECT_NAME)
-unit
\
run
--name
$(PROJECT_NAME)
-unit-
$(CI_COMMIT_SHORT_SHA)$(CI_PIPELINE_ID)
\
$(PROJECT_NAME)
-test-unit
nosetests
-v
\
$(TESTS_DEBUG_OPTS)
\
--with-doctest
\
--with-coverage
--cover-package
=
prozorro_sale.
$(PROJECT_NAME)
\
prozorro_sale test.unit
$(PROJECT_NAME)
-test-unit
pytest
-v
-q
--cov-report
=
--cov
=
prozorro_sale
test
/unit
@
docker
cp
$(PROJECT_NAME)
-unit-
$(CI_COMMIT_SHORT_SHA)$(CI_PIPELINE_ID)
:/
$(PROJECT_NAME)
/.coverage .coverage.unit
@
docker-compose
-p
$(COMPOSE_PROJECT_NAME)
-unit
stop
...
...
requirements/requirements.txt
View file @
65a797e0
...
...
@@ -2,10 +2,10 @@
#For include pre-release and development versions, uncomment line --pre. By default, pip only finds stable versions.
#--pre
PyYAML
aiohttp==3.7.4.post0
aiohttp
[speedups]
==3.7.4.post0
ipaddress
cryptography
pyjwt~=2.0.0
aiohttp
-
swagger
prozorro-tools==0.1
2
.0
prozorro_
aiohttp
_
swagger
prozorro-tools==0.1
4
.0
uvloop
requirements/test.txt
View file @
65a797e0
-r requirements.txt
nose
coverage
pytest
pytest-aiohttp
...
...
src/prozorro_sale/auth/__init__.py
View file @
65a797e0
from
dataclasses
import
dataclass
from
typing
import
List
,
Dict
,
Optional
,
Callable
import
os
from
aiohttp
import
web
import
yaml
from
yaml
import
load
try
:
from
yaml
import
CSafeLoader
as
Loader
# type: ignore
except
ImportError
:
# pragma: no cover
from
yaml
import
SafeLoader
as
Loader
# type: ignore
import
ipaddress
from
prozorro_sale.tools
import
logger
from
prozorro_sale.auth.environment
import
environment
from
prozorro_sale.auth.encryption
import
hash_token
AUTH_IP_BLOCK_STRICT
=
os
.
environ
.
get
(
'AUTH_IP_BLOCK_STRICT'
,
'1'
).
lower
()
not
in
(
'false'
,
'0'
,
'f'
)
LOG
=
logger
.
get_custom_logger
(
__name__
)
...
...
@@ -82,7 +87,7 @@ def load_auth(file_path: str) -> None:
users
=
{}
global
AUTH_FILE
with
open
(
file_path
)
as
auth_file
:
config
=
yaml
.
safe_
load
(
auth_file
)
config
=
load
(
auth_file
,
Loader
=
Loader
)
for
group_users
in
config
.
values
():
for
user
,
user_data
in
group_users
.
items
():
...
...
@@ -133,7 +138,7 @@ def check_access(func: Callable) -> Callable:
return
forbidden_response
()
if
not
user
.
has_access_by_ip
(
remote
):
LOG
.
warning
(
f
"User
{
user
.
name
}
forbidden access by ip
{
remote
}
"
)
if
AUTH_IP_BLOCK_STRICT
:
if
environment
[
'
AUTH_IP_BLOCK_STRICT
'
]
:
return
forbidden_response
()
return
await
func
(
request
,
*
args
,
**
kwargs
)
...
...
src/prozorro_sale/auth/api/api.py
View file @
65a797e0
import
os
from
aiohttp
import
web
import
prozorro_sale
from
prozorro_sale
import
tools
,
auth
from
prozorro_sale.auth
import
utils
from
prozorro_sale.tools
import
logger
from
prozorro_sale.auth.environment
import
environment
from
prozorro_sale.auth
import
utils
,
forbidden_response
LOG
=
tools
.
logger
.
get_custom_logger
(
__name__
)
SWAGGER_DOC_AVAILABLE
=
os
.
getenv
(
'SWAGGER_DOC'
,
False
)
AUTH_IP_BLOCK_STRICT
=
os
.
environ
.
get
(
'AUTH_IP_BLOCK_STRICT'
,
'1'
).
lower
()
not
in
(
'false'
,
'0'
,
'f'
)
LOG
=
logger
.
get_custom_logger
(
__name__
)
async
def
version
(
request
):
resp
=
{
'api_version'
:
prozorro_sale
.
version
}
if
SWAGGER_DOC
_AVAILABLE
:
if
environment
[
'
SWAGGER_DOC
'
]
:
resp
[
'doc'
]
=
'If you don`t know what to do you should probably try /api/doc'
return
web
.
json_response
(
resp
)
...
...
@@ -51,8 +49,8 @@ async def check_auth(request):
if
user
:
=
request
.
_context
.
user
:
if
not
user
.
has_access_by_ip
(
remote
):
LOG
.
warning
(
f
"User
{
user
.
name
}
forbidden access by ip
{
remote
}
"
)
if
AUTH_IP_BLOCK_STRICT
:
return
auth
.
forbidden_response
()
if
environment
[
'
AUTH_IP_BLOCK_STRICT
'
]
:
return
forbidden_response
()
return
web
.
json_response
({
'owner'
:
owner
,
'ip'
:
remote
,
'procedures'
:
procedures
})
...
...
src/prozorro_sale/auth/api/main.py
View file @
65a797e0
import
os
import
uvloop
from
typing
import
AsyncGenerator
from
aiohttp
import
web
from
aiohttp_swagger
import
setup_swagger
from
prozorro_
aiohttp_swagger
import
setup_swagger
import
prozorro_sale
from
prozorro_sale
import
tools
,
auth
import
prozorro_sale
# noqa
from
prozorro_sale.tools
import
logger
from
prozorro_sale.tools.errors
import
catch_error_middleware
from
prozorro_sale.tools.middlewares
import
request_id_middleware
from
prozorro_sale.auth
import
context_middleware
,
load_auth
from
prozorro_sale.auth.environment
import
environment
,
spec
from
prozorro_sale.auth.api.routes
import
init_routes
from
prozorro_sale.auth.errors
import
request_errors_middleware
from
prozorro_sale.auth.errors
import
ERROR_DICT
LOG
=
tools
.
logger
.
get_custom_logger
(
__name__
)
SWAGGER_DOC_AVAILABLE
=
os
.
getenv
(
'SWAGGER_DOC'
,
False
)
AUTH_FILE
=
os
.
environ
.
get
(
'AUTH_FILE'
,
'/secrets/auth.yml'
)
LOG
=
logger
.
get_custom_logger
(
__name__
)
SWAGGER_DOC_AVAILABLE
=
environment
[
'SWAGGER_DOC'
]
AUTH_FILE
=
environ
ment
[
'AUTH_FILE'
]
async
def
all_start_stop_log
(
app
:
web
.
Application
)
->
AsyncGenerator
[
None
,
None
]:
...
...
@@ -23,23 +25,28 @@ async def all_start_stop_log(app: web.Application) -> AsyncGenerator[None, None]
LOG
.
info
(
'Shutting down application'
)
def
create_app
():
tools
.
logger
.
configure_logging
()
auth
.
load_auth
(
AUTH_FILE
)
app
=
web
.
Application
(
middlewares
=
[
request_id_middleware
,
request_errors_middleware
,
auth
.
context_middleware
,
])
init_routes
(
app
)
async
def
on_startup
(
app
:
web
.
Application
)
->
None
:
if
SWAGGER_DOC_AVAILABLE
:
setup_swagger
(
LOG
.
info
(
'Setup Swagger...'
)
await
setup_swagger
(
app
,
title
=
'Prozorro Sale Notification API'
,
api_version
=
prozorro_sale
.
version
,
ui_version
=
3
,
security_definitions
=
{
'Bearer'
:
{
'type'
:
'apiKey'
,
'name'
:
'Authorization'
,
'in'
:
'header'
}},
)
def
create_app
():
logger
.
configure_logging
()
load_auth
(
AUTH_FILE
)
app
=
web
.
Application
(
middlewares
=
[
request_id_middleware
,
catch_error_middleware
(
ERROR_DICT
),
context_middleware
,
])
init_routes
(
app
)
app
.
on_startup
.
append
(
on_startup
)
app
.
cleanup_ctx
.
extend
([
all_start_stop_log
])
...
...
@@ -49,12 +56,12 @@ def create_app():
def
main
()
->
None
:
uvloop
.
install
()
environment
.
check_strict
(
spec
,
True
)
host
=
environment
[
'API_HOST'
]
port
=
environment
[
'API_PORT'
]
access_log_class
=
logger
.
CustomAccessLogger
app
=
create_app
()
web
.
run_app
(
app
,
port
=
80
,
access_log_class
=
tools
.
logger
.
CustomAccessLogger
)
web
.
run_app
(
app
,
host
=
host
,
port
=
port
,
access_log_class
=
access_log_class
)
if
__name__
==
'__main__'
:
...
...
src/prozorro_sale/auth/databridge/main.py
View file @
65a797e0
import
os
from
typing
import
AsyncGenerator
import
uvloop
...
...
@@ -6,12 +5,14 @@ from aiohttp import web
import
prozorro_sale
# noqa
from
prozorro_sale
import
tools
# type: ignore
from
prozorro_sale.tools
import
logger
from
prozorro_sale.tools.errors
import
catch_error_middleware
from
prozorro_sale.tools.middlewares
import
request_id_middleware
from
prozorro_sale.auth.environment
import
environment
,
spec
from
prozorro_sale.auth.databridge.routes
import
init_routes
from
prozorro_sale.auth.errors
import
request_errors_middleware
from
prozorro_sale.auth.errors
import
ERROR_DICT
LOG
=
tools
.
logger
.
get_custom_logger
(
__name__
)
LOG
=
logger
.
get_custom_logger
(
__name__
)
async
def
all_start_stop_log
(
app
:
web
.
Application
)
->
AsyncGenerator
[
None
,
None
]:
...
...
@@ -21,11 +22,11 @@ async def all_start_stop_log(app: web.Application) -> AsyncGenerator[None, None]
def
create_databridge
()
->
web
.
Application
:
tools
.
logger
.
configure_logging
()
logger
.
configure_logging
()
app
=
web
.
Application
(
middlewares
=
[
request_id_middleware
,
request
_error
s
_middleware
catch
_error_middleware
(
ERROR_DICT
),
]
)
init_routes
(
app
)
...
...
@@ -37,9 +38,12 @@ def create_databridge() -> web.Application:
def
main
()
->
None
:
uvloop
.
install
()
environment
.
check_strict
(
spec
,
True
)
host
=
environment
[
'DATABRIDGE_HOST'
]
port
=
environment
[
'DATABRIDGE_PORT'
]
access_log_class
=
logger
.
CustomAccessLogger
app
=
create_databridge
()
service_port
=
int
(
os
.
environ
.
get
(
'DATABRIDGE_SERVICE_PORT'
,
80
))
web
.
run_app
(
app
,
port
=
service_port
,
access_log_class
=
tools
.
logger
.
CustomAccessLogger
)
web
.
run_app
(
app
,
host
=
host
,
port
=
port
,
access_log_class
=
access_log_class
)
if
__name__
==
'__main__'
:
...
...
src/prozorro_sale/auth/environment.py
0 → 100644
View file @
65a797e0
from
prozorro_sale.tools.environment
import
Environment
,
booleans
__all__
=
[
'environment'
]
spec
=
{
'API_HOST'
:
str
,
'API_PORT'
:
int
,
'DATABRIDGE_HOST'
:
str
,
'DATABRIDGE_PORT'
:
int
,
'SWAGGER_DOC'
:
booleans
,
'AUTH_FILE'
:
str
,
'AUTH_IP_BLOCK_STRICT'
:
booleans
,
}
default
=
{
'API_HOST'
:
'0.0.0.0'
,
'API_PORT'
:
80
,
'DATABRIDGE_HOST'
:
'0.0.0.0'
,
'DATABRIDGE_PORT'
:
80
,
'SWAGGER_DOC'
:
False
,
'AUTH_FILE'
:
'/secrets/auth.yml'
,
'AUTH_IP_BLOCK_STRICT'
:
False
,
}
environment
=
Environment
(
spec
=
spec
,
default
=
default
)
test/integration/test_api.py
View file @
65a797e0
import
pytest
from
prozorro_sale.auth
import
User
from
prozorro_sale.auth.environment
import
environment
import
ipaddress
from
unittest
import
mock
@
pytest
.
mark
.
usefixtures
(
'create_auth_token'
,
'build_redirect_url'
,
'get_token'
)
...
...
@@ -47,7 +47,7 @@ class TestApi:
])
async
def
test_check_auth
(
self
,
client
,
_ip
,
auth_block
,
status
):
User
.
ips
=
[
ipaddress
.
ip_network
(
v
)
for
v
in
[
'255.255.255.255'
,
'0.0.0.0/24'
]]
with
mock
.
patch
(
'prozorro_sale.auth
.AUTH_IP_BLOCK_STRICT
'
,
return_value
=
auth_block
):
resp
=
await
client
.
get
(
'api/auth'
,
headers
=
{
"Authorization"
:
"test_broker_1_token"
,
"X-FORWARDED-FOR"
:
_ip
})
assert
resp
.
status
==
status
environment
.
AUTH_IP_BLOCK_STRICT
=
auth_block
resp
=
await
client
.
get
(
'api/auth'
,
headers
=
{
"Authorization"
:
"test_broker_1_token"
,
"X-FORWARDED-FOR"
:
_ip
})
assert
resp
.
status
==
status
test/unit/conftest.py
0 → 100644
View file @
65a797e0
import
pytest
import
prozorro_sale.auth
as
auth
@
pytest
.
fixture
def
user
():
def
_user
(
ips
=
None
,
procedures
=
None
):
if
ips
is
None
:
ips
=
[
'255.255.255.255'
,
'0.0.0.0/24'
]
if
procedures
is
None
:
procedures
=
{
'renewables'
:
[
'procedure'
,
'bids'
],
'timber'
:
[
'procedure'
,
'bids'
],
'subsoil'
:
[
'procedure'
,
'bids'
],
'railwayCargo'
:
[
'procedure'
]
}
return
auth
.
User
(
'test_name'
,
'test_token'
,
ips
=
ips
,
procedures
=
procedures
)
return
_user
test/unit/test_auth.py
View file @
65a797e0
import
unittest
import
pytest
import
prozorro_sale.auth
as
auth
class
User
TestCase
(
unittest
.
TestCase
)
:
def
setUp
(
self
):
test_
ips
=
[
'255.255.255.255'
,
'0.0.0.0/24'
]
test_
procedures
=
{
'renewables'
:
[
'procedure'
,
'bids'
],
'timber'
:
[
'procedure'
,
'bids'
],
'subsoil'
:
[
'procedure'
,
'bids'
],
'railwayCargo'
:
[
'procedure'
]
}
self
.
user
=
auth
.
User
(
'test_name'
,
'test_token'
,
ips
=
test_ips
,
procedures
=
test_procedures
)
class
Test
User
:
def
test_
user_access_by_ip
(
self
,
user
):
test_
user
=
user
()
assert
test_user
.
has_access_by_ip
(
'255.255.255.255'
)
assert
test_user
.
has_access_by_ip
(
'0.0.0.1'
)
assert
not
test_user
.
has_access_by_ip
(
'0.255.255.255'
)
with
pytest
.
raises
(
ValueError
):
test_user
.
has_access_by_ip
(
'test'
)
def
test_user_access_by_ip
(
self
):
self
.
assertTrue
(
self
.
user
.
has_access_by_ip
(
'255.255.255.255'
))
self
.
assertTrue
(
self
.
user
.
has_access_by_ip
(
'0.0.0.1'
))
self
.
assertFalse
(
self
.
user
.
has_access_by_ip
(
'0.255.255.255'
))
with
pytest
.
raises
(
ValueError
):
test_user
.
has_access_by_ip
(
'123.123.123'
)
self
.
assertR
aises
(
Valu
eError
,
self
.
user
.
has_access_by_ip
,
'test'
)
self
.
assertRaises
(
ValueError
,
self
.
user
.
has_access_by_ip
,
'123.123.123
'
)
with
pytest
.
r
aises
(
Typ
eError
)
:
test_user
.
__setattr__
(
'ips'
,
'test
'
)
self
.
assertR
aises
(
TypeError
,
self
.
user
.
__setattr__
,
'ips'
,
'test'
)
self
.
assertRaises
(
TypeError
,
self
.
user
.
__setattr__
,
'ips'
,
123
)
with
pytest
.
r
aises
(
TypeError
)
:
test_
user
.
__setattr__
(
'ips'
,
123
)
def
test_user_access_to_object
(
self
):
self
.
assertTrue
(
self
.
user
.
has_access_to_object
(
'renewables'
,
'procedure'
))
self
.
assertTrue
(
self
.
user
.
has_access_to_object
(
'timber'
,
'bids'
))
self
.
assertFalse
(
self
.
user
.
has_access_to_object
(
'railwayCargo'
,
'bids'
))
self
.
assertFalse
(
self
.
user
.
has_access_to_object
(
'test'
,
'test'
))
def
test_user_access_to_object
(
self
,
user
):
test_user
=
user
()
assert
test_user
.
has_access_to_object
(
'renewables'
,
'procedure'
)
assert
test_user
.
has_access_to_object
(
'timber'
,
'bids'
)
assert
not
test_user
.
has_access_to_object
(
'railwayCargo'
,
'bids'
)
assert
not
test_user
.
has_access_to_object
(
'test'
,
'test'
)
class
Auth
TestCase
(
unittest
.
TestCase
)
:
class
Test
Auth
:
auth_file
=
'test/test_secrets/auth.yml'
def
test_load_auth
(
self
):
auth
.
AUTH_FILE
=
None
auth
.
load_auth
(
self
.
auth_file
)
self
.
assert
IsNotNone
(
auth
.
AUTH_FILE
)
assert
auth
.
AUTH_FILE
is
not
None
def
test_get_user
(
self
):
auth
.
load_auth
(
self
.
auth_file
)
u
=
auth
.
_get_user_by_token
(
'test_broker_2_token'
)
self
.
assert
Equal
(
u
.
name
,
'test_broker_2'
)
self
.
assert
Equal
(
len
(
u
.
procedures
)
,
4
)
u
ser
=
auth
.
_get_user_by_token
(
'test_broker_2_token'
)
assert
user
.
name
==
'test_broker_2'
assert
len
(
u
ser
.
procedures
)
==
4
def
test_user_has_access_by_ips
(
self
):
auth
.
load_auth
(
self
.
auth_file
)
u
=
auth
.
_get_user_by_token
(
'test_ips'
)
self
.
assert
Equal
(
u
.
name
,
'test_broker_ips'
)
self
.
assert
Equal
(
len
(
u
.
procedures
)
,
4
)
u
ser
=
auth
.
_get_user_by_token
(
'test_ips'
)
assert
user
.
name
==
'test_broker_ips'
assert
len
(
u
ser
.
procedures
)
==
4
self
.
assert
True
(
u
.
has_access_by_ip
(
'255.255.255.255'
)
)
self
.
assert
False
(
u
.
has_access_by_ip
(
'255.255.255.254'
)
)
assert
user
.
has_access_by_ip
(
'255.255.255.255'
)
assert
not
user
.
has_access_by_ip
(
'255.255.255.254'
)
# check subnet mask
self
.
assert
True
(
u
.
has_access_by_ip
(
'0.0.0.0'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'0.0.0.1'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'0.0.0.2'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'0.0.0.3'
)
)
self
.
assert
False
(
u
.
has_access_by_ip
(
'0.0.0.4'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'255.255.255.128'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'255.255.255.129'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'255.255.255.130'
)
)
self
.
assert
True
(
u
.
has_access_by_ip
(
'255.255.255.131'
)
)
self
.
assert
False
(
u
.
has_access_by_ip
(
'255.255.255.132'
)
)
assert
user
.
has_access_by_ip
(
'0.0.0.0'
)
assert
user
.
has_access_by_ip
(
'0.0.0.1'
)
assert
user
.
has_access_by_ip
(
'0.0.0.2'
)
assert
user
.
has_access_by_ip
(
'0.0.0.3'
)
assert
not
user
.
has_access_by_ip
(
'0.0.0.4'
)
assert
user
.
has_access_by_ip
(
'255.255.255.128'
)
assert
user
.
has_access_by_ip
(
'255.255.255.129'
)
assert
user
.
has_access_by_ip
(
'255.255.255.130'
)
assert
user
.
has_access_by_ip
(
'255.255.255.131'
)
assert
not
user
.
has_access_by_ip
(
'255.255.255.132'
)
def
test_user_legal_name
(
self
):
auth
.
load_auth
(
self
.
auth_file
)
...
...
@@ -74,6 +72,6 @@ class AuthTestCase(unittest.TestCase):
broker_1
=
'test_broker_1'
broker_2
=
'test_broker_2'
self
.
assert
Equal
(
auth
.
get_user_legal_name_by_name
(
broker_1
)
,
'test_broker_1'
)
self
.
assert
Equal
(
auth
.
get_user_legal_name_by_name
(
broker_2
)
,
'test_broker_2'
)
self
.
assert
Equal
(
auth
.
get_user_legal_name_by_name
(
'test'
)
,
None
)
assert
auth
.
get_user_legal_name_by_name
(
broker_1
)
==
'test_broker_1'
assert
auth
.
get_user_legal_name_by_name
(
broker_2
)
==
'test_broker_2'
assert
auth
.
get_user_legal_name_by_name
(
'test'
)
is
None
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment