# coding: utf-8
"""
BibTeX import helpers for archivum.
This module uses a Bib2df_Incremental, derived from the Mendeley porting
logic to incrementally import new references from a BibTeX file into an
existing Library.
Each import run is recorded under a timestamped directory so that
the original .bib and a copy of the PDFs are preserved and the
ETL is, in principle, replayable.
"""
import json
import logging
from functools import partial
from pathlib import Path
import re
from difflib import SequenceMatcher
import datetime as dt
import latexcodec # noqa
# import Levenshtein # per Gemini prefer to use rapidfuzz
from rapidfuzz import distance
import numpy as np
import pandas as pd
from . import EMPTY_LIBRARY
from .utilities import (
remove_accents,
accent_mapper_dict,
safe_int,
TagAllocator,
)
from .trie import Trie
from .library_base import LibraryBase
from .hasher import hash_many3 as hash_many
from .enhancements import save_from_row, path_from_row
# set to True to override where audit files are stored to tmp
# False is default
logger = logging.getLogger(__name__)
## ==================================================================
[docs]
class Bib2df_Incremental(LibraryBase):
"""
Bibtex file to dataframe - library aware.
Code started as a copy of mendeley_port.Bib2df, adjusted for incremental,
library-aware use. .
"""
# for de-texing single characters in braces
_r_brace1 = re.compile(r"{(.)}")
_r_brace2 = re.compile(r"{{(.)}}")
# base columns used by the app for quick output displays
_base_cols = ["tag", "type", "author", "title", "year", "journal", "file"]
# base columns expected by import_bibtex_file
_base_fields = [
"title",
"journal",
"publisher",
"institution",
"booktitle",
"address",
"editor",
"mendeley-tags",
"edition",
]
# =====================================================================================================
# user defined mappers: these can be customized
# _char_map is less likely to be changed: it is applied to the raw text read from the bibtex file
_char_unicode_dict = {
"“": '"', # left double quote
"”": '"', # right double quote
"„": '"', # low double quote
"«": '"', # double angle quote
"»": '"',
"‘": "'", # left single quote
"’": "'", # right single quote
"‚": "'", # low single quote
"′": "'", # prime
"‵": "'", # reversed prime
"‹": "'", # single angle quote
"›": "'",
"\u00a0": " ", # non-breaking space
"\u200b": "", # zero-width space
"\ufeff": "", # BOM
}
_char_map = str.maketrans(_char_unicode_dict)
# _re_subs is also applied to raw text to adjust en and em dashes.
_re_subs = {
"–": "--", # en dash → hyphen
"—": "---", # em dash → hyphen
}
_re_subs_compiled = re.compile("|".join(map(re.escape, _re_subs)))
# for mapping the edition bibtex field, used in import_bibtex_file
_edition_mapper = {
"First": "First",
"2": "Second",
"2nd": "Second",
"Second": "Second",
"Second Edi": "Second",
"3": "Third",
"3rd": "Third",
"Third": "Third",
"4": "Fourth",
"4th": "Fourth",
"Fourth": "Fourth",
"fourth": "Fourth",
"5": "Fifth",
"5th": "Fifth",
"Fifth": "Fifth",
"Sixth": "Sixth",
"Seventh": "Seventh",
"Ninth": "Ninth",
"10": "Tenth",
"2nd Editio": "Second",
"Enlarged": "Enlarged",
}
# used by import_bibtex_file to drop fields from input bibtex file
_omitted_bibtex_fields = [
"abstract",
"annote",
"issn",
# "isbn",
# "archivePrefix",
# 'arxivId',
# "eprint",
"pmid",
"primaryClass",
"series",
"chapter",
"school",
# "organization",
"howpublished",
"keywords",
]
# end customizable mappers
# =====================================================================================================
def __init__(
self,
*,
bibtex_file_path,
doc_dir,
reference_library,
fillna=True,
errors_mapper=None,
remap_dashes=False,
add_hashes=False,
incremental=False,
qd=None,
write_audit=True,
):
"""
Read Path p into bibtex df, doc_dir is a Path to pdf files (must exist)
This class is very property driven...dataframes are created
when needed. Eg the audit dir is only created if you get to that
point.
The one-time Mendeley porting version of this finds all
documents in doc_dir. We don't need to do that here, we
know the docs will exist in the right place. However,
for the time being will keep the code, but only transfer
over files as needed.
afile = an actual file
vfile = a named reference in the bibtex file that may not correspond to an afile
doc_dir is where the afile documents live; all afiles are found.
Use fillna=False to use the contents functions (see missing fields).
Note: this function is "bibtex" file based and creates a dataframe, whereas
the Library class is dataframe based and creates a bibtex file.
errors_mapper is to allow this class to do the inital porting.
Pass something like:
# special unicode errors used by tex_to_unicode
errors_mapper = {'Caicedo, Andr´es Eduardo': 'Caicedo, Andrés Eduardo',
'Cerreia‐Vioglio, Simone': 'Cerreia‐Vioglio, Simone',
'Cerreia–Vioglio, S.': 'Cerreia–Vioglio, S.',
'Cireşan, Dan': 'Cireșan, Dan',
'J.B., SEOANE-SEP´ULVEDA': 'J.B., Seoane-Sepúlveda',
'JIM´ENEZ-RODR´IGUEZ, P.': 'Jiménez-Rodríguez, P.',
'Joldeş, Mioara': 'Joldeș, Mioara',
'Lesne, Jean‐Philippe ‐P': 'Lesne, Jean‐Philippe ‐P',
'MU˜NOZ-FERN´ANDEZ, G.A.': 'Muñoz-Fernández, G.A.',
'Naneş, Ana Maria': 'Naneș, Ana Maria',
'Paradıs, J': 'Paradís, J',
"P{\\'{a}}stor, Ľ": 'Pástor, Ľ',
'Uludağ, Muhammed': 'Uludağ, Muhammed',
'Ulug{\\"{u}}lyaǧci, Abdurrahman': 'Ulugülyaǧci, Abdurrahman',
'Zitikis, Riċardas': 'Zitikis, Riċardas',
'de la Pen̄a, Victor H.': 'de la Peña, Victor H.',
"{L{\\'{o}}pez\xa0de\xa0Vergara}, Jorge E.": 'López\xa0de\xa0Vergara, Jorge E.'}
Audit mode is just ALWAYS ON - you can delete the files if you like!
Files saved to /tmp for nightly delete. On update they are committed
to the library import folder.
"""
self.bibtex_file_path = Path(bibtex_file_path)
self.name = self.bibtex_file_path.stem
self.doc_dir = Path(doc_dir) if doc_dir else None
self.reference_library = reference_library or EMPTY_LIBRARY
self.fillna = fillna
self.errors_mapper = errors_mapper or {}
self.remap_dashes = remap_dashes
self._add_hashes = add_hashes or incremental
self.incremental = incremental
self.write_audit = write_audit
assert self.bibtex_file_path.exists(), "Bibtex file must exist"
if self.doc_dir and not self.doc_dir.exists():
logger.info("PDF directory is None or does not exist")
# if you write audits, also save - this is a flag
self._errors_mapper_saved = False
# for properties
self._raw_df = pd.DataFrame()
self._author_map_df = pd.DataFrame()
self._vfile_df = pd.DataFrame()
self._doc_df = pd.DataFrame()
self._ref_doc_df = pd.DataFrame()
self._ref_df = pd.DataFrame()
self._best_match_df = pd.DataFrame()
self._ref_no_doc = pd.DataFrame()
self._ported_df = (
pd.DataFrame()
) # the "raw" ported df, includes file column, but otherwise like ref_df
self._database = pd.DataFrame()
# for audit and debugging
self._last_missing_vfiles = None
self._last_decode = None
self.__audit_dir_path = None
self._all_unicode_errors = None
# for duplicate detection: normalized titles and doi
self._existing_title_norm = None
self._dois = None
self._self_test = False
# timestamp for audit files and arc-source for imports
self.timestamp = dt.datetime.now().strftime("%Y-%m-%d_at_%H-%M-%S")
self.qd = qd or print
@property
def raw_df(self):
"""DataFrame of raw(ish) information read directly from bibtex file."""
# gemini improvement: filter nones and the space before @
if self._raw_df.empty:
logger.info("===>> creating raw_df property <<====")
self.txt = self.bibtex_file_path.read_text(encoding="utf-8").translate(
self._char_map
)
if self.remap_dashes:
self.txt, n = self._re_subs_compiled.subn(
lambda m: self._re_subs[m.group()], self.txt
)
logger.info(f"remap dashes regex sub found {n = } replacements")
# Split on Start-of-line + optional space + @
# This consumes the indent and the @, leaving the type as the start of the chunk
# (?m) means multiline (like a flag)
self.stxt = re.split(r"(?m)^\s*@", self.txt)
# Process ALL chunks. parse_line handles filtering.
# Use a list comprehension to filter out Nones immediately
parsed_lines = [
res for res in map(self.parse_line, self.stxt)
if res is not None
]
self._raw_df = pd.DataFrame(parsed_lines)
# Reset index to be 1-based standard
self._raw_df.index = range(1, 1 + len(self._raw_df))
if self.fillna:
self._raw_df = self._raw_df.fillna("")
return self._raw_df
@property
def ported_df(self):
if self._ported_df.empty:
logger.info("===>> creating ported_df property <<====")
self.import_bibtex_file()
return self._ported_df
@property
def ref_df(self):
"""The reference df contains no file information and has tag NOT as the index."""
if self._ref_df.empty:
logger.info("===>> creating ref_df property <<====")
self._ref_df = (
self.ported_df.drop(columns="file")
if "file" in self.ported_df
else self.ported_df
)
self._ref_df["arc-source"] = f"bibtex {self.name} at {self.timestamp}"
return self._ref_df
@property
def doc_df(self):
"""
Read file information for the current library's pdf store.
Returns dataframe describing **actual files** (afiles). These may or may not
be referenced in library.database.
Currently only PDFs.
"""
if self._doc_df.empty and len(self._doc_df.columns) == 0:
logger.info("===>> creating doc_df property <<====")
if self.doc_dir is None or not self.doc_dir.exists():
dt_type = f'datetime64[ns, {self.reference_library.config.timezone}]'
column_dtypes = {
"name": "object",
"path": "object",
"mod": dt_type,
"create": dt_type,
"access": dt_type,
"node": "int64",
"links": "int64",
"size": "int64",
"suffix": "object",
"hash": "object",
}
# Create an empty DataFrame using the defined dtypes
self._doc_df = pd.DataFrame(columns=column_dtypes.keys()).astype(
column_dtypes
)
logger.info(
f"pdf folder is None or does not exist; and created empty doc_df with {len(self._doc_df.columns)} columns"
)
else:
# actually have documents
docs = self.reference_library.find_docs(self.doc_dir)
logger.info("Found %s afiles (actual document files).", len(docs))
ans = []
for p in docs:
p = p.absolute()
stat = p.stat(follow_symlinks=True)
ans.append(
{
"name": p.name,
"path": str(p.as_posix()),
"mod": stat.st_mtime_ns,
"create": stat.st_ctime_ns,
"access": stat.st_atime_ns,
"node": stat.st_ino,
"links": stat.st_nlink,
"size": stat.st_size,
"suffix": p.suffix,
"hash": "",
}
)
df = pd.DataFrame(ans)
tz = self.reference_library.config.timezone # "Europe/London"
df["create"] = (
pd.to_datetime(df["create"], unit="ns")
.dt.tz_localize("UTC")
.dt.tz_convert(tz)
)
df["mod"] = (
pd.to_datetime(df["mod"], unit="ns")
.dt.tz_localize("UTC")
.dt.tz_convert(tz)
)
df["access"] = (
pd.to_datetime(df["access"], unit="ns")
.dt.tz_localize("UTC")
.dt.tz_convert(tz)
)
if self._add_hashes:
logger.info("Adding hashes and versions")
missing_docs = df.path.values
hashes = hash_many(
[Path(p) for p in missing_docs],
workers=self.reference_library.config.hash_workers
)
# hashes returns dict Path->hash, so lookup on Path(x)
df.hash = df.path.map(lambda x: hashes.get(Path(x), ""))
# Assign versions based on existing library docs
# We need to be careful not to duplicate (hash, path) pairs if they already exist
lib_docs = self.reference_library.doc_df
def assign_version(row):
h = row.hash
p = row.path
if h == "": return -1
# 1. Check if this exact (hash, path) already exists in lib
if not lib_docs.empty:
match = lib_docs[(lib_docs.hash == h) & (lib_docs.path == p)]
if not match.empty:
return match.version.iloc[0]
# 2. If not, get next available version for this hash
existing_versions = lib_docs[lib_docs.hash == h].version
if not existing_versions.empty:
return existing_versions.max() + 1
return 0 # Default for new hash
# This is a bit slow for large imports, but safe.
# We also need to account for duplicates WITHIN the import itself.
df = df.sort_values(['hash', 'path'])
df['version'] = 0 # Placeholder
# Group by hash and assign incremental versions,
# but offset by whatever is in the library
for h, group in df.groupby('hash'):
offset = 0
if not lib_docs.empty:
existing = lib_docs[lib_docs.hash == h]
if not existing.empty:
offset = existing.version.max() + 1
df.loc[group.index, 'version'] = range(offset, offset + len(group))
# set variable
self._doc_df = df
logger.info(
f"Scanned documents folder and created doc_df with {len(ans)} files"
)
return self._doc_df
@property
def vfile_df(self):
"""
Information about virtual files (vfiles) found in the file field
in the Mendeley bibtex file.
Parses file field created by Mendeley in order to discover them.
Mendeley's internal file(s) field added to bibtex files. Looks like
a semicolon separated list of the form::
:C\\:/S/new-papers/Blackwell/1953_Equivalent Comparisons of Experiments.pdf:pdf
Oddly, empty vfiles are represented as ``::``.
"""
def proc_vfile(vf_drive, vf_name):
"""create correct absolute Path from vf_name, str from bibtex file."""
# weirdly :c\:
vf_drive = f'{vf_drive[0]}:'
p = Path(vf_drive + vf_name)
if p.is_absolute():
return str(p.as_posix())
else:
p = self.bibtex_file_path.parent / vf_name
return str(p.as_posix())
if self._vfile_df.empty:
logger.info("===>> creating vfile_df property <<====")
ans = []
self._file_errs = []
df = self.ported_df.set_index("tag")
for tag, value in df.file.str.split(";").fillna("").items():
# the items are: tag=0,1,... and value a list of strings
# of the form :drive\\:file:file_type
# on splitting at ":" these have four parts:
# before drive (empty), drive, path, file_type
try:
for ref in value:
# some empty refs come through as [::]
# these should be ignored - they are not afiles
if ref == "::":
continue
x = ref.split(":")
if len(x) == 4:
# drive, filename and type
d, f, t = x[1:]
ans.append([tag, d, proc_vfile(d, f), t])
else:
self._file_errs.append([tag, *x[1:]])
except AttributeError:
self._file_errs.append([tag, "Attribute", *ref])
self._vfile_df = pd.DataFrame(
ans, columns=["tag", "drive", "vfile", "type"]
)
# resolve the file names
# self._vfile_df.vfile = [
# str(Path(vf).absolute().as_posix()) for vf in self._vfile_df.vfile
# ]
logger.info(f"Created vfile_df with {len(self._vfile_df)} rows.")
return self._vfile_df
@property
def ref_doc_df(self):
"""
Make the reference/document dataframe by matching vfiles to afiles.
vfiles (virtual files) are references within the file field in the
mendeley bibtex file.
afiles are actual files that exist in the pdf_path directory.
"""
# columns are ref_id=tag and afile name
if self._ref_doc_df.empty and len(self._ref_doc_df.columns) == 0:
logger.info("===>> creating ref_doc_df property <<====")
# Identify which vfiles aren't in our doc_df scan
actual_files = set(self.doc_df.path)
missing_vfiles_initial = [r.vfile for _, r in self.vfile_df.iterrows() if r.vfile not in actual_files]
# Check if any of these "missing" vfiles actually exist on disk
found_externally = []
for vfile in missing_vfiles_initial:
p = Path(vfile)
if p.exists() and p.is_file():
found_externally.append(p)
if found_externally:
logger.info(f"Found {len(found_externally)} files at absolute paths outside scan directory.")
# We need to add these to doc_df so they get hashed and versioned
new_rows = []
for p in found_externally:
p = p.absolute()
stat = p.stat(follow_symlinks=True)
new_rows.append({
"name": p.name,
"path": str(p.as_posix()),
"mod": stat.st_mtime_ns,
"create": stat.st_ctime_ns,
"access": stat.st_atime_ns,
"node": stat.st_ino,
"links": stat.st_nlink,
"size": stat.st_size,
"suffix": p.suffix,
"hash": "",
})
df_ext = pd.DataFrame(new_rows)
tz = self.reference_library.config.timezone
for col in ["create", "mod", "access"]:
df_ext[col] = (
pd.to_datetime(df_ext[col], unit="ns")
.dt.tz_localize("UTC")
.dt.tz_convert(tz)
)
# Hash them immediately
logger.info(f"Hashing {len(df_ext)} external files...")
ext_hashes = hash_many(
[Path(p) for p in df_ext.path],
workers=self.reference_library.config.hash_workers
)
df_ext.hash = df_ext.path.map(lambda x: ext_hashes.get(Path(x), ""))
# Assign versions (0 for these new external files)
df_ext['version'] = 0
# Append to our doc_df
self._doc_df = pd.concat([self._doc_df, df_ext], ignore_index=True)
# Refresh actual_files set
actual_files = set(self.doc_df.path)
# Now proceed with matching, much fewer should be "missing" now
missing_vfiles = []
for i, r in self.vfile_df.iterrows():
if r.vfile not in actual_files:
missing_vfiles.append([i, r.vfile])
if len(missing_vfiles) == 0:
logger.info('GOOD NEWS: No missing vfiles!!')
matcher = {}
else:
# Proceed with Levenshtein for truly missing files
logger.info("\tFound %s missing vfiles (%s of actual files)", len(missing_vfiles),
f'{len(missing_vfiles) / len(actual_files):.1%}')
logger.info("\tLevenshtein (rapidfuzz) matching in ref_doc...")
ans = []
for tag, m_vfile in missing_vfiles:
best_match = min(
actual_files,
key=lambda alt: distance.Levenshtein.distance(m_vfile, alt),
)
ans.append(
[
tag,
m_vfile,
best_match,
distance.Levenshtein.distance(m_vfile, best_match),
]
)
logger.debug('\tmatching for %s -> %s', m_vfile, best_match)
# for reference
self._best_match_df = pd.DataFrame(
ans, columns=["tag", "missing_vfile", "match_afile", "distance"]
)
logger.info("\t...Levenshtein matching completed")
matcher = {
vfile: afile
for vfile, afile in self._best_match_df[
["missing_vfile", "match_afile"]
].values
}
# identity model: ref_doc_df uses hash and version
# join with this import's doc_df to get the hash and assigned version
merged = self.vfile_df.merge(
self.doc_df[['path', 'hash', 'version']],
left_on='vfile', right_on='path', how='left'
)
# Apply matcher if needed
if matcher:
# For files that were matched by Levenshtein, we need to lookup their hash/version
# in doc_df using the matched afile path
for i, r in merged[merged.hash.isna()].iterrows():
match_path = matcher.get(r.vfile)
if match_path:
doc_match = self.doc_df[self.doc_df.path == match_path]
if not doc_match.empty:
merged.loc[i, 'hash'] = doc_match.hash.iloc[0]
merged.loc[i, 'version'] = doc_match.version.iloc[0]
self._ref_doc_df = pd.DataFrame(
{
"tag": merged.tag,
"hash": merged.hash,
"version": merged.version,
}
).dropna(subset=['hash'])
self._ref_doc_df['version'] = self._ref_doc_df['version'].astype(int)
# for ref.
self._last_missing_vfiles = missing_vfiles
return self._ref_doc_df
@property
def author_map_df(self):
"""
DataFrame of author name showing a transition to a normalized form.
Adjusts for initials (puts periods in), takes the longest ! name
using a Trie, adjusts for accents (guess work!).
For a new import into an empty library, needs to be run
on the authors in raw_df to prime the pump
"""
if self._author_map_df.empty:
df = pd.DataFrame({"original": self.distinct("author", self.raw_df)})
self._last_decode = []
df["unicoded"] = df.original.map(self.tex_to_unicode).str.replace(".", "")
# space out initials Mild, SJM -> Mild, S J M; works for two of three consecutive initials
df["spaced"] = df.unicoded.str.replace(
r"(?<=, )([A-Z]{2,3})\b", lambda m: " ".join(m.group(1)), regex=True
)
# diverge from Bib2df: use the reference library
t = Trie()
# distinct returns a set
if (
self.reference_library != EMPTY_LIBRARY
and not self.reference_library.ref_df.empty
):
ref_authors = self.distinct("author", self.reference_library.ref_df)
logger.info(
"Building author Trie from reference library, "
f"{len(ref_authors)} distinct authors"
)
else:
# no reference authors in the reference library
# e.g., it could be a start from scratch library
# prime the pump with the author names we have
ref_authors = self.distinct("author", self.raw_df)
logger.info(
"Building author Trie from self.raw_df - no reference library authors; "
f"{len(ref_authors) = } distinct authors"
)
for name in ref_authors:
t.insert(name.strip(". "))
# mapping will go from name to longest completion
mapping = {}
# authors in self.raw_df
a = self.distinct("author", self.raw_df)
logger.info(f"Import contains {len(a)} distinct authors -> remapping")
for name in a:
try:
m = t.longest_unique_completion(name.strip("."), strict=False)
except ValueError:
# in strict mode means prefix not found -> no change
pass
else:
if m != name:
# have found a better version
mapping[name] = m
df["longest"] = df.spaced.replace(mapping)
accent_mapper = accent_mapper_dict(df.longest)
df["accents"] = df.longest.replace(accent_mapper)
# initial periods
df["proposed"] = df.accents.str.replace(
r"(\b)([A-Z])( |$)", r"\1\2.\3", case=True, regex=True
)
logger.debug(f"Field: authors\nDecode errors: {len(self._last_decode) = }")
self._author_map_df = df
# debug
self.trie = t
self.mapping = mapping
self.accent_mapper = accent_mapper
return self._author_map_df
@property
def database(self):
"""Merged database, with exploded authors."""
if self._database.empty:
exploded_authors = self.ref_df.assign(
author=self.ref_df.author.str.split(" and ")
).explode("author", ignore_index=True)
self._database = (
self.ref_doc_df.merge(exploded_authors, on="tag", how="right")
).merge(self.doc_df, on=["hash", "version"], how="left")
for c in ["node", "links", "size"]:
if c in self._database.columns:
self._database[c] = self._database[c].fillna(0)
self._database.fillna("")
return self._database
[docs]
def raw_no_file(self):
"""Raw entries with no files listed."""
return self.raw_df.loc[self.raw_df.file == "", self._base_cols]
@staticmethod
def parse_line_original(entry):
result = {}
# Step 1: Extract type and tag
# windows GS bibtex pastes come in with \r\n
logger.debug('working on entry = %s', entry)
entry = entry.replace("\r\n *", "\n")
logger.debug('working on adjusted entry = %s', entry)
header_match = re.match(r"@?(\w+)\{([^,]+),", entry)
if not header_match:
# this is expected, but note it
logger.info("Unable to parse entry header (generally expected).")
return None
result["type"], result["tag"] = header_match.groups()
# Step 2: Remove header and final trailing '}'
body = entry[header_match.end() :].strip()
if body.endswith("}"):
body = body[:-1].strip()
logger.debug('working on body = %s', body)
# this is a bit of an art...
# for m in re.finditer(r" *([a-zA-Z\-]+) *= *{(.*?)},?\n", body, flags=re.DOTALL):
# Updated loop with robust regex
# 1. \s* handles indentation and trailing spaces (fixes eprint)
# 2. (?:\n|\Z) handles end of line OR end of string (fixes missing file)
for m in re.finditer(r"\s*([a-zA-Z\-]+)\s*=\s*\{(.*?)\}\s*,?\s*(?:\n|\Z)", body, flags=re.DOTALL):
try:
k, v = m.groups()
result[k] = v
logger.debug("key = %s and value = %s from m = %s", k, v, m)
except ValueError:
logger.info("going slow")
return Bib2df_Incremental.parse_line_slow(entry)
return result
@staticmethod
def parse_line(entry):
# 1. Early Exit and Normalization
if not entry or len(entry.strip()) < 5:
return None
entry = entry.replace('\r\n', '\n').replace('\r', '\n')
# 2. Extract Header
# Matches type{tag,
# We allow leading whitespace and a broader range of characters for the entry type
# to handle artifacts like @temp\.ipynb...
header_match = re.match(r"\s*@?([a-zA-Z0-9\.\\\-_]+)\s*\{\s*([^,]+),", entry)
if not header_match:
logger.debug("Skipping header")
return None
result = {}
result["type"], result["tag"] = header_match.groups()
# 3. Extract Body
body = entry[header_match.end():].strip()
if body.endswith('}'):
body = body[:-1]
# 4. Parse Fields
# Keys: [a-zA-Z]+ only
# End anchor: (?:\n|\Z) matches newline OR end of string (fixing the missing 'file' issue)
field_pattern = r"\s*([a-zA-Z\-]+)\s*=\s*\{(.*?)\}\s*,?\s*(?:\n|\Z)"
for m in re.finditer(field_pattern, body, flags=re.DOTALL):
try:
k, v = m.groups()
result[k] = v.strip()
logger.debug("key = %s and value = %s from m = %s", k, v, m)
except ValueError:
logger.info("going slow for %s", entry)
return Bib2df_Incremental.parse_line_slow(entry)
return result
@staticmethod
def parse_line_slow(entry):
result = {}
# Step 1: Extract type and tag
header_match = re.match(r"(\w+)\{([^,]+),", entry)
if not header_match:
logger.error("Skipping entry header.")
return None
result["type"], result["tag"] = header_match.groups()
# Step 2: Remove header and final trailing '}'
body = entry[header_match.end() :].strip()
if body.endswith("}"):
body = body[:-1].strip()
# Step 3: Find all key = { positions
matches = list(re.finditer(r"([a-zA-Z\-]+) = \{", body))
n = len(matches)
for i, match in enumerate(matches):
key = match.group(1)
val_start = match.end()
val_end = matches[i + 1].start() if i + 1 < n else len(body)
# Strip off the trailing "}," (assumes always ",\n" after value)
value = body[val_start:val_end].rstrip().rstrip(",")
if value.endswith("}"):
value = value[:-1].rstrip()
result[key] = value
return result
[docs]
@staticmethod
def distinct(column_name, df):
"""Return distinct occurrences of col c in df."""
# signature changed from mendeley version
if df is None or df.empty:
return []
if column_name == "author":
return sorted(
set(
author.strip()
for s in df.author.dropna()
for author in s.split(" and ")
)
)
else:
return sorted(set([i for i in df[column_name] if i != ""]))
[docs]
def tex_to_unicode(self, s_in: str) -> str:
"""
Tex codes to Unicode for a string and removing braces with single character.
Errors are added to self._last_decode and looked up in the dictionary
self.errors_mapper. Work iteratively: run, look at errors, add or update
entries in self.errors_mapper.
"""
if pd.isna(s_in):
return s_in
try:
s = self._r_brace2.sub(r"\1", s_in.encode("latin1").decode("latex"))
s = self._r_brace1.sub(r"\1", s)
if s.find(",") > 0 and s == s.upper():
# title case what appear to be names (comma) that are all caps
s = s.title()
return s
except ValueError as e:
s = self.errors_mapper.get(s_in, s_in)
if s_in not in self.errors_mapper:
self._last_decode.append(s_in)
return s
[docs]
def author_mapper(self):
"""dict mapper for author name."""
# dropped manual fixes
return {k: v for k, v in self.author_map_df[["original", "proposed"]].values}
[docs]
def map_authors(self, df_name):
"""Actually apply the author mapper to the author column."""
df = getattr(self, df_name)
am = self.author_mapper()
def f(x):
sx = x.split(" and ")
msx = map(lambda x: am.get(x, x), sx)
return " and ".join(msx)
df.author = df.author.map(f)
# audit
amdf = pd.DataFrame(am.items(), columns=["key", "value"])
self.save_audit_file(amdf, ".author-mapping")
[docs]
def import_bibtex_file(self):
"""
The work happens here! Do the actual import, and
normalize each text-based field.
Runs through each task in turn, see comments.
For the initial port choose run_add_hoc=True, but
for incremental updates use False.
Updated to remove ad_hoc adjustments, dropped extract citations
from abstract, tags use library, etc.
Called automatically by ported_df property if needed.
"""
logger.info("Running import_bibtex_file to create ported_df")
kept_fields = [
i for i in self.raw_df.columns if i not in self._omitted_bibtex_fields
]
self._ported_df = self.raw_df[kept_fields].copy()
# ============================================================================================
# author: initials, extend, accents - either from reference library or self.raw_df
# if a new import
self.map_authors("_ported_df")
# ensure other edited fields are present
# this may not be the case for small imports
for f in self._base_fields:
if f not in self._ported_df:
logger.debug("Imported df missing %s - adding", f)
# probably a string?
self._ported_df[f] = ""
# ============================================================================================
# de-tex other text fields
self._all_unicode_errors = {}
for f in [
"title",
"journal",
"publisher",
"institution",
"booktitle",
"address",
"editor",
"mendeley-tags",
]:
self._last_decode = []
self._ported_df[f] = self._ported_df[f].map(self.tex_to_unicode)
if len(self._last_decode):
logger.debug(f"\tField: {f}\t{len(self._last_decode) = }")
self._all_unicode_errors[f] = self._last_decode.copy()
logger.debug(f"Fixed {f}")
# audit unicode errors
ans = []
for k, v in self._all_unicode_errors.items():
for mc in v:
ans.append([k, mc])
temp = pd.DataFrame(ans, columns=["field", "miscode"])
self.save_audit_file(temp, ".tex-unicode-errors")
# ============================================================================================
# keywords
# paper's key words - never used these, they are included in _omitted_bibtex_fields
# add code here for alternative treatment
# ============================================================================================
# mendeley-tags: these are things like my WangR or Delbaen or PMM
# nothing to do here --- just carry over
# ============================================================================================
# citations: figure number of citations from my notes in the abstract - DROPPED
# dict index -> number of citations, default = 0
# ============================================================================================
# edition: normalize edition field
self._ported_df.edition = self._ported_df.edition.replace(self._edition_mapper)
# ============================================================================================
# tags: normalize and resolve duplicate TAGS
self.map_tags()
# ============================================================================================
# files: files are entirely separately managed, field just pulled over
# see code in file_field_df
# set tag as the index
# self._ported_df = self._ported_df.set_index('tag')
# ============================================================================================
# 1. Run Analysis on FULL unfiltered data
self._analysis_cache = self.import_analysis(lib_test=True)
# 2. Filter duplicates in Incremental Mode
if self.incremental:
logger.info("Incremental Mode: Identifying and removing duplicates...")
if not self._analysis_cache.empty:
# ONLY kick out if it is a Skip action (Hash + Metadata match)
dupes_df = self._analysis_cache[self._analysis_cache.action == "SKIP (Dupe)"]
if not dupes_df.empty:
dupe_tags = dupes_df.tag.unique()
logger.warning(f"Kicking out {len(dupe_tags)} metadata+hash duplicates from import.")
# We remove from ported_df based on the remapped tag
self._ported_df = self._ported_df[~self._ported_df.tag.isin(dupe_tags)]
# Update raw_df to match ported_df for reporting purposes
self._raw_df = self._raw_df[self._raw_df.tag.isin(self._ported_df.tag)]
# Force properties to re-evaluate for the actual import
self._ref_df = pd.DataFrame()
self._vfile_df = pd.DataFrame()
self._ref_doc_df = pd.DataFrame()
# ============================================================================================
# final checks and balances, and write out info
self.save_audit_file(self.raw_df, ".raw-df")
self.save_audit_file(self._ported_df, ".ported-df")
num_raw = len(self.raw_df)
num_ported = len(self._ported_df)
num_dupes = num_raw - num_ported
import_info_dict = {
"created": str(self.timestamp),
"bibtex_file": str(self.bibtex_file_path.absolute()),
"raw_entries": num_raw,
}
if self.incremental:
import_info_dict["duplicates"] = num_dupes
import_info_dict["net_entries"] = num_ported
else:
import_info_dict["ported_entries"] = num_ported
import_info = pd.DataFrame(
import_info_dict.items(),
columns=["key", "value"],
)
self.save_audit_file(import_info, ".audit-info")
return import_info
[docs]
def import_analysis(self, lib_test=True):
"""
Prepare a detailed analysis of the import.
Returns a DataFrame with columns:
tag | author | title | hash match | doi match | title match | action
"""
# Return cached analysis if available (ensures we see filtered records)
if hasattr(self, '_analysis_cache') and self._analysis_cache is not None:
return self._analysis_cache
rows = []
# We MUST use the internal ported_df BEFORE it is filtered
# or reconstructed from raw_df
for idx, raw in self.raw_df.iterrows():
# Get the remapped tag if it exists in ported_df
tag = self._ported_df.loc[idx].tag if idx in self._ported_df.index else raw.tag
title = raw.title
author = raw.author[:25]
# Get detailed duplicate info
dup_info = self._check_all_duplicates_v3(raw, tag, idx, lib_test=lib_test)
# Format matches for display
def fmt_match(m):
if not m: return "N"
status = "doc" if m['has_doc'] else "no doc"
return f"Y ({status})"
hash_m = "Y" if dup_info['hash'] else "N"
doi_m = fmt_match(dup_info['doi'])
title_m = fmt_match(dup_info['title'])
# Determine Action
action = "Import"
has_meta_match = (dup_info['doi'] or dup_info['title'])
if dup_info['hash'] and has_meta_match:
action = "SKIP (Dupe)"
elif dup_info['hash']:
action = "Link Existing"
elif has_meta_match:
m = dup_info['doi'] or dup_info['title']
if not m['has_doc']:
action = "Import (Fill)"
else:
action = "Merge/Warn"
rows.append({
"tag": tag,
"author": author,
"title": title[:50],
"hash match": hash_m,
"doi match": doi_m,
"title match": title_m,
"action": action
})
return pd.DataFrame(rows)
def _check_all_duplicates_v3(self, raw_row, ported_tag, idx, lib_test=True):
"""Standardized duplicate checker."""
res = {'hash': None, 'doi': None, 'title': None}
# 1. Hash Check
if self._add_hashes:
# Find path in this import's doc_df
# Use the index to stay aligned
if idx in self.ported_df.index:
vfile_mask = self.vfile_df.tag == ported_tag
if vfile_mask.any():
p_str = self.vfile_df[vfile_mask].vfile.iloc[0]
p = Path(p_str)
doc_mask = self.doc_df.path.map(lambda x: Path(x)) == p
if doc_mask.any():
h = self.doc_df[doc_mask].hash.iloc[0]
if h and h in self.reference_library.doc_df.hash.values:
res['hash'] = h
# 2. DOI/Title Check
for kind in ('doi', 'title'):
m = self._possible_duplicate_v2(raw_row, idx, kind, lib_test)
if m:
has_doc = m['match_tag'] in self.reference_library.ref_doc_df.tag.values
res[kind] = {'tag': m['match_tag'], 'has_doc': has_doc}
return res
[docs]
def import_analysis_full(self, lib_test=True, strict=False):
"""
Original detailed diagnostic analysis.
Shows scores, field changes, and raw vs ported comparison.
"""
results = []
for (left, raw_input), (right, revised) in zip(
self.raw_df.iterrows(), self.ported_df.iterrows()
):
tag_in = raw_input.tag
tag = revised.tag
title = revised.title
(
kind,
score_title,
score_tag,
match_title,
match_tag,
) = self._possible_duplicate(revised, right, lib_test=lib_test) or (
"",
"",
"",
0,
0,
)
change = "-" if self._compare(raw_input, revised) else "CHANGED"
if change == "CHANGED":
temp = []
for c in revised.index:
if c in raw_input.index and revised[c] != raw_input[c]:
temp.append(c)
if temp:
change_cols = ",".join(temp)
else:
change = "-"
if strict:
index_change = set(revised.index) - set(raw_input.index)
change_cols = "idx: " + ",".join(index_change)
else:
change_cols = ""
else:
change_cols = " no chg "
results.append(
[
tag_in,
tag,
title,
kind,
score_title,
score_tag,
match_title,
match_tag,
raw_input.author[:20],
revised.author[:20],
change,
change_cols,
]
)
result_df = pd.DataFrame(
results,
columns=[
"tag_in",
"tag_ported",
"title",
"kind",
"score_title",
"score_tag",
"match_title",
"match_tag",
"author_in",
"author_ported",
"change",
"change_cols",
],
).sort_values(["kind", "change", "change_cols"], ascending=[True, False, True])
return result_df
def _check_all_duplicates(self, ref_row, ref_row_idx, lib_test=True):
"""Comprehensive check for all types of duplicates."""
res = {'hash': None, 'doi': None, 'title': None}
tag = ref_row.get('tag', '')
# 1. Hash Check
if self._add_hashes:
mask = self.vfile_df.tag == tag
if mask.any():
path_str = self.vfile_df[mask].vfile.iloc[0]
path = Path(path_str)
# Lookup in this import's doc_df
# Ensure we compare Path objects or standardized posix strings
h_mask = self.doc_df.path.map(lambda x: Path(x)) == path
if h_mask.any():
h = self.doc_df[h_mask].hash.iloc[0]
if h and h in self.reference_library.doc_df.hash.values:
res['hash'] = h
# 2. DOI/Title Check
for kind in ('doi', 'title'):
m = self._possible_duplicate_v2(ref_row, ref_row_idx, kind, lib_test)
if m:
# Check if the matched tag has a document
has_doc = m['match_tag'] in self.reference_library.ref_doc_df.tag.values
res[kind] = {'tag': m['match_tag'], 'has_doc': has_doc}
return res
def _possible_duplicate_v2(self, ref_row, ref_row_idx, kind, lib_test):
"""Helper for DOI/Title matching."""
# Initialize internal caches if needed
_ = self._possible_duplicate(ref_row, ref_row_idx, lib_test)
val = ""
if kind == 'doi':
val = str(ref_row.get("doi", "") or "").strip().lower()
if not val: return None
mask = self._dois == val
else:
val = self._normalize_title(ref_row.get("title", ""))
if not val: return None
mask = self._existing_title_norm == val
if self._self_test:
mask[ref_row_idx] = False
if mask.any():
match_tag = self._possible_duplicate_tags.loc[mask].iloc[0]
return {'match_tag': match_tag}
return None
[docs]
def update_library(self, save=True):
"""
Update self.library underlying files and save.
If self.incremental is True, also shards the new documents into the
library's document store.
"""
if self.incremental:
logger.info("Incremental Import: Sharding new documents...")
# Merge necessary metadata for sharding
# We use the newly created ref_df, ref_doc_df, and doc_df from THIS import
to_shard = (
self.ref_doc_df.merge(self.ref_df, on='tag', how='inner')
.merge(self.doc_df, on=['hash', 'version'], how='inner')
)
if not to_shard.empty:
base_path = self.reference_library.doc_store_path
# Perform hardlinking
hardlink_maker = partial(save_from_row, base_path=base_path)
results = to_shard.apply(hardlink_maker, axis=1)
logger.info("Sharding complete: %s rich links created.", len(results))
# Update paths in the importer so the library update uses the sharded paths
to_shard['new_path'] = to_shard.apply(lambda r: path_from_row(r, base_path), axis=1)
# Update doc_df with new paths
new_doc_df = to_shard.merge(self.doc_df, on=['hash', 'version'], how='inner', suffixes=('', '_old'))
new_doc_df['path'] = new_doc_df['new_path']
new_doc_df['name'] = new_doc_df['path'].apply(lambda x: Path(x).name)
# Keep original columns of doc_df
self._doc_df = new_doc_df[self.doc_df.columns].drop_duplicates(subset=['hash', 'version'])
else:
logger.info("Nothing to shard.")
self.reference_library.update(self)
# create audit trail
import_path = (
self.reference_library.config_path / "import-audit" / self.timestamp
)
import_path.mkdir(parents=True, exist_ok=True)
count = 0
for f in self._audit_dir_path.glob("*.*"):
newf = import_path / f.name
if newf.exists():
newf.unlink()
newf.hardlink_to(f)
count += 1
logger.info("UPDATE AUDIT: %s files copied to %s", count, import_path)
[docs]
def save_audit_file(self, df, suffix):
"""Save df audit file with a standard filename."""
if not self.write_audit:
return
fn = self.bibtex_file_path.stem + suffix + ".csv"
p = self._audit_dir_path / fn
df.to_csv(p, encoding="utf-8")
logger.debug(f"Audit: dataFrame, {len(df) = }, saved to {p.name}.")
# check about errors mapper
if self.errors_mapper and not self._errors_mapper_saved:
fn = self._audit_dir_path / "errors_mapper.json"
with open(fn, "w", encoding="utf-8") as f:
json.dump(self.errors_mapper, f, indent=4)
self._errors_mapper_saved = True
[docs]
def show_audit_files(self, top=5, trim=100, bib=False):
"""qd all the audit files."""
if bib:
for f in self._audit_dir_path.glob("*.bib"):
print(f.name)
print("=" * len(f.name))
print(f"Lines trimmed to {trim} characters.")
txt = f.read_text()
txt = "\n".join([i[:trim] for i in txt.split("\n")])
print(txt)
print()
for f in self._audit_dir_path.glob("*.json"):
print(f.name)
print("=" * len(f.name))
print(f.read_text())
print()
if self.qd is None:
logger.error("Must provide qd to use show_ functions")
return
for f in self._audit_dir_path.glob("*.csv"):
df = pd.read_csv(f, encoding="utf-8-sig")
self.qd(df.head(top), caption=f.stem, tikz=False)
[docs]
def show_generated_dfs(self):
"""Use self.qd to display the main generated dfs."""
if self.qd is None:
logger.error("Must provide qd to use show_ functions")
return
for nm in ("df", "ported_df", "ref_df", "doc_df", "ref_doc_df"):
d = getattr(self, nm, None)
if d is not None:
self.qd(d, caption=nm)
[docs]
def show_unicode_errors(self):
"""Accumulated Unicode errors."""
if self._all_unicode_errors is None:
return None
ans = set()
for k, v in self._all_unicode_errors.items():
ans = ans.union(
set([c for line in v for c in line if len(c.encode("utf-8")) > 1])
)
return ans
@property
def _audit_dir_path(self):
"""
Time-stamped location to save audit data.
If created, copies the input bibtex file (hard link).
"""
if self.__audit_dir_path is None:
self.__audit_dir_path = self.reference_library.debug_dir_path / "imports" / self.timestamp
# ensure it exists
self.__audit_dir_path.mkdir(parents=True, exist_ok=True)
logger.info("Created audit path at %s", str(self.__audit_dir_path))
# audit the bibtex input file
p_ = self.__audit_dir_path / self.bibtex_file_path.name
if p_.exists():
logger.warning("REALLY WEIRD - audit of input bibtex already exists.")
p_.unlink()
p_.hardlink_to(self.bibtex_file_path)
return self.__audit_dir_path
# GEMINI CODE for interactive update library
@staticmethod
def _normalize_title(s: str) -> str:
"""Simple normalization for title comparison."""
if not isinstance(s, str):
return ""
s = s.lower()
s = re.sub(r"[^a-z0-9]+", " ", s)
return " ".join(s.split())
def _possible_duplicate(
self, ref_row, ref_row_idx, lib_test: bool = True
) -> str | None:
"""
Heuristic duplicate check: by DOI (if present) and by normalized title.
Returns a short message if something looks like a duplicate.
lib_test = test against the library, else self test (dups within the import).
"""
# basic checks
doi = str(ref_row.get("doi", "") or "").strip().lower()
title = ref_row.get("title", "")
tag = ref_row.get("tag", "")
title_norm = self._normalize_title(title)
# Hash Check (The "Guardian" logic)
if self.incremental and self._add_hashes:
# Check if this file hash already exists in the library
# Note: doc_df is the library's document database
# We need to find the hash of the file currently being imported
# Use self.vfile_df to find the actual path for this tag
# (ref_row index is matching ported_df index which is 1-based,
# but tag is always reliable)
mask = self.vfile_df.tag == tag
if mask.any():
current_path = self.vfile_df[mask].vfile.iloc[0]
# lookup in doc_df which contains hashes of files in this import
doc_mask = self.doc_df.path == current_path
if doc_mask.any():
current_hash = self.doc_df[doc_mask].hash.iloc[0]
if current_hash:
lib_docs = self.reference_library.doc_df
if current_hash in lib_docs.hash.values:
# Find which tags in the library use this hash
match_tags = self.reference_library.ref_doc_df[
self.reference_library.ref_doc_df.hash == current_hash
].tag.tolist()
if match_tags:
return "HASH", 100, 100, title, match_tags[0]
# what are we checking against?
if self._existing_title_norm is None or self._dois is None:
if (
lib_test
and self.reference_library != EMPTY_LIBRARY
and not self.reference_library.ref_df.empty
):
logger.info("Checking duplicates relative to reference library.")
self._existing_title_norm = self.reference_library.ref_df.title.map(
self._normalize_title
)
self._dois = (
self.reference_library.ref_df.doi.astype(str)
.str.lower()
.str.strip()
if "doi" in self.reference_library.ref_df
else []
)
self._self_test = False
# temporary storage - note titles are non-normalized
self._possible_duplicate_tags = self.reference_library.ref_df.tag
self._possible_duplicate_titles = self.reference_library.ref_df.title
else:
# check against self - new import with no library or lib_test == False
logger.info(
"No reference library...checking duplicates relative to import."
)
self._existing_title_norm = self.ref_df.title.map(self._normalize_title)
self._dois = (
self.ref_df.doi.astype(str).str.lower().str.strip()
if "doi" in self.ref_df
else []
)
self._self_test = True
# temporary storage
self._possible_duplicate_tags = self.ref_df.tag
self._possible_duplicate_titles = self.ref_df.title
# assert len(self._existing_title_norm) == len(self._dois), "WRONG SIZES"
def create_message(mask, kind):
tags = self._possible_duplicate_tags.loc[mask].tolist()
titles = self._possible_duplicate_titles.loc[mask].tolist()
score_title = 0 # similarity score
score_tag = 0 # similarity score
match_tag = ""
match_title = ""
for tg, tl in zip(tags, titles):
#
# title_similarity = fuzz.ratio(title, tl)
# tag_similarity = fuzz.ratio(tag, tg)
# gemini recommends
title_similarity = SequenceMatcher(None, title, tl).ratio() * 100
tag_similarity = SequenceMatcher(None, tag, tg).ratio() * 100
if title_similarity > score_title:
score_title = title_similarity
score_tag = tag_similarity
match_tag = tg
match_title = title
if score_title > 66 and score_tag > 80:
return kind, score_title, score_tag, match_title, match_tag
else:
return
# DOI check
if doi:
mask = self._dois == doi
# when run against itself (import into empty library), not match yourself!
if self._self_test:
mask[ref_row_idx] = False
if mask.any():
if (t := sum(mask)) > 1:
logger.debug("MULTI TITLE: %s, %s", ref_row.tag, t)
return create_message(mask, "DOI")
# Title check
if title_norm:
mask = self._existing_title_norm == title_norm
if self._self_test:
mask[ref_row_idx] = False
if mask.any():
if (t := sum(mask)) > 1:
logger.debug("MULTI TITLE: %s, %s", ref_row.tag, t)
return create_message(mask, "title")
return None
@staticmethod
def _compare(orig: pd.Series, revised: pd.Series) -> bool:
"""
Returns True if revised.index is a subset of orig.index and
corresponding values match (treating NaNs as equal).
"""
if not revised.index.isin(orig.index).all():
return False
return revised.equals(orig.loc[revised.index])