photo26/photologue/views.py
krek0 b0027be96c
All checks were successful
Docker / build (release) Successful in 9s
fix: seek uploaded file to start before saving to avoid empty image on commit
2026-05-16 16:16:26 +02:00

401 lines
14 KiB
Python

# This file is part of photo21
# Copyright (C) 2021-2022 Amicale des élèves de l'ENS Paris-Saclay
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import zipfile
from pathlib import Path
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin
from django.core.mail import mail_admins
from django.conf import settings
from django.db import transaction
from django.http import HttpResponse
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.urls import reverse, reverse_lazy
from django.utils.text import slugify
from django.views import View
from django.views.generic.dates import ArchiveIndexView, YearArchiveView
from django.views.generic.detail import DetailView
from django.views.generic.edit import DeleteView, FormView
from django.contrib.auth import get_user_model
from django.db.models import F, Q
from .forms import UploadForm
from .models import Gallery, Photo, Tag, Video
from .utils import generate_video_thumbnail, is_photo, is_video
MEDIA_MODELS = {"photo": Photo, "video": Video}
class ModelFromUrlMixin:
"""Sets self.model from the <model_name> URL kwarg using MEDIA_MODELS registry."""
def setup(self, request, *args, **kwargs):
super().setup(request, *args, **kwargs)
model_name = kwargs.get("model_name")
self.model = MEDIA_MODELS.get(model_name)
if self.model is None:
from django.http import Http404
raise Http404
def unique_slug(model, title):
base = slugify(title)
slug, counter = base, 2
while model.objects.filter(slug=slug).exists():
slug = f"{base}-{counter}"
counter += 1
return slug
# Cette ligne renvoie le modèle d'utilisateur actif (le natif ou le vôtre)
User = get_user_model()
class GalleryDateView(LoginRequiredMixin):
model = Gallery
date_field = "date_start"
allow_empty = True # Do not 404 if no galleries
def get_queryset(self):
"""Hide galleries with no public media"""
qs = super().get_queryset()
if self.request.user.is_staff:
return qs
else:
return qs.filter(
Q(photos__is_public=True) | Q(videos__is_public=True)
).distinct()
def get_context_data(self, **kwargs):
"""Always show all years in archive"""
context = super().get_context_data(**kwargs)
context["date_list"] = self.get_queryset().dates(
self.date_field, "year", "DESC"
)
return context
class GalleryArchiveIndexView(GalleryDateView, ArchiveIndexView):
pass
class GalleryYearArchiveView(GalleryDateView, YearArchiveView):
make_object_list = True
class PhotoDetailView(LoginRequiredMixin, DetailView):
model = Photo
def get_queryset(self):
"""Non-staff members only see public photos"""
qs = super().get_queryset()
if self.request.user.is_staff:
return qs
else:
return qs.filter(is_public=True)
class MediaDeleteView(ModelFromUrlMixin, LoginRequiredMixin, DeleteView):
template_name = "photologue/media_confirm_delete.html"
def get_object(self, queryset=None):
obj = super().get_object(queryset)
delete_perm = f"photologue.delete_{self.model._meta.model_name}"
if obj.owner != self.request.user and not self.request.user.has_perm(delete_perm):
from django.core.exceptions import PermissionDenied
raise PermissionDenied
return obj
def get_success_url(self):
galleries = self.object.galleries.all()
if not galleries:
return reverse_lazy("photologue:pl-gallery-archive")
slug = galleries[0].slug
return reverse_lazy("photologue:pl-gallery", args=[slug])
class MediaReportView(ModelFromUrlMixin, DetailView):
template_name = "photologue/photo_confirm_report.html"
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
obj = self.get_object()
if not obj.galleries.filter(is_public=True).exists():
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(request.get_full_path())
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
# Mark media as private
obj = self.get_object()
obj.is_public = False
obj.save()
# Get gallery
galleries = obj.galleries.all()
gallery_slug = galleries[0].slug if galleries else ""
if not gallery_slug:
url = reverse_lazy("photologue:pl-gallery-archive")
else:
url = reverse_lazy("photologue:pl-gallery", args=[gallery_slug])
url = request.build_absolute_uri(url)
# Send mail to managers
reporter = request.user.username if request.user.is_authenticated else "Anonymous (public link)"
mail_admins(
subject=f"Abuse report for {self.model._meta.verbose_name} id {obj.pk}",
message=f"{reporter} reported an abuse for `{obj.title}`: {url}#lg=1&slide={obj.pk}",
)
# Redirect to gallery
return redirect(url)
class MediaUncensorView(ModelFromUrlMixin, LoginRequiredMixin, View):
model = None
def post(self, request, pk, **kwargs):
if not request.user.has_perm("photologue.can_resolve_censorship"):
from django.core.exceptions import PermissionDenied
raise PermissionDenied
obj = get_object_or_404(self.model, pk=pk)
obj.is_public = True
obj.save()
return JsonResponse({"ok": True})
class TagDetail(LoginRequiredMixin, DetailView):
model = Tag
def get_context_data(self, **kwargs):
"""
Insert the single object into the context dict.
"""
current_tag = self.get_object().slug
context = super().get_context_data(**kwargs)
context["galleries"] = Gallery.objects.filter(tags__slug=current_tag).order_by(
"-date_start"
)
return context
class GalleryDetailView(DetailView):
"""
Gallery detail view to filter on photo owner
"""
model = Gallery
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
self.object = self.get_object()
if not self.object.is_public:
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(request.get_full_path())
return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
can_resolve = self.request.user.has_perm("photologue.can_resolve_censorship")
context["can_resolve_censorship"] = can_resolve
# Non-staff members only see public media
visible_only = not (self.request.user.is_staff or can_resolve)
def media_qs(relation):
qs = relation.select_related("owner")
return qs.filter(is_public=True) if visible_only else qs.all()
context["photos"] = media_qs(self.object.photos)
context["videos"] = media_qs(self.object.videos)
# List owners
context["owners"] = []
# owners_pk_distinct = context["photos"].order_by('owner__pk').values_list('owner__pk', flat=True).distinct()
# context["owners"] = User.objects.filter(pk__in=owners_pk_distinct)
for media in [*context["photos"], *context["videos"]]:
if media.owner not in context["owners"]:
context["owners"].append(media.owner)
# Filter on owner
if "owner" in self.kwargs:
context["photos"] = context["photos"].filter(owner__id=self.kwargs["owner"])
context["videos"] = context["videos"].filter(owner__id=self.kwargs["owner"])
# Combine photos and videos into a single sorted list for the template
context["media_items"] = sorted(
[*context["photos"], *context["videos"]], key=lambda x: x.title
)
# Increment the photo view count
context["photos"].update(view_count=F("view_count") + 1)
return context
class GalleryDownload(DetailView):
### IN FUTURE, PUT IT as Django Task
model = Gallery
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
self.object = self.get_object()
if not self.object.is_public:
from django.contrib.auth.views import redirect_to_login
return redirect_to_login(request.get_full_path())
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""
Download a zip file of the gallery on GET request.
"""
# Create zip file with pictures
gallery = self.get_object()
gallery_year = os.path.join("/photos/", str(gallery.date_start.year))
gallery_zip = os.path.join(gallery_year, (gallery.slug + ".zip"))
media = [*gallery.photos.filter(is_public=True), *gallery.videos.filter(is_public=True)]
with open(settings.MEDIA_ROOT + gallery_zip, "wb") as zip_bytes:
zip_file = zipfile.ZipFile(zip_bytes, "w")
for item in media:
filename = os.path.basename(os.path.normpath(item.file_path))
zip_file.write(item.file_path, filename)
zip_file.close()
# Return the path to it
return redirect(
(settings.MEDIA_URL + str(gallery_zip)).replace("\\", "/")
) # windows fix
class GalleryPublicToggleView(LoginRequiredMixin, View):
def post(self, request, slug):
if not request.user.is_staff:
from django.core.exceptions import PermissionDenied
raise PermissionDenied
gallery = get_object_or_404(Gallery, slug=slug)
gallery.is_public = not gallery.is_public
gallery.save()
return redirect(reverse("photologue:pl-gallery", args=[slug]))
class GalleryUpload(PermissionRequiredMixin, FormView):
"""
Form to upload new photos in a gallery
"""
form_class = UploadForm
template_name = "photologue/upload.html"
success_url = reverse_lazy("photologue:pl-gallery-upload")
permission_required = "photologue.add_gallery"
def _upload_media(self, model, file_field, file_obj, gallery, gallery_dir, post_save=None):
"""
Create a media object, save it to the DB, schedule file save on commit.
Returns True if uploaded, False if already exists.
"""
title = Path(file_obj.name).stem
if model.objects.filter(title=title, owner=self.request.user, galleries=gallery).exists():
return False
obj = model(title=title, slug=unique_slug(model, title), owner=self.request.user)
file_path = str(gallery_dir / file_obj.name)
with transaction.atomic():
obj.save()
obj.galleries.set([gallery])
def _save(o=obj, fp=file_path, f=file_obj, ff=file_field, ps=post_save):
f.seek(0)
getattr(o, ff).save(fp, f)
if ps:
ps(o)
transaction.on_commit(_save)
return True
def form_invalid(self, form):
if not self.request.accepts("text/html") and self.request.accepts("application/json"):
errors = {field: list(errs) for field, errs in form.errors.items()}
return JsonResponse({"code": 400, "error": errors}, status=400)
return super().form_invalid(form)
def form_valid(self, form):
# Get or create gallery
if self.request.accepts("text/html") or not self.request.accepts(
"application/json"
):
response_json = False
finish_json = False
else:
response_json = True
finish_json = form.data.get("end", "") == "end"
gallery = form.get_or_create_gallery()
jsondata = {"galleryID": gallery.id, "code": 200}
gallery_year = Path(str(gallery.date_start.year))
gallery_dir = gallery_year / gallery.slug
# Upload pictures and videos
uploaded_photo_name = []
already_exists = 0
files = form.cleaned_data["file_field"]
for photo_file in files:
if is_photo(photo_file):
uploaded = self._upload_media(Photo, "image", photo_file, gallery, gallery_dir)
elif is_video(photo_file):
uploaded = self._upload_media(Video, "file", photo_file, gallery, gallery_dir, post_save=generate_video_thumbnail)
else:
messages.error(self.request, f"{photo_file.name} is not a recognized image or video")
jsondata["code"] = 400
jsondata["error"] = f"{photo_file.name} is not a recognized image or video"
continue
if not uploaded:
already_exists += 1
else:
uploaded_photo_name.append(photo_file.name)
# Notify user then managers
n_success = len(uploaded_photo_name)
if not response_json:
if already_exists:
messages.success(
self.request,
f"{n_success} photo(s) uploaded, {already_exists} photo(s) skipped as they already exist in this gallery.",
)
else:
messages.success(self.request, f"{n_success} photo(s) uploaded.")
# Notify administrators on new uploads
gallery_url = reverse_lazy("photologue:pl-gallery", args=[gallery.slug])
gallery_url = self.request.build_absolute_uri(gallery_url)
if uploaded_photo_name and (not response_json):
photos = ", ".join(uploaded_photo_name)
mail_admins(
subject=f"New upload from {self.request.user.username}",
message=f"{self.request.user.username} has uploaded in <{gallery_url}>:\n{photos}",
)
elif response_json and finish_json:
photos = ", ".join(uploaded_photo_name)
mail_admins(
subject=f"New continious upload from {self.request.user.username}",
message=f"{self.request.user.username} has uploaded multiples photo with continious upload in <{gallery_url}>",
)
if response_json:
jsondata["uploadeds"] = uploaded_photo_name
return JsonResponse(jsondata)
else:
return super().form_valid(form)