Files
ibapp-backend/core/apps/notifications/utils/send_notification.py
behruz-dev bbdfd9abcf fix
2025-10-30 17:40:04 +05:00

60 lines
1.8 KiB
Python

import requests
from firebase_admin import messaging
from core.apps.notifications.models import Notification
from core.apps.notifications.tasks.create_notification_history import create_history
EXPO_API_URL = "https://exp.host/--/api/v2/push/send"
def send_notification(token, title, body, data=None):
tokens = list(Notification.objects.exclude(token=token).values_list("token", flat=True))
users = list(Notification.objects.exclude(token=token).values_list("user", flat=True))
create_history.delay(users, title, body, data)
if not tokens:
return {"error": "No tokens found"}
messages = [
{
"to": token,
"sound": "default",
"title": title,
"body": body,
"data": data or {},
}
for token in tokens
]
chunk_size = 100
results = []
for i in range(0, len(messages), chunk_size):
chunk = messages[i:i + chunk_size]
response = requests.post(EXPO_API_URL, json=chunk, headers={"Content-Type": "application/json"})
try:
results.append(response.json())
except Exception:
results.append({"error": "Invalid JSON response"})
return results
def send_web_notification(token, title, body, data=None):
tokens = list(Notification.objects.exclude(token=token).values_list('token', flat=True))
users = list(Notification.objects.exclude(token=token).values_list("user", flat=True))
user_ids = []
for user in users:
user_ids.append(user.id)
create_history.delay(user_ids, title, body, data)
if not tokens:
return
message = messaging.MulticastMessage(
notification=messaging.Notification(
title=title,
body=body
),
data=data or {},
tokens=tokens,
)
response = messaging.send_each_for_multicast(message)