Source code for intranet.apps.printing.views

import logging
import math
import os
import re
import subprocess
import tempfile
from io import BytesIO
from typing import Dict, Optional

import magic
from sentry_sdk import add_breadcrumb, capture_exception
from xhtml2pdf import pisa

from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.cache import cache
from django.shortcuts import redirect, render
from django.template.loader import get_template
from django.utils import timezone
from django.utils.text import slugify

from ..auth.decorators import deny_restricted
from ..context_processors import _get_current_ip
from .forms import PrintJobForm
from .models import PrintJob

logger = logging.getLogger(__name__)


[docs]class InvalidInputPrintingError(Exception): """An error occurred while printing, but it was due to invalid input from the user and is not worthy of a ``CRITICAL`` log message."""
[docs]def get_printers() -> Dict[str, str]: """Returns a dictionary mapping name:description for available printers. This requires that a CUPS client be configured on the server. Otherwise, this returns an empty dictionary. Returns: A dictionary mapping name:description for available printers. """ key = "printing:printers" cached = cache.get(key) if cached and isinstance(cached, dict): return cached else: try: output = subprocess.check_output(["lpstat", "-l", "-p"], universal_newlines=True, timeout=10) # Don't die if cups isn't installed. except FileNotFoundError: return [] # Don't die if lpstat fails except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return [] PRINTER_LINE_RE = re.compile(r"^printer\s+(\w+)\s+(?!disabled)", re.ASCII) DESCRIPTION_LINE_RE = re.compile(r"^\s+Description:\s+(.*)\s*$", re.ASCII) printers = {} last_name = None for line in output.splitlines(): match = PRINTER_LINE_RE.match(line) if match is not None: # Pull out the name of the printer name = match.group(1) if name != "Please_Select_a_Printer": # By default, use the name of the printer instead of the description printers[name] = name # Record the name of the printer so when we parse the rest of the # extended description we know which printer it's referring to. last_name = name else: # If we've seen a line with the name of a printer before if last_name is not None: match = DESCRIPTION_LINE_RE.match(line) if match is not None: # Pull out the description description = match.group(1) # And make sure we don't set an empty description if description: printers[last_name] = description last_name = None cache.set(key, printers, timeout=settings.CACHE_AGE["printers_list"]) return printers
[docs]def convert_soffice(tmpfile_name: str) -> Optional[str]: """Converts a doc or docx to a PDF with soffice. Args: tmpfile_name: The path to the file to print. Returns: The path to the converted file. If it fails, false. """ try: output = subprocess.check_output( ["soffice", "--headless", "--convert-to", "pdf", tmpfile_name, "--outdir", "/tmp"], stderr=subprocess.STDOUT, universal_newlines=True, timeout=60, ) except subprocess.CalledProcessError as e: logger.error("Could not run soffice command (returned %d): %s", e.returncode, e.output) return None if " -> " in output and " using " in output: # pylint: disable=unsupported-membership-test; Pylint is wrong fileout = output.split(" -> ", 2)[1] fileout = fileout.split(" using ", 1)[0] return fileout logger.error("soffice command succeeded, but we couldn't find the file name in the output: %r", output) return None
[docs]def convert_pdf(tmpfile_name: str, cmdname: str = "ps2pdf") -> Optional[str]: new_name = "{}.pdf".format(tmpfile_name) try: output = subprocess.check_output([cmdname, tmpfile_name, new_name], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: logger.error("Could not run %s command (returned %d): %s", cmdname, e.returncode, e.output) return None if os.path.isfile(new_name): return new_name logger.error("%s command succeeded, but the file it was supposed to create (%s) does not exist: %r", cmdname, new_name, output) return None
[docs]def get_numpages(tmpfile_name: str) -> int: try: output = subprocess.check_output(["pdfinfo", tmpfile_name], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: logger.error("Could not run pdfinfo command (returned %d): %s", e.returncode, e.output) return -1 lines = output.splitlines() num_pages = -1 pages_prefix = "Pages:" for line in lines: if line.startswith(pages_prefix): try: num_pages = int(line[len(pages_prefix):].strip()) except ValueError: num_pages = -1 return num_pages
# If a file is identified as a mimetype that is a key in this dictionary, the magic files (in the "magic_files" director) from the corresponding list # will be used to re-examine the file and attempt to find a better match. # Why not just always use those files? Well, if you give libmagic a list of files, it will check *only* the files you tell it to, excluding the # system-wide magic database. Worse, there is no reliable method of getting the system-wide database path (which is distro-specific, so we can't just # hardcode it). This really is the best solution. EXTRA_MAGIC_FILES = {"application/zip": ["msooxml"]} # If the re-examination of a file with EXTRA_MAGIC_FILES yields one of these mimetypes, the original mimetype (the one that prompted re-examining # based on EXTRA_MAGIC_FILES) will be used instead. GENERIC_MIMETYPES = {"application/octet-stream"}
[docs]def get_mimetype(tmpfile_name: str) -> str: mime = magic.Magic(mime=True) mimetype = mime.from_file(tmpfile_name) if mimetype in EXTRA_MAGIC_FILES: magic_files = ":".join(os.path.join(os.path.dirname(__file__), "magic_files", fname) for fname in EXTRA_MAGIC_FILES[mimetype]) mime = magic.Magic(mime=True, magic_file=magic_files) new_mimetype = mime.from_file(tmpfile_name) if new_mimetype not in GENERIC_MIMETYPES: mimetype = new_mimetype return mimetype
[docs]def convert_file(tmpfile_name: str, orig_fname: str) -> Optional[str]: detected = get_mimetype(tmpfile_name) add_breadcrumb(category="printing", message="Detected file type {}".format(detected), level="debug") no_conversion = ["application/pdf", "text/plain"] soffice_convert = [ "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", "application/vnd.oasis.opendocument.text", ] if detected in no_conversion: return tmpfile_name # .docx if detected in soffice_convert: return convert_soffice(tmpfile_name) if detected == "application/postscript": return convert_pdf(tmpfile_name, "pdf2ps") # Not detected if orig_fname.endswith((".doc", ".docx")): raise InvalidInputPrintingError( "Invalid file type {}<br>Note: It looks like you are trying to print a Word document. Word documents don't always print correctly, so we " "recommend that you convert to a PDF before printing.".format(detected) ) raise InvalidInputPrintingError("Invalid file type {}".format(detected))
[docs]def check_page_range(page_range: str, max_pages: int) -> Optional[int]: """Returns the number of pages included in the range, or None if it is an invalid range. Args: page_range: The page range as a string, such as "1-5" or "1,2,3". max_pages: The number of pages in the submitted document. If the number of pages in the given range exceeds this, it will be considered invalid. Returns: The number of pages in the range, or None if it is an invalid range. """ pages = 0 try: for single_range in page_range.split(","): # check all ranges separated by commas if "-" in single_range: if single_range.count("-") > 1: return None range_low, range_high = map(int, single_range.split("-")) # check in page range if range_low <= 0 or range_high <= 0 or range_low > max_pages or range_high > max_pages: return None if range_low > range_high: # check lower bound <= upper bound return None pages += range_high - range_low + 1 else: single_range = int(single_range) if single_range <= 0 or single_range > max_pages: # check in page range return None pages += 1 except ValueError: # catch int parse fail return None return pages
[docs]def html_to_pdf(template_src, filename, context=None): if context is None: context = {} template = get_template(template_src) html = template.render(context) result = BytesIO() pdf = pisa.pisaDocument(BytesIO(html.encode("ISO-8859-1")), result) if not pdf.err: filename_without_extension = os.path.basename(os.path.splitext(filename)[0]) filename = filename_without_extension + ".pdf" title_tmpfile_fd, title_tmpfile_name = tempfile.mkstemp(prefix=f"ion_title_print_{filename}") with open(title_tmpfile_fd, "wb") as f: f.write(result.getvalue()) return title_tmpfile_name return None
@login_required @deny_restricted def print_view(request): if _get_current_ip(request) not in settings.TJ_IPS and not request.user.has_admin_permission("printing"): messages.error(request, "You don't have printer access outside of the TJ network.") return redirect("index") printers = get_printers() if request.method == "POST": form = PrintJobForm(request.POST, request.FILES, printers=printers) if form.is_valid(): obj = form.save(commit=True) obj.user = request.user obj.save() try: print_job(obj) except InvalidInputPrintingError as e: messages.error(request, str(e)) except Exception as e: messages.error(request, str(e)) logging.error("Printing failed: %s", e) capture_exception(e) else: messages.success( request, "Your file was submitted to the printer. " "Do not re-print this job if it does not come out of the printer - " "in nearly all cases, the job has been received and re-printing " "will cause multiple copies to be printed. " "Ask for help instead by contacting the " "Student Systems Administrators by filling out the feedback form.", ) else: form = PrintJobForm(printers=printers) context = {"form": form} return render(request, "printing/print.html", context)