import csv
import logging
import re
from typing import List, Optional
from cacheops import invalidate_model, invalidate_obj
from django import http
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator
from django.db.models import Q
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from formtools.wizard.views import SessionWizardView
from ....auth.decorators import eighth_admin_required
from ....groups.models import Group
from ....search.views import get_search_results
from ....users.models import User
from ...forms.admin.activities import ActivitySelectionForm, ScheduledActivityMultiSelectForm
from ...forms.admin.blocks import BlockSelectionForm
from ...forms.admin.groups import GroupForm, QuickGroupForm, UploadGroupForm
from ...models import EighthActivity, EighthBlock, EighthScheduledActivity, EighthSignup
from ...tasks import eighth_admin_signup_group_task
from ...utils import get_start_date
logger = logging.getLogger(__name__)
[docs]@eighth_admin_required
def add_group_view(request):
if request.method == "POST":
form = QuickGroupForm(request.POST)
if form.is_valid():
if not request.user.can_manage_group(form.cleaned_data["name"]):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
group = form.save()
messages.success(request, "Successfully added group.")
return redirect("eighth_admin_edit_group", group_id=group.id)
else:
messages.error(request, "Error adding group.")
return redirect("eighth_admin_dashboard")
else:
return http.HttpResponseNotAllowed(["POST"], "405: METHOD NOT ALLOWED")
[docs]@eighth_admin_required
def edit_group_view(request, group_id):
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
if not request.user.can_manage_group(group):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
if request.method == "POST":
invalidate_model(Group)
if group.name.lower().startswith("all students"):
cache.delete("users:students")
if "remove_all" in request.POST:
users = group.user_set.all()
num = users.count()
for u in users:
group.user_set.remove(u)
group.save()
invalidate_obj(group)
messages.success(request, f"Successfully deleted {num} members of the group.")
return redirect("eighth_admin_edit_group", group.id)
form = GroupForm(request.POST, instance=group)
if form.is_valid():
if "student_visible" in form.cleaned_data:
props = group.properties
props.student_visible = form.cleaned_data["student_visible"]
props.save()
invalidate_obj(props)
form.save()
messages.success(request, "Successfully edited group.")
return redirect("eighth_admin_dashboard")
else:
messages.error(request, "Error modifying group.")
else:
form = GroupForm(instance=group, initial={"student_visible": group.properties.student_visible})
student_query = None
if request.method == "GET":
student_query = request.GET.get("q", None)
if not student_query:
users = group.user_set.all() # Order not strictly alphabetical
else:
ion_ids = [sid.strip() for sid in student_query.split(",")]
users = group.user_set.filter(username__in=ion_ids)
users = users.order_by("username", "first_name", "last_name", "student_id")
p = Paginator(users, 100) # Paginating to limit LDAP queries (slow)
page_num = request.GET.get("p", 1)
try:
page = p.page(page_num)
except PageNotAnInteger:
page = p.page(1)
except EmptyPage:
page = p.page(p.num_pages)
members = []
for user in page:
grade = user.grade
members.append(
{
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"student_id": user.student_id,
"email": user.tj_email,
"grade": grade.number if user.grade and not user.grade.number == 13 else "Staff",
}
)
members = sorted(members, key=lambda m: (m["last_name"], m["first_name"]))
linked_activities = EighthActivity.objects.filter(groups_allowed=group)
def parse_int(value):
try:
return int(value)
except ValueError:
return None
context = {
"group": group,
"members": members,
"member_count": users.count(),
"members_page": page,
"edit_form": form,
"added_ids": [parse_int(x) for x in request.GET.getlist("added")],
"linked_activities": linked_activities,
"admin_page_title": "Edit Group",
"delete_url": reverse("eighth_admin_delete_group", args=[group_id]),
}
if "possible_student" in request.GET:
student_ids = request.GET.getlist("possible_student")
possible_students = get_user_model().objects.get(id__in=student_ids)
context["possible_students"] = possible_students
return render(request, "eighth/admin/edit_group.html", context)
[docs]def get_file_string(fileobj):
filetext = ""
for chunk in fileobj.chunks():
filetext += chunk.decode("ISO-8859-1")
return filetext
[docs]def get_user_info(key: str, val) -> Optional[List[User]]:
if key in ["username", "id"]:
try:
u = get_user_model().objects.filter(**{key: val})
except ValueError:
return []
return u
if key == "student_id":
u = get_user_model().objects.user_with_student_id(val)
return [u] if u else []
if key == "name":
if re.match("^[A-Za-z ]*$", val):
vals = val.split(" ")
if len(vals) == 2:
u = get_user_model().objects.user_with_name(vals[0], vals[1])
if u:
return [u]
elif len(vals) == 3:
u = get_user_model().objects.user_with_name(vals[0], vals[2])
if u:
return [u]
elif len(vals) == 1:
# Try last name
u = get_user_model().objects.user_with_name(None, vals[0])
if u:
return [u]
else:
# Try first name
u = get_user_model().objects.user_with_name(vals[0], None)
if u:
return [u]
return []
return None
[docs]@eighth_admin_required
def upload_group_members_view(request, group_id):
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
if not request.user.can_manage_group(group):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
stage = "upload"
data = {}
filetext = False
if request.method == "POST":
form = UploadGroupForm(request)
if "file" in request.FILES:
fileobj = request.FILES["file"]
if "text/" not in fileobj.content_type:
messages.error(request, "The uploaded file is not of the correct type, plain text.")
return redirect("eighth_admin_edit_group", group.id)
filetext = get_file_string(fileobj)
elif "filetext" in request.POST:
filetext = request.POST.get("filetext")
elif "user_id" in request.POST:
userids = request.POST.getlist("user_id")
num_added = 0
for uid in userids:
user = get_user_model().objects.get(id=uid)
if user is None:
messages.error(request, f"User with ID {uid} does not exist")
elif user.groups.filter(id=group.id).exists():
messages.warning(request, f"User {user.username} is already in group")
else:
user.groups.add(group)
user.save()
num_added += 1
invalidate_obj(group)
messages.success(request, f"{num_added} added to group {group}")
return redirect("eighth_admin_edit_group", group.id)
elif "import_group" in request.POST:
try:
import_group = Group.objects.get(id=request.POST["import_group"])
except Group.DoesNotExist as e:
raise http.Http404 from e
num_users = 0
if "import_confirm" in request.POST:
for member in import_group.user_set.all():
if member.groups.filter(id=group.id).exists():
messages.warning(request, f"User {member.username} is already in group")
else:
member.groups.add(group)
member.save()
num_users += 1
invalidate_obj(group)
messages.success(request, f"Added {num_users} users from {import_group} to {group}")
return redirect("eighth_admin_edit_group", group.id)
return render(
request,
"eighth/admin/upload_group.html",
{
"admin_page_title": f"Import Group Members: {group}",
"stage": "import_confirm",
"group": group,
"import_group": import_group,
"num_users": num_users,
},
)
else:
form = UploadGroupForm()
all_groups = Group.objects.order_by("name")
context = {
"admin_page_title": f"Upload/Import Group Members: {group}",
"form": form,
"stage": stage,
"data": data,
"group": group,
"all_groups": all_groups,
}
if filetext:
context["stage"] = "parse"
data = handle_group_input(filetext)
context["sure_users"], context["unsure_users"] = data
return render(request, "eighth/admin/upload_group.html", context)
[docs]@eighth_admin_required
def delete_group_view(request, group_id):
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
if not request.user.can_manage_group(group):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
if request.method == "POST":
group.delete()
messages.success(request, "Successfully deleted group.")
return redirect("eighth_admin_dashboard")
else:
context = {
"admin_page_title": "Delete Group",
"item_name": str(group),
"help_text": "Deleting this group will remove all records " "of it related to eighth period.",
}
return render(request, "eighth/admin/delete_form.html", context)
[docs]@eighth_admin_required
def download_group_csv_view(request, group_id):
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
response = http.HttpResponse(content_type="text/csv")
response["Content-Disposition"] = f'attachment; filename="{group.name}.csv"'
writer = csv.writer(response)
writer.writerow(["Last Name", "First Name", "Student ID", "Grade", "Email"])
users = group.user_set.all()
users = sorted(users, key=lambda m: (m.last_name, m.first_name))
for user in users:
row = []
row.append(user.last_name)
row.append(user.first_name)
row.append(user.student_id)
grade = user.grade
row.append(grade.number if grade else "Staff")
row.append(user.tj_email)
writer.writerow(row)
return response
[docs]class EighthAdminSignUpGroupWizard(SessionWizardView):
FORMS = [("block", BlockSelectionForm), ("activity", ActivitySelectionForm)]
TEMPLATES = {"block": "eighth/admin/sign_up_group.html", "activity": "eighth/admin/sign_up_group.html"}
[docs] def get_template_names(self):
return [self.TEMPLATES[self.steps.current]]
[docs] def get_context_data(self, form, **kwargs):
context = super().get_context_data(form=form, **kwargs)
block = self.get_cleaned_data_for_step("block")
if block:
context.update({"block_obj": block["block"]})
context.update({"admin_page_title": "Sign Up Group"})
return context
[docs] def done(self, form_list, **kwargs):
form_list = list(form_list)
block = form_list[0].cleaned_data["block"]
activity = form_list[1].cleaned_data["activity"]
scheduled_activity = EighthScheduledActivity.objects.get(block=block, activity=activity)
try:
group = Group.objects.get(id=kwargs["group_id"])
except Group.DoesNotExist as e:
raise http.Http404 from e
return redirect(reverse("eighth_admin_signup_group_action", args=[group.id, scheduled_activity.id]))
eighth_admin_signup_group = eighth_admin_required(EighthAdminSignUpGroupWizard.as_view(EighthAdminSignUpGroupWizard.FORMS))
[docs]def eighth_admin_signup_group_action(request, group_id, schact_id):
scheduled_activity = get_object_or_404(EighthScheduledActivity, id=schact_id)
group = get_object_or_404(Group, id=group_id)
users = group.user_set.all()
# Sticky check
sticky_users_and_activities = {}
for user in users:
sticky_activity_signup = EighthSignup.objects.filter(user=user, scheduled_activity__block=scheduled_activity.block).filter(
Q(scheduled_activity__activity__sticky=True) | Q(scheduled_activity__sticky=True)
)
if sticky_activity_signup.exists():
sticky_users_and_activities[user] = sticky_activity_signup[0].scheduled_activity
sticky_users_and_activities = dict(sorted(sticky_users_and_activities.items(), key=lambda x: x[0].last_name))
if not users.exists():
messages.info(request, "The group you have selected has no members.")
return redirect("eighth_admin_dashboard")
if "confirm" in request.POST:
skip_users = set(sticky_users_and_activities.keys())
for user in request.POST.getlist("remove_from_sticky"):
try:
user = User.objects.get(username=user)
skip_users.remove(user)
except (User.DoesNotExist, ValueError):
messages.warning(request, f"Error signing up user {user} for activity.")
if request.POST.get("run_in_background"):
eighth_admin_signup_group_task.delay(user_id=request.user.id, group_id=group_id, schact_id=schact_id, skip_users=skip_users)
messages.success(request, "Group members are being signed up in the background.")
return redirect("eighth_admin_dashboard")
else:
eighth_admin_perform_group_signup(group_id=group_id, schact_id=schact_id, request=request, skip_users=skip_users)
messages.success(request, "Successfully signed up group for activity.")
return redirect("eighth_admin_dashboard")
return render(
request,
"eighth/admin/sign_up_group.html",
{
"admin_page_title": "Confirm Group Signup",
"scheduled_activity": scheduled_activity,
"group": group,
"users_num": users.count(),
"sticky_users_and_activities": sticky_users_and_activities,
},
)
[docs]class EighthAdminDistributeGroupWizard(SessionWizardView):
FORMS = [("block", BlockSelectionForm), ("activity", ScheduledActivityMultiSelectForm)]
TEMPLATES = {
"block": "eighth/admin/distribute_group.html",
"activity": "eighth/admin/distribute_group.html",
"choose": "eighth/admin/distribute_group.html",
}
[docs] def get_template_names(self):
return [self.TEMPLATES[self.steps.current]]
[docs] def dispatch(self, request, *args, **kwargs):
self.group_id = kwargs.get("group_id", None) # pylint: disable=attribute-defined-outside-init
try:
self.group = Group.objects.get(id=self.group_id) # pylint: disable=attribute-defined-outside-init
except Group.DoesNotExist as e:
if self.request.resolver_match.url_name == "eighth_admin_distribute_unsigned":
self.group = False # pylint: disable=attribute-defined-outside-init
else:
raise http.Http404 from e
return super().dispatch(request, *args, **kwargs)
[docs] def get_context_data(self, form, **kwargs):
context = super().get_context_data(form=form, **kwargs)
block = self.get_cleaned_data_for_step("block")
if self.group:
context.update({"group": self.group})
elif block:
unsigned = block["block"].get_unsigned_students()
context.update({"users": unsigned, "eighthblock": block["block"]})
if "block" in self.request.GET:
block_id = self.request.GET["block"]
context["redirect_block_id"] = block_id
if self.request.resolver_match.url_name == "eighth_admin_distribute_unsigned":
context.update({"users_type": "unsigned"})
context.update({"group": False})
context.update({"admin_page_title": "Distribute Group Members Among Activities"})
return context
[docs] def done(self, form_list, **kwargs):
form_list = list(form_list)
block = form_list[0].cleaned_data["block"]
activities = form_list[1].cleaned_data["activities"]
schact_ids = []
for act in activities:
try:
schact = EighthScheduledActivity.objects.get(block=block, activity=act)
schact_ids.append(schact.id)
except EighthScheduledActivity.DoesNotExist as e:
raise http.Http404 from e
args = "".join(f"&schact={said}" for said in schact_ids)
if "group_id" in kwargs:
gid = kwargs["group_id"]
args += f"&group={gid}"
if self.request.resolver_match.url_name == "eighth_admin_distribute_unsigned":
args += f"&unsigned=1&block={block.id}"
return redirect(f"/eighth/admin/groups/distribute_action?{args}")
eighth_admin_distribute_group = eighth_admin_required(EighthAdminDistributeGroupWizard.as_view(EighthAdminDistributeGroupWizard.FORMS))
eighth_admin_distribute_unsigned = eighth_admin_required(EighthAdminDistributeGroupWizard.as_view(EighthAdminDistributeGroupWizard.FORMS))
[docs]@eighth_admin_required
def eighth_admin_distribute_action(request):
block = None
if "users" in request.POST:
activity_user_map = {}
for item in request.POST:
if item[:6] == "schact":
try:
sid = int(item[6:])
schact = EighthScheduledActivity.objects.get(id=sid)
except EighthScheduledActivity.DoesNotExist:
messages.error(request, f"ScheduledActivity does not exist with id {sid}")
userids = request.POST.getlist(item)
activity_user_map[schact] = userids
sticky_users_and_activities = {}
for schact, userids in activity_user_map.items():
for uid in userids:
user = get_user_model().objects.get(id=uid)
sticky_activity_signup = EighthSignup.objects.filter(user=user, scheduled_activity__block=schact.block).filter(
Q(scheduled_activity__activity__sticky=True) | Q(scheduled_activity__sticky=True)
)
if sticky_activity_signup.exists():
sticky_users_and_activities[uid] = sticky_activity_signup[0].scheduled_activity
skip_users = set(sticky_users_and_activities.keys())
for uid in request.POST.getlist("remove_from_sticky"):
skip_users.remove(uid)
changes = 0
for schact, userids in activity_user_map.items():
for uid in userids:
if uid not in skip_users:
changes += 1
schact.add_user(get_user_model().objects.get(id=int(uid)), request=request, force=True, no_after_deadline=True)
messages.success(request, f"Successfully completed {changes} activity signups.")
return redirect("eighth_admin_dashboard")
elif "schact" in request.GET:
schactids = request.GET.getlist("schact")
schacts = []
for schact in schactids:
try:
sch = EighthScheduledActivity.objects.get(id=schact)
schacts.append(sch)
except EighthScheduledActivity.DoesNotExist as e:
raise http.Http404 from e
users = []
users_type = ""
if "group" in request.GET:
group = Group.objects.get(id=request.GET.get("group"))
users = group.user_set.all()
users_type = "group"
elif "unsigned" in request.GET:
unsigned = []
if "block" in request.GET:
blockid = request.GET.get("block")
block = EighthBlock.objects.get(id=blockid)
else:
raise http.Http404
unsigned = get_user_model().objects.get_students().exclude(eighthsignup__scheduled_activity__block__id=blockid)
users = unsigned
users_type = "unsigned"
if "limit" in request.GET:
users = users[0 : int(request.GET.get("limit"))]
# Sort by last name
users = sorted(users, key=lambda x: x.last_name)
# Sticky check
sticky_users_and_activities = {}
for user in users:
sticky_activity_signup = EighthSignup.objects.filter(
user=user,
scheduled_activity__block=block if users_type == "unsigned" else schacts[0].block, # pylint: disable=used-before-assignment
).filter(Q(scheduled_activity__activity__sticky=True) | Q(scheduled_activity__sticky=True))
if sticky_activity_signup.exists():
sticky_users_and_activities[user] = sticky_activity_signup[0].scheduled_activity
sticky_users_and_activities = dict(sorted(sticky_users_and_activities.items(), key=lambda x: x[0].last_name))
context = {
"admin_page_title": "Distribute Group Members Across Activities",
"users_type": users_type,
"group": group if users_type == "group" else None,
"eighthblock": block if users_type == "unsigned" else None,
"schacts": schacts,
"users": users,
"show_selection": True,
"sticky_users_and_activities": sticky_users_and_activities,
}
return render(request, "eighth/admin/distribute_group.html", context)
else:
return redirect("eighth_admin_dashboard")
[docs]@eighth_admin_required
def add_member_to_group_view(request, group_id):
if request.method != "POST":
return http.HttpResponseNotAllowed(["POST"], "HTTP 405: METHOD NOT ALLOWED")
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
if not request.user.can_manage_group(group):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
next_url = reverse("eighth_admin_edit_group", kwargs={"group_id": group_id})
if "user_id" in request.POST:
user_ids = request.POST.getlist("user_id")
user_objects = get_user_model().objects.filter(id__in=user_ids).exclude(groups=group)
next_url += "?"
for user in user_objects:
user.groups.add(group)
user.save()
if len(user_objects) < 25:
next_url += f"added={user.id}&"
invalidate_obj(group)
messages.success(request, "Successfully added {} user{} to the group.".format(len(user_objects), "s" if len(user_objects) != 1 else ""))
return redirect(next_url)
if "query" not in request.POST:
return redirect(next_url + "?error=s")
query = request.POST["query"]
from_sid = get_user_model().objects.user_with_student_id(query)
if from_sid:
if not from_sid.groups.filter(id=group.id).exists():
from_sid.groups.add(group)
from_sid.save()
messages.success(request, f'Successfully added user "{from_sid.full_name}" to the group.')
return redirect(next_url + "?added=" + str(from_sid.id))
errors, results = get_search_results(query)
if errors:
messages.error(request, "Could not process search query.")
return redirect(next_url + "?error=n")
if not results:
return redirect(next_url + "?error=n")
else:
users = sorted(results, key=lambda x: (x.last_name, x.first_name))
context = {"query": query, "users": users, "group": group, "admin_page_title": "Add Members to Group"}
return render(request, "eighth/admin/possible_students_add_group.html", context)
[docs]@eighth_admin_required
def remove_member_from_group_view(request, group_id, user_id):
if request.method != "POST":
return http.HttpResponseNotAllowed(["POST"], "HTTP 405: METHOD NOT ALLOWED")
try:
group = Group.objects.get(id=group_id)
except Group.DoesNotExist as e:
raise http.Http404 from e
if not request.user.can_manage_group(group):
messages.error(request, "You must be a superuser on Ion to manage administrative groups")
return redirect("eighth_admin_dashboard")
uid = request.POST.get("profile_id", 0)
if uid:
next_url = reverse("user_profile", args=(uid,))
else:
next_url = reverse("eighth_admin_edit_group", kwargs={"group_id": group_id})
try:
user = get_user_model().objects.get(id=user_id)
except get_user_model().DoesNotExist:
messages.error(request, "There was an error removing this user.")
return redirect(next_url, status=400)
group.user_set.remove(user)
group.save()
invalidate_obj(group)
messages.success(request, f'Successfully removed user "{user.full_name}" from the group.')
return redirect(next_url)
[docs]@eighth_admin_required
def delete_empty_groups_view(request):
empty_groups = [g for g in Group.objects.all() if g.user_set.all().count() == 0]
if request.method == "POST":
for g in empty_groups:
g.delete()
messages.success(request, "Successfully removed empty groups.")
return redirect(reverse("eighth_admin_dashboard"))
context = {
"admin_page_title": "Delete Empty Groups",
"groups": empty_groups,
}
return render(request, "eighth/admin/delete_empty_groups.html", context=context)