Source code for intranet.apps.polls.views

import csv
import json
import logging
from collections import OrderedDict

import pyrankvote
from pyrankvote import Ballot

from django import http
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.core.serializers import serialize
from django.db import transaction
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone

from ...utils.date import get_senior_graduation_year
from ...utils.html import safe_html
from ...utils.locking import lock_on
from ..auth.decorators import deny_restricted
from .forms import PollForm
from .models import Answer, Choice, Poll, Question

logger = logging.getLogger(__name__)


@login_required
@deny_restricted
def polls_view(request):
    is_polls_admin = request.user.has_admin_permission("polls")

    if is_polls_admin and "show_all" in request.GET:
        polls = Poll.objects.all()
    else:
        polls = Poll.objects.visible_to_user(request.user)

    if "show_all" not in request.GET:
        now = timezone.now()
        polls = polls.filter(start_time__lt=now, end_time__gt=now)

    if not is_polls_admin:
        polls = polls.filter(visible=True)

    context = {"polls": polls.order_by("-end_time"), "is_polls_admin": is_polls_admin}
    return render(request, "polls/home.html", context)


@login_required
@deny_restricted
def csv_results(request, poll_id):
    is_polls_admin = request.user.has_admin_permission("polls")
    if not is_polls_admin:
        return render(request, "error/403.html", {"reason": "You are not authorized to view this page."}, status=403)

    dict_list = []
    p = get_object_or_404(Poll, id=poll_id)

    if p.is_secret:
        messages.error(request, "CSV results cannot be generated for secret polls.")
        return redirect("poll_results", poll_id=poll_id)

    if len(p.get_users_voted()) == 0:
        messages.error(request, "CSV results cannot be generated because no votes have been cast for this poll.")
        return redirect("poll_results", poll_id=poll_id)

    for u in p.get_users_voted():
        answers = Answer.objects.filter(question__poll=p, user=u)
        answer_dict = OrderedDict()
        answer_dict["Username"] = u.username
        answer_dict["First"] = u.first_name
        answer_dict["Last"] = u.last_name
        for answer in answers:
            question = answer.question.question
            if answer.choice:
                answer_dict[question] = answer.choice
            elif answer.answer:
                answer_dict[question] = ("Other: " if answer.other_vote else "") + answer.answer
            elif answer.clear_vote:
                answer_dict[question] = "Cleared"
            else:
                answer_dict[question] = "None"
        dict_list.append(answer_dict)

    response = http.HttpResponse(content_type="text/csv")
    w = csv.DictWriter(response, dict_list[0].keys())
    w.writeheader()
    w.writerows(dict_list)
    return response


@login_required
@deny_restricted
def ranked_choice_results(request, poll_id):
    is_polls_admin = request.user.has_admin_permission("polls")
    if not is_polls_admin:
        return render(request, "error/403.html", {"reason": "You are not authorized to view this page."}, status=403)

    dict_list = []
    p = get_object_or_404(Poll, id=poll_id)

    if p.is_secret:
        messages.error(request, "CSV results cannot be generated for secret polls.")
        return redirect("poll_results", poll_id=poll_id)

    if len(p.get_users_voted()) == 0:
        messages.error(request, "CSV results cannot be generated because no votes have been cast for this poll.")
        return redirect("poll_results", poll_id=poll_id)

    for u in p.get_users_voted():
        answers = Answer.objects.filter(question__poll=p, user=u)
        for answer in answers:
            answer_dict = {}
            answer_dict["Username"] = u.username
            question = answer.question.question
            if answer.choice:
                answer_dict["Rank - " + question] = answer.rank
                answer_dict[question] = answer.choice.display_name()
            elif answer.answer:
                answer_dict[question] = answer.answer
            elif answer.clear_vote:
                answer_dict[question] = "Cleared"
            else:
                answer_dict[question] = "None"
            dict_list.append(answer_dict)
    response = http.HttpResponse(content_type="text/csv")

    headers = []
    headers.append("Username")
    for a in answers:
        if a.question.question not in headers:
            headers.append("Rank - " + a.question.question)
            headers.append(a.question.question)
    w = csv.DictWriter(response, headers)
    w.writeheader()
    w.writerows(dict_list)
    return response


@login_required
@deny_restricted
def poll_vote_view(request, poll_id):
    poll = get_object_or_404(Poll, id=poll_id)

    user = request.user
    is_polls_admin = user.has_admin_permission("polls")
    if is_polls_admin and "user" in request.GET:
        try:
            user = get_user_model().objects.get(id=request.GET.get("user"))
        except (get_user_model().DoesNotExist, ValueError):
            user = request.user

    if request.method == "POST" and poll.can_vote(user):
        questions = poll.question_set.all()
        entries = request.POST
        for name in entries:
            if name.startswith("question-"):
                question_num = name.split("-", 3)[1]
                try:
                    question_obj = questions.get(num=question_num)
                except Question.DoesNotExist:
                    messages.error(request, f"Invalid question passes with num {question_num}")
                    continue

                choice_num = entries[name]

                if question_obj.is_choice():
                    choices = question_obj.choice_set.all()
                    if question_obj.is_single_choice():
                        if choice_num and choice_num == "CLEAR":
                            Answer.objects.update_or_create(
                                user=user, question=question_obj, defaults={"clear_vote": True, "other_vote": False, "choice": None, "answer": None}
                            )
                            messages.success(request, f"Clear Vote for {question_obj}")
                        elif choice_num == "OTHER":  # Standard Other option
                            continue
                        elif name.endswith("-writing"):  # Standard Other answer
                            if entries["question-" + question_num] != "OTHER":
                                continue
                            Answer.objects.update_or_create(
                                user=user,
                                question=question_obj,
                                defaults={"clear_vote": False, "other_vote": True, "choice": None, "answer": choice_num},
                            )
                            messages.success(request, f"Vote saved for choice Other on question {question_obj}")
                        else:
                            try:
                                choice_obj = choices.get(num=choice_num)
                            except Choice.DoesNotExist:
                                messages.error(request, f"Invalid answer choice with num {choice_num}")
                                continue
                            else:
                                Answer.objects.update_or_create(
                                    user=user,
                                    question=question_obj,
                                    defaults={"clear_vote": False, "other_vote": False, "choice": choice_obj, "answer": None},
                                )
                                messages.success(request, f"Voted for {choice_obj} on {question_obj}")

                    elif question_obj.is_rank_choice():
                        updated_nums = request.POST.getlist(f"helper-{name}")  # helper-question-1
                        updated_choices = request.POST.getlist(name)

                        choice_value_map = {int(updated_nums[c]): [int(updated_choices[c]), c + 1] for c in range(0, len(updated_choices))}

                        with transaction.atomic():
                            lock_on(user.answer_set.all())

                            Answer.objects.filter(user=user, question=question_obj).delete()

                            # Saves the choices and answers
                            for v in choice_value_map:
                                choice = Choice.objects.get(question=question_obj, num=choice_value_map[v][0])
                                answer = Answer.objects.get_or_create(user=user, choice=choice, question=question_obj)[0]
                                answer.rank = choice_value_map[v][1]
                                answer.save()

                    elif question_obj.is_many_choice():
                        updated_choices = request.POST.getlist(name)
                        if len(updated_choices) == 1 and updated_choices[0] == "CLEAR":
                            with transaction.atomic():
                                # Lock on the user's answers to prevent duplicates.
                                lock_on(user.answer_set.all())
                                Answer.objects.filter(user=user, question=question_obj).delete()
                                Answer.objects.create(user=user, question=question_obj, clear_vote=True)
                                messages.success(request, f"Clear Vote for {question_obj}")
                        elif "CLEAR" in updated_choices:
                            messages.error(request, "Cannot select other options with Clear Vote.")
                        elif len(updated_choices) > question_obj.max_choices:
                            messages.error(request, f"You have voted on too many options for {question_obj}")
                        else:
                            with transaction.atomic():
                                # Lock on the question's answers to prevent duplicates.
                                lock_on(user.answer_set.all())

                                available_answers = [c.num for c in choices]
                                updated_answers = [int(c) for c in updated_choices if int(c) in available_answers]
                                current_answers = [c.choice.num for c in Answer.objects.filter(user=user, question=question_obj) if c.choice]

                                to_create = [choices.get(num=c) for c in updated_answers if c not in current_answers]
                                for c in to_create:
                                    Answer.objects.create(user=user, question=question_obj, choice=c)
                                    messages.success(request, f"Voted for {c} on {question_obj}")

                                to_delete = [choices.get(num=c) for c in current_answers if c not in updated_answers]
                                for c in to_delete:
                                    logger.info("Deleting choice for %s", c)
                                Answer.objects.filter(user=user, question=question_obj).filter(Q(clear_vote=True) | Q(choice__in=to_delete)).delete()

                elif question_obj.is_writing():
                    Answer.objects.update_or_create(user=user, question=question_obj, defaults={"answer": choice_num})
                    messages.success(request, f"Answer saved for {question_obj}")

        messages.success(request, "Thank you for voting!")
    if poll.can_vote(user):
        questions = []
        for q in poll.question_set.all():
            current_votes = Answer.objects.filter(user=user, question=q)

            if q.type == Question.ELECTION:
                choices = q.random_choice_set
            else:
                choices = q.choice_set.all()

            if q.type == Question.RANK:
                if current_votes.count() == q.max_choices:
                    choices_and_values = [(a.choice, a.rank) for a in current_votes]

                    for c in choices:
                        if c not in [a.choice for a in current_votes]:
                            choices_and_values.append((c, -1))

                else:
                    choices_and_values = [(c, -1) for c in choices]
            else:
                choices_and_values = []

            question = {
                "num": q.num,
                "type": q.type,
                "question": q.question,
                "choices": choices,
                "is_single_choice": q.is_single_choice(),
                "is_rank_choice": q.is_rank_choice(),
                "is_many_choice": q.is_many_choice(),
                "is_writing": q.is_writing(),
                "max_choices": q.max_choices,
                "num_choices": len(choices),
                "current_votes": current_votes,
                "current_vote": current_votes[0] if current_votes else None,
                "current_choices": [v.choice for v in current_votes],
                "current_vote_none": (len(current_votes) < 1),
                "current_vote_clear": (len(current_votes) == 1 and current_votes[0].clear_vote),
                "current_vote_other": (len(current_votes) == 1 and current_votes[0].other_vote),
                "choices_and_values": choices_and_values,
            }
            questions.append(question)

        can_vote = poll.can_vote(user)
        context = {
            "poll": poll,
            "can_vote": can_vote,
            "user": user,
            "questions": questions,
            "question_types": Question.get_question_types(),
        }
        return render(request, "polls/vote.html", context)

    else:
        messages.error(request, "You cannot view this poll.")
        return redirect("polls")


[docs]def fmt(num): return int(100 * num) / 100
[docs]def perc(num, den): if den == 0: return 0 return round(num / den * 100.0, 2)
[docs]def generate_choice(name, votes, total_count, show_answers=False): choice = { "choice": name, "votes": { "total": { "all": votes.count(), "all_percent": perc(votes.count(), total_count), "male": votes.filter(user__gender=True).count(), "female": votes.filter(user__gender__isnull=False, user__gender=False).count(), } }, "users": [v.user for v in votes] if show_answers else None, } for yr in range(9, 14): yr_votes = votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) choice["votes"][yr] = { "all": yr_votes.count(), "male": yr_votes.filter(user__gender=True).count(), "female": yr_votes.filter(user__gender__isnull=False, user__gender=False).count(), } return choice
[docs]def handle_sap(q): question_votes = votes = Answer.objects.filter(question=q) users = q.get_users_voted() num_users_votes = {u.id: votes.filter(user=u).count() for u in users} user_scale = {u.id: (1 / num_users_votes[u.id]) for u in users} choices = [] for c in q.choice_set.all().order_by("num"): votes = question_votes.filter(choice=c) vote_users = {v.user for v in votes} choice = { "choice": c, "votes": { "total": { "all": len(vote_users), "all_percent": perc(len(vote_users), users.count()), "male": fmt(sum(v.user.is_male * user_scale[v.user.id] for v in votes)), "female": fmt(sum(v.user.is_female * user_scale[v.user.id] for v in votes)), } }, "users": [v.user for v in votes], } for yr in range(9, 14): yr_votes = [v.user if v.user.grade and v.user.grade.number == yr else None for v in votes] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(set(yr_votes)), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) """ Clear vote """ votes = question_votes.filter(clear_vote=True) clr_users = {v.user for v in votes} choice = { "choice": "Clear vote", "votes": { "total": { "all": len(clr_users), "all_percent": perc(len(clr_users), users.count()), "male": fmt(sum(v.user.is_male * user_scale[v.user.id] for v in votes)), "female": fmt(sum(v.user.is_female * user_scale[v.user.id] for v in votes)), } }, "users": clr_users, } for yr in range(9, 14): yr_votes = [v.user if v.user.grade and v.user.grade.number == yr else None for v in votes] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(yr_votes), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) choice = { "choice": "Total", "votes": { "total": { "all": users.count(), "votes_all": question_votes.count(), "all_percent": perc(users.count(), users.count()), "male": users.filter(gender=True).count(), "female": users.filter(gender__isnull=False, gender=False).count(), } }, } for yr in range(9, 14): yr_votes = [u if u.grade and u.grade.number == yr else None for u in users] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(set(yr_votes)), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) return {"question": q, "choices": choices, "user_scale": user_scale}
[docs]def handle_rank_choice(q, show_answers=False): # For some reason, this seems to be faster than select_related("question", "user", "choice") question_votes = votes = Answer.objects.filter(question=q).select_related() choices = [] c_set = q.choice_set.all().order_by("num") votes_total = q.get_total_votes() for c in c_set: votes = question_votes.filter(choice=c) votes_sum = sum(v.display_votes() for v in votes) choice = { "choice": c, "votes": { "total": { "all": votes_sum, "all_percent": perc(votes_sum, votes_total), "votes_all": votes_total, # Gender statistics commented out because of large performance hit # Also, Ion doesn't really get gender information anymore so it isn't very helpful anyway # "male": sum(v.display_votes() if v.user and v.user.is_male else 0 for v in votes), # "female": sum(v.display_votes() if v.user and v.user.is_female else 0 for v in votes), } }, "users": [v.user for v in votes] if show_answers else None, } for yr in range(9, 14): # Year 13 is staff yr_votes = votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) choice["votes"][yr] = { "all": sum(v.display_votes() for v in yr_votes), # "male": sum(v.display_votes() if v.user and v.user.is_male else 0 for v in yr_votes), # "female": sum(v.display_votes() if v.user and v.user.is_female else 0 for v in yr_votes), } choices.append(choice) choice = { "choice": "Total", "votes": {}, } for yr in range(9, 14): yr_votes = question_votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": sum(v.display_votes() for v in yr_votes), # "male": sum(v.display_votes() if v.user.is_male else 0 for v in yr_votes), # "female": sum(v.display_votes() if v.user.is_female else 0 for v in yr_votes), } # Sort choices.sort(key=lambda e: e["votes"]["total"]["all"], reverse=True) all_sum = sum(v.display_votes() for v in question_votes) choice["votes"]["total"] = { "all": all_sum, "all_percent": perc(all_sum, votes_total), "votes_all": votes_total, # "male": sum(v.display_votes() if v.user.is_male else 0 for v in question_votes), # "female": sum(v.display_votes() if v.user.is_female else 0 for v in question_votes), } choices.append(choice) return {"question": q, "choices": choices}
[docs]def handle_choice(q, show_answers=False): question_votes = votes = Answer.objects.filter(question=q) total_count = question_votes.count() users = q.get_users_voted() choices = [] # Choices for c in q.choice_set.all().order_by("num"): if c: votes = question_votes.filter(choice=c) choices.append(generate_choice(c, votes, total_count, show_answers)) if q.type == "STO": other_votes = question_votes.filter(other_vote=True) choices.append(generate_choice("Total Votes for Other", other_votes, total_count, show_answers)) answers = {ans.answer for ans in other_votes} for ans in answers: votes = other_votes.filter(answer=ans) choices.append(generate_choice(f"Other: {ans}", votes, total_count, show_answers)) # Clear vote votes = question_votes.filter(clear_vote=True) choices.append(generate_choice("Clear vote", votes, total_count, show_answers)) # Sort choices.sort(key=lambda e: e["votes"]["total"]["all"], reverse=True) # Total total_choice = generate_choice("Total", question_votes, total_count, show_answers) total_choice["votes"]["total"]["users_all"] = users.count() choices.append(total_choice) return {"question": q, "choices": choices}
@login_required @deny_restricted def poll_results_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if poll.in_time_range(): messages.info(request, "Results may not be final because the poll is still open.") show_answers = request.GET.get("show_answers", False) if show_answers and poll.is_secret: messages.error(request, "User selections cannot be viewed for secret polls.") return redirect("poll_results", poll_id) questions = [] for q in poll.question_set.all(): if q.type == "SAP": # Split-approval; each person splits their one vote questions.append(handle_sap(q)) elif q.is_rank_choice(): questions.append(handle_rank_choice(q, show_answers)) elif q.is_choice(): questions.append(handle_choice(q, show_answers)) elif q.is_writing(): answers = Answer.objects.filter(question=q) question = {"question": q, "answers": answers} questions.append(question) context = {"poll": poll, "grades": range(9, 13), "questions": questions, "show_answers": show_answers} return render(request, "polls/results.html", context) @login_required @deny_restricted def election_winners_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if poll.in_time_range(): messages.error(request, "Warning: results may not be final because the poll is still open.") context = {"poll": poll, "results": determine_ranked_choice_winners(poll)} return render(request, "polls/winners.html", context)
[docs]def determine_ranked_choice_winners(poll): # pyrankvote's Candidate class is broken, so use a custom class instead class Candidate: """A candidate in the election.""" def __init__(self, name): self.name = name def __str__(self): return self.name def __repr__(self): return "<Candidate('%s')>" % self.name def __hash__(self): return hash(self.name) results = [] for q in poll.question_set.all(): if q.is_rank_choice(): # pyrankvote breaks in these cases if q.choice_set.count() == 0: results.append([q, "No answer choices", "None"]) continue if q.choice_set.count() == 1: winner = q.choice_set.first().display_name() results.append([q, f"{winner} ({q.answer_set.count()} votes)\nNote: There was only one choice for this question.", winner]) continue candidates = [] ballots = [] for c in q.choice_set.all(): candidates.append(Candidate(c.display_name())) for u in poll.get_users_voted(): ranked_candidates = [] for a in u.answer_set.filter(question=q).order_by("rank"): ranked_candidates.append(candidates[a.choice.num - 1]) ballots.append(Ballot(ranked_candidates=ranked_candidates)) result = pyrankvote.instant_runoff_voting(candidates, ballots) winner = result.get_winners()[0] results.append([q, str(result).strip(), str(winner)]) else: max_votes = 0 winner = "" for c in q.choice_set.all(): num_votes = q.answer_set.filter(choice=c).count() if num_votes > max_votes: max_votes = num_votes winner = c.display_name() results.append([q, f"{winner} ({max_votes} votes)", winner]) return results
@login_required @deny_restricted def add_poll_view(request): if not request.user.has_admin_permission("polls"): return redirect("polls") if request.method == "POST": form = PollForm(data=request.POST) question_data = request.POST.get("question_data", None) flag = True if not question_data: messages.error(request, "No question information was sent with your request!") flag = False if flag and form.is_valid(): question_data = json.loads(question_data) instance = form.save() process_question_data(instance, question_data) messages.success(request, "The poll has been created.") return redirect("polls") else: form = PollForm() context = {"action": "add", "action_title": "Add", "poll_questions": "[]", "poll_choices": "[]", "form": form, "is_polls_admin": True} return render(request, "polls/add_modify.html", context) @login_required @deny_restricted def modify_poll_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if not poll.before_end_time(): messages.error(request, "Warning: you are editing a closed poll.") if request.method == "POST": form = PollForm(data=request.POST, instance=poll) question_data = request.POST.get("question_data", None) flag = True if not question_data: messages.error(request, "No question information was sent with your request!") flag = False question_data = json.loads(question_data) if flag and form.is_valid(): instance = form.save() process_question_data(instance, question_data) messages.success(request, "The poll has been modified.") return redirect("polls") else: form = PollForm(instance=poll) context = { "action": "modify", "action_title": "Modify", "poll": poll, "poll_questions": serialize("json", poll.question_set.all()), "poll_choices": serialize("json", Choice.objects.filter(question__in=poll.question_set.all())), "form": form, "is_polls_admin": True, } return render(request, "polls/add_modify.html", context) @login_required @deny_restricted def delete_poll_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if not poll.before_end_time(): return redirect("polls") if request.method == "POST": poll.delete() messages.success(request, "The poll has been deleted!") return redirect("polls") return render(request, "polls/delete.html", {"poll": poll})
[docs]def process_question_data(instance, question_data): # Remove all questions not returned by client instance.question_set.exclude(pk__in=[x["pk"] for x in question_data if "pk" in x]).delete() count = 1 for q in question_data: question = None if not q.get("question", None): # Don't add question if no question is entered continue if "pk" in q: # Question already exists question = instance.question_set.get(pk=q["pk"]) question.question = safe_html(q["question"]).strip() question.num = count question.type = q.get("type", "STD") question.max_choices = q.get("max_choices", 1) question.save() # Delete all choices not returned by client question.choice_set.exclude(pk__in=[x["pk"] for x in q["choices"] if "pk" in x]).delete() else: # Question does not exist question = Question.objects.create( poll=instance, question=safe_html(q["question"]).strip(), num=count, type=q.get("type", "STD"), max_choices=q.get("max_choices", 1) ) choice_count = 1 for c in q.get("choices", []): if not c.get("info", None): # Don't add choice if no text is entered continue if "pk" in c: # Choice already exists choice = question.choice_set.get(pk=c["pk"]) choice.num = choice_count choice.info = safe_html(c["info"]).strip() choice.save() else: # Choice does not exist choice = Choice.objects.create(question=question, num=choice_count, info=safe_html(c["info"]).strip()) choice_count += 1 count += 1