Source code for sycamore.transforms.sketcher

import sys
import re
import functools
import unicodedata

from sycamore.data import Document
from sycamore.data.document import DocumentPropertyTypes
from sycamore.functions.simhash import shinglesCalc, shinglesDist
from sycamore.plan_nodes import Node, SingleThreadUser, NonGPUUser
from sycamore.transforms.map import Map, FlatMap
from sycamore.utils.time_trace import timetrace

# NOTE: A larger test of ndd is present at examples/ndd_debug.py

unwantedRe = re.compile(r"\W+")


def normalizeString(s: str) -> str:
    """
    Removes all non-word-constituent characters, converts Unicode
    to composed form, and changes to lowercase.
    """

    s = unwantedRe.sub("", s)
    s = unicodedata.normalize("NFKC", s)
    return s.lower()


[docs] class Sketcher(SingleThreadUser, NonGPUUser, Map): """ For each Document, uses shingling to hash sliding windows of the text_representation. The set of shingles is called the sketch. Documents' sketches can be compared to determine if they have near-duplicate content. The SketchUniquify transform can be used to de-duplicate small docsets in Sycamore. De-duplicating at retrieval-time is more scalable and avoids some relevance problems. Args: child: The source node or component that provides the documents window: Number of bytes in the sliding window that is hashed (17) number: Count of hashes comprising a shingle (16) Example: .. code-block:: python node = ... # source node or component that provides hierarchical documents. xform = Sketcher(child=node) dataset = xform.execute() """ def __init__(self, child: Node, window: int = 17, number: int = 16, **kwargs): super().__init__(child, f=Sketcher.sketcher, args=[window, number], **kwargs) @staticmethod @timetrace("sketcher") def sketcher(doc: Document, window: int, number: int): txt = doc.text_representation if txt: utf = normalizeString(txt).encode("utf-8") doc.shingles = shinglesCalc(utf, window, number) return doc
[docs] class SketchUniquify(SingleThreadUser, NonGPUUser, FlatMap): """ Removes each Document which is a near-duplicate of a Document seen before. Uses the shingles calculated by the Sketcher transform. This approach requires full materialization of the entire docset on a single node. It will store all sketches in memory. It is not suitable for large docsets. Args: child: The source node or component that provides the documents threshold: Largest distance to be considered a duplicate (0.4) Example: .. code-block:: python node = ... # source node xform = SketchUniquify(child=node) dataset = xform.execute() """ def __init__(self, child: Node, threshold: float = 0.4, **kwargs) -> None: # must run on 1 instance to get a global view kwargs["parallelism"] = 1 super().__init__(child, f=SketchUniquify.Predicate, constructor_args=[threshold], **kwargs) class Predicate: def __init__(self, threshold: float) -> None: self.threshold = threshold self.total = 0 self.drops = 0 # This is a significant amount of memory... self.seenSketches: list[list[int]] = [] def good(self, doc: Document) -> bool: self.total += 1 docSketch = doc.shingles if docSketch: for prevSketch in self.seenSketches: dist = shinglesDist(docSketch, prevSketch) if dist <= self.threshold: self.drops += 1 print(f"SketchUniquify dropped {self.drops} of {self.total}", file=sys.stderr) return False self.seenSketches.append(docSketch) return True def __call__(self, doc: Document) -> list[Document]: if self.good(doc): return [doc] else: return []
[docs] class SketchDebug(SingleThreadUser, NonGPUUser, FlatMap): """ Removes each Document which is a near-duplicate of a Document seen before. Prints out duplicate pairs and a histogram of distances. Uses the shingles calculated by the Sketcher transform. This approach requires full materialization of the entire docset on a single node. It will store all sketches in memory. It is not suitable for large docsets. Args: child: The source node or component that provides the documents threshold: Largest distance to be considered a duplicate (0.4) Example: .. code-block:: python node = ... # source node xform = SketchUniquify(child=node) dataset = xform.execute() """ def __init__(self, child: Node, threshold: float = 0.4, **kwargs) -> None: kwargs["parallelism"] = 1 super().__init__(child, f=SketchDebug.Predicate, constructor_args=[threshold], **kwargs) self.threshold = threshold class Predicate: def __init__(self, threshold: float) -> None: self.threshold = threshold self.total = 0 self.drops = 0 # This is a significant amount of memory... self.seenSketches: list[list[int]] = [] self.seenText: list[str] = [] self.seenLoc: list[str] = [] self.hist: dict[int, int] = {} def good(self, doc: Document) -> bool: self.total += 1 docSketch = doc.shingles docText = str(doc.text_representation) docLoc = "%s:%s" % ( doc.properties.get("path", "NoPath"), doc.properties.get(DocumentPropertyTypes.PAGE_NUMBER, "NoPage"), ) if docSketch: for ii, prevSketch in enumerate(self.seenSketches): dist = shinglesDist(docSketch, prevSketch) dpct = int(dist * 100.0) self.hist[dpct] = 1 + self.hist.get(dpct, 0) if dist <= self.threshold: self.drops += 1 print(f"SketchDebug dropped {self.drops} of {self.total}", file=sys.stderr) seenText = self.seenText[ii] seenLoc = self.seenLoc[ii] print("DIST", dist, file=sys.stderr) print("PREV", seenLoc, seenText, file=sys.stderr) print("CURR", docLoc, docText, file=sys.stderr) print( functools.reduce( lambda s, k: s + "%d=%d " % (k, self.hist[k]), sorted(self.hist.keys()), "" ), file=sys.stderr, ) return False self.seenSketches.append(docSketch) self.seenText.append(docText) self.seenLoc.append(docLoc) return True def __call__(self, doc: Document) -> list[Document]: if self.good(doc): return [doc] else: return []