# This file is part of photo21 # Copyright (C) 2021-2022 Amicale des élèves de l'ENS Paris-Saclay # SPDX-License-Identifier: GPL-3.0-or-later import os import zipfile from pathlib import Path from django.contrib import messages from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin from django.core.mail import mail_admins from django.conf import settings from django.db import transaction from django.http import HttpResponse from django.http import JsonResponse from django.shortcuts import get_object_or_404, redirect from django.urls import reverse, reverse_lazy from django.utils.text import slugify from django.views import View from django.views.generic.dates import ArchiveIndexView, YearArchiveView from django.views.generic.detail import DetailView from django.views.generic.edit import DeleteView, FormView from django.contrib.auth import get_user_model from django.db.models import F, Q from .forms import UploadForm from .models import Gallery, Photo, Tag, Video from .utils import generate_video_thumbnail, is_photo, is_video MEDIA_MODELS = {"photo": Photo, "video": Video} class ModelFromUrlMixin: """Sets self.model from the URL kwarg using MEDIA_MODELS registry.""" def setup(self, request, *args, **kwargs): super().setup(request, *args, **kwargs) model_name = kwargs.get("model_name") self.model = MEDIA_MODELS.get(model_name) if self.model is None: from django.http import Http404 raise Http404 def unique_slug(model, title): base = slugify(title) slug, counter = base, 2 while model.objects.filter(slug=slug).exists(): slug = f"{base}-{counter}" counter += 1 return slug # Cette ligne renvoie le modèle d'utilisateur actif (le natif ou le vôtre) User = get_user_model() class GalleryDateView(LoginRequiredMixin): model = Gallery date_field = "date_start" allow_empty = True # Do not 404 if no galleries def get_queryset(self): """Hide galleries with no public media""" qs = super().get_queryset() if self.request.user.is_staff: return qs else: return qs.filter( Q(photos__is_public=True) | Q(videos__is_public=True) ).distinct() def get_context_data(self, **kwargs): """Always show all years in archive""" context = super().get_context_data(**kwargs) context["date_list"] = self.get_queryset().dates( self.date_field, "year", "DESC" ) return context class GalleryArchiveIndexView(GalleryDateView, ArchiveIndexView): pass class GalleryYearArchiveView(GalleryDateView, YearArchiveView): make_object_list = True class PhotoDetailView(LoginRequiredMixin, DetailView): model = Photo def get_queryset(self): """Non-staff members only see public photos""" qs = super().get_queryset() if self.request.user.is_staff: return qs else: return qs.filter(is_public=True) class MediaDeleteView(ModelFromUrlMixin, LoginRequiredMixin, DeleteView): template_name = "photologue/media_confirm_delete.html" def get_object(self, queryset=None): obj = super().get_object(queryset) delete_perm = f"photologue.delete_{self.model._meta.model_name}" if obj.owner != self.request.user and not self.request.user.has_perm(delete_perm): from django.core.exceptions import PermissionDenied raise PermissionDenied return obj def get_success_url(self): galleries = self.object.galleries.all() if not galleries: return reverse_lazy("photologue:pl-gallery-archive") slug = galleries[0].slug return reverse_lazy("photologue:pl-gallery", args=[slug]) class MediaReportView(ModelFromUrlMixin, DetailView): template_name = "photologue/photo_confirm_report.html" def dispatch(self, request, *args, **kwargs): if not request.user.is_authenticated: obj = self.get_object() if not obj.galleries.filter(is_public=True).exists(): from django.contrib.auth.views import redirect_to_login return redirect_to_login(request.get_full_path()) return super().dispatch(request, *args, **kwargs) def post(self, request, *args, **kwargs): # Mark media as private obj = self.get_object() obj.is_public = False obj.save() # Get gallery galleries = obj.galleries.all() gallery_slug = galleries[0].slug if galleries else "" if not gallery_slug: url = reverse_lazy("photologue:pl-gallery-archive") else: url = reverse_lazy("photologue:pl-gallery", args=[gallery_slug]) url = request.build_absolute_uri(url) # Send mail to managers reporter = request.user.username if request.user.is_authenticated else "Anonymous (public link)" mail_admins( subject=f"Abuse report for {self.model._meta.verbose_name} id {obj.pk}", message=f"{reporter} reported an abuse for `{obj.title}`: {url}#lg=1&slide={obj.pk}", ) # Redirect to gallery return redirect(url) class MediaUncensorView(ModelFromUrlMixin, LoginRequiredMixin, View): model = None def post(self, request, pk, **kwargs): if not request.user.has_perm("photologue.can_resolve_censorship"): from django.core.exceptions import PermissionDenied raise PermissionDenied obj = get_object_or_404(self.model, pk=pk) obj.is_public = True obj.save() return JsonResponse({"ok": True}) class TagDetail(LoginRequiredMixin, DetailView): model = Tag def get_context_data(self, **kwargs): """ Insert the single object into the context dict. """ current_tag = self.get_object().slug context = super().get_context_data(**kwargs) context["galleries"] = Gallery.objects.filter(tags__slug=current_tag).order_by( "-date_start" ) return context class GalleryDetailView(DetailView): """ Gallery detail view to filter on photo owner """ model = Gallery def dispatch(self, request, *args, **kwargs): if not request.user.is_authenticated: self.object = self.get_object() if not self.object.is_public: from django.contrib.auth.views import redirect_to_login return redirect_to_login(request.get_full_path()) return super().dispatch(request, *args, **kwargs) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) can_resolve = self.request.user.has_perm("photologue.can_resolve_censorship") context["can_resolve_censorship"] = can_resolve # Non-staff members only see public media visible_only = not (self.request.user.is_staff or can_resolve) def media_qs(relation): qs = relation.select_related("owner") return qs.filter(is_public=True) if visible_only else qs.all() context["photos"] = media_qs(self.object.photos) context["videos"] = media_qs(self.object.videos) # List owners context["owners"] = [] # owners_pk_distinct = context["photos"].order_by('owner__pk').values_list('owner__pk', flat=True).distinct() # context["owners"] = User.objects.filter(pk__in=owners_pk_distinct) for media in [*context["photos"], *context["videos"]]: if media.owner not in context["owners"]: context["owners"].append(media.owner) # Filter on owner if "owner" in self.kwargs: context["photos"] = context["photos"].filter(owner__id=self.kwargs["owner"]) context["videos"] = context["videos"].filter(owner__id=self.kwargs["owner"]) # Combine photos and videos into a single sorted list for the template context["media_items"] = sorted( [*context["photos"], *context["videos"]], key=lambda x: x.title ) # Increment the photo view count context["photos"].update(view_count=F("view_count") + 1) return context class GalleryDownload(DetailView): ### IN FUTURE, PUT IT as Django Task model = Gallery def dispatch(self, request, *args, **kwargs): if not request.user.is_authenticated: self.object = self.get_object() if not self.object.is_public: from django.contrib.auth.views import redirect_to_login return redirect_to_login(request.get_full_path()) return super().dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): """ Download a zip file of the gallery on GET request. """ # Create zip file with pictures gallery = self.get_object() gallery_year = os.path.join("/photos/", str(gallery.date_start.year)) gallery_zip = os.path.join(gallery_year, (gallery.slug + ".zip")) media = [*gallery.photos.filter(is_public=True), *gallery.videos.filter(is_public=True)] with open(settings.MEDIA_ROOT + gallery_zip, "wb") as zip_bytes: zip_file = zipfile.ZipFile(zip_bytes, "w") for item in media: filename = os.path.basename(os.path.normpath(item.file_path)) zip_file.write(item.file_path, filename) zip_file.close() # Return the path to it return redirect( (settings.MEDIA_URL + str(gallery_zip)).replace("\\", "/") ) # windows fix class GalleryPublicToggleView(LoginRequiredMixin, View): def post(self, request, slug): if not request.user.is_staff: from django.core.exceptions import PermissionDenied raise PermissionDenied gallery = get_object_or_404(Gallery, slug=slug) gallery.is_public = not gallery.is_public gallery.save() return redirect(reverse("photologue:pl-gallery", args=[slug])) class GalleryUpload(PermissionRequiredMixin, FormView): """ Form to upload new photos in a gallery """ form_class = UploadForm template_name = "photologue/upload.html" success_url = reverse_lazy("photologue:pl-gallery-upload") permission_required = "photologue.add_gallery" def _upload_media(self, model, file_field, file_obj, gallery, gallery_dir, post_save=None): """ Create a media object, save it to the DB, schedule file save on commit. Returns True if uploaded, False if already exists. """ title = Path(file_obj.name).stem if model.objects.filter(title=title, owner=self.request.user, galleries=gallery).exists(): return False obj = model(title=title, slug=unique_slug(model, title), owner=self.request.user) file_path = str(gallery_dir / file_obj.name) with transaction.atomic(): obj.save() obj.galleries.set([gallery]) def _save(o=obj, fp=file_path, f=file_obj, ff=file_field, ps=post_save): getattr(o, ff).save(fp, f) if ps: ps(o) transaction.on_commit(_save) return True def form_invalid(self, form): if not self.request.accepts("text/html") and self.request.accepts("application/json"): errors = {field: list(errs) for field, errs in form.errors.items()} return JsonResponse({"code": 400, "error": errors}, status=400) return super().form_invalid(form) def form_valid(self, form): # Get or create gallery if self.request.accepts("text/html") or not self.request.accepts( "application/json" ): response_json = False finish_json = False else: response_json = True finish_json = form.data.get("end", "") == "end" gallery = form.get_or_create_gallery() jsondata = {"galleryID": gallery.id, "code": 200} gallery_year = Path(str(gallery.date_start.year)) gallery_dir = gallery_year / gallery.slug # Upload pictures and videos uploaded_photo_name = [] already_exists = 0 files = form.cleaned_data["file_field"] for photo_file in files: if is_photo(photo_file): uploaded = self._upload_media(Photo, "image", photo_file, gallery, gallery_dir) elif is_video(photo_file): uploaded = self._upload_media(Video, "file", photo_file, gallery, gallery_dir, post_save=generate_video_thumbnail) else: messages.error(self.request, f"{photo_file.name} is not a recognized image or video") jsondata["code"] = 400 jsondata["error"] = f"{photo_file.name} is not a recognized image or video" continue if not uploaded: already_exists += 1 else: uploaded_photo_name.append(photo_file.name) # Notify user then managers n_success = len(uploaded_photo_name) if not response_json: if already_exists: messages.success( self.request, f"{n_success} photo(s) uploaded, {already_exists} photo(s) skipped as they already exist in this gallery.", ) else: messages.success(self.request, f"{n_success} photo(s) uploaded.") # Notify administrators on new uploads gallery_url = reverse_lazy("photologue:pl-gallery", args=[gallery.slug]) gallery_url = self.request.build_absolute_uri(gallery_url) if uploaded_photo_name and (not response_json): photos = ", ".join(uploaded_photo_name) mail_admins( subject=f"New upload from {self.request.user.username}", message=f"{self.request.user.username} has uploaded in <{gallery_url}>:\n{photos}", ) elif response_json and finish_json: photos = ", ".join(uploaded_photo_name) mail_admins( subject=f"New continious upload from {self.request.user.username}", message=f"{self.request.user.username} has uploaded multiples photo with continious upload in <{gallery_url}>", ) if response_json: jsondata["uploadeds"] = uploaded_photo_name return JsonResponse(jsondata) else: return super().form_valid(form)