Commit 18337a51 authored by apeshkov's avatar apeshkov

[BACK][VLN] Project init

parent 63114e67
*.pyc
/.idea/
/venv/
/vevn/
/dev/data/
/dev/logs/
/dev/share/
/docs/
/app/main/settings_local.py
# Editor configuration, see https://editorconfig.org
root = true
[*.yml]
indent_size = 2
[*.py]
indent_size = 4
[flake8]
ignore = E203, E266, E501, W503, F403, F401
max-line-length = 79
max-complexity = 18
select = B,C,E,F,W,T4,B9
*.pyc
/.idea/
/venv/
/vevn/
/static/
# Compiled messages
*.mo
#IDE stuff
*.iml
#local env
*.env
#local settings
/app/main/settings_local.py
#data for local compose env
/dev/data/
/dev/logs/
/dev/share/
# Generated documentation
/docs/api/exec-api.html
# pre-built metadata files
computed_fields.map
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/ambv/black
rev: 20.8b1
hooks:
- id: black
language_version: python3.9
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.0
hooks:
- id: flake8
default_language_version:
python: python3.9
# Deploy
- Скопировать содержимое папки app в папке /opt/back/app, предварительно очистив ее
- Скопировать содержимое папки locale в папку /opt/back/locale
- Скопировать файлы Dockerfile и requirements.txt в папку /opt/back
- Из папки /opt вызвать команду sudo docker-compose -f docker-compose.dev.yml up -d --build
FROM python:3.9.2-slim-buster AS development_build
LABEL maintainer="sobolevn@wemake.services"
LABEL vendor="wemake.services"
ARG DJANGO_ENV
ENV DJANGO_ENV=${DJANGO_ENV} \
# build:
BUILD_ONLY_PACKAGES='wget' \
# python:
PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PYTHONDONTWRITEBYTECODE=1 \
# pip:
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
# dockerize:
DOCKERIZE_VERSION=v0.6.1 \
# tini:
TINI_VERSION=v0.19.0 \
# poetry:
POETRY_VERSION=1.1.4 \
POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_CREATE=false \
POETRY_CACHE_DIR='/var/cache/pypoetry' \
PATH="$PATH:/root/.poetry/bin"
# System deps:
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
bash \
build-essential \
curl \
gettext \
git \
libpq-dev \
# Defining build-time-only dependencies:
$BUILD_ONLY_PACKAGES \
# Installing `dockerize` utility:
# https://github.com/jwilder/dockerize
&& wget "https://github.com/jwilder/dockerize/releases/download/${DOCKERIZE_VERSION}/dockerize-linux-amd64-${DOCKERIZE_VERSION}.tar.gz" \
&& tar -C /usr/local/bin -xzvf "dockerize-linux-amd64-${DOCKERIZE_VERSION}.tar.gz" \
&& rm "dockerize-linux-amd64-${DOCKERIZE_VERSION}.tar.gz" && dockerize --version \
# Installing `tini` utility:
# https://github.com/krallin/tini
&& wget -O /usr/local/bin/tini "https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini" \
&& chmod +x /usr/local/bin/tini && tini --version \
# Installing `poetry` package manager:
# https://github.com/python-poetry/poetry
&& curl -sSL 'https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py' | python \
&& poetry --version \
# Removing build-time-only dependencies:
&& apt-get remove -y $BUILD_ONLY_PACKAGES \
# Cleaning cache:
&& apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \
&& apt-get clean -y && rm -rf /var/lib/apt/lists/*
WORKDIR /code
# This is a special case. We need to run this script as an entry point:
COPY ./docker/django/entrypoint.sh /docker-entrypoint.sh
# Setting up proper permissions:
RUN chmod +x '/docker-entrypoint.sh' \
&& groupadd -r web && useradd -d /code -r -g web web \
&& chown web:web -R /code \
&& mkdir -p /var/www/django/static /var/www/django/media \
&& chown web:web /var/www/django/static /var/www/django/media
# Copy only requirements, to cache them in docker layer
COPY --chown=web:web ./poetry.lock ./pyproject.toml /code/
# Project initialization:
RUN echo "$DJANGO_ENV" \
&& poetry install \
$(if [ "$DJANGO_ENV" = 'production' ]; then echo '--no-dev'; fi) \
--no-interaction --no-ansi \
# Cleaning poetry installation's cache for production:
&& if [ "$DJANGO_ENV" = 'production' ]; then rm -rf "$POETRY_CACHE_DIR"; fi
# Running as non-root user:
USER web
# We customize how our app is loaded with the custom entrypoint:
ENTRYPOINT ["tini", "--", "/docker-entrypoint.sh"]
# The following stage is only for Prod:
# https://wemake-django-template.readthedocs.io/en/latest/pages/template/production.html
FROM development_build AS production_build
COPY --chown=web:web . /code
# voxlane-back
# README #
### Как поднять проект локально? ###
Можно использовать прилагаемый docker-compose файл:
```
curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python3.9
cd dev
docker-compose --project-name voxlane-back -f workspace/voxlane-back/dev/docker-compose.local.yml up```
Для запуска приложения требуется чтобы папка `app` в корне репозитория была добавлена в `PYTHONPATH`.
В Pycharm для этого достаточно отметить её как `Sources Root`.
Затем создайте файл `app/main/settings_local.py` со следующим содержимым:
Прочтите файл, некоторые настройки потребуется заполнить вручную.
```python
import datetime
DEBUG = True
# убираем настройки безопасности
ALLOWED_HOSTS = ['*']
SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': datetime.timedelta(days=30),
'REFRESH_TOKEN_LIFETIME': datetime.timedelta(hours=3),
'AUTH_HEADER_TYPES': ('Bearer',),
'ROTATE_REFRESH_TOKENS': True
}
AUTH_PASSWORD_VALIDATORS = []
USE_HTTPS = False
SESSION_COOKIE_SECURE = False
CSRF_COOKIE_SECURE = False
# сначала пытаемся аутентифицироваться через Django
AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend',
]
# БД
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'HOST': 'localhost',
'PORT': '35432',
'NAME': 'voxlane',
'USER': 'thirdlane',
'PASSWORD': 'thirdlane',
}
}
# исходящая почта
EMAIL_HOST = 'localhost'
EMAIL_PORT = '1025'
# redis и celery
REDIS_HOST = 'localhost'
REDIS_PORT = '36379'
REDIS_DB_CELERY = 0
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_CELERY}"
CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_CELERY}"
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": f"redis://{REDIS_HOST}:{REDIS_PORT}/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
}
}
}
# упрощает отладку вычисляемых полей в моделях
COMPUTEDFIELDS_MAP = None
COMPUTEDFIELDS_ADMIN = True
```
Переменные, добавленные в него, будут автоматически добавляться(перезаписывать существующие переменные) в базовых настройках в `app/main/settings.py`
В конфигурацию входит почтовый сервер, можно читать исходящие письма в веб интерфейсе на http://localhost:8025/
Также в конфигурацию входит сервер rabbitmq, админка доступна на http://localhost:8030/ логин/пароль guest/guest
### Фронтенд ###
Фронтенд поднимается отдельно, но по умолчанию настроен на работу с локальным бекендом.
Репозиторий фронтенда:
### Как запускать celery task'и без отдельного процесса? ###
Добавьте к вашему файлу настроек `CELERY_TASK_ALWAYS_EAGER = True`
### Как запускать celery на windows? ###
Celery на windows запустится, но "из коробки" не будет выполнять таски, нужно запускать следующим образом:
`celery -A main worker --loglevel=INFO --pool=solo`
from .celery import *
"""
ASGI config for voxlane-back project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.0/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
application = get_asgi_application()
from django.contrib.auth import get_user_model
from rest_framework_simplejwt.serializers import (
TokenObtainPairSerializer,
TokenVerifySerializer,
TokenRefreshSerializer,
)
from rest_framework_simplejwt.state import token_backend
from rest_framework_simplejwt.tokens import RefreshToken
from users.serializers import UserSerializer
User = get_user_model()
class TokenObtainPairSerializerEx(TokenObtainPairSerializer):
def validate(self, attrs):
data = super().validate(attrs)
data["user"] = UserSerializer(self.user).data
return data
class TokenVerifySerializerEx(TokenVerifySerializer):
def validate(self, attrs):
data = super().validate(attrs)
payload = token_backend.decode(attrs["token"], verify=False)
user = User.objects.get(id=payload.get("user_id"))
data["user"] = UserSerializer(user).data
return data
class TokenRefreshSerializerEx(TokenRefreshSerializer):
def validate(self, attrs):
data = super().validate(attrs)
refresh = RefreshToken(attrs["refresh"])
user = User.objects.get(id=refresh.payload.get("user_id"))
data["user"] = UserSerializer(user).data
return data
from rest_framework_simplejwt.views import TokenObtainPairView, TokenVerifyView, TokenRefreshView
from main.auth.serializers import TokenObtainPairSerializerEx, TokenVerifySerializerEx, TokenRefreshSerializerEx
class TokenObtainPairViewEx(TokenObtainPairView):
serializer_class = TokenObtainPairSerializerEx
class TokenVerifyViewEx(TokenVerifyView):
serializer_class = TokenVerifySerializerEx
class TokenRefreshViewEx(TokenRefreshView):
serializer_class = TokenRefreshSerializerEx
import os
from celery import Celery
from celery.signals import task_failure
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
app = Celery("voxlane")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
def periodic(schedule):
"""
Decorator to run a task on a schedule.
see https://docs.celeryproject.org/en/v4.4.7/userguide/periodic-tasks.html
:param schedule: - interval in seconds/cron/solar
"""
def decorator(task):
app.add_periodic_task(schedule, task)
return task
return decorator
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# you can add periodic tasks here
pass
@task_failure.connect
def celery_task_failure_email(sender, **kwargs):
from django.core.mail import mail_admins
from django.conf import settings
queue_name = sender.request.delivery_info.get("routing_key", "celery")
subject = f"[Task Failure][{queue_name}@{settings.PRIMARY_HOST}] {sender.name} error"
message = (
f"Task {sender.name} has failed. \n " f"Task id was: {kwargs['task_id']} \n " f"Traceback: \n {kwargs['einfo']}"
)
mail_admins(subject, message)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
import logging
import time
from django.core.cache import cache
from django.core.mail import mail_admins
from django.utils.timezone import now
from main.core.exceptions import ParallelExecutionException
from main.core.utils import human_delta
logger = logging.getLogger(__name__)
def avoid_parallel_execution(fn_name):
def decorator(fn):
def wrapper(*args, **kwargs):
key = "double_execution_protection:" + fn_name
key_value = cache.get(key)
if key_value is None:
cache.set(key, now(), None)
try:
fn(*args, **kwargs)
except BaseException as ex:
raise ex
finally:
cache.delete(key)
else:
msg = "{0}: parallel execution detected. Other cron started at {1}".format(fn_name, key_value)
mail_admins("Cron parallel execution detected", msg)
logger.error(msg)
raise ParallelExecutionException()
return None
return wrapper
return decorator
def time_perf(desc="Processing", logger=None):
def decorator(fn):
def wrapper(*args, **kwargs):
logger.info(f"{desc}...")
ts = time.perf_counter()
result = fn(*args, **kwargs)
te = time.perf_counter()
logger.info(f"{desc} done in {human_delta(te - ts)}.")
return result
return wrapper
return decorator
from rest_framework.exceptions import APIException
class ParallelExecutionException(Exception):
pass
class BadGateway(APIException):
status_code = 502
default_detail = "Bad gateway"
default_code = "bad_gateway"
class ServiceUnavailable(APIException):
status_code = 503
default_detail = "Service temporarily unavailable"
default_code = "service_unavailable"
class GatewayTimeout(APIException):
status_code = 504
default_detail = "Gateway timeout"
default_code = "gateway_timeout"
from django.conf import settings
from main.core.utils.classes import readonly_class_property
class Feature:
"""
Feature-флаги,
добавляйте по мере необходимости
"""
@readonly_class_property
def is_qa(self):
return settings.QA
@readonly_class_property
def is_prod(self):
return settings.PRODUCTION
from model_utils import FieldTracker
class VLNFieldTracker(FieldTracker):
def patch_save(self, model):
from main.core.models import TrackerMixin
if not issubclass(model, TrackerMixin):
return super().patch_save(model)
from rest_framework.filters import OrderingFilter
class NestedOrderingFilter(OrderingFilter):
"""
Extends default ordering filter to allow sorting by related model attributes and queryset annotations
without explicitly specifying names for those attributes in ordering_fields
Desired nesting level can be specified on the view class via order_nesting_depth property
"""
default_nesting_depth = 2
def get_nesting_depth(self, view):
return getattr(view, "order_nesting_depth", self.default_nesting_depth) - 1
@staticmethod
def get_field_verbose_name(field, name: str) -> str:
return field.verbose_name if hasattr(field, "verbose_name") else name.replace("__", " ")
def add_nested_fields(self, sort_fields, fields, depth: int, parent_name=None):
for field in fields:
field_name = f"{parent_name}__{field.name}" if parent_name else field.name
if field.related_model and (field.many_to_one or field.one_to_one) and depth > 0:
self.add_nested_fields(sort_fields, field.related_model._meta.get_fields(), depth - 1, field_name)
elif parent_name:
sort_fields.append((field_name, self.get_field_verbose_name(field, field_name)))
def get_valid_fields(self, queryset, view, context=None):
fields = super(NestedOrderingFilter, self).get_valid_fields(queryset, view, context)
valid_field_names = {field[0] for field in fields}
max_depth = self.get_nesting_depth(view)
model_fields = (field for field in queryset.model._meta.get_fields() if field.name in valid_field_names)
self.add_nested_fields(fields, model_fields, max_depth)
# implicitly enable sorting for annotations
for annotation_name in queryset.query.annotations.keys():
fields.append((annotation_name, annotation_name))
return fields
from .api_client import ApiClient
import abc
import urllib.parse
from contextlib import contextmanager
import requests
from requests.exceptions import ConnectionError, HTTPError, Timeout, TooManyRedirects
from rest_framework.exceptions import ValidationError
from main.core.exceptions import BadGateway, GatewayTimeout
from main.core.utils.classes import Singleton
class ApiClient(Singleton, abc.ABC):
service_name = "Service"
def __init__(self):
self.session = requests.Session()
self.validate_config()
self.authorize()
def get_shared_kwargs(self):
return {}
def authorize(self):
pass
@abc.abstractmethod
def get_host(self) -> str:
pass
def get_endpoint(self, path) -> str:
return urllib.parse.urljoin(self.get_host(), path)
def perform_request(self, method: str, url: str, kwargs):
with self.handle_errors():
raise_for_status = kwargs.pop("raise_for_status", True)
url = self.get_endpoint(url)
kwargs = {**self.get_shared_kwargs(), **kwargs}
response = getattr(self.session, method)(url, **kwargs)
if response.status_code == 401:
response = self.unauthorized_hook(response)
if raise_for_status:
response.raise_for_status()
return response
def get(self, url, **kwargs):
return self.perform_request("get", url, kwargs)
def post(self, url, **kwargs):
return self.perform_request("post", url, kwargs)
def put(self, url, **kwargs):
return self.perform_request("put", url, kwargs)
def patch(self, url, **kwargs):
return self.perform_request("patch", url, kwargs)
def delete(self, url, **kwargs):
return self.perform_request("delete", url, kwargs)
def unauthorized_hook(self, response):
return response
@staticmethod
def validate_config():
pass
@classmethod
@contextmanager
def handle_errors(cls):
try:
yield
except ConnectionError:
raise BadGateway("Could not reach %s" % cls.service_name)
except TooManyRedirects:
raise BadGateway("Could not reach %s (too many redirects)" % cls.service_name)
except Timeout:
raise GatewayTimeout("Request to %s timed out" % cls.service_name)
except HTTPError as err:
raise ValidationError(err.response.json())
import os
import threading
from django.utils import timezone
from main.core.utils.classes import readonly_class_property
class CurrentRequest:
"""
Contains global state tied to request
threading.local is not necessary for production, but fixes some `manage.py runsever` inconsistencies
"""
state = threading.local()
@classmethod
def use_request(cls, request):
cls.state.request = request
cls.state.cache = {} if request else None
@readonly_class_property
def exists(cls):
return cls.request is not None
@readonly_class_property
def request(cls):
return getattr(cls.state, "request", None)
@readonly_class_property
def user(cls):
if cls.request:
return cls.request.user
return None
@classmethod
def cached(cls, func):
def wrapped(*args):
if getattr(cls.state, "cache", None) is None:
return func(*args)
if func not in cls.state.cache:
cls.state.cache[func] = func(*args)
return cls.state.cache[func]
return wrapped
class VLNMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def add_debug_headers(self, response):
response["X-SME-WORKER-PID"] = os.getpid()
response["X-SME-WORKER-TIME"] = timezone.now()
response["X-SME-TIMEZONE"] = timezone.get_default_timezone_name()
def __call__(self, request):
CurrentRequest.use_request(request)
response = self.get_response(request)
self.add_debug_headers(response)
CurrentRequest.use_request(None)
return response
from main.core.fields import VLNFieldTracker
class TrackerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
tracker = getattr(self.__class__, "tracker", None)
if not tracker or not isinstance(tracker, VLNFieldTracker):
raise AttributeError("Classes with TrackerMixin must have tracker=VLNFieldTracker()")
self._before_save_hooks = []
def __flush_before_save_hooks(self):
for hook in self._before_save_hooks:
hook()
self._before_save_hooks.clear()
def enqueue_before_save_hook(self, hook):
self._before_save_hooks.append(hook)
def save(self, *args, **kwargs):
"""
This implementation was copy pasted from FieldTracker.patch_save
it correctly reports tracker changes during nested model.save() calls
"""
self.__flush_before_save_hooks()
ret = super().save(*args, **kwargs)
update_fields = kwargs.get("update_fields")
if not update_fields and update_fields is not None: # () or []
fields = update_fields
elif update_fields is None:
fields = None
else:
fields = (field for field in update_fields if field in self.tracker.fields)
self.tracker.set_saved_fields(fields=fields)
return ret
def save_if_changed(self, **kwargs):
if not self._state.adding:
if any(self.tracker.changed().keys()):
self.save(**kwargs)
else:
self.save(**kwargs)
from django.core.paginator import Paginator
from rest_framework.pagination import PageNumberPagination
class VLNPaginator(Paginator):
pass
class DefaultVLNPagination(PageNumberPagination):
django_paginator_class = VLNPaginator
pagination_disabled_query_param = None # 'unpaginated'
page_query_param = "page"
page_size_query_param = "size"
page_size = 50
max_page_size = 1000
def get_page_size(self, request):
# disable pagination if query param is present
if self.pagination_disabled_query_param and self.pagination_disabled_query_param in request.query_params:
return None
if self.page_size_query_param and request.query_params.get(self.page_size_query_param) == "max":
return self.max_page_size
return super().get_page_size(request)
from copy import deepcopy
from rest_framework.permissions import DjangoModelPermissions
def get_custom_method_map():
"""
By default DRF does not check permissions on GET, enable it here
"""
perms = deepcopy(DjangoModelPermissions.perms_map)
# Temporarily disabled, if not re-enabled before 7.09.2020, delete this check entirely
# perms['GET'].append('%(app_label)s.view_%(model_name)s')
return perms
class VLNModelPermissions(DjangoModelPermissions):
perms_map = get_custom_method_map()
def AutoPermissions(app_label, class_name):
class Permissions:
ADD: str = ".".join((app_label, f"add_{class_name}"))
CHANGE: str = ".".join((app_label, f"change_{class_name}"))
DELETE: str = ".".join((app_label, f"delete_{class_name}"))
VIEW: str = ".".join((app_label, f"view_{class_name}"))
return Permissions
from rest_framework.routers import Route, DefaultRouter
from rest_framework_nested.routers import NestedMixin
class VLNRelatedViewSetSupportRouterMixin:
__extra_routes = [
Route(
url=r"^{prefix}{trailing_slash}$",
mapping={
"get": "retrieve_related",
"put": "update_related",
"patch": "partial_update_related",
},
name="{basename}-related",
detail=False,
initkwargs={"suffix": "Related"},
)
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.routes = self.__extra_routes + super().routes
class VLNRouter(VLNRelatedViewSetSupportRouterMixin, DefaultRouter):
pass
class VLNNestedRouter(NestedMixin, VLNRouter):
pass
from rest_framework.serializers import Serializer, ListSerializer
from main.core.serializers import CacheAwareSerializerMixin
class RecursiveField(Serializer): # noqa
def to_representation(self, value):
serializer = self.parent.parent.__class__(value, context=self.context)
return serializer.data
class CacheAwareListSerializer(CacheAwareSerializerMixin, ListSerializer): # noqa
pass
class CacheAwareRecursiveField(CacheAwareSerializerMixin, RecursiveField): # noqa
class Meta:
list_serializer_class = CacheAwareListSerializer
import dataclasses
from django.core.exceptions import ObjectDoesNotExist
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from rest_framework.fields import get_attribute
from main.core.utils.models import get_model_manager
from main.core.utils.views import extract_id
class ForeignKeyDeserializerMixin:
queryset = None # mimics rest_framework.relations.RelatedField
def __init__(self, *args, **kwargs):
self.queryset = kwargs.pop("queryset", self.queryset)
self._get_custom_queryset = kwargs.pop("get_queryset", None)
super().__init__(*args, **kwargs)
def get_model_manager(self):
return get_model_manager(self.Meta.model)
def _get_queryset(self):
if self._get_custom_queryset:
return self._get_custom_queryset(self)
if self.queryset:
return self.queryset
else:
return self.get_model_manager().all()
def to_internal_value(self, data):
pk = extract_id(data)
try:
model = self._get_queryset().get(pk=pk)
except ObjectDoesNotExist:
raise ValidationError("No matching object found in database")
return model
class NestedRouterForeignKeyDeserializerMixin:
"""
This class only works for relations that are serialized as primary keys
It can be extended if needed or used in conjunction with ForeignKeyDeserializerMixin
Add your nested router lookups to router_lookups and they will be automatically added to serializer data
"""
router_lookups = ()
def _get_relation_pk(self, attr: str):
view = self.context.get("view")
return view.kwargs.get(f"{attr}_{view.lookup_field}")
def _inject_relations_from_router(self, data):
for attr in self.router_lookups:
if isinstance(attr, str):
data[attr] = self._get_relation_pk(attr)
else:
data[attr[0]] = self._get_relation_pk(attr[1])
return data
def to_internal_value(self, data):
extended_data = self._inject_relations_from_router(data)
return super().to_internal_value(extended_data)
class CacheAwareSerializerMixin:
cache_context_key = "cache"
cache_pk_attr = "id"
"""
Can grab model attribute values from context cache by model id instead of querying the model
Init kwargs:
consider_empty_on_miss: if true, never try to resolve model attribute on cache miss
"""
def __init__(self, *args, **kwargs):
self.consider_empty_on_miss = kwargs.pop("consider_empty_on_miss", False)
# ListSerializer gets its own set of kwargs, so we need to pluck the attr from child
if "child" in kwargs:
self.consider_empty_on_miss = getattr(kwargs["child"], "consider_empty_on_miss", False)
super().__init__(*args, **kwargs)
def get_attribute(self, instance):
try:
cache = get_attribute(self.context.get(self.cache_context_key), self.source_attrs)
except (KeyError, AttributeError):
cache = None
if cache:
pk = getattr(instance, self.cache_pk_attr)
if pk in cache:
return cache[pk]
if self.consider_empty_on_miss:
return [] if self.many else None
return super().get_attribute(instance)
class GenericDataclassSerializer(serializers.Serializer): # noqa
def to_representation(self, instance):
if not dataclasses.is_dataclass(instance):
raise ValueError("This serializer only supports dataclasses")
return dataclasses.asdict(instance)
This diff is collapsed.
class Singleton:
__singleton_instance = None
def __new__(cls, *args, **kwargs):
if cls.__singleton_instance is None:
cls.__singleton_instance = super().__new__(cls)
return cls.__singleton_instance
class readonly_class_property: # noqa
def __init__(self, getter):
self.getter = getter
def __get__(self, obj, cls):
return self.getter(cls)
def set_in(obj: dict, key: str, value):
tokens = key.split("__")
current_level = obj
if not isinstance(current_level, dict):
current_level = {}
for token in tokens[:-1]:
if token not in current_level or not isinstance(current_level[token], dict):
current_level[token] = {}
current_level = current_level[token]
current_level[tokens[-1]] = value
return obj
def get_in(obj: dict, key: str, default=None):
tokens = key.split("__")
current_level = obj
if not isinstance(current_level, dict):
return default
for token in tokens:
if token in current_level:
current_level = current_level[token]
else:
return default
return current_level
def decapitalize_keys(source, recursive=False):
"""
переводит первую букву всех строковых ключей в словаре в нижний регистр
:param source:
:param recursive:
:return:
"""
if recursive and isinstance(source, list):
for item in source:
decapitalize_keys(item, recursive=recursive)
if not isinstance(source, dict):
return source
for key in list(source.keys()):
if isinstance(key, str):
value = source.pop(key)
source[key[0].lower() + key[1:]] = value
else:
value = source[key]
if recursive:
decapitalize_keys(value, recursive=recursive)
return source
from django.conf import settings
def is_unit_testing():
return settings.TEST
from typing import List
from django.db import models
from django.db.models import prefetch_related_objects
from main.core.utils import is_prefetched, chunk_iterator
def purify_queryset(queryset: models.QuerySet) -> models.QuerySet:
queryset = queryset.prefetch_related(None).select_related(None)
queryset.query.annotations.clear()
return queryset
def get_model_manager(model):
return model._default_manager or model.objects # noqa
def patch_attributes(model, **kwargs):
for key, value in kwargs.items():
setattr(model, key, value)
def patch_attributes_diff(model, **kwargs) -> bool:
"""
Patch model attributes and return True if any were changed
:param model:
:param kwargs:
:return: bool
"""
has_diff = False
for key, value in kwargs.items():
has_attr = hasattr(model, key)
current = getattr(model, key, None)
if not has_attr or current != value:
has_diff = True
setattr(model, key, value)
return has_diff
def fake_prefetch_related(instance, attribute_name: str, value: List):
"""
Inserts a list of objects into the models prefetch cache as if they were fetched via prefetch_related()
:param instance:
:param attribute_name:
:param value:
:return: None
"""
cache_attr = "_prefetched_objects_cache"
if not hasattr(instance, cache_attr):
setattr(instance, cache_attr, {})
cache = getattr(instance, cache_attr)
field = getattr(instance, attribute_name)
prefetch_cache_name = getattr(field, "prefetch_cache_name", cache_attr)
cache[prefetch_cache_name] = value
def fake_select_related(instance, attribute_name: str, value):
state = getattr(instance, "_state")
state.fields_cache[attribute_name] = value
def prefetch_related_iterator(queryset, *related_lookups, chunk_size=100):
"""
Enables optimization similar to prefetch_related for datasets that may not fit in memory
:param queryset:
:param related_lookups:
:param chunk_size:
:return:
"""
qs_iterator = queryset.iterator(chunk_size=chunk_size)
chunks = chunk_iterator(qs_iterator, chunk_size)
if not related_lookups:
related_lookups = getattr(queryset, "_prefetch_related_lookups", [])
for chunk in chunks:
prefetch_related_objects(chunk, *related_lookups)
yield from chunk
class SmartRelatedFilter:
"""
Фильтр, который можно применять к загружаемым через prefetch_related Queryset'ам без дополнительного запроса к БД.
На выходе будет массив или queryset в зависимости от того подгружен queryset или нет
Поддерживаются следующие lookup'ы:
id=5
id__in=[5,6,7,8]
Не поддерживает Q
Для неподдерживаемых случаев можно использовать custom_object_filter для фильтрации кэшированных queryset'ов
"""
lookups = {"__in": lambda obj, key, collection: getattr(obj, key[:-4]) in collection}
@staticmethod
def default_equals(obj, key, value):
return getattr(obj, key) == value
@classmethod
def lookup_fn_from_key(cls, key: str):
for lookup_key, lookup_fn in cls.lookups.items():
if key.endswith(lookup_key):
return lookup_fn
return cls.default_equals
@classmethod
def apply(cls, model, qs_attribute, *args, custom_object_filter=None, **kwargs):
if is_prefetched(model, qs_attribute):
objs = getattr(model, qs_attribute).all()
if custom_object_filter is not None:
return [obj for obj in objs if custom_object_filter(obj)]
return [
obj
for obj in objs
if all(cls.lookup_fn_from_key(key)(obj, key, value) for key, value in kwargs.items())
]
else:
return getattr(model, qs_attribute).filter(*args, **kwargs)
def smart_exists(model, qs_attribute, **kwargs):
objs = SmartRelatedFilter.apply(model, qs_attribute, **kwargs)
if is_prefetched(model, qs_attribute):
return bool(len(objs))
else:
return objs.exists()
def smart_count(model, qs_attribute, **kwargs):
objs = SmartRelatedFilter.apply(model, qs_attribute, **kwargs)
if is_prefetched(model, qs_attribute):
return len(objs)
else:
return objs.count()
def BooleanExpression(**kwargs):
return models.Case(
models.When(**kwargs, then=models.Value(True)), default=models.Value(False), output_field=models.BooleanField()
)
import re
import bleach
# TODO: use a different sanitizer implementation
whitespace_line_start_re = re.compile(r"^\s+", flags=re.MULTILINE)
whitespace_inline_re = re.compile(r"[ \t]+")
style_re = re.compile(r"<style>(?:.|\n)*</style>", flags=re.MULTILINE)
def html2text(html: str):
text = bleach.clean(html, tags=["style"], attributes={}, styles=[], strip=True)
text = style_re.sub("", text)
text = whitespace_line_start_re.sub("", text)
text = whitespace_inline_re.sub(" ", text)
return text
def get_effective_attribute(attr_name, value, serializer):
instance = getattr(serializer, "instance")
return value.get(attr_name, getattr(instance, attr_name, None))
from rest_framework.exceptions import ValidationError
from rest_framework.generics import get_object_or_404
from main.core.utils.models import get_model_manager
def extract_id(data):
if isinstance(data, int):
return data
elif isinstance(data, dict) and "id" in data:
return data["id"]
raise ValidationError({"id": "This field is required"})
def find_object_for_data(data, model):
return get_object_or_404(get_model_manager(model).all(), id=extract_id(data))
def find_objects_for_data(data, model):
ids = [extract_id(item) for item in data]
return get_model_manager(model).filter(id__in=ids).all()
def with_headers(response, headers: dict):
"""
Adds all headers to response
"""
for key, value in headers.items():
response[key] = value
return response
class BodyParser:
def __init__(self, body: dict):
self.body = body
def _get_item(self, name, consume):
return self.body.pop(name, None) if consume else self.body.get(name, None)
@classmethod
def _validate_list(cls, obj, field_name):
if not isinstance(obj, list):
raise ValidationError({field_name: "Value must be a list"})
@classmethod
def _validate_dict(cls, obj, field_name):
if not isinstance(obj, dict):
raise ValidationError({field_name: "Value must be a dict"})
@classmethod
def _validate_not_empty(cls, obj, field_name):
if not obj:
raise ValidationError({field_name: "No valid items provided"})
def int_list(self, name: str, not_empty=True, consume=True):
items = self._get_item(name, consume)
self._validate_list(items, name)
items_ints = [item for item in items if isinstance(item, int)]
if not_empty:
self._validate_not_empty(items, name)
return items_ints
def object_list(self, name: str, not_empty=True, consume=True):
items = self._get_item(name, consume)
self._validate_list(items, name)
if not_empty:
self._validate_not_empty(items, name)
return items
def object_dict(self, name: str, not_empty=True, consume=True):
object_dict = self._get_item(name, consume)
self._validate_dict(object_dict, name)
if not_empty:
self._validate_not_empty(object_dict, name)
return object_dict
from django.db.models import Q
from rest_framework.generics import get_object_or_404
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet, GenericViewSet
class VLNViewMixin:
@classmethod
def __normalize_lookup_key(cls, key):
if not key.endswith("_pk"):
return key + "_pk"
return key
@classmethod
def __normalize_router_lookup(cls, lookup):
lookup_pk = lookup
filter_keys = lookup
second_filter_keys = lookup
if not isinstance(lookup, str):
lookup_pk = lookup[0]
filter_keys = lookup[1]
second_filter_keys = lookup[2] if len(lookup) > 2 else None
lookup_pk = cls.__normalize_lookup_key(lookup_pk)
return lookup_pk, filter_keys, second_filter_keys
def get_router_lookup(self, key):
return self.kwargs.get(self.__normalize_lookup_key(key))
def get_queryset(self):
"""
Automatically picks up nested router lookups from both serializer and self
If serializer has a method to set up prefetching, invokes it automatically with current query set
:return: queryset
"""
qs = super().get_queryset()
serializer_class = self.get_serializer_class()
if hasattr(serializer_class, "setup_eager_loading"):
qs = serializer_class.setup_eager_loading(qs)
router_lookups = getattr(self, "router_lookups", None) or getattr(serializer_class, "router_lookups", None)
if router_lookups:
normalized_lookups = (self.__normalize_router_lookup(lookup) for lookup in router_lookups)
filter_kwargs = {lookup[1]: self.kwargs[lookup[0]] for lookup in normalized_lookups}
second_filter_kwargs = None
if len(router_lookups[0]) > 2:
normalized_lookups = (self.__normalize_router_lookup(lookup) for lookup in router_lookups)
second_filter_kwargs = {lookup[2]: self.kwargs[lookup[0]] for lookup in normalized_lookups}
if second_filter_kwargs:
filter_kwargs = Q(**filter_kwargs)
filter_kwargs |= Q(**second_filter_kwargs)
qs = qs.filter(filter_kwargs)
else:
qs = qs.filter(**filter_kwargs)
return qs
class VLNModelViewSet(VLNViewMixin, ModelViewSet):
pass
class VLNRelatedModelViewSetMixin:
"""
This mixin relies on VLNViewMixin nested router lookup resolution feature.
You must specify router lookups for this to work (on serializer or view set).
Current implementation will only work for one-to-many or many-to-one fields.
"""
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
obj = get_object_or_404(queryset)
self.check_object_permissions(self.request, obj)
return obj
def retrieve_related(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def update_related(self, request, *args, **kwargs):
partial = kwargs.pop("partial", False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update_related(serializer)
if getattr(instance, "_prefetched_objects_cache", None):
instance._prefetched_objects_cache = {}
return Response(serializer.data)
def perform_update_related(self, serializer):
serializer.save()
def partial_update_related(self, request, *args, **kwargs):
kwargs["partial"] = True
return self.update_related(request, *args, **kwargs)
class VLNRelatedModelViewSet(VLNRelatedModelViewSetMixin, VLNViewMixin, GenericViewSet):
pass
"""
Django settings for voxlane-back project.
Generated by 'django-admin startproject' using Django 3.0.7.
For more information on this file, see
https://docs.djangoproject.com/en/3.0/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.0/ref/settings/
"""
import datetime
import os
import sys
import uuid
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = "48#xjh0!2+k7uq%tb!lt7mz2!1)3wm@2u8q(*(6**45sx^6lcl"
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.environ.get("DEBUG", "").lower() == "true"
DOCKER = bool(os.environ.get("DOCKER", False))
QA = os.environ.get("QA", "").lower() == "true"
PRODUCTION = os.environ.get("PRODUCTION", "").lower() == "true"
TEST = "test" in sys.argv
ALLOWED_HOSTS = [host.strip() for host in os.environ.get("ALLOWED_HOSTS", "").split(",") if host.strip()]
ADMINS = [(email.strip(), email.strip()) for email in os.environ.get("ADMINS", "").strip().split(",") if email.strip()]
PRIMARY_HOST = os.environ.get("PRIMARY_HOST", "execution").lower()
# HTTPS
USE_HTTPS = os.environ.get("USE_HTTPS", "true").lower() == "true"
SESSION_COOKIE_SECURE = USE_HTTPS
CSRF_COOKIE_SECURE = USE_HTTPS
# TODO: set up HSTS
APPEND_SLASH = True
# Application definition
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"computedfields",
"rest_framework",
"django_filters",
"django_cron",
"corsheaders",
"mptt",
"users.apps.UsersConfig", # users should be last in this list, or permission update may fail
]
REST_FRAMEWORK = {
"DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",),
"DEFAULT_AUTHENTICATION_CLASSES": (
"rest_framework_simplejwt.authentication.JWTAuthentication",
"rest_framework.authentication.SessionAuthentication",
"rest_framework.authentication.BasicAuthentication",
),
"DEFAULT_FILTER_BACKENDS": (
"django_filters.rest_framework.DjangoFilterBackend",
"rest_framework.filters.SearchFilter",
"main.core.filters.NestedOrderingFilter",
),
"DEFAULT_PAGINATION_CLASS": "main.core.pagination.DefaultVLNPagination",
}
SIMPLE_JWT = {
"ACCESS_TOKEN_LIFETIME": datetime.timedelta(minutes=30),
"REFRESH_TOKEN_LIFETIME": datetime.timedelta(hours=3),
"AUTH_HEADER_TYPES": ("Bearer",),
"ROTATE_REFRESH_TOKENS": True,
}
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"corsheaders.middleware.CorsMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"main.core.middleware.VLNMiddleware", # should be last
]
ROOT_URLCONF = "main.urls"
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
WSGI_APPLICATION = "main.wsgi.application"
CORS_ORIGIN_ALLOW_ALL = True # If this is used then `CORS_ORIGIN_WHITELIST` will not have any effect
CORS_ALLOW_CREDENTIALS = True
CORS_ORIGIN_WHITELIST = [
"http://localhost:4200",
] # If this is used, then not need to use `CORS_ORIGIN_ALLOW_ALL = True`
CORS_ORIGIN_REGEX_WHITELIST = [
"http://localhost:4200",
]
# Database
# https://docs.djangoproject.com/en/3.0/ref/settings/#databases
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql_psycopg2",
"HOST": os.environ.get("DB_HOST", "127.0.0.1"),
"PORT": os.environ.get("DB_PORT", "5432"),
"NAME": os.environ.get("DB_NAME", "voxlane"),
"USER": os.environ.get("DB_USER", "thirdlane"),
"PASSWORD": os.environ.get("DB_PASSWORD", "thirdlane"),
}
}
AUTH_USER_MODEL = "users.User"
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
]
# Password validation
# https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
# Internationalization
# https://docs.djangoproject.com/en/3.0/topics/i18n/
LANGUAGE_CODE = "en"
LANGUAGES = [
("en", "English"),
]
TIME_ZONE = "America/New_York"
USE_I18N = True
USE_L10N = True
USE_TZ = True
LOCALE_PATHS = (os.path.join(os.path.dirname(BASE_DIR), "locale"),)
# computed fields
COMPUTEDFIELDS_MAP = "/computed_fields.map" if DOCKER else None
COMPUTEDFIELDS_ADMIN = DEBUG
# Integrations
# UI_URL = os.environ.get('UI_URL', '')
# Cron Jobs
# CRON_CLASSES = [
# 'integration.crons.ClientCronJob',
# ]
DJANGO_CRON_DELETE_LOGS_OLDER_THAN = 14 # days
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.0/howto/static-files/
STATIC_URL = "/static/"
STATIC_ROOT = os.path.join(os.path.dirname(BASE_DIR), "static")
STATICFILES_DIRS = [
os.path.join(os.path.join(BASE_DIR), "static"),
]
# Mail
EMAIL_HOST = os.environ.get("EMAIL_HOST", "")
EMAIL_HOST_USER = os.environ.get("EMAIL_HOST_USER", "")
EMAIL_HOST_PASSWORD = os.environ.get("EMAIL_HOST_PASSWORD", "")
EMAIL_PORT = os.environ.get("EMAIL_PORT", "25")
EMAIL_USE_TLS = os.environ.get("EMAIL_USE_TLS", "").lower() == "true"
DEFAULT_FROM_EMAIL = os.environ.get("DEFAULT_FROM_EMAIL", f"noreply@{PRIMARY_HOST}")
EMAIL_SUBJECT_PREFIX = os.environ.get("EMAIL_SUBJECT_PREFIX", "[EXEC]")
EMAIL_REDIRECT_ALL_TO = [
email.strip() for email in os.environ.get("EMAIL_REDIRECT_ALL_TO", "").split(",") if email.strip()
]
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
REDIS_DB_CELERY = "0"
REDIS_DB_CACHE = "1"
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_CELERY}"
CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_CELERY}"
CELERY_RESULT_EXPIRES = 3600 # seconds
CELERY_TRACK_STARTED = True
# CELERY_TIMEZONE = TZ
COMMENT_EDITING_LIFETIME_MIN = os.environ.get("COMMENT_EDITING_LIFETIME_MIN", "30")
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_CACHE}",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
},
}
}
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"filters": {
"require_debug_false": {
"()": "django.utils.log.RequireDebugFalse",
},
"require_debug_true": {
"()": "django.utils.log.RequireDebugTrue",
},
},
"handlers": {
"console_debug": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "short",
"filters": ["require_debug_true"],
},
"stdout": {
"level": "WARNING",
"class": "logging.StreamHandler",
"formatter": "verbose",
"filters": ["require_debug_false"],
},
},
"formatters": {
"verbose": {"format": "[%(levelname)s] %(asctime)s %(module)s PID=%(process)d %(message)s"},
"short": {"format": "[%(levelname)s] %(message)s"},
},
"loggers": {
"django.request": {
"handlers": ["stdout", "console_debug"],
"level": "INFO",
"propagate": False,
},
"django.db.backends": {
"level": "DEBUG",
"handlers": ["console_debug"],
"propagate": False,
},
"": {
"handlers": ["stdout", "console_debug"],
"level": "INFO",
"propagate": False,
},
},
}
try:
from .settings_local import *
except ImportError:
pass
"""voxlane-back URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/3.0/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.contrib.auth import views as auth_views
from django.urls import path, include
from main.auth.views import TokenObtainPairViewEx, TokenVerifyViewEx, TokenRefreshViewEx
from main.core.routers import VLNNestedRouter, VLNRouter
from users.views import UserViewSet
router = VLNRouter()
router.register("users", UserViewSet, basename="users")
api_routes = [
path("", include(router.urls)),
path("token/auth", TokenObtainPairViewEx.as_view()),
path("token/refresh", TokenRefreshViewEx.as_view()),
path("token/verify", TokenVerifyViewEx.as_view()),
]
urlpatterns = [
path("api/v1/", include(api_routes)),
path("admin/", admin.site.urls),
path("accounts/login/", auth_views.LoginView.as_view(**{"template_name": "admin/login.html"})),
]
"""
WSGI config for voxlane-back project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/3.0/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
application = get_wsgi_application()
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == "__main__":
main()
from django import forms
from django.contrib import admin
from django.contrib.admin import ModelAdmin, widgets
from django.contrib.admin.sites import NotRegistered
from django.contrib.auth.models import Group
from users.models import User
class UserAdmin(ModelAdmin):
search_fields = (
"username",
"last_name",
"first_name",
)
list_display = (
"username",
"last_name",
"first_name",
)
list_filter = ("is_active",)
readonly_fields = ("username", "is_superuser")
fieldsets = (
(
None,
{
"fields": (
"username",
"last_name",
"first_name",
"email",
"groups",
"is_superuser",
"object_guid",
)
},
),
)
def has_add_permission(self, request):
return False
class PairAdmin(ModelAdmin):
search_fields = (
"user__username",
"user__last_name",
"user__first_name",
"master__username",
"master__last_name",
"master__first_name",
)
list_display = (
"user",
"master",
)
autocomplete_fields = ["user", "master"]
class GroupForm(forms.ModelForm):
name = forms.CharField(disabled=True)
users = forms.ModelMultipleChoiceField(
User.objects.all(), label="Users", required=False, widget=widgets.FilteredSelectMultiple("Users", False)
)
class Meta:
model = Group
exclude = ["permissions"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance.pk:
self.fields["users"].initial = self.instance.user_set.all()
def save(self, *args, **kwargs):
instance = super().save(*args, **kwargs)
self.instance.user_set.set(self.cleaned_data["users"])
return instance
class GroupAdmin(admin.ModelAdmin):
form = GroupForm
search_fields = ("name",)
ordering = ("name",)
def has_delete_permission(self, request, obj=None):
return False
def has_add_permission(self, request):
return False
admin.site.register(User, UserAdmin)
try:
admin.site.unregister(Group)
admin.site.register(Group, GroupAdmin)
except NotRegistered:
pass
from django.apps import AppConfig
from users.constants import APP_LABEL
class UsersConfig(AppConfig):
name = APP_LABEL
verbose_name = "Users"
from main.core.permissions import AutoPermissions
APP_LABEL = "users"
DELETED_USER_NAME = "__deleted"
SYSTEM_USER_NAME = "vln-system"
class UserGroup:
ADMINISTRATOR = "administrator"
MANAGER = "manager"
OPERATOR = "operator"
class UserPermission(AutoPermissions(APP_LABEL, "user")):
class Raw:
IS_MANAGER = "is_manager"
IS_OPERATOR = "is_operator"
IS_MANAGER = ".".join((APP_LABEL, Raw.IS_MANAGER))
IS_OPERATOR = ".".join((APP_LABEL, Raw.IS_OPERATOR))
from django_filters.rest_framework import FilterSet
from django_filters.rest_framework.filters import BaseInFilter, BooleanFilter
from django_filters import NumberFilter
class UserFilter(FilterSet):
groups = BaseInFilter(field_name="groups__name", lookup_expr="in", distinct=True)
masters_of = NumberFilter(field_name="employee_pairs__user_id")
is_active = BooleanFilter(field_name="is_active")
# Generated by Django 3.1.7 on 2021-03-17 20:22
import django.contrib.auth.models
import django.contrib.auth.validators
from django.db import migrations, models
import django.utils.timezone
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = [
("auth", "0012_alter_user_first_name_max_length"),
]
operations = [
migrations.CreateModel(
name="User",
fields=[
("password", models.CharField(max_length=128, verbose_name="password")),
("last_login", models.DateTimeField(blank=True, null=True, verbose_name="last login")),
(
"is_superuser",
models.BooleanField(
default=False,
help_text="Designates that this user has all permissions without explicitly assigning them.",
verbose_name="superuser status",
),
),
(
"username",
models.CharField(
error_messages={"unique": "A user with that username already exists."},
help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.",
max_length=150,
unique=True,
validators=[django.contrib.auth.validators.UnicodeUsernameValidator()],
verbose_name="username",
),
),
("first_name", models.CharField(blank=True, max_length=150, verbose_name="first name")),
("last_name", models.CharField(blank=True, max_length=150, verbose_name="last name")),
("email", models.EmailField(blank=True, max_length=254, verbose_name="email address")),
(
"is_staff",
models.BooleanField(
default=False,
help_text="Designates whether the user can log into this admin site.",
verbose_name="staff status",
),
),
(
"is_active",
models.BooleanField(
default=True,
help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.",
verbose_name="active",
),
),
("date_joined", models.DateTimeField(default=django.utils.timezone.now, verbose_name="date joined")),
("id", models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
(
"groups",
models.ManyToManyField(
blank=True,
help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.",
related_name="user_set",
related_query_name="user",
to="auth.Group",
verbose_name="groups",
),
),
(
"user_permissions",
models.ManyToManyField(
blank=True,
help_text="Specific permissions for this user.",
related_name="user_set",
related_query_name="user",
to="auth.Permission",
verbose_name="user permissions",
),
),
],
options={
"verbose_name": "User",
"verbose_name_plural": "Users",
"permissions": [("is_manager", "MANAGER"), ("is_operator", "OPERATOR")],
},
managers=[
("objects", django.contrib.auth.models.UserManager()),
],
),
]
import uuid
from django.contrib.auth.models import AbstractUser
from django.db import models
from users.constants import UserGroup, UserPermission
class User(AbstractUser):
Group = UserGroup
Permission = UserPermission
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
class Meta:
verbose_name = "User"
verbose_name_plural = "Users"
permissions = [
(UserPermission.Raw.IS_MANAGER, "MANAGER"),
(UserPermission.Raw.IS_OPERATOR, "OPERATOR"),
]
@property
def full_name(self) -> str:
if self.first_name or self.last_name:
return " ".join([self.last_name, self.first_name])
elif self.username:
return self.username
else:
return "Unknown user"
from rest_framework import serializers, fields
from rest_framework.exceptions import ValidationError
from rest_framework.fields import ReadOnlyField
from main.core.serializers import ForeignKeyDeserializerMixin
from users.models import User
class UserGroupsSerializer(fields.Field):
def to_representation(self, value):
return [group.name for group in value.all()]
def to_internal_value(self, data):
if not isinstance(data, list):
raise ValidationError("Groups must be a list of strings")
for name in data:
if not isinstance(name, str):
raise ValidationError("Group name must be a string")
return data
class UserShortSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ("id", "username", "first_name", "last_name", "email")
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
read_only_fields = ("object_guid",)
exclude = ("date_joined", "last_login", "password", "user_permissions")
groups = UserGroupsSerializer(read_only=True)
permissions = serializers.SerializerMethodField()
is_purchase_lead = ReadOnlyField()
is_ko_lead = ReadOnlyField()
is_do_lead = ReadOnlyField()
is_chief_commercial_officer = ReadOnlyField()
is_logistics_lead = ReadOnlyField()
is_service_lead = ReadOnlyField()
is_engineering_lead = ReadOnlyField()
@staticmethod
def get_permissions(user):
return user.get_all_permissions()
class UserShortSerializerWithFKDeserializer(ForeignKeyDeserializerMixin, UserShortSerializer):
pass
import random
import factory
from factory.django import DjangoModelFactory
from faker import Faker
from users.models import User, Pair
fake = Faker()
class UserFactory(DjangoModelFactory):
class Meta:
model = User
username = factory.LazyAttribute(lambda x: f"{fake.user_name()}_{random.randint(0, 99999)}")
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
email = factory.Faker("safe_email")
class PairFactory(DjangoModelFactory):
class Meta:
model = Pair
user = factory.SubFactory(UserFactory)
master = factory.SubFactory(UserFactory)
from django.contrib.auth.models import Group
from django.test import TestCase
from users.constants import UserGroup
from users.tests.factories import UserFactory
class UserTestCase(TestCase):
def test_group_admin_sync(self):
admin_group = Group.objects.get(name=UserGroup.ADMINISTRATOR)
user = UserFactory(username="test", email="test@example.com", password="secret")
self.assertFalse(user.is_superuser)
self.assertFalse(user.is_staff)
user.groups.add(admin_group)
user.refresh_from_db()
self.assertTrue(user.is_superuser)
self.assertTrue(user.is_staff)
user.groups.remove(admin_group)
user.refresh_from_db()
self.assertFalse(user.is_superuser)
self.assertFalse(user.is_staff)
import logging
from django.conf import settings
from main.core.middleware import CurrentRequest
from main.core.utils import get_request_from_stack
from users.constants import DELETED_USER_NAME, SYSTEM_USER_NAME
logger = logging.getLogger(__name__)
def get_permissions_as_dict():
from django.apps import apps
Permission = apps.get_model("auth", "Permission")
all_permissions = Permission.objects.select_related("content_type").all()
return {f"{p.content_type.app_label}.{p.codename}": p for p in all_permissions}
@CurrentRequest.cached
def get_placeholder_for_deleted_user():
from users.models import User
"""
This user should own all resources created by deleted users
:return: User
"""
return User.objects.get(username=DELETED_USER_NAME)
@CurrentRequest.cached
def get_system_user():
from users.models import User
return User.objects.get(username=SYSTEM_USER_NAME)
# @CurrentRequest.cached
# def get_crm_user():
# from users.models import User
# return User.objects.get(username=CRM_USER_NAME)
def get_actor_user():
if CurrentRequest.user:
return CurrentRequest.user
request = get_request_from_stack()
user = getattr(request, "user", None)
if user:
return user
return get_system_user()
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from main.core.permissions import VLNModelPermissions
from main.core.views import VLNModelViewSet
from users.filters import UserFilter
from users.models import User
from users.serializers import UserSerializer, UserShortSerializer
class UserViewSet(VLNModelViewSet):
permission_classes = (VLNModelPermissions,)
serializer_class = UserShortSerializer
filterset_class = UserFilter
queryset = User.objects.all()
ordering = ["-id"]
search_fields = (
"username",
"last_name",
"first_name",
)
@action(methods=("GET",), detail=False, permission_classes=(IsAuthenticated,))
def me(self, request):
serializer = UserSerializer(self.request.user)
return Response(serializer.data)
/app/logs/*.log {
daily
rotate 21
copytruncate
dateext
missingok
notifempty
minsize 1048576
compress
su root root
}
[uwsgi]
chdir=/app
module=main.wsgi:application
env DJANGO_SETTINGS_MODULE=main.settings
master=True
pidfile=/tmp/project-master.pid
processes=5
uid=1000
gid=2000
harakiri=60
max-requests=5000
vacuum=True
single-interpreter=True
enable-threads=True
socket=/sock/django.sock
#socket=0.0.0.0:8001 for a web port socket
chmod-socket = 666
logger = file:/app/logs/uwsgi.log
buffer-size=8192
*/5 * * * * cd /app && python manage.py runcrons
0 0 * * * /usr/sbin/logrotate /etc/logrotate/logrotate.conf -s /app/logs/logrotate.status --verbose
DB_HOST=db
DB_PORT=5432
DB_NAME=voxlane
DB_USER=thirdlane
DB_PASSWORD=thirdlane
#PRIMARY_HOST=execution-dev.evelopers.com
#EMAIL_HOST=192.168.63.227
EMAIL_HOST_USER=
EMAIL_HOST_PASSWORD=
EMAIL_PORT=1025
EMAIL_USE_TLS=False
EMAIL_SUBJECT_PREFIX=[EXEC DEV]
REDIS_HOST=redis
USE_HTTPS=False
ADMINS=apeshkov@thirdlane.com
#ALLOWED_HOSTS=192.168.99.100,execution-dev.evelopers.com,localhost,django,exec
ALLOWED_HOSTS=localhost,django
version: '3.6'
services:
db:
image: postgres:13.2
command: -c fsync=off -c full_page_writes=off -c logging_collector=on -c log_directory=/app/logs/ -c log_filename=postgres.log
volumes:
- pg_data:/var/lib/postgresql/data
- ./logs:/app/logs
environment:
- POSTGRES_USER=thirdlane
- POSTGRES_PASSWORD=thirdlane
- POSTGRES_DB=voxlane
expose:
- 5432/tcp
ports:
- 5432:5432
networks:
- back-net
redis:
image: redis:6.2.1
command: redis-server --logfile /app/logs/redis.log
volumes:
- redis_data:/data
- ./logs:/app/logs
expose:
- 6379/tcp
ports:
- 6379:6379
networks:
- back-net
django:
image: voxlane-back
command: uwsgi --ini /etc/uwsgi/apps-enabled/django.ini
depends_on:
- db
- redis
- migration
ports:
- 8000:8000
env_file:
- ./.env.dev
networks:
- back-net
volumes:
- sock:/sock
- ./logs:/app/logs/
- ./share:/app/share
celery:
image: voxlane-back
command: celery -A main worker --beat --loglevel=INFO --autoscale=3,1 -f /app/logs/celery-%i.log
depends_on:
- db
- redis
- migration
env_file:
- ./.env.dev
networks:
- back-net
volumes:
- ./logs:/app/logs/
- ./share:/app/share
cron:
image: voxlane-back
command: crond -f -L /app/logs/cron.log -l 1
depends_on:
- db
- redis
- migration
volumes:
- ./logs:/app/logs/
- ./share:/app/share
env_file:
- ./.env.dev
networks:
- back-net
migration:
image: voxlane-back
command: sh -c "python /app/manage.py collectstatic --noinput && python /app/manage.py migrate --noinput && python /app/manage.py updatedata"
depends_on:
- db
- redis
env_file:
- ./.env.dev
networks:
- back-net
volumes:
- django_static:/static
- ./logs:/app/logs/
volumes:
pg_data:
redis_data:
sock:
django_static:
networks:
back-net:
name: vln-back
version: '3.6'
services:
db:
image: postgres:13.2
volumes:
- pg_data:/var/lib/postgresql/data
environment:
- POSTGRES_USER=thirdlane
- POSTGRES_PASSWORD=thirdlane
- POSTGRES_DB=voxlane
expose:
- 5432/tcp
ports:
- 5432:5432
mail:
image: mailhog/mailhog
logging:
driver: 'none'
expose:
- 1025/tcp
ports:
- 127.0.0.1:1025:1025
- 8025:8025
redis:
image: redis:6.2.1
volumes:
- redis_data:/data
ports:
- 36379:6379
volumes:
pg_data:
redis_data:
Описываем
[RAML](https://github.com/raml-org/raml-spec/blob/master/versions/raml-10/raml-10.md/)
Компилируем
[raml2html](https://github.com/raml2html/raml2html)
```bash
raml2html -i .\exec-api.raml -o .\exec-api.html
```
DB_HOST=
DB_PORT=
DB_NAME=
DB_USER=
DB_PASSWORD=
This diff is collapsed.
[tool.poetry]
name = "voxlane-back"
version = "0.1.0"
description = ""
authors = ["apeshkov <apeshkov@evelopers.com>"]
repository = "https://gitlab.thirdlane.com/voxlane-app/voxlane-back"
[tool.poetry.dependencies]
python = "3.9"
Django = "3.1.7"
django-computedfields = "0.1.3"
django-cors-headers = "3.7.0"
django-cron = "0.5.1"
django-model-utils = "4.1.1"
django-extensions = "3.1.1"
django-redis = "4.12.1"
djangorestframework = "3.12.2"
djangorestframework-simplejwt = "4.6.0"
drf-nested-routers = "0.93.3"
django-filter = "2.4.0"
django-mptt = "0.12.0"
factory-boy = "3.2.0"
Faker = "6.6.1"
psycopg2 = "2.8.6"
python-dateutil = "2.8.1"
PyJWT = "2.0.1"
pytz = "2021.1"
requests = "2.25.1"
uWSGI = "2.0.19.1"
celery = "5.0.5"
redis = "3.5.3"
pre-commit = "^2.11.1"
[tool.poetry.dev-dependencies]
ipython = "^7.21.0"
[tool.black]
line-length = 120
include = '\.pyi?$'
exclude = '/( \.git | \.venv | docs | static )/'
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
# bleach==3.2.1
Django==3.1.7
django-computedfields==0.1.3
django-cors-headers==3.7.0
django-cron==0.5.1
django-model-utils==4.1.1
django-extensions==3.1.1
django-redis==4.12.1
djangorestframework==3.12.2
djangorestframework-simplejwt==4.6.0
drf-nested-routers==0.93.3
django-filter==2.4.0
django-mptt==0.12.0
factory_boy==3.2.0
# graphviz==0.16
Faker==6.6.1
# openpyxl==3.0.7
psycopg2==2.8.6
python-dateutil==2.8.1
PyJWT==2.0.1
# for windows venv/Scripts/python_ldap-3.2.0-cp38-cp38-win32.whl from https://www.lfd.uci.edu/~gohlke/pythonlibs/#python-ldap
pytz==2021.1
requests==2.25.1
# sqlparse==0.4.1
uwsgi==2.0.19.1
celery==5.0.5
redis==3.5.3
# pika==1.1.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment