Source code for intranet.apps.printing.views

import logging
import math
import os
import re
import subprocess
import tempfile
from io import BytesIO
from typing import Dict, Optional

import magic
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
from django.shortcuts import redirect, render
from django.template.loader import get_template
from django.utils import timezone
from django.utils.text import slugify
from sentry_sdk import add_breadcrumb, capture_exception
from xhtml2pdf import pisa

from ..auth.decorators import deny_restricted
from ..context_processors import _get_current_ip
from .forms import PrintJobForm
from .models import PrintJob

logger = logging.getLogger(__name__)


[docs]class InvalidInputPrintingError(Exception): """An error occurred while printing, but it was due to invalid input from the user and is not worthy of a ``CRITICAL`` log message."""
[docs]class RatelimitCacheError(Exception): """An error occurred while accessing the cache to rate limit a user"""
[docs]class RatelimitExceededError(Exception): """An error occurred because the user exceeded the printing rate limit"""
[docs]def get_user_ratelimit_status(username: str) -> bool: cache_key = f"printing_ratelimit:{username}" value = cache.get(cache_key, None) if value is None or value < settings.PRINT_RATELIMIT_FREQUENCY: # User did not go over the rate limit return False elif value >= settings.PRINT_RATELIMIT_FREQUENCY: return True else: raise RatelimitCacheError("An error occurred while trying to get your rate limit status")
[docs]def set_user_ratelimit_status(username: str) -> None: cache_key = f"printing_ratelimit:{username}" value = cache.get(cache_key, None) if value is None: # Set the key to expire in the time specified by settings and indicate the user has requested once so far cache.set(cache_key, 1, settings.PRINT_RATELIMIT_MINUTES * 60) elif value >= 1: cache.incr(cache_key)
[docs]def get_printers() -> Dict[str, str]: """Returns a dictionary mapping name:description for available printers. This requires that a CUPS client be configured on the server. Otherwise, this returns an empty dictionary. Returns: A dictionary mapping name:description for available printers. """ key = "printing:printers" cached = cache.get(key) if cached and isinstance(cached, dict): return cached else: try: output = subprocess.check_output(["lpstat", "-l", "-p"], universal_newlines=True, timeout=10) # Don't die if cups isn't installed. except FileNotFoundError: return [] # Don't die if lpstat fails except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return [] PRINTER_LINE_RE = re.compile(r"^printer\s+(\w+)\s+(?!disabled)", re.ASCII) DESCRIPTION_LINE_RE = re.compile(r"^\s+Description:\s+(.*)\s*$", re.ASCII) printers = {} last_name = None for line in output.splitlines(): match = PRINTER_LINE_RE.match(line) if match is not None: # Pull out the name of the printer name = match.group(1) if name != "Please_Select_a_Printer": # By default, use the name of the printer instead of the description printers[name] = name # Record the name of the printer so when we parse the rest of the # extended description we know which printer it's referring to. last_name = name elif last_name is not None: match = DESCRIPTION_LINE_RE.match(line) if match is not None: # Pull out the description description = match.group(1) # And make sure we don't set an empty description if description: printers[last_name] = description last_name = None cache.set(key, printers, timeout=settings.CACHE_AGE["printers_list"]) return printers
[docs]def convert_soffice(tmpfile_name: str) -> Optional[str]: """Converts a doc or docx to a PDF with soffice. Args: tmpfile_name: The path to the file to print. Returns: The path to the converted file. If it fails, false. """ try: output = subprocess.check_output( ["soffice", "--headless", "--convert-to", "pdf", tmpfile_name, "--outdir", "/tmp"], stderr=subprocess.STDOUT, universal_newlines=True, timeout=60, ) except subprocess.CalledProcessError as e: logger.error("Could not run soffice command (returned %d): %s", e.returncode, e.output) return None if " -> " in output and " using " in output: # pylint: disable=unsupported-membership-test; Pylint is wrong fileout = output.split(" -> ", 2)[1] fileout = fileout.split(" using ", 1)[0] return fileout logger.error("soffice command succeeded, but we couldn't find the file name in the output: %r", output) return None
[docs]def convert_pdf(tmpfile_name: str, cmdname: str = "ps2pdf") -> Optional[str]: new_name = f"{tmpfile_name}.pdf" try: output = subprocess.check_output([cmdname, tmpfile_name, new_name], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: logger.error("Could not run %s command (returned %d): %s", cmdname, e.returncode, e.output) return None if os.path.isfile(new_name): return new_name logger.error("%s command succeeded, but the file it was supposed to create (%s) does not exist: %r", cmdname, new_name, output) return None
[docs]def get_numpages(tmpfile_name: str) -> int: try: output = subprocess.check_output(["pdfinfo", tmpfile_name], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: logger.error("Could not run pdfinfo command (returned %d): %s", e.returncode, e.output) return -1 lines = output.splitlines() num_pages = -1 pages_prefix = "Pages:" for line in lines: if line.startswith(pages_prefix): try: num_pages = int(line[len(pages_prefix) :].strip()) except ValueError: num_pages = -1 return num_pages
# If a file is identified as a mimetype that is a key in this dictionary, the magic files (in the "magic_files" # director) from the corresponding list will be used to re-examine the file and attempt to find a better match. Why # not just always use those files? Well, if you give libmagic a list of files, it will check *only* the files you # tell it to, excluding the system-wide magic database. Worse, there is no reliable method of getting the system-wide # database path (which is distro-specific, so we can't just hardcode it). This really is the best solution. EXTRA_MAGIC_FILES = {"application/zip": ["msooxml"]} # If the re-examination of a file with EXTRA_MAGIC_FILES yields one of these mimetypes, the original mimetype (the # one that prompted re-examining based on EXTRA_MAGIC_FILES) will be used instead. GENERIC_MIMETYPES = {"application/octet-stream"}
[docs]def get_mimetype(tmpfile_name: str) -> str: mime = magic.Magic(mime=True) mimetype = mime.from_file(tmpfile_name) if mimetype in EXTRA_MAGIC_FILES: magic_files = ":".join(os.path.join(os.path.dirname(__file__), "magic_files", fname) for fname in EXTRA_MAGIC_FILES[mimetype]) mime = magic.Magic(mime=True, magic_file=magic_files) new_mimetype = mime.from_file(tmpfile_name) if new_mimetype not in GENERIC_MIMETYPES: mimetype = new_mimetype return mimetype
[docs]def convert_file(tmpfile_name: str, orig_fname: str) -> Optional[str]: detected = get_mimetype(tmpfile_name) add_breadcrumb(category="printing", message=f"Detected file type {detected}", level="debug") no_conversion = ["application/pdf", "text/plain"] soffice_convert = [ "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", "application/vnd.oasis.opendocument.text", ] if detected in no_conversion: return tmpfile_name # .docx if detected in soffice_convert: return convert_soffice(tmpfile_name) if detected == "application/postscript": return convert_pdf(tmpfile_name, "pdf2ps") # Not detected if orig_fname.endswith((".doc", ".docx")): raise InvalidInputPrintingError( f"Invalid file type {detected}<br>Note: It looks like you are trying to print a Word document. " "Word documents don't always print correctly, so we recommend that you convert to a PDF before printing." ) raise InvalidInputPrintingError(f"Invalid file type {detected}")
[docs]def check_page_range(page_range: str, max_pages: int) -> Optional[int]: """Returns the number of pages included in the range, or None if it is an invalid range. Args: page_range: The page range as a string, such as "1-5" or "1,2,3". max_pages: The number of pages in the submitted document. If the number of pages in the given range exceeds this, it will be considered invalid. Returns: The number of pages in the range, or None if it is an invalid range. """ pages = 0 try: for single_range in page_range.split(","): # check all ranges separated by commas if "-" in single_range: if single_range.count("-") > 1: return None range_low, range_high = map(int, single_range.split("-")) # check in page range if range_low <= 0 or range_high <= 0 or range_low > max_pages or range_high > max_pages: return None if range_low > range_high: # check lower bound <= upper bound return None pages += range_high - range_low + 1 else: single_range = int(single_range) if single_range <= 0 or single_range > max_pages: # check in page range return None pages += 1 except ValueError: # catch int parse fail return None return pages
[docs]def html_to_pdf(template_src, filename, context=None): if context is None: context = {} template = get_template(template_src) html = template.render(context) result = BytesIO() pdf = pisa.pisaDocument(BytesIO(html.encode("ISO-8859-1")), result) if not pdf.err: filename_without_extension = os.path.basename(os.path.splitext(filename)[0]) filename = filename_without_extension + ".pdf" title_tmpfile_fd, title_tmpfile_name = tempfile.mkstemp(prefix=f"ion_title_print_{filename}") with open(title_tmpfile_fd, "wb") as f: f.write(result.getvalue()) return title_tmpfile_name return None
@login_required @deny_restricted def print_view(request): if _get_current_ip(request) not in settings.TJ_IPS and not request.user.has_admin_permission("printing"): messages.error(request, "You don't have printer access outside of the TJ network.") return redirect("index") printers = get_printers() if request.method == "POST": form = PrintJobForm(request.POST, request.FILES, printers=printers) if form.is_valid(): obj = form.save(commit=True) obj.user = request.user obj.save() try: print_job(obj) except InvalidInputPrintingError as e: messages.error(request, str(e)) except Exception as e: messages.error(request, str(e)) logging.error("Printing failed: %s", e) capture_exception(e) else: messages.success( request, "Your file was submitted to the printer. " "Do not re-print this job if it does not come out of the printer - " "in nearly all cases, the job has been received and re-printing " "will cause multiple copies to be printed. " "Ask for help instead by contacting the " "Student Systems Administrators by filling out the feedback form.", ) else: form = PrintJobForm(printers=printers) context = {"form": form} return render(request, "printing/print.html", context)