Source code for intranet.apps.polls.views

import csv
import json
import logging
from collections import OrderedDict

import pyrankvote
from django import http
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import login_required
from django.core.serializers import serialize
from django.db import transaction
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.utils import timezone
from pyrankvote import Ballot

from ...utils.date import get_senior_graduation_year
from ...utils.html import safe_html
from ...utils.locking import lock_on
from ..auth.decorators import deny_restricted
from .forms import PollForm
from .models import Answer, Choice, Poll, Question

logger = logging.getLogger(__name__)


@login_required
@deny_restricted
def polls_view(request):
    is_polls_admin = request.user.has_admin_permission("polls")

    if is_polls_admin and "show_all" in request.GET:
        polls = Poll.objects.all()
    else:
        polls = Poll.objects.visible_to_user(request.user)

    if "show_all" not in request.GET:
        now = timezone.now()
        polls = polls.filter(start_time__lt=now, end_time__gt=now)

    if not is_polls_admin:
        polls = polls.filter(visible=True)

    context = {"polls": polls.order_by("-end_time"), "is_polls_admin": is_polls_admin}
    return render(request, "polls/home.html", context)


@login_required
@deny_restricted
def csv_results(request, poll_id):
    is_polls_admin = request.user.has_admin_permission("polls")
    if not is_polls_admin:
        return render(request, "error/403.html", {"reason": "You are not authorized to view this page."}, status=403)

    dict_list = []
    p = get_object_or_404(Poll, id=poll_id)

    if p.is_secret:
        messages.error(request, "CSV results cannot be generated for secret polls.")
        return redirect("poll_results", poll_id=poll_id)

    if len(p.get_users_voted()) == 0:
        messages.error(request, "CSV results cannot be generated because no votes have been cast for this poll.")
        return redirect("poll_results", poll_id=poll_id)

    for u in p.get_users_voted():
        answers = Answer.objects.filter(question__poll=p, user=u)
        answer_dict = OrderedDict()
        answer_dict["Username"] = u.username
        answer_dict["First"] = u.first_name
        answer_dict["Last"] = u.last_name
        for answer in answers:
            question = answer.question.question
            if answer.choice:
                answer_dict[question] = answer.choice
            elif answer.answer:
                answer_dict[question] = ("Other: " if answer.other_vote else "") + answer.answer
            elif answer.clear_vote:
                answer_dict[question] = "Cleared"
            else:
                answer_dict[question] = "None"
        dict_list.append(answer_dict)

    response = http.HttpResponse(content_type="text/csv")
    w = csv.DictWriter(response, dict_list[0].keys())
    w.writeheader()
    w.writerows(dict_list)
    return response


@login_required
@deny_restricted
def ranked_choice_results(request, poll_id):
    is_polls_admin = request.user.has_admin_permission("polls")
    if not is_polls_admin:
        return render(request, "error/403.html", {"reason": "You are not authorized to view this page."}, status=403)

    dict_list = []
    p = get_object_or_404(Poll, id=poll_id)

    if p.is_secret:
        messages.error(request, "CSV results cannot be generated for secret polls.")
        return redirect("poll_results", poll_id=poll_id)

    if len(p.get_users_voted()) == 0:
        messages.error(request, "CSV results cannot be generated because no votes have been cast for this poll.")
        return redirect("poll_results", poll_id=poll_id)

    for u in p.get_users_voted():
        answers = Answer.objects.filter(question__poll=p, user=u)
        for answer in answers:
            answer_dict = {}
            answer_dict["Username"] = u.username
            question = answer.question.question
            if answer.choice:
                answer_dict["Rank - " + question] = answer.rank
                answer_dict[question] = answer.choice.display_name()
            elif answer.answer:
                answer_dict[question] = answer.answer
            elif answer.clear_vote:
                answer_dict[question] = "Cleared"
            else:
                answer_dict[question] = "None"
            dict_list.append(answer_dict)
    response = http.HttpResponse(content_type="text/csv")

    headers = []
    headers.append("Username")
    for a in answers:
        if a.question.question not in headers:
            headers.append("Rank - " + a.question.question)
            headers.append(a.question.question)
    w = csv.DictWriter(response, headers)
    w.writeheader()
    w.writerows(dict_list)
    return response


@login_required
@deny_restricted
def poll_vote_view(request, poll_id):
    poll = get_object_or_404(Poll, id=poll_id)

    user = request.user
    is_polls_admin = user.has_admin_permission("polls")
    if is_polls_admin and "user" in request.GET:
        try:
            user = get_user_model().objects.get(id=request.GET.get("user"))
        except (get_user_model().DoesNotExist, ValueError):
            user = request.user

    if request.method == "POST" and poll.can_vote(user):
        questions = poll.question_set.all()
        entries = request.POST
        for name in entries:
            if name.startswith("question-"):
                question_num = name.split("-", 3)[1]
                try:
                    question_obj = questions.get(num=question_num)
                except Question.DoesNotExist:
                    messages.error(request, f"Invalid question passes with num {question_num}")
                    continue

                choice_num = entries[name]

                if question_obj.is_choice():
                    choices = question_obj.choice_set.all()
                    if question_obj.is_single_choice():
                        if choice_num and choice_num == "CLEAR":
                            Answer.objects.update_or_create(
                                user=user, question=question_obj, defaults={"clear_vote": True, "other_vote": False, "choice": None, "answer": None}
                            )
                            messages.success(request, f"Clear Vote for {question_obj}")
                        elif choice_num == "OTHER":  # Standard Other option
                            continue
                        elif name.endswith("-writing"):  # Standard Other answer
                            if entries["question-" + question_num] != "OTHER":
                                continue
                            Answer.objects.update_or_create(
                                user=user,
                                question=question_obj,
                                defaults={"clear_vote": False, "other_vote": True, "choice": None, "answer": choice_num},
                            )
                            messages.success(request, f"Vote saved for choice Other on question {question_obj}")
                        else:
                            try:
                                choice_obj = choices.get(num=choice_num)
                            except Choice.DoesNotExist:
                                messages.error(request, f"Invalid answer choice with num {choice_num}")
                                continue
                            else:
                                Answer.objects.update_or_create(
                                    user=user,
                                    question=question_obj,
                                    defaults={"clear_vote": False, "other_vote": False, "choice": choice_obj, "answer": None},
                                )
                                messages.success(request, f"Voted for {choice_obj} on {question_obj}")

                    elif question_obj.is_rank_choice():
                        updated_nums = request.POST.getlist(f"helper-{name}")  # helper-question-1
                        updated_choices = request.POST.getlist(name)

                        choice_value_map = {int(updated_nums[c]): [int(updated_choices[c]), c + 1] for c in range(0, len(updated_choices))}

                        with transaction.atomic():
                            lock_on(user.answer_set.all())

                            Answer.objects.filter(user=user, question=question_obj).delete()

                            # Saves the choices and answers
                            for v in choice_value_map:
                                choice = Choice.objects.get(question=question_obj, num=choice_value_map[v][0])
                                answer = Answer.objects.get_or_create(user=user, choice=choice, question=question_obj)[0]
                                answer.rank = choice_value_map[v][1]
                                answer.save()

                    elif question_obj.is_many_choice():
                        updated_choices = request.POST.getlist(name)
                        if len(updated_choices) == 1 and updated_choices[0] == "CLEAR":
                            with transaction.atomic():
                                # Lock on the user's answers to prevent duplicates.
                                lock_on(user.answer_set.all())
                                Answer.objects.filter(user=user, question=question_obj).delete()
                                Answer.objects.create(user=user, question=question_obj, clear_vote=True)
                                messages.success(request, f"Clear Vote for {question_obj}")
                        elif "CLEAR" in updated_choices:
                            messages.error(request, "Cannot select other options with Clear Vote.")
                        elif len(updated_choices) > question_obj.max_choices:
                            messages.error(request, f"You have voted on too many options for {question_obj}")
                        else:
                            with transaction.atomic():
                                # Lock on the question's answers to prevent duplicates.
                                lock_on(user.answer_set.all())

                                available_answers = [c.num for c in choices]
                                updated_answers = [int(c) for c in updated_choices if int(c) in available_answers]
                                current_answers = [c.choice.num for c in Answer.objects.filter(user=user, question=question_obj) if c.choice]

                                to_create = [choices.get(num=c) for c in updated_answers if c not in current_answers]
                                for c in to_create:
                                    Answer.objects.create(user=user, question=question_obj, choice=c)
                                    messages.success(request, f"Voted for {c} on {question_obj}")

                                to_delete = [choices.get(num=c) for c in current_answers if c not in updated_answers]
                                for c in to_delete:
                                    logger.info("Deleting choice for %s", c)
                                Answer.objects.filter(user=user, question=question_obj).filter(Q(clear_vote=True) | Q(choice__in=to_delete)).delete()

                elif question_obj.is_writing():
                    Answer.objects.update_or_create(user=user, question=question_obj, defaults={"answer": choice_num})
                    messages.success(request, f"Answer saved for {question_obj}")

        messages.success(request, "Thank you for voting!")
    if poll.can_vote(user):
        questions = []
        for q in poll.question_set.all():
            current_votes = Answer.objects.filter(user=user, question=q)

            if q.type == Question.ELECTION:
                choices = q.random_choice_set
            else:
                choices = q.choice_set.all()

            if q.type == Question.RANK:
                if current_votes.count() == q.max_choices:
                    choices_and_values = [(a.choice, a.rank) for a in current_votes]

                    for c in choices:
                        if c not in [a.choice for a in current_votes]:
                            choices_and_values.append((c, -1))

                else:
                    choices_and_values = [(c, -1) for c in choices]
            else:
                choices_and_values = []

            question = {
                "num": q.num,
                "type": q.type,
                "question": q.question,
                "choices": choices,
                "is_single_choice": q.is_single_choice(),
                "is_rank_choice": q.is_rank_choice(),
                "is_many_choice": q.is_many_choice(),
                "is_writing": q.is_writing(),
                "max_choices": q.max_choices,
                "num_choices": len(choices),
                "current_votes": current_votes,
                "current_vote": current_votes[0] if current_votes else None,
                "current_choices": [v.choice for v in current_votes],
                "current_vote_none": (len(current_votes) < 1),
                "current_vote_clear": (len(current_votes) == 1 and current_votes[0].clear_vote),
                "current_vote_other": (len(current_votes) == 1 and current_votes[0].other_vote),
                "choices_and_values": choices_and_values,
            }
            questions.append(question)

        can_vote = poll.can_vote(user)
        context = {
            "poll": poll,
            "can_vote": can_vote,
            "user": user,
            "questions": questions,
            "question_types": Question.get_question_types(),
        }
        return render(request, "polls/vote.html", context)

    else:
        messages.error(request, "You cannot view this poll.")
        return redirect("polls")


[docs]def fmt(num): return int(100 * num) / 100
[docs]def perc(num, den): if den == 0: return 0 return round(num / den * 100.0, 2)
[docs]def generate_choice(name, votes, total_count, show_answers=False): choice = { "choice": name, "votes": { "total": { "all": votes.count(), "all_percent": perc(votes.count(), total_count), "male": votes.filter(user__gender=True).count(), "female": votes.filter(user__gender__isnull=False, user__gender=False).count(), } }, "users": [v.user for v in votes] if show_answers else None, } for yr in range(9, 14): yr_votes = votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) choice["votes"][yr] = { "all": yr_votes.count(), "male": yr_votes.filter(user__gender=True).count(), "female": yr_votes.filter(user__gender__isnull=False, user__gender=False).count(), } return choice
[docs]def handle_sap(q): question_votes = votes = Answer.objects.filter(question=q) users = q.get_users_voted() num_users_votes = {u.id: votes.filter(user=u).count() for u in users} user_scale = {u.id: (1 / num_users_votes[u.id]) for u in users} choices = [] for c in q.choice_set.all().order_by("num"): votes = question_votes.filter(choice=c) vote_users = {v.user for v in votes} choice = { "choice": c, "votes": { "total": { "all": len(vote_users), "all_percent": perc(len(vote_users), users.count()), "male": fmt(sum(v.user.is_male * user_scale[v.user.id] for v in votes)), "female": fmt(sum(v.user.is_female * user_scale[v.user.id] for v in votes)), } }, "users": [v.user for v in votes], } for yr in range(9, 14): yr_votes = [v.user if v.user.grade and v.user.grade.number == yr else None for v in votes] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(set(yr_votes)), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) """ Clear vote """ votes = question_votes.filter(clear_vote=True) clr_users = {v.user for v in votes} choice = { "choice": "Clear vote", "votes": { "total": { "all": len(clr_users), "all_percent": perc(len(clr_users), users.count()), "male": fmt(sum(v.user.is_male * user_scale[v.user.id] for v in votes)), "female": fmt(sum(v.user.is_female * user_scale[v.user.id] for v in votes)), } }, "users": clr_users, } for yr in range(9, 14): yr_votes = [v.user if v.user.grade and v.user.grade.number == yr else None for v in votes] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(yr_votes), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) choice = { "choice": "Total", "votes": { "total": { "all": users.count(), "votes_all": question_votes.count(), "all_percent": perc(users.count(), users.count()), "male": users.filter(gender=True).count(), "female": users.filter(gender__isnull=False, gender=False).count(), } }, } for yr in range(9, 14): yr_votes = [u if u.grade and u.grade.number == yr else None for u in users] yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": len(set(yr_votes)), "male": fmt(sum(u.is_male * user_scale[u.id] for u in yr_votes)), "female": fmt(sum(u.is_female * user_scale[u.id] for u in yr_votes)), } choices.append(choice) return {"question": q, "choices": choices, "user_scale": user_scale}
[docs]def handle_rank_choice(q, show_answers=False): # For some reason, this seems to be faster than select_related("question", "user", "choice") question_votes = votes = Answer.objects.filter(question=q).select_related() choices = [] c_set = q.choice_set.all().order_by("num") votes_total = q.get_total_votes() for c in c_set: votes = question_votes.filter(choice=c) votes_sum = sum(v.display_votes() for v in votes) choice = { "choice": c, "votes": { "total": { "all": votes_sum, "all_percent": perc(votes_sum, votes_total), "votes_all": votes_total, # Gender statistics commented out because of large performance hit # Also, Ion doesn't really get gender information anymore so it isn't very helpful anyway # "male": sum(v.display_votes() if v.user and v.user.is_male else 0 for v in votes), # "female": sum(v.display_votes() if v.user and v.user.is_female else 0 for v in votes), } }, "users": [v.user for v in votes] if show_answers else None, } for yr in range(9, 14): # Year 13 is staff yr_votes = votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) choice["votes"][yr] = { "all": sum(v.display_votes() for v in yr_votes), # "male": sum(v.display_votes() if v.user and v.user.is_male else 0 for v in yr_votes), # "female": sum(v.display_votes() if v.user and v.user.is_female else 0 for v in yr_votes), } choices.append(choice) choice = { "choice": "Total", "votes": {}, } for yr in range(9, 14): yr_votes = question_votes.filter(user__graduation_year=get_senior_graduation_year() + 12 - yr) yr_votes = list(filter(None, yr_votes)) choice["votes"][yr] = { "all": sum(v.display_votes() for v in yr_votes), # "male": sum(v.display_votes() if v.user.is_male else 0 for v in yr_votes), # "female": sum(v.display_votes() if v.user.is_female else 0 for v in yr_votes), } # Sort choices.sort(key=lambda e: e["votes"]["total"]["all"], reverse=True) all_sum = sum(v.display_votes() for v in question_votes) choice["votes"]["total"] = { "all": all_sum, "all_percent": perc(all_sum, votes_total), "votes_all": votes_total, # "male": sum(v.display_votes() if v.user.is_male else 0 for v in question_votes), # "female": sum(v.display_votes() if v.user.is_female else 0 for v in question_votes), } choices.append(choice) return {"question": q, "choices": choices}
[docs]def handle_choice(q, show_answers=False): question_votes = votes = Answer.objects.filter(question=q) total_count = question_votes.count() users = q.get_users_voted() choices = [] # Choices for c in q.choice_set.all().order_by("num"): if c: votes = question_votes.filter(choice=c) choices.append(generate_choice(c, votes, total_count, show_answers)) if q.type == "STO": other_votes = question_votes.filter(other_vote=True) choices.append(generate_choice("Total Votes for Other", other_votes, total_count, show_answers)) answers = {ans.answer for ans in other_votes} for ans in answers: votes = other_votes.filter(answer=ans) choices.append(generate_choice(f"Other: {ans}", votes, total_count, show_answers)) # Clear vote votes = question_votes.filter(clear_vote=True) choices.append(generate_choice("Clear vote", votes, total_count, show_answers)) # Sort choices.sort(key=lambda e: e["votes"]["total"]["all"], reverse=True) # Total total_choice = generate_choice("Total", question_votes, total_count, show_answers) total_choice["votes"]["total"]["users_all"] = users.count() choices.append(total_choice) return {"question": q, "choices": choices}
@login_required @deny_restricted def poll_results_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if poll.in_time_range(): messages.info(request, "Results may not be final because the poll is still open.") show_answers = request.GET.get("show_answers", False) if show_answers and poll.is_secret: messages.error(request, "User selections cannot be viewed for secret polls.") return redirect("poll_results", poll_id) questions = [] for q in poll.question_set.all(): if q.type == "SAP": # Split-approval; each person splits their one vote questions.append(handle_sap(q)) elif q.is_rank_choice(): questions.append(handle_rank_choice(q, show_answers)) elif q.is_choice(): questions.append(handle_choice(q, show_answers)) elif q.is_writing(): answers = Answer.objects.filter(question=q) question = {"question": q, "answers": answers} questions.append(question) context = {"poll": poll, "grades": range(9, 13), "questions": questions, "show_answers": show_answers} return render(request, "polls/results.html", context) @login_required @deny_restricted def election_winners_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if poll.in_time_range(): messages.error(request, "Warning: results may not be final because the poll is still open.") context = {"poll": poll, "results": determine_ranked_choice_winners(poll)} return render(request, "polls/winners.html", context)
[docs]def determine_ranked_choice_winners(poll): # pyrankvote's Candidate class is broken, so use a custom class instead class Candidate: """A candidate in the election.""" def __init__(self, name): self.name = name def __str__(self): return self.name def __repr__(self): return "<Candidate('%s')>" % self.name def __hash__(self): return hash(self.name) results = [] for q in poll.question_set.all(): if q.is_rank_choice(): # pyrankvote breaks in these cases if q.choice_set.count() == 0: results.append([q, "No answer choices", "None"]) continue if q.choice_set.count() == 1: winner = q.choice_set.first().display_name() results.append([q, f"{winner} ({q.answer_set.count()} votes)\nNote: There was only one choice for this question.", winner]) continue candidates = [] ballots = [] for c in q.choice_set.all(): candidates.append(Candidate(c.display_name())) for u in poll.get_users_voted(): ranked_candidates = [] for a in u.answer_set.filter(question=q).order_by("rank"): ranked_candidates.append(candidates[a.choice.num - 1]) ballots.append(Ballot(ranked_candidates=ranked_candidates)) result = pyrankvote.instant_runoff_voting(candidates, ballots) winner = result.get_winners()[0] results.append([q, str(result).strip(), str(winner)]) else: max_votes = 0 winner = "" for c in q.choice_set.all(): num_votes = q.answer_set.filter(choice=c).count() if num_votes > max_votes: max_votes = num_votes winner = c.display_name() results.append([q, f"{winner} ({max_votes} votes)", winner]) return results
@login_required @deny_restricted def add_poll_view(request): if not request.user.has_admin_permission("polls"): return redirect("polls") if request.method == "POST": form = PollForm(data=request.POST) question_data = request.POST.get("question_data", None) flag = True if not question_data: messages.error(request, "No question information was sent with your request!") flag = False if flag and form.is_valid(): question_data = json.loads(question_data) instance = form.save() process_question_data(instance, question_data) messages.success(request, "The poll has been created.") return redirect("polls") else: form = PollForm() context = {"action": "add", "action_title": "Add", "poll_questions": "[]", "poll_choices": "[]", "form": form, "is_polls_admin": True} return render(request, "polls/add_modify.html", context) @login_required @deny_restricted def modify_poll_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if not poll.before_end_time(): messages.error(request, "Warning: you are editing a closed poll.") if request.method == "POST": form = PollForm(data=request.POST, instance=poll) question_data = request.POST.get("question_data", None) flag = True if not question_data: messages.error(request, "No question information was sent with your request!") flag = False question_data = json.loads(question_data) if flag and form.is_valid(): instance = form.save() process_question_data(instance, question_data) messages.success(request, "The poll has been modified.") return redirect("polls") else: form = PollForm(instance=poll) context = { "action": "modify", "action_title": "Modify", "poll": poll, "poll_questions": serialize("json", poll.question_set.all()), "poll_choices": serialize("json", Choice.objects.filter(question__in=poll.question_set.all())), "form": form, "is_polls_admin": True, } return render(request, "polls/add_modify.html", context) @login_required @deny_restricted def delete_poll_view(request, poll_id): if not request.user.has_admin_permission("polls"): return redirect("polls") poll = get_object_or_404(Poll, id=poll_id) if not poll.before_end_time(): return redirect("polls") if request.method == "POST": poll.delete() messages.success(request, "The poll has been deleted!") return redirect("polls") return render(request, "polls/delete.html", {"poll": poll})
[docs]def process_question_data(instance, question_data): # Remove all questions not returned by client instance.question_set.exclude(pk__in=[x["pk"] for x in question_data if "pk" in x]).delete() count = 1 for q in question_data: question = None if not q.get("question", None): # Don't add question if no question is entered continue if "pk" in q: # Question already exists question = instance.question_set.get(pk=q["pk"]) question.question = safe_html(q["question"]).strip() question.num = count question.type = q.get("type", "STD") question.max_choices = q.get("max_choices", 1) question.save() # Delete all choices not returned by client question.choice_set.exclude(pk__in=[x["pk"] for x in q["choices"] if "pk" in x]).delete() else: # Question does not exist question = Question.objects.create( poll=instance, question=safe_html(q["question"]).strip(), num=count, type=q.get("type", "STD"), max_choices=q.get("max_choices", 1) ) choice_count = 1 for c in q.get("choices", []): if not c.get("info", None): # Don't add choice if no text is entered continue if "pk" in c: # Choice already exists choice = question.choice_set.get(pk=c["pk"]) choice.num = choice_count choice.info = safe_html(c["info"]).strip() choice.save() else: # Choice does not exist choice = Choice.objects.create(question=question, num=choice_count, info=safe_html(c["info"]).strip()) choice_count += 1 count += 1