services papkasi o'chirildi
This commit is contained in:
0
auth/core/__init__.py
Normal file
0
auth/core/__init__.py
Normal file
0
auth/core/apps/__init__.py
Normal file
0
auth/core/apps/__init__.py
Normal file
0
auth/core/apps/accounts/__init__.py
Normal file
0
auth/core/apps/accounts/__init__.py
Normal file
2
auth/core/apps/accounts/admin/__init__.py
Normal file
2
auth/core/apps/accounts/admin/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .core import * # noqa
|
||||
from .user import * # noqa
|
||||
18
auth/core/apps/accounts/admin/core.py
Normal file
18
auth/core/apps/accounts/admin/core.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""
|
||||
Admin panel register
|
||||
"""
|
||||
|
||||
from django.contrib import admin
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth import models as db_models
|
||||
from django_core.models import SmsConfirm
|
||||
|
||||
from ..admin import user
|
||||
from .user import SmsConfirmAdmin
|
||||
|
||||
admin.site.unregister(db_models.Group)
|
||||
admin.site.register(db_models.Group, user.GroupAdmin)
|
||||
admin.site.register(db_models.Permission, user.PermissionAdmin)
|
||||
|
||||
admin.site.register(get_user_model(), user.CustomUserAdmin)
|
||||
admin.site.register(SmsConfirm, SmsConfirmAdmin)
|
||||
52
auth/core/apps/accounts/admin/user.py
Normal file
52
auth/core/apps/accounts/admin/user.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from django.contrib.auth import admin
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from unfold.admin import ModelAdmin
|
||||
from unfold.forms import AdminPasswordChangeForm # UserCreationForm,
|
||||
from unfold.forms import UserChangeForm
|
||||
|
||||
|
||||
class CustomUserAdmin(admin.UserAdmin, ModelAdmin):
|
||||
change_password_form = AdminPasswordChangeForm
|
||||
# add_form = UserCreationForm
|
||||
form = UserChangeForm
|
||||
list_display = (
|
||||
"first_name",
|
||||
"last_name",
|
||||
"phone",
|
||||
"role",
|
||||
)
|
||||
autocomplete_fields = ["groups", "user_permissions"]
|
||||
fieldsets = ((None, {"fields": ("phone",)}),) + (
|
||||
(None, {"fields": ("username", "password")}),
|
||||
(_("Personal info"), {"fields": ("first_name", "last_name", "email")}),
|
||||
(
|
||||
_("Permissions"),
|
||||
{
|
||||
"fields": (
|
||||
"is_active",
|
||||
"is_staff",
|
||||
"is_superuser",
|
||||
"groups",
|
||||
"user_permissions",
|
||||
"role",
|
||||
),
|
||||
},
|
||||
),
|
||||
(_("Important dates"), {"fields": ("last_login", "date_joined")}),
|
||||
)
|
||||
|
||||
|
||||
class PermissionAdmin(ModelAdmin):
|
||||
list_display = ("name",)
|
||||
search_fields = ("name",)
|
||||
|
||||
|
||||
class GroupAdmin(ModelAdmin):
|
||||
list_display = ["name"]
|
||||
search_fields = ["name"]
|
||||
autocomplete_fields = ("permissions",)
|
||||
|
||||
|
||||
class SmsConfirmAdmin(ModelAdmin):
|
||||
list_display = ["phone", "code", "resend_count", "try_count"]
|
||||
search_fields = ["phone", "code"]
|
||||
9
auth/core/apps/accounts/apps.py
Normal file
9
auth/core/apps/accounts/apps.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class AccountsConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "core.apps.accounts"
|
||||
|
||||
def ready(self):
|
||||
from core.apps.accounts import signals # noqa
|
||||
1
auth/core/apps/accounts/choices/__init__.py
Normal file
1
auth/core/apps/accounts/choices/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .user import * # noqa
|
||||
12
auth/core/apps/accounts/choices/user.py
Normal file
12
auth/core/apps/accounts/choices/user.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from django.db import models
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
|
||||
class RoleChoice(models.TextChoices):
|
||||
"""
|
||||
User Role Choice
|
||||
"""
|
||||
|
||||
SUPERUSER = "superuser", _("Superuser")
|
||||
ADMIN = "admin", _("Admin")
|
||||
USER = "user", _("User")
|
||||
1
auth/core/apps/accounts/managers/__init__.py
Normal file
1
auth/core/apps/accounts/managers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .user import * # noqa
|
||||
23
auth/core/apps/accounts/managers/user.py
Normal file
23
auth/core/apps/accounts/managers/user.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from django.contrib.auth import base_user
|
||||
|
||||
|
||||
class UserManager(base_user.BaseUserManager):
|
||||
def create_user(self, phone, password=None, **extra_fields):
|
||||
if not phone:
|
||||
raise ValueError("The phone number must be set")
|
||||
|
||||
user = self.model(phone=phone, **extra_fields)
|
||||
user.set_password(password)
|
||||
user.save(using=self._db)
|
||||
return user
|
||||
|
||||
def create_superuser(self, phone, password=None, **extra_fields):
|
||||
extra_fields.setdefault("is_staff", True)
|
||||
extra_fields.setdefault("is_superuser", True)
|
||||
|
||||
if extra_fields.get("is_staff") is not True:
|
||||
raise ValueError("Superuser must have is_staff=True.")
|
||||
if extra_fields.get("is_superuser") is not True:
|
||||
raise ValueError("Superuser must have is_superuser=True.")
|
||||
|
||||
return self.create_user(phone, password, **extra_fields)
|
||||
60
auth/core/apps/accounts/migrations/0001_initial.py
Normal file
60
auth/core/apps/accounts/migrations/0001_initial.py
Normal file
@@ -0,0 +1,60 @@
|
||||
# Generated by Django 5.1.3 on 2024-12-13 19:04
|
||||
|
||||
import django.db.models.deletion
|
||||
import django.utils.timezone
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('auth', '0012_alter_user_first_name_max_length'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='User',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('password', models.CharField(max_length=128, verbose_name='password')),
|
||||
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
|
||||
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
|
||||
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
|
||||
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
|
||||
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
|
||||
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
|
||||
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
|
||||
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
|
||||
('phone', models.CharField(max_length=255, unique=True)),
|
||||
('username', models.CharField(blank=True, max_length=255, null=True)),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('updated_at', models.DateTimeField(auto_now=True)),
|
||||
('validated_at', models.DateTimeField(blank=True, null=True)),
|
||||
('role', models.CharField(choices=[('superuser', 'Superuser'), ('admin', 'Admin'), ('user', 'User')], default='user', max_length=255)),
|
||||
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
|
||||
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'user',
|
||||
'verbose_name_plural': 'users',
|
||||
'abstract': False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='ResetToken',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('updated_at', models.DateTimeField(auto_now=True)),
|
||||
('token', models.CharField(max_length=255, unique=True)),
|
||||
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
'verbose_name': 'Reset Token',
|
||||
'verbose_name_plural': 'Reset Tokens',
|
||||
},
|
||||
),
|
||||
]
|
||||
0
auth/core/apps/accounts/migrations/__init__.py
Normal file
0
auth/core/apps/accounts/migrations/__init__.py
Normal file
3
auth/core/apps/accounts/models/__init__.py
Normal file
3
auth/core/apps/accounts/models/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
# isort: skip_file
|
||||
from .user import * # noqa
|
||||
from .reset_token import * # noqa
|
||||
15
auth/core/apps/accounts/models/reset_token.py
Normal file
15
auth/core/apps/accounts/models/reset_token.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.db import models
|
||||
from django_core.models import AbstractBaseModel
|
||||
|
||||
|
||||
class ResetToken(AbstractBaseModel):
|
||||
token = models.CharField(max_length=255, unique=True)
|
||||
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
|
||||
|
||||
def __str__(self):
|
||||
return self.token
|
||||
|
||||
class Meta:
|
||||
verbose_name = "Reset Token"
|
||||
verbose_name_plural = "Reset Tokens"
|
||||
24
auth/core/apps/accounts/models/user.py
Normal file
24
auth/core/apps/accounts/models/user.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from django.contrib.auth import models as auth_models
|
||||
from django.db import models
|
||||
|
||||
from ..choices import RoleChoice
|
||||
from ..managers import UserManager
|
||||
|
||||
|
||||
class User(auth_models.AbstractUser):
|
||||
phone = models.CharField(max_length=255, unique=True)
|
||||
username = models.CharField(max_length=255, null=True, blank=True)
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
updated_at = models.DateTimeField(auto_now=True)
|
||||
validated_at = models.DateTimeField(null=True, blank=True)
|
||||
role = models.CharField(
|
||||
max_length=255,
|
||||
choices=RoleChoice,
|
||||
default=RoleChoice.USER,
|
||||
)
|
||||
|
||||
USERNAME_FIELD = "phone"
|
||||
objects = UserManager()
|
||||
|
||||
def __str__(self):
|
||||
return self.phone
|
||||
1
auth/core/apps/accounts/seeder/__init__.py
Normal file
1
auth/core/apps/accounts/seeder/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .core import * # noqa
|
||||
10
auth/core/apps/accounts/seeder/core.py
Normal file
10
auth/core/apps/accounts/seeder/core.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""
|
||||
Create a new user/superuser
|
||||
"""
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
|
||||
class UserSeeder:
|
||||
def run(self):
|
||||
get_user_model().objects.create_superuser("998888112309", "2309")
|
||||
4
auth/core/apps/accounts/serializers/__init__.py
Normal file
4
auth/core/apps/accounts/serializers/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .auth import * # noqa
|
||||
from .change_password import * # noqa
|
||||
from .set_password import * # noqa
|
||||
from .user import * # noqa
|
||||
59
auth/core/apps/accounts/serializers/auth.py
Normal file
59
auth/core/apps/accounts/serializers/auth.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.translation import gettext as _
|
||||
from rest_framework import exceptions, serializers
|
||||
|
||||
|
||||
class LoginSerializer(serializers.Serializer):
|
||||
username = serializers.CharField(max_length=255)
|
||||
password = serializers.CharField(max_length=255)
|
||||
|
||||
|
||||
class RegisterSerializer(serializers.ModelSerializer):
|
||||
phone = serializers.CharField(max_length=255)
|
||||
|
||||
def validate_phone(self, value):
|
||||
user = get_user_model().objects.filter(phone=value, validated_at__isnull=False)
|
||||
if user.exists():
|
||||
raise exceptions.ValidationError(_("Phone number already registered."), code="unique")
|
||||
return value
|
||||
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
fields = ["first_name", "last_name", "phone", "password"]
|
||||
extra_kwargs = {
|
||||
"first_name": {
|
||||
"required": True,
|
||||
},
|
||||
"last_name": {"required": True},
|
||||
}
|
||||
|
||||
|
||||
class ConfirmSerializer(serializers.Serializer):
|
||||
code = serializers.IntegerField(min_value=1000, max_value=9999)
|
||||
phone = serializers.CharField(max_length=255)
|
||||
|
||||
|
||||
class ResetPasswordSerializer(serializers.Serializer):
|
||||
phone = serializers.CharField(max_length=255)
|
||||
|
||||
def validate_phone(self, value):
|
||||
user = get_user_model().objects.filter(phone=value)
|
||||
if user.exists():
|
||||
return value
|
||||
|
||||
raise serializers.ValidationError(_("User does not exist"))
|
||||
|
||||
|
||||
class ResetConfirmationSerializer(serializers.Serializer):
|
||||
code = serializers.IntegerField(min_value=1000, max_value=9999)
|
||||
phone = serializers.CharField(max_length=255)
|
||||
|
||||
def validate_phone(self, value):
|
||||
user = get_user_model().objects.filter(phone=value)
|
||||
if user.exists():
|
||||
return value
|
||||
raise serializers.ValidationError(_("User does not exist"))
|
||||
|
||||
|
||||
class ResendSerializer(serializers.Serializer):
|
||||
phone = serializers.CharField(max_length=255)
|
||||
6
auth/core/apps/accounts/serializers/change_password.py
Normal file
6
auth/core/apps/accounts/serializers/change_password.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from rest_framework import serializers
|
||||
|
||||
|
||||
class ChangePasswordSerializer(serializers.Serializer):
|
||||
old_password = serializers.CharField(required=True)
|
||||
new_password = serializers.CharField(required=True, min_length=8)
|
||||
6
auth/core/apps/accounts/serializers/set_password.py
Normal file
6
auth/core/apps/accounts/serializers/set_password.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from rest_framework import serializers
|
||||
|
||||
|
||||
class SetPasswordSerializer(serializers.Serializer):
|
||||
password = serializers.CharField()
|
||||
token = serializers.CharField(max_length=255)
|
||||
23
auth/core/apps/accounts/serializers/user.py
Normal file
23
auth/core/apps/accounts/serializers/user.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from rest_framework import serializers
|
||||
|
||||
|
||||
class UserSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
exclude = [
|
||||
"created_at",
|
||||
"updated_at",
|
||||
"password",
|
||||
"groups",
|
||||
"user_permissions"
|
||||
]
|
||||
model = get_user_model()
|
||||
|
||||
|
||||
class UserUpdateSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
fields = [
|
||||
"first_name",
|
||||
"last_name"
|
||||
]
|
||||
1
auth/core/apps/accounts/signals/__init__.py
Normal file
1
auth/core/apps/accounts/signals/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .user import * # noqa
|
||||
10
auth/core/apps/accounts/signals/user.py
Normal file
10
auth/core/apps/accounts/signals/user.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from django.db.models.signals import post_save
|
||||
from django.dispatch import receiver
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
|
||||
@receiver(post_save, sender=get_user_model())
|
||||
def user_signal(sender, created, instance, **kwargs):
|
||||
if created and instance.username is None:
|
||||
instance.username = "U%(id)s" % {"id": 1000 + instance.id}
|
||||
instance.save()
|
||||
0
auth/core/apps/accounts/test/__init__.py
Normal file
0
auth/core/apps/accounts/test/__init__.py
Normal file
116
auth/core/apps/accounts/test/test_auth.py
Normal file
116
auth/core/apps/accounts/test/test_auth.py
Normal file
@@ -0,0 +1,116 @@
|
||||
import logging
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from pydantic import BaseModel
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
from core.apps.accounts.models import ResetToken
|
||||
from django_core.models import SmsConfirm
|
||||
from core.services import SmsService
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
|
||||
class TokenModel(BaseModel):
|
||||
access: str
|
||||
refresh: str
|
||||
|
||||
|
||||
class SmsViewTest(TestCase):
|
||||
def setUp(self):
|
||||
self.client = APIClient()
|
||||
self.phone = "998999999999"
|
||||
self.password = "password"
|
||||
self.code = "1111"
|
||||
self.token = "token"
|
||||
self.user = get_user_model().objects.create_user(
|
||||
phone=self.phone, first_name="John", last_name="Doe", password=self.password
|
||||
)
|
||||
SmsConfirm.objects.create(phone=self.phone, code=self.code)
|
||||
|
||||
def test_reg_view(self):
|
||||
"""Test register view."""
|
||||
data = {
|
||||
"phone": "998999999991",
|
||||
"first_name": "John",
|
||||
"last_name": "Doe",
|
||||
"password": "password",
|
||||
}
|
||||
with patch.object(SmsService, "send_confirm", return_value=True):
|
||||
response = self.client.post(reverse("auth-register"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
||||
self.assertEqual(
|
||||
response.data["data"]["detail"],
|
||||
"Sms %(phone)s raqamiga yuborildi" % {"phone": data["phone"]},
|
||||
)
|
||||
|
||||
def test_confirm_view(self):
|
||||
"""Test confirm view."""
|
||||
data = {"phone": self.phone, "code": self.code}
|
||||
response = self.client.post(reverse("auth-confirm"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
||||
|
||||
def test_invalid_confirm_view(self):
|
||||
"""Test confirm view."""
|
||||
data = {"phone": self.phone, "code": "1112"}
|
||||
response = self.client.post(reverse("auth-confirm"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_reset_confirmation_code_view(self):
|
||||
"""Test reset confirmation code view."""
|
||||
data = {"phone": self.phone, "code": self.code}
|
||||
response = self.client.post(reverse("auth-confirm"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
||||
self.assertIn("token", response.data["data"])
|
||||
|
||||
def test_reset_confirmation_code_view_invalid_code(self):
|
||||
"""Test reset confirmation code view with invalid code."""
|
||||
data = {"phone": self.phone, "code": "123456"}
|
||||
response = self.client.post(reverse("auth-confirm"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def test_reset_set_password_view(self):
|
||||
"""Test reset set password view."""
|
||||
token = ResetToken.objects.create(user=self.user, token=self.token)
|
||||
data = {"token": token.token, "password": "new_password"}
|
||||
response = self.client.post(reverse("reset-password-reset-password-set"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_reset_set_password_view_invalid_token(self):
|
||||
"""Test reset set password view with invalid token."""
|
||||
token = "test_token"
|
||||
data = {"token": token, "password": "new_password"}
|
||||
with patch.object(get_user_model().objects, "filter", return_value=get_user_model().objects.none()):
|
||||
response = self.client.post(reverse("reset-password-reset-password-set"), data=data)
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(response.data["data"]["detail"], "Invalid token")
|
||||
|
||||
def test_resend_view(self):
|
||||
"""Test resend view."""
|
||||
data = {"phone": self.phone}
|
||||
response = self.client.post(reverse("auth-resend"), data=data)
|
||||
logging.error(response.json())
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_reset_password_view(self):
|
||||
"""Test reset password view."""
|
||||
data = {"phone": self.phone}
|
||||
response = self.client.post(reverse("reset-password-reset-password"), data=data)
|
||||
logging.error(response.json())
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_me_view(self):
|
||||
"""Test me view."""
|
||||
self.client.force_authenticate(user=self.user)
|
||||
response = self.client.get(reverse("me-me"))
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_me_update_view(self):
|
||||
"""Test me update view."""
|
||||
self.client.force_authenticate(user=self.user)
|
||||
data = {"first_name": "Updated"}
|
||||
response = self.client.patch(reverse("me-user-update"), data=data)
|
||||
logging.error(response.json())
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
58
auth/core/apps/accounts/test/test_change_password.py
Normal file
58
auth/core/apps/accounts/test/test_change_password.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from core.apps.accounts.serializers import ChangePasswordSerializer
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APIClient
|
||||
|
||||
|
||||
class ChangePasswordViewTest(TestCase):
|
||||
def setUp(self):
|
||||
self.client = APIClient()
|
||||
self.phone = "9981111111"
|
||||
self.password = "12345670"
|
||||
self.path = reverse("change-password-change-password")
|
||||
|
||||
self.user = get_user_model().objects.create_user(
|
||||
phone=self.phone, password=self.password, email="test@example.com"
|
||||
)
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
def test_change_password_success(self):
|
||||
data = {
|
||||
"old_password": self.password,
|
||||
"new_password": "newpassword",
|
||||
}
|
||||
response = self.client.post(self.path, data=data, format="json")
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['data']["detail"], "password changed successfully")
|
||||
self.assertTrue(self.user.check_password("newpassword"))
|
||||
|
||||
def test_change_password_invalid_old_password(self):
|
||||
data = {
|
||||
"old_password": "wrongpassword",
|
||||
"new_password": "newpassword",
|
||||
}
|
||||
response = self.client.post(self.path, data=data, format="json")
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
self.assertEqual(response.data['data']["detail"], "invalida password")
|
||||
|
||||
def test_change_password_serializer_validation(self):
|
||||
data = {
|
||||
"old_password": self.password,
|
||||
"new_password": "newpassword",
|
||||
}
|
||||
serializer = ChangePasswordSerializer(data=data)
|
||||
self.assertTrue(serializer.is_valid())
|
||||
|
||||
data = {
|
||||
"old_password": self.password,
|
||||
"new_password": "123",
|
||||
}
|
||||
serializer = ChangePasswordSerializer(data=data)
|
||||
self.assertFalse(serializer.is_valid())
|
||||
|
||||
def test_change_password_view_permissions(self):
|
||||
self.client.force_authenticate(user=None)
|
||||
response = self.client.post(self.path, data={}, format="json")
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
26
auth/core/apps/accounts/urls.py
Normal file
26
auth/core/apps/accounts/urls.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""
|
||||
Accounts app urls
|
||||
"""
|
||||
|
||||
from django.urls import path, include
|
||||
from rest_framework_simplejwt import views as jwt_views
|
||||
from .views import RegisterView, ResetPasswordView, MeView, ChangePasswordView
|
||||
from rest_framework.routers import DefaultRouter
|
||||
|
||||
router = DefaultRouter()
|
||||
router.register("", RegisterView, basename="auth")
|
||||
router.register("", ResetPasswordView, basename="reset-password")
|
||||
router.register("", MeView, basename="me")
|
||||
router.register("", ChangePasswordView, basename="change-password")
|
||||
|
||||
|
||||
urlpatterns = [
|
||||
path("", include(router.urls)),
|
||||
path("token/", jwt_views.TokenObtainPairView.as_view(), name="token_obtain_pair"),
|
||||
path("token/verify/", jwt_views.TokenVerifyView.as_view(), name="token_verify"),
|
||||
path(
|
||||
"token/refresh/",
|
||||
jwt_views.TokenRefreshView.as_view(),
|
||||
name="token_refresh",
|
||||
),
|
||||
]
|
||||
1
auth/core/apps/accounts/views/__init__.py
Normal file
1
auth/core/apps/accounts/views/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .auth import * # noqa
|
||||
209
auth/core/apps/accounts/views/auth.py
Normal file
209
auth/core/apps/accounts/views/auth.py
Normal file
@@ -0,0 +1,209 @@
|
||||
import uuid
|
||||
from typing import Type
|
||||
|
||||
from core.services import UserService, SmsService
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_core import exceptions
|
||||
from drf_spectacular.utils import extend_schema
|
||||
from rest_framework import status, throttling, request
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.viewsets import GenericViewSet
|
||||
from django_core.mixins import BaseViewSetMixin
|
||||
from rest_framework.decorators import action
|
||||
from ..serializers import (
|
||||
RegisterSerializer,
|
||||
ConfirmSerializer,
|
||||
ResendSerializer,
|
||||
ResetPasswordSerializer,
|
||||
ResetConfirmationSerializer,
|
||||
SetPasswordSerializer,
|
||||
UserSerializer,
|
||||
UserUpdateSerializer,
|
||||
)
|
||||
from rest_framework.permissions import AllowAny
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from drf_spectacular.utils import OpenApiResponse
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from ..serializers import ChangePasswordSerializer
|
||||
|
||||
from .. import models
|
||||
|
||||
|
||||
@extend_schema(tags=["register"])
|
||||
class RegisterView(BaseViewSetMixin, GenericViewSet, UserService):
|
||||
throttle_classes = [throttling.UserRateThrottle]
|
||||
permission_classes = [AllowAny]
|
||||
|
||||
def get_serializer_class(self):
|
||||
match self.action:
|
||||
case "register":
|
||||
return RegisterSerializer
|
||||
case "confirm":
|
||||
return ConfirmSerializer
|
||||
case "resend":
|
||||
return ResendSerializer
|
||||
case _:
|
||||
return RegisterSerializer
|
||||
|
||||
@action(methods=["POST"], detail=False, url_path="register")
|
||||
def register(self, request):
|
||||
ser = self.get_serializer(data=request.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
data = ser.data
|
||||
phone = data.get("phone")
|
||||
# Create pending user
|
||||
self.create_user(phone, data.get("first_name"), data.get("last_name"), data.get("password"))
|
||||
self.send_confirmation(phone) # Send confirmation code for sms eskiz.uz
|
||||
return Response(
|
||||
{"detail": _("Sms %(phone)s raqamiga yuborildi") % {"phone": phone}},
|
||||
status=status.HTTP_202_ACCEPTED,
|
||||
)
|
||||
|
||||
@extend_schema(summary="Auth confirm.", description="Auth confirm user.")
|
||||
@action(methods=["POST"], detail=False, url_path="confirm")
|
||||
def confirm(self, request):
|
||||
ser = self.get_serializer(data=request.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
data = ser.data
|
||||
phone, code = data.get("phone"), data.get("code")
|
||||
try:
|
||||
if SmsService.check_confirm(phone, code=code):
|
||||
token = self.validate_user(get_user_model().objects.filter(phone=phone).first())
|
||||
return Response(
|
||||
data={
|
||||
"detail": _("Tasdiqlash ko'di qabul qilindi"),
|
||||
"token": token,
|
||||
},
|
||||
status=status.HTTP_202_ACCEPTED,
|
||||
)
|
||||
except exceptions.SmsException as e:
|
||||
raise PermissionDenied(e) # Response exception for APIException
|
||||
except Exception as e:
|
||||
raise PermissionDenied(e) # Api exception for APIException
|
||||
|
||||
@action(methods=["POST"], detail=False, url_path="resend")
|
||||
def resend(self, rq: Type[request.Request]):
|
||||
ser = self.get_serializer(data=rq.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
phone = ser.data.get("phone")
|
||||
self.send_confirmation(phone)
|
||||
return Response({"detail": _("Sms %(phone)s raqamiga yuborildi") % {"phone": phone}})
|
||||
|
||||
|
||||
@extend_schema(tags=["reset-password"])
|
||||
class ResetPasswordView(BaseViewSetMixin, GenericViewSet, UserService):
|
||||
permission_classes = [AllowAny]
|
||||
|
||||
def get_serializer_class(self):
|
||||
match self.action:
|
||||
case "reset_password":
|
||||
return ResetPasswordSerializer
|
||||
case "reset_confirm":
|
||||
return ResetConfirmationSerializer
|
||||
case "reset_password_set":
|
||||
return SetPasswordSerializer
|
||||
case _:
|
||||
return None
|
||||
|
||||
@action(methods=["POST"], detail=False, url_path="reset-password")
|
||||
def reset_password(self, request):
|
||||
ser = self.get_serializer(data=request.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
phone = ser.data.get("phone")
|
||||
self.send_confirmation(phone)
|
||||
return Response({"detail": _("Sms %(phone)s raqamiga yuborildi") % {"phone": phone}})
|
||||
|
||||
@action(methods=["POST"], detail=False, url_path="reset-password-confirm")
|
||||
def reset_confirm(self, request):
|
||||
ser = self.get_serializer(data=request.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
|
||||
data = ser.data
|
||||
code, phone = data.get("code"), data.get("phone")
|
||||
try:
|
||||
SmsService.check_confirm(phone, code)
|
||||
token = models.ResetToken.objects.create(
|
||||
user=get_user_model().objects.filter(phone=phone).first(),
|
||||
token=str(uuid.uuid4()),
|
||||
)
|
||||
return Response(
|
||||
data={
|
||||
"token": token.token,
|
||||
"created_at": token.created_at,
|
||||
"updated_at": token.updated_at,
|
||||
},
|
||||
status=status.HTTP_200_OK,
|
||||
)
|
||||
except exceptions.SmsException as e:
|
||||
raise PermissionDenied(str(e))
|
||||
except Exception as e:
|
||||
raise PermissionDenied(str(e))
|
||||
|
||||
@action(methods=["POST"], detail=False, url_path="reset-password-set")
|
||||
def reset_password_set(self, request):
|
||||
ser = self.get_serializer(data=request.data)
|
||||
ser.is_valid(raise_exception=True)
|
||||
data = ser.data
|
||||
token = data.get("token")
|
||||
password = data.get("password")
|
||||
token = models.ResetToken.objects.filter(token=token)
|
||||
if not token.exists():
|
||||
raise PermissionDenied(_("Invalid token"))
|
||||
phone = token.first().user.phone
|
||||
token.delete()
|
||||
self.change_password(phone, password)
|
||||
return Response({"detail": _("password updated")}, status=status.HTTP_200_OK)
|
||||
|
||||
|
||||
@extend_schema(tags=["me"])
|
||||
class MeView(BaseViewSetMixin, GenericViewSet, UserService):
|
||||
permission_classes = [IsAuthenticated]
|
||||
|
||||
def get_serializer_class(self):
|
||||
match self.action:
|
||||
case "me":
|
||||
return UserSerializer
|
||||
case "user_update":
|
||||
return UserUpdateSerializer
|
||||
case _:
|
||||
return None
|
||||
|
||||
@action(methods=["GET", "OPTIONS"], detail=False, url_path="me")
|
||||
def me(self, request):
|
||||
return Response(self.get_serializer(request.user).data)
|
||||
|
||||
@action(methods=["PATCH", "PUT"], detail=False, url_path="user-update")
|
||||
def user_update(self, request):
|
||||
ser = self.get_serializer(instance=request.user, data=request.data, partial=True)
|
||||
ser.is_valid(raise_exception=True)
|
||||
ser.save()
|
||||
return Response({"detail": _("Malumotlar yangilandi")})
|
||||
|
||||
|
||||
@extend_schema(tags=["change-password"], description="Parolni o'zgartirish uchun")
|
||||
class ChangePasswordView(BaseViewSetMixin, GenericViewSet):
|
||||
serializer_class = ChangePasswordSerializer
|
||||
permission_classes = (IsAuthenticated,)
|
||||
|
||||
@extend_schema(
|
||||
request=serializer_class,
|
||||
responses={200: OpenApiResponse(ChangePasswordSerializer)},
|
||||
summary="Change user password.",
|
||||
description="Change password of the authenticated user.",
|
||||
)
|
||||
@action(methods=["POST"], detail=False, url_path="change-password")
|
||||
def change_password(self, request, *args, **kwargs):
|
||||
user = self.request.user
|
||||
serializer = self.get_serializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
if user.check_password(request.data["old_password"]):
|
||||
user.password = make_password(request.data["new_password"])
|
||||
user.save()
|
||||
return Response(
|
||||
data={"detail": "password changed successfully"},
|
||||
status=status.HTTP_200_OK,
|
||||
)
|
||||
raise PermissionDenied(_("invalida password"))
|
||||
2
auth/core/apps/logs/.gitignore
vendored
Normal file
2
auth/core/apps/logs/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
||||
3
auth/core/services/__init__.py
Normal file
3
auth/core/services/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .otp import * # noqa
|
||||
from .sms import * # noqa
|
||||
from .user import * # noqa
|
||||
135
auth/core/services/otp.py
Normal file
135
auth/core/services/otp.py
Normal file
@@ -0,0 +1,135 @@
|
||||
import requests
|
||||
from config.env import env
|
||||
|
||||
|
||||
class ConsoleService:
|
||||
|
||||
def __init__(self) -> None: ...
|
||||
|
||||
def send_sms(self, phone_number, message):
|
||||
print(phone_number, message)
|
||||
|
||||
|
||||
class EskizService:
|
||||
GET = "GET"
|
||||
POST = "POST"
|
||||
PATCH = "PATCH"
|
||||
CONTACT = "contact"
|
||||
|
||||
def __init__(self, api_url=None, email=None, password=None, callback_url=None):
|
||||
self.api_url = api_url or env("SMS_API_URL")
|
||||
self.email = email or env("SMS_LOGIN")
|
||||
self.password = password or env("SMS_PASSWORD")
|
||||
self.callback_url = callback_url
|
||||
self.headers = {}
|
||||
|
||||
self.methods = {
|
||||
"auth_user": "auth/user",
|
||||
"auth_login": "auth/login",
|
||||
"auth_refresh": "auth/refresh",
|
||||
"send_message": "message/sms/send",
|
||||
}
|
||||
|
||||
def request(self, api_path, data=None, method=None, headers=None):
|
||||
incoming_data = {"status": "error"}
|
||||
|
||||
try:
|
||||
response = requests.request(
|
||||
method,
|
||||
f"{self.api_url}/{api_path}",
|
||||
data=data,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if api_path == self.methods["auth_refresh"]:
|
||||
if response.status_code == 200:
|
||||
incoming_data["status"] = "success"
|
||||
else:
|
||||
incoming_data = response.json()
|
||||
except requests.RequestException as error:
|
||||
raise Exception(str(error))
|
||||
|
||||
return incoming_data
|
||||
|
||||
def auth(self):
|
||||
data = {"email": self.email, "password": self.password}
|
||||
|
||||
return self.request(self.methods["auth_login"], data=data, method=self.POST)
|
||||
|
||||
def refresh_token(self):
|
||||
token = self.auth()["data"]["token"]
|
||||
self.headers["Authorization"] = "Bearer " + token
|
||||
|
||||
context = {
|
||||
"headers": self.headers,
|
||||
"method": self.PATCH,
|
||||
"api_path": self.methods["auth_refresh"],
|
||||
}
|
||||
|
||||
return self.request(
|
||||
context["api_path"],
|
||||
method=context["method"],
|
||||
headers=context["headers"],
|
||||
)
|
||||
|
||||
def get_my_user_info(self):
|
||||
token = self.auth()["data"]["token"]
|
||||
self.headers["Authorization"] = "Bearer " + token
|
||||
|
||||
data = {
|
||||
"headers": self.headers,
|
||||
"method": self.GET,
|
||||
"api_path": self.methods["auth_user"],
|
||||
}
|
||||
|
||||
return self.request(data["api_path"], method=data["method"], headers=data["headers"])
|
||||
|
||||
def add_sms_contact(self, first_name, phone_number, group):
|
||||
token = self.auth()["data"]["token"]
|
||||
self.headers["Authorization"] = "Bearer " + token
|
||||
|
||||
data = {
|
||||
"name": first_name,
|
||||
"email": self.email,
|
||||
"group": group,
|
||||
"mobile_phone": phone_number,
|
||||
}
|
||||
|
||||
context = {
|
||||
"headers": self.headers,
|
||||
"method": self.POST,
|
||||
"api_path": self.CONTACT,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
return self.request(
|
||||
context["api_path"],
|
||||
data=context["data"],
|
||||
method=context["method"],
|
||||
headers=context["headers"],
|
||||
)
|
||||
|
||||
def send_sms(self, phone_number, message):
|
||||
token = self.auth()["data"]["token"]
|
||||
self.headers["Authorization"] = "Bearer " + token
|
||||
|
||||
data = {
|
||||
"from": 4546,
|
||||
"mobile_phone": phone_number,
|
||||
"callback_url": self.callback_url,
|
||||
"message": message,
|
||||
}
|
||||
|
||||
context = {
|
||||
"headers": self.headers,
|
||||
"method": self.POST,
|
||||
"api_path": self.methods["send_message"],
|
||||
"data": data,
|
||||
}
|
||||
|
||||
return self.request(
|
||||
context["api_path"],
|
||||
data=context["data"],
|
||||
method=context["method"],
|
||||
headers=context["headers"],
|
||||
)
|
||||
57
auth/core/services/sms.py
Normal file
57
auth/core/services/sms.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django_core import exceptions, models, tasks
|
||||
|
||||
|
||||
class SmsService:
|
||||
@staticmethod
|
||||
def send_confirm(phone):
|
||||
# TODO: Deploy this change when deploying -> code = random.randint(1000, 9999) # noqa
|
||||
code = 1111
|
||||
|
||||
sms_confirm, status = models.SmsConfirm.objects.get_or_create(phone=phone, defaults={"code": code})
|
||||
|
||||
sms_confirm.sync_limits()
|
||||
|
||||
if sms_confirm.resend_unlock_time is not None:
|
||||
expired = sms_confirm.interval(sms_confirm.resend_unlock_time)
|
||||
exception = exceptions.SmsException(f"Resend blocked, try again in {expired}", expired=expired)
|
||||
raise exception
|
||||
|
||||
sms_confirm.code = code
|
||||
sms_confirm.try_count = 0
|
||||
sms_confirm.resend_count += 1
|
||||
sms_confirm.phone = phone
|
||||
sms_confirm.expired_time = datetime.now() + timedelta(seconds=models.SmsConfirm.SMS_EXPIRY_SECONDS) # noqa
|
||||
sms_confirm.resend_unlock_time = datetime.now() + timedelta(
|
||||
seconds=models.SmsConfirm.SMS_EXPIRY_SECONDS
|
||||
) # noqa
|
||||
sms_confirm.save()
|
||||
|
||||
tasks.SendConfirm.delay(phone, code)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def check_confirm(phone, code):
|
||||
sms_confirm = models.SmsConfirm.objects.filter(phone=phone).first()
|
||||
|
||||
if sms_confirm is None:
|
||||
raise exceptions.SmsException("Invalid confirmation code")
|
||||
|
||||
sms_confirm.sync_limits()
|
||||
|
||||
if sms_confirm.is_expired():
|
||||
raise exceptions.SmsException("Time for confirmation has expired")
|
||||
|
||||
if sms_confirm.is_block():
|
||||
expired = sms_confirm.interval(sms_confirm.unlock_time)
|
||||
raise exceptions.SmsException(f"Try again in {expired}")
|
||||
|
||||
if sms_confirm.code == code:
|
||||
sms_confirm.delete()
|
||||
return True
|
||||
|
||||
sms_confirm.try_count += 1
|
||||
sms_confirm.save()
|
||||
|
||||
raise exceptions.SmsException("Invalid confirmation code")
|
||||
64
auth/core/services/user.py
Normal file
64
auth/core/services/user.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from datetime import datetime
|
||||
|
||||
from core.services import sms
|
||||
from django.contrib.auth import get_user_model, hashers
|
||||
from django.utils.translation import gettext as _
|
||||
from django_core import exceptions
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework_simplejwt import tokens
|
||||
|
||||
|
||||
class UserService(sms.SmsService):
|
||||
def get_token(self, user):
|
||||
refresh = tokens.RefreshToken.for_user(user)
|
||||
|
||||
return {
|
||||
"refresh": str(refresh),
|
||||
"access": str(refresh.access_token),
|
||||
}
|
||||
|
||||
def create_user(self, phone, first_name, last_name, password):
|
||||
get_user_model().objects.update_or_create(
|
||||
phone=phone,
|
||||
defaults={
|
||||
"phone": phone,
|
||||
"first_name": first_name,
|
||||
"last_name": last_name,
|
||||
"password": hashers.make_password(password),
|
||||
},
|
||||
)
|
||||
|
||||
def send_confirmation(self, phone) -> bool:
|
||||
try:
|
||||
self.send_confirm(phone)
|
||||
return True
|
||||
except exceptions.SmsException as e:
|
||||
raise PermissionDenied(_("Qayta sms yuborish uchun kuting: {}").format(e.kwargs.get("expired")))
|
||||
except Exception:
|
||||
raise PermissionDenied(_("Serverda xatolik yuz berdi"))
|
||||
|
||||
def validate_user(self, user) -> dict:
|
||||
"""
|
||||
Create user if user not found
|
||||
"""
|
||||
if user.validated_at is None:
|
||||
user.validated_at = datetime.now()
|
||||
user.save()
|
||||
token = self.get_token(user)
|
||||
return token
|
||||
|
||||
def is_validated(self, user) -> bool:
|
||||
"""
|
||||
User is validated check
|
||||
"""
|
||||
if user.validated_at is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
def change_password(self, phone, password):
|
||||
"""
|
||||
Change password
|
||||
"""
|
||||
user = get_user_model().objects.filter(phone=phone).first()
|
||||
user.set_password(password)
|
||||
user.save()
|
||||
3
auth/core/utils/__init__.py
Normal file
3
auth/core/utils/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .cache import * # noqa
|
||||
from .console import * # noqa
|
||||
from .core import * # noqa
|
||||
18
auth/core/utils/cache.py
Normal file
18
auth/core/utils/cache.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import hashlib
|
||||
|
||||
from django.core.cache import cache
|
||||
|
||||
from config.env import env
|
||||
|
||||
|
||||
class Cache:
|
||||
def remember(self, func, key: str, timeout=None, *args, **kwargs):
|
||||
cache_enabled = env.bool("CACHE_ENABLED")
|
||||
key = hashlib.md5(key.encode("utf-8")).hexdigest()
|
||||
response = cache.get(key)
|
||||
if not cache_enabled:
|
||||
return func(*args, **kwargs)
|
||||
elif response is None:
|
||||
response = func(*args, **kwargs)
|
||||
cache.set(key, response, env.int("CACHE_TIME") if timeout is None else timeout)
|
||||
return response
|
||||
79
auth/core/utils/console.py
Normal file
79
auth/core/utils/console.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Union
|
||||
|
||||
from django.conf import settings
|
||||
from django.core import management
|
||||
|
||||
|
||||
class Console(management.BaseCommand):
|
||||
"""
|
||||
Console logging class
|
||||
"""
|
||||
|
||||
def get_stdout(self):
|
||||
base_command = management.BaseCommand()
|
||||
return base_command.stdout
|
||||
|
||||
def get_style(self):
|
||||
base_command = management.BaseCommand()
|
||||
return base_command.style
|
||||
|
||||
def success(self, message):
|
||||
logging.debug(message)
|
||||
self.get_stdout().write(self.get_style().SUCCESS(message))
|
||||
|
||||
def error(self, message):
|
||||
logging.error(message)
|
||||
self.get_stdout().write(self.get_style().ERROR(message))
|
||||
|
||||
def log(self, message):
|
||||
self.get_stdout().write(
|
||||
self.get_style().ERROR(
|
||||
"\n{line}\n{message}\n{line}\n".format(
|
||||
message=message, line="=" * len(message)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class BaseMake(management.BaseCommand):
|
||||
path: str
|
||||
|
||||
def __init__(self, *args, **options):
|
||||
super().__init__(*args, **options)
|
||||
self.console = Console()
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument("name")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
name = options.get("name")
|
||||
if name is None:
|
||||
name = ""
|
||||
|
||||
stub = open(os.path.join(settings.BASE_DIR, f"resources/stub/{self.path}.stub"))
|
||||
data: Union[Any] = stub.read()
|
||||
stub.close()
|
||||
|
||||
stub = data.replace("{{name}}", name or "")
|
||||
|
||||
|
||||
core_http_path = os.path.join(settings.BASE_DIR, "core/http")
|
||||
if os.path.exists(
|
||||
os.path.join(core_http_path, f"{self.path}/{name.lower()}.py")
|
||||
): # noqa
|
||||
self.console.error(f"{self.name} already exists")
|
||||
return
|
||||
|
||||
if not os.path.exists(os.path.join(core_http_path, self.path)):
|
||||
os.makedirs(os.path.join(core_http_path, self.path))
|
||||
|
||||
file = open(
|
||||
os.path.join(core_http_path, f"{self.path}/{name.lower()}.py"),
|
||||
"w+",
|
||||
)
|
||||
file.write(stub) # type: ignore
|
||||
file.close()
|
||||
|
||||
self.console.success(f"{self.name} created")
|
||||
6
auth/core/utils/core.py
Normal file
6
auth/core/utils/core.py
Normal file
@@ -0,0 +1,6 @@
|
||||
class Helper:
|
||||
"""
|
||||
Helper class to handle index
|
||||
"""
|
||||
|
||||
pass
|
||||
33
auth/core/utils/storage.py
Normal file
33
auth/core/utils/storage.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from typing import Optional, Union
|
||||
|
||||
from config.env import env
|
||||
|
||||
|
||||
class Storage:
|
||||
|
||||
storages = ["AWS", "MINIO", "FILE", "STATIC"]
|
||||
|
||||
def __init__(self, storage: Union[str], storage_type: Union[str] = "default") -> None:
|
||||
self.storage = storage
|
||||
self.sorage_type = storage_type
|
||||
if storage not in self.storages:
|
||||
raise ValueError(f"Invalid storage type: {storage}")
|
||||
|
||||
def get_backend(self) -> Optional[str]:
|
||||
match self.storage:
|
||||
case "AWS" | "MINIO":
|
||||
return "storages.backends.s3boto3.S3Boto3Storage"
|
||||
case "FILE":
|
||||
return "django.core.files.storage.FileSystemStorage"
|
||||
case "STATIC":
|
||||
return "django.contrib.staticfiles.storage.StaticFilesStorage"
|
||||
|
||||
def get_options(self) -> Optional[str]:
|
||||
match self.storage:
|
||||
case "AWS" | "MINIO":
|
||||
if self.sorage_type == "default":
|
||||
return {"bucket_name": env.str("STORAGE_BUCKET_MEDIA")}
|
||||
elif self.sorage_type == "static":
|
||||
return {"bucket_name": env.str("STORAGE_BUCKET_STATIC")}
|
||||
case _:
|
||||
return {}
|
||||
Reference in New Issue
Block a user