import requests from firebase_admin import messaging from core.apps.notifications.models import Notification from core.apps.notifications.tasks.create_notification_history import create_history EXPO_API_URL = "https://exp.host/--/api/v2/push/send" def send_notification(token, title, body, data=None): tokens = list(Notification.objects.exclude(token=token).values_list("token", flat=True)) users = list(Notification.objects.exclude(token=token).values_list("user", flat=True)) create_history.delay(users, title, body, data) if not tokens: return {"error": "No tokens found"} messages = [ { "to": token, "sound": "default", "title": title, "body": body, "data": data or {}, } for token in tokens ] chunk_size = 100 results = [] for i in range(0, len(messages), chunk_size): chunk = messages[i:i + chunk_size] response = requests.post(EXPO_API_URL, json=chunk, headers={"Content-Type": "application/json"}) try: results.append(response.json()) except Exception: results.append({"error": "Invalid JSON response"}) return results def send_web_notification(token, title, body, data=None): tokens = list(Notification.objects.exclude(token=token).values_list('token', flat=True)) users = list(Notification.objects.exclude(token=token).values_list("user", flat=True)) create_history.delay(users, title, body, data) if not tokens: return message = messaging.MulticastMessage( notification=messaging.Notification( title=title, body=body ), data=data or {}, tokens=tokens, ) response = messaging.send_each_for_multicast(message)