"""
Document class
Combines best of Gemini and my original document class.
renamer moved into utilities.
v 1.0 2025-12-06
"""
from datetime import datetime
import logging
import re
import subprocess
import unicodedata
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict, Optional, Tuple, Any, List
from nameparser import HumanName
import pymupdf # type: ignore[import-untyped] # fitz
from rapidfuzz import fuzz
from tqdm import tqdm
from .arxiv import lookup_arxiv
from .crossref import lookup_doi, search as lookup_xref_search
from .utilities import sanitize_windows_component
from .bibtex import dict_to_bibtex_crossref, format_mendeley_file
from .hasher import hash_many3
logger = logging.getLogger(__name__)
# Mapping Crossref types to BibTeX types
CROSSREF_TO_BIBTEX = {
"journal-article": "article",
"book-chapter": "incollection",
"proceedings-article": "inproceedings",
"monograph": "book",
"book": "book",
"report": "techreport",
"dissertation": "phdthesis",
"preprint": "article", # Often best fit for arXiv
}
[docs]
class Document:
"""
Manages a physical PDF document.
Uses a Gather -> Rank -> Verify strategy to reconcile Metadata, Filenames, and OCR.
"""
def __init__(self, doc_path: Path, book_mode: bool = False):
self.doc_path = Path(doc_path)
self._new_doc_path: Optional[Path] = None
self._text: str = ""
self.book_mode = book_mode
self.hash: str = ""
# Operational Status
self.status = "NEW" # NEW, SUCCESS, REVIEW_NEEDED, FAILED
self.confidence_score = 0 # 0 to 100
self.log_messages = []
# The final chosen metadata
self.bib: Dict[str, str] = {
"type": "book" if book_mode else "article",
"title": "",
"author": "",
"year": "",
"month": "",
"day": "",
"doi": "",
"arxiv_id": "",
"journal": "", # For articles
"booktitle": "", # For chapters/proceedings
"publisher": "",
"volume": "",
"number": "",
"pages": "",
}
# Candidate data from different sources
self.candidates = {
"filename": {},
"metadata": {},
"visual": {},
"api": {},
}
def __repr__(self):
return f"Document({self.doc_path.name}) [{self.status}]"
[docs]
def key(self):
"""A reasonable default key to make reviewing easy. Filename based."""
n = self.doc_path.stem
n = "".join(c for c in n if c.isalnum())
return "f" + n[:20]
[docs]
def process(self):
"""
Orchestrates the discovery pipeline by prioritizing evidence:
1. Gather: Collect raw info from Filename, PDF Metadata, and Visual OCR.
2. Prioritized Enhance: Attempt lookup using a found DOI or ArXiv ID. If successful,
accept the result as definitive.
3. Fallback Enhance: If no ID was found, determine the best local 'Anchor',
search external APIs, and validate the results.
"""
self.log_messages.append(f"Starting processing for {self.doc_path.name}")
# 1. Gather
self._parse_filename()
self._step_metadata()
self._step_visual()
# 2. Prioritized Enhancement via found ID
visual_ids = self.candidates["visual"].get("doi") or self.candidates["visual"].get(
"arxiv_id"
)
if visual_ids:
self.log_messages.append(
"Found potential DOI/ArXiv ID from visual scan. Attempting direct lookup."
)
self._step_id_lookup(self.candidates["visual"])
if self.candidates.get("api"):
self.log_messages.append(
f"Direct lookup successful. API result: {self.candidates['api']}"
)
# High confidence - this is the best source of truth.
self.bib.update(self.candidates["api"])
self.status = "SUCCESS"
self.confidence_score = 100 # 100% confidence in a direct ID lookup
self.log_messages.append(
"Setting status to SUCCESS with 100% confidence based on ID lookup."
)
return # End of processing
# 3. Fallback to Anchor-based Search
self.log_messages.append(
"No definitive ID found or lookup failed. Falling back to anchor-based search."
)
anchor_source, anchor_data = self._determine_anchor()
self.log_messages.append(f"Selected anchor source: '{anchor_source}' with data: {anchor_data}")
if not anchor_data.get("title"):
self.log_messages.append("Anchor has no title. Cannot proceed with search.")
self.bib["title"] = self.doc_path.stem.replace("_", " ")
self.status = "FAILED"
return
self._step_search_api(anchor_data)
# 4. Verify & Merge for anchor-based search
self._validate_and_merge(anchor_data)
# 5. Final cleanup
if not self.bib.get("title"):
self.log_messages.append("Processing failed to find a title.")
self.bib["title"] = self.doc_path.stem.replace("_", " ")
self.status = "FAILED"
# ----------------------------------------------------------------------
# 1. GATHERING STEPS
# ----------------------------------------------------------------------
def _parse_filename(self):
"""
Heuristic parsing of filenames based on common ebook patterns.
"""
name = self.doc_path.stem
candidate = {"source": "filename"}
# --- A. Clean Noise ---
# Remove common "pirate" tags and file extensions in stem if any
noise_patterns = [
r"\(z-lib\.org\)",
r"\(Z-Library\)",
r"libgen\.li",
r"\(auth\.\)",
r"\(eds\.\)",
r"\(ed\.\)",
r"_crc",
r"\(.*Springer.*\)",
r"\(.*Cambridge.*\)",
r"\(.*Wiley.*\)",
]
clean_name = name
for pat in noise_patterns:
clean_name = re.sub(pat, "", clean_name, flags=re.IGNORECASE)
clean_name = clean_name.replace(r"_", " ").replace('-', ' ').strip()
# --- B. Extract Year ---
# Look for (YYYY) or start of string YYYY
year_match = re.search(r"\((\d{4})\)|^\s*(\d{4})\s", clean_name)
if year_match:
candidate["year"] = year_match.group(1) or year_match.group(2)
# Remove year from name to clean up title parsing
clean_name = re.sub(r"\((\d{4})\)|^\s*(\d{4})\s", "", clean_name)
# --- C. Structure Detection ---
# Strategy 1: "Title by Author"
if " by " in clean_name:
parts = clean_name.split(" by ", 1)
candidate["title"] = parts[0].strip()
candidate["author"] = parts[1].strip()
# Strategy 2: "Author - Title" (Hyphen separated)
# Note: Many files have "Series - Author - Title".
# We split by " - " (space hyphen space) to avoid hyphenated words.
elif " - " in clean_name:
segments = clean_name.split(" - ")
# Heuristic: Title is usually the longest segment
longest = max(segments, key=len)
candidate["title"] = longest.strip()
# If 2 parts: Author - Title OR Title - Author?
# Usually Author - Title.
if len(segments) >= 2:
# If the longest is the second part, assume first is author
if longest == segments[1]:
candidate["author"] = segments[0].strip()
# If longest is first part, assume second is author (less common but possible)
elif longest == segments[0]:
candidate["author"] = segments[1].strip()
# Strategy 3: "Title (Author)"
elif "(" in clean_name and clean_name.endswith(")"):
# Last parentheses often contain Author
match = re.search(r"(.*)\((.*)\)$", clean_name)
if match:
candidate["title"] = match.group(1).strip()
candidate["author"] = match.group(2).strip()
# Fallback: Treat whole cleaned string as Title
else:
candidate["title"] = clean_name.strip()
# Clean up authors (remove " et al", etc)
if candidate.get("author"):
candidate["author"] = re.sub(
r" et al\.?", "", candidate["author"], flags=re.IGNORECASE
)
self.candidates["filename"] = candidate
self.log_messages.append(f"Parsed Filename: {candidate}")
def _step_metadata(self):
"""Extract embedded PDF metadata."""
c = {"source": "metadata"}
try:
with pymupdf.open(self.doc_path) as doc:
meta = doc.metadata
except Exception as e:
c["error"] = f"extracting md: {e}"
self.candidates["metadata"] = c
self.log_messages.append(f"PDF Metadata Error: {c['error']}")
return
title = meta.get("title", "").strip()
author = meta.get("author", "").strip()
subject = meta.get("subject", "").strip()
# Filter Garbage Metadata
bad_titles = ["Microsoft Word", "Untitled", "Presentation", "Document"]
if title and len(title) > 3 and not any(b in title for b in bad_titles):
c["title"] = title
if author and len(author) > 2 and "@" not in author:
c["author"] = author
# Attempt to find year in subject or creation date
year_match = re.search(r"\b(19|20)\d{2}\b", subject)
if year_match:
c["year"] = year_match.group(0)
else:
# Fallback to file creation date from metadata
cdate = meta.get("creationDate", "")
if cdate.startswith("D:"):
c["year"] = cdate[2:6]
self.candidates["metadata"] = c
self.log_messages.append(f"Extracted Metadata: {c}")
def _step_visual(self):
"""Visual scraping for Largest Text (Title) and IDs."""
c = {"source": "visual"}
try:
with pymupdf.open(self.doc_path) as doc:
page = doc[0]
text_dict = page.get_text("dict")
raw_text = page.get_text("text")
except Exception as e:
c["error"] = f"visual extraction: {e}"
self.candidates["visual"] = c
self.log_messages.append(f"Visual Scan Error: {c['error']}")
return
# ID Scraping
# Handles new (YYMM.NNNNN) and old (subject/YYMMNNN) formats, with versions
arxiv_match = re.search(
r"arXiv:((?:\d{4}\.\d{4,5}(?:v\d+)?)|(?:[a-z-]+(?:\.[A-Z]{2})?\/\d{7}(?:v\d+)?))",
raw_text,
re.IGNORECASE,
)
if arxiv_match:
c["arxiv_id"] = arxiv_match.group(1)
doi_match = re.search(r"10\.\d{4,9}/[-._;()/:A-Z0-9]+", raw_text, re.IGNORECASE)
if doi_match:
c["doi"] = doi_match.group(0).rstrip(".") # clean trailing dots
# Title Scraping (Largest Font)
visual_title = self._find_largest_text(text_dict)
if visual_title:
# the all caps trick
all_caps = [w for w in visual_title.split(' ') if w.isupper() and len(w) > 3]
if 5 <= len(all_caps) <= 15:
visual_title = (' '.join(all_caps)).title()
# if too long probably includes the abstract too...
# but who knows where to truncate!
c["title"] = visual_title
self.candidates["visual"] = c
self.log_messages.append(f"Visual Scan Results: {c}")
# ----------------------------------------------------------------------
# 2. RANKING / ANCHOR
# ----------------------------------------------------------------------
def _determine_anchor(self) -> Tuple[str, Dict]:
"""
Compare candidates and pick the 'Anchor'—the most trustworthy local source.
Priorities:
1. Visual (If Title exists and looks 'clean')
2. Filename (If parsed successfully)
3. Metadata (Lowest trust)
"""
vis = self.candidates["visual"]
fn = self.candidates["filename"]
meta = self.candidates["metadata"]
# Helper to score a candidate
def score(c):
s = 0
if not c.get("title"):
return -100
else:
s += 50
t_len = len(c["title"].split(" "))
if t_len == 1:
s -= 50
elif t_len <= 5:
s -= 25
# penalize junk non-alpha chars
s -= sum([1 for c in c["title"] if c != ' ' and not c.isalpha()])
if "Microsoft" in c["title"]:
s -= 50
if c.get("author"):
s += 25
return s
s_vis = score(vis)
s_fn = score(fn)
s_meta = score(meta)
scores = {"visual": s_vis, "filename": s_fn, "metadata": s_meta}
best_source = max(scores, key=scores.get)
self.log_messages.append(f"Anchor scores: {scores}")
return best_source, self.candidates[best_source]
# ----------------------------------------------------------------------
# 3. EXTERNAL ENHANCEMENT
# ----------------------------------------------------------------------
def _step_id_lookup(self, source_data):
"""Lookup by DOI or Arxiv ID."""
if source_data.get("arxiv_id"):
arxiv_id = source_data["arxiv_id"]
self.log_messages.append(f"Looking up arXiv ID: {arxiv_id}")
res = lookup_arxiv(arxiv_id)
if res:
self.candidates["api"] = self._normalize_arxiv(res)
return
if source_data.get("doi"):
doi = source_data["doi"]
self.log_messages.append(f"Looking up DOI: {doi}")
res = lookup_doi(doi)
if res:
self.candidates["api"] = self._normalize_crossref(res)
self.candidates['crossref-bib'] = dict_to_bibtex_crossref(res)
return
def _step_search_api(self, anchor):
"""Search Crossref using Title/Author from anchor."""
if not anchor.get("title"):
return
query = anchor["title"]
if anchor.get("author"):
query += f" {anchor['author']}"
self.log_messages.append(f"Searching Crossref with query: '{query}'")
results = lookup_xref_search(query, book_mode=self.book_mode)
if results:
# We take the top result tentatively
self.log_messages.append(f"Crossref search found {len(results)} results.")
self.candidates["api"] = self._normalize_crossref(results[0])
self.candidates['crossref-bib'] = dict_to_bibtex_crossref(results[0])
else:
self.log_messages.append("Crossref search returned no results.")
# ----------------------------------------------------------------------
# 4. VERIFY & MERGE
# ----------------------------------------------------------------------
def _validate_and_merge(self, anchor):
"""
Decide whether to trust the API result or fallback to the Anchor.
"""
api = self.candidates.get("api", {})
if not api:
# No API result found. Use Anchor.
self.log_messages.append("No API results to merge. Using local anchor.")
self.bib.update(anchor)
# Downgrade status if anchor is weak (e.g., no author)
self.status = "REVIEW_NEEDED" if not anchor.get("author") else "SUCCESS"
self.confidence_score = 40 # Low confidence score for anchor-only
return
# Validation: Compare API Title vs Anchor Title
# We use Token Sort Ratio to handle word reordering
anchor_title = str(anchor.get("title", "")).lower()
api_title = str(api.get("title", "")).lower()
similarity = fuzz.token_sort_ratio(anchor_title, api_title)
self.confidence_score = similarity
self.log_messages.append(f"Validating API title '{api_title}' against anchor title '{anchor_title}' (Similarity: {similarity}%)")
if similarity > 80:
# High Confidence: Accept API
self.log_messages.append("High confidence match. Accepting API results.")
self.bib.update(api)
self.status = "SUCCESS"
elif similarity > 50:
# Medium Confidence: Accept API but flag for review
self.log_messages.append(f"Medium confidence match. Accepting API results but flagging for review.")
self.bib.update(api)
self.status = "REVIEW_NEEDED"
else:
# Low Confidence: Reject API, use Anchor and flag for review
self.log_messages.append(
f"Low confidence match. Rejecting API result and using local anchor."
)
self.bib.update(anchor)
self.status = "REVIEW_NEEDED"
# ----------------------------------------------------------------------
# UTILITIES & NORMALIZERS
# ----------------------------------------------------------------------
def _normalize_crossref(self, data):
"""Map Crossref JSON to internal dict."""
out = {}
t = data.get("title", "")
out["title"] = t[0] if isinstance(t, list) and t else str(t)
# Authors
authors = data.get("author", [])
if isinstance(authors, list):
auth_strs = []
for a in authors:
if "family" in a:
auth_strs.append(
f"{a.get('family')}, {a.get('given', '')}".strip(", ")
)
out["author"] = " and ".join(auth_strs)
# Date
pub = (
data.get("published-print")
or data.get("published-online")
or data.get("created")
)
if pub and "date-parts" in pub and pub["date-parts"][0]:
out["year"] = str(pub["date-parts"][0][0])
out["doi"] = data.get("DOI", "")
out["publisher"] = data.get("publisher", "")
# Container
j = data.get("container-title", [])
container = j[0] if j else ""
out["journal"] = container # Simplified mapping
return out
def _normalize_arxiv(self, data):
"""Map Arxiv JSON to internal dict."""
# Handle list return from lookup_arxiv
if isinstance(data, list) and data:
data = data[0]
ans = {
"title": data.get("title", ""),
"author": data.get("author", ""), # Assuming already stringified
"year": str(data.get("year", "")),
"arxiv_id": data.get("arxiv", ""), # Adjust based on your arxiv module
}
if not ans['arxiv_id'] and 'eprint' in data:
ans['arxiv_id'] = data['eprint']
if 'doi' in data:
ans['doi'] = data['doi']
if 'journal' in data:
ans['journal'] = data['journal']
journal_ref = data['journal']
# Attempt to extract volume, number, pages, and year from journal_ref
# Pages: e.g., 871-904, S1-S10
pages_match = re.search(r'(\d+[a-zA-Z]?--?\d+[a-zA-Z]?)', journal_ref)
if pages_match:
ans['pages'] = pages_match.group(1)
# Year: e.g., (1999) - careful not to overwrite more reliable year
year_match = re.search(r'\((\d{4})\)', journal_ref)
if year_match and not ans.get('year'): # Only set if year not already present
ans['year'] = year_match.group(1)
# Volume and Number:
# Common patterns: "Vol. X", "149", "no. 3"
volume_match = re.search(r'(?:Vol\.\s*)?(\d+)', journal_ref)
if volume_match:
ans['volume'] = volume_match.group(1)
number_match = re.search(r'no\.\s*(\d+)', journal_ref)
if number_match:
ans['number'] = number_match.group(1)
# Attempt to clean journal title by removing extracted parts
cleaned_journal_title = journal_ref
if ans.get('pages'):
cleaned_journal_title = cleaned_journal_title.replace(ans['pages'], '').strip(' ,')
if ans.get('year'):
cleaned_journal_title = re.sub(r'\(\s*' + re.escape(ans['year']) + r'\s*\)', '', cleaned_journal_title).strip(' ,')
if ans.get('number'):
cleaned_journal_title = re.sub(r'no\.\s*' + re.escape(ans['number']), '', cleaned_journal_title).strip(' ,')
if ans.get('volume'):
# This is tricky as volume can be just a number. Better to keep it messy for now or rely on specific patterns.
# For simplicity, let's keep the original journal string as the title for now
pass
ans['journal'] = cleaned_journal_title.strip(' ,') # Set the cleaned title
return ans
def _find_largest_text(self, text_dict: Dict) -> str:
"""Return text with largest font size from pymupdf dict."""
blocks = text_dict.get("blocks", [])
candidates = []
for b in blocks:
for line in b.get("lines", []):
for span in line.get("spans", []):
text = span["text"].strip()
if len(text) > 1:
candidates.append((span["size"], text))
if not candidates:
return ""
candidates.sort(key=lambda x: x[0], reverse=True)
# Join spans of same largest size (multiline titles)
largest_size = candidates[0][0]
title_parts = []
for size, text in candidates:
if abs(size - largest_size) < 0.5:
title_parts.append(text)
else:
break
return " ".join(title_parts)
# ----------------------------------------------------------------------
# OUTPUTS
# ----------------------------------------------------------------------
[docs]
def text_path(self, text_dir_path: Path, extractor: str) -> Path:
"""
Return Path to where text is or will be stored.
Mirrors the sharded structure: text_dir / first_2_of_fn / fn.md
where fn starts with first 10 chars of hash.
"""
if hasattr(self, 'hash') and self.hash:
h10 = self.hash[:10]
else:
# Try to extract hash from current path if it looks sharded
match = re.search(r'([A-F0-9]{10,})', self.doc_path.name)
h10 = match.group(1)[:10] if match else "Unknown"
# Sharded structure: h10[:2] / h10 - ... .md
parent_shard = self.doc_path.parent.name
shard = h10[:2]
stem = self.doc_path.stem
if not stem.startswith(h10):
stem = f"{h10}_{stem}"
return text_dir_path / shard / f"{stem}.{extractor}.md"
[docs]
def text_exists(self, text_dir_path: Path, extractor: str) -> bool:
"""Check if text file exists."""
return self.text_path(text_dir_path, extractor).exists()
[docs]
def report(self, print_fn=print):
"""
Prints a comprehensive report of the discovery process, including the
steps taken, final status, and the resulting BibTeX entry.
"""
print_fn(f"--- Report for: {self.doc_path.name} ---")
print_fn("\n[Discovery Log]")
for msg in self.log_messages:
print_fn(f"- {msg}")
print_fn("\n[Result]")
print_fn(f"- Final Status: {self.status}")
print_fn(f"- Confidence Score: {self.confidence_score}%")
print_fn("\n[BibTeX Entry]")
print_fn(self.bibtex())
print_fn("--- End of Report ---\n")
def bibtex(self) -> str:
if not self.bib["title"]:
return ""
# Key - will be over-ridden, but this helps the review process
cite_key = self.key() # "Author2099"
lines = [f"@{self.bib['type']}{{{cite_key},"]
for k, v in self.bib.items():
if v and k not in ["type", "arxiv_id"]:
val = str(v).replace("{", "\\{").replace("}", "\\}")
if k == 'title':
lines.append(f" {k} = {{{{{val}}}}},")
elif k == "author":
val = Document._sort_authors(val)
lines.append(f" {k} = {{{val}}},")
else:
lines.append(f" {k} = {{{val}}},")
if self.bib.get("arxiv_id"):
lines.append(f" eprint = {{{self.bib['arxiv_id']}}},")
lines.append(" archivePrefix = {arXiv},")
p = self._new_doc_path or self.doc_path
p = p.absolute()
# Windows Mendeley style path
mendeley_file = format_mendeley_file(p)
lines.append(f" file = {{{mendeley_file}}},")
lines.append("}")
return "\n".join(lines)
[docs]
def show_log(self, print_fn=print):
"""Show the process log information."""
print_fn(f'{self.doc_path.name}\n'
+ f'{"-" * len(self.doc_path.name)}\n'
+ '\n'.join(self.log_messages))
@staticmethod
def _sort_authors(authors):
"""Make last, first and ... """
if not authors:
return ""
a_list = authors.split(' and ')
out = []
for a in a_list:
hn = HumanName(a)
name_out = f'{hn.last}, {hn.first}' + (
f' {hn.middle}' if hn.middle else '')
out.append(name_out)
return ' and '.join(out)
[docs]
def discover_docs(doc_path: Path, lib):
"""
Discover documents in doc_path if a directory or about
doc_path if it is a file.
"""
# find the files(s)
if doc_path.is_dir():
doc_paths = lib.find_docs(doc_path)
logger.info("Found %s files", len(doc_paths))
else:
doc_paths = [doc_path]
logger.info(f'Found {len(doc_paths)} potential docs for import.')
print(f'Found {len(doc_paths)} potential docs for import.')
# process the docs
docs = []
# path -> hash
doc_hashes = hash_many3(doc_paths, lib.config.hash_workers)
existing_hashes = set(lib.doc_df.hash)
duplicates = {k: v for k, v in doc_hashes.items() if v in existing_hashes}
new_doc_hashes = {k: v for k, v in doc_hashes.items() if v not in existing_hashes}
print(f'{len(new_doc_hashes) = } and {len(duplicates) = }.')
bibs = [f"% import from {doc_path.absolute()}"]
for p, h in new_doc_hashes.items():
if p.suffix.lower() != '.pdf':
logger.warning(f'WARNING non-pdf {p.name}')
try:
logger.info('gathering import info for %s', p)
doc = Document(p)
doc.hash = h
doc.process()
docs.append(doc)
bibs.append(doc.bibtex())
except Exception as e:
logger.error(f"Error for {p.name}: {e}")
# for the time being...
bib_str = "\n".join(bibs)
return bib_str, docs, duplicates
[docs]
def elaborate_duplicates(lib, duplicates, trim=True):
"""
Find the refs corresponding to duplicate hashes from discover_docs.
Return the ref if available. Non-matched returned in missing_refs
Docs in missing_refs already exist in the Library but are orphans
with no associated reference record.
"""
# find the docs
docs = lib.doc_df.loc[lib.doc_df.hash.isin(duplicates.values())]
# find the doc-refs
dr = lib.ref_doc_df.loc[lib.ref_doc_df.path.isin(docs.path)]
# find the refs
refs = lib.ref_df.loc[lib.ref_df.tag.isin(dr.tag)].copy()
refs['path'] = refs.tag.map(dr.set_index('tag').path.get)
if trim:
refs = refs[['tag', 'year', 'author', 'title', 'path']]
# still missing
missing_refs = docs.loc[~docs.path.isin(dr.path)]
# result
return refs, missing_refs