Source code for pymantic.primitives

__all__ = ['Triple', 'Quad', 'q_as_t', 't_as_q', 'Literal', 'NamedNode',
           'Prefix', 'BlankNode', 'Graph', 'Dataset', 'PrefixMap', 'TermMap',
           'parse_curie', 'is_language', 'lang_match', 'to_curie', 'Profile',
           ]

import collections
from collections import defaultdict

import datetime
from operator import itemgetter
from .compat import (
    text_type,
    string_types,
    iteritems,
    itervalues,
    iterkeys,
)

import pymantic.uri_schemes as uri_schemes

from pymantic.util import quote_normalized_iri
from pymantic.serializers import nt_escape


[docs]def is_language(lang): """Is something a valid XML language?""" if isinstance(lang, NamedNode): return False return True
[docs]def lang_match(lang1, lang2): """Determines if two languages are, in fact, the same language. Eg: en is the same as en-us and en-uk.""" if lang1 is None and lang2 is None: return True elif lang1 is None or lang2 is None: return False lang1 = lang1.partition('-') lang2 = lang2.partition('-') return lang1[0] == lang2[0] and (lang1[2] == '' or lang2[2] == '' or lang1[2] == lang2[2])
[docs]def parse_curie(curie, prefixes): """ Parses a CURIE within the context of the given namespaces. Will also accept explicit URIs and wrap them in an rdflib URIRef. Specifically: 1) If the CURIE is not of the form [stuff] and the prefix is in the list of standard URIs, it is wrapped in a URIRef and returned unchanged. 2) Otherwise, the CURIE is parsed by the rules of CURIE Syntax 1.0: http://www.w3.org/TR/2007/WD-curie-20070307/ The default namespace is the namespace keyed by the empty string in the namespaces dictionary. 3) If the CURIE's namespace cannot be resolved, a ValueError is raised. """ definitely_curie = False if curie[0] == '[' and curie[-1] == ']': curie = curie[1:-1] definitely_curie = True prefix, sep, reference = curie.partition(':') if not definitely_curie: if prefix in uri_schemes.schemes: return NamedNode(curie) if not reference and '' in prefixes: reference = prefix return Prefix(prefixes[''])(reference) if prefix in prefixes: return Prefix(prefixes[prefix])(reference) else: raise ValueError( 'Could not parse CURIE prefix {} from prefixes {}'.format( prefix, prefixes))
[docs]def parse_curies(curies, namespaces): """Parse multiple CURIEs at once.""" for curie in curies: yield parse_curie(curie, namespaces)
[docs]def to_curie(uri, namespaces, seperator=":", explicit=False): """Converts a URI to a CURIE using the prefixes defined in namespaces. If there is no matching prefix, return the URI unchanged. namespaces - a dictionary of prefix -> namespace mappings. separator - the character to use as the separator between the prefix and the local name. explicit - if True and the URI can be abbreviated, wrap the abbreviated form in []s to indicate that it is definitely a CURIE.""" matches = [] for prefix, namespace in namespaces.items(): if uri.startswith(namespace): matches.append((prefix, namespace)) if len(matches) > 0: prefix, namespace = sorted(matches, key=lambda pair: -len(pair[1]))[0] if explicit: return '[' + uri.replace(namespace, prefix + seperator) + ']' else: return uri.replace(namespace, prefix + seperator) return uri
[docs]class Triple(tuple): """Triple(subject, predicate, object) The Triple interface represents an RDF Triple. The stringification of a Triple results in an N-Triples. """ __slots__ = () _fields = ('subject', 'predicate', 'object') def __new__(_cls, subject, predicate, object): return tuple.__new__(_cls, (subject, predicate, object)) @classmethod def _make(cls, iterable, new=tuple.__new__, len=len): 'Make a new Triple object from a sequence or iterable' result = new(cls, iterable) if len(result) != 3: raise TypeError('Expected 3 arguments, got %d' % len(result)) return result def __repr__(self): return 'Triple(subject=%r, predicate=%r, object=%r)' % self def _asdict(t): 'Return a new dict which maps field names to their values' return {'subject': t[0], 'predicate': t[1], 'object': t[2]} def _replace(_self, **kwds): 'Return a new Triple object replacing specified fields with new values' result = _self._make(map(kwds.pop, ('subject', 'predicate', 'object'), _self)) if kwds: raise ValueError('Got unexpected field names: %r' % kwds.keys()) return result def __getnewargs__(self): return tuple(self) subject = property(itemgetter(0)) predicate = property(itemgetter(1)) object = property(itemgetter(2)) def __str__(self): return self.subject.toNT() + ' ' + self.predicate.toNT() + ' ' + \ self.object.toNT() + ' .\n' def toString(self): return str(self)
[docs]class Quad(tuple): 'Quad(subject, predicate, object, graph)' __slots__ = () _fields = ('subject', 'predicate', 'object', 'graph') def __new__(_cls, subject, predicate, object, graph): return tuple.__new__(_cls, (subject, predicate, object, graph)) @classmethod def _make(cls, iterable, new=tuple.__new__, len=len): 'Make a new Quad object from a sequence or iterable' result = new(cls, iterable) if len(result) != 4: raise TypeError('Expected 4 arguments, got %d' % len(result)) return result def __repr__(self): return 'Quad(subject=%r, predicate=%r, object=%r, graph=%r)' % self def _asdict(t): 'Return a new dict which maps field names to their values' return {'subject': t[0], 'predicate': t[1], 'object': t[2], 'graph': t[3], } def _replace(_self, **kwds): 'Return a new Quad object replacing specified fields with new values' result = _self._make(map(kwds.pop, ('subject', 'predicate', 'object', 'graph'), _self)) if kwds: raise ValueError('Got unexpected field names: %r' % kwds.keys()) return result def __getnewargs__(self): return tuple(self) subject = property(itemgetter(0)) predicate = property(itemgetter(1)) object = property(itemgetter(2)) graph = property(itemgetter(3)) def __str__(self): return str(self.subject) + ' ' + str(self.predicate) + ' ' + \ str(self.object) + ' ' + str(self.graph) + ' .\n'
def q_as_t(quad): return Triple(quad.subject, quad.predicate, quad.object) def t_as_q(graph_name, triple): return Quad(triple.subject, triple.predicate, triple.object, graph_name)
[docs]class Literal(tuple): """Literal(`value`, `language`, `datatype`) Literals represent values such as numbers, dates and strings in RDF data. A Literal is comprised of three attributes: * a lexical representation of the nominalValue * an optional language represented by a string token * an optional datatype specified by a NamedNode Literals representing plain text in a natural language may have a language attribute specified by a text string token, as specified in [BCP47], normalized to lowercase (e.g., 'en', 'fr', 'en-gb'). Literals may not have both a datatype and a language.""" __slots__ = () _fields = ('value', 'language', 'datatype') types = { int: lambda v: (str(v), XSD('integer')), datetime.datetime: lambda v: (v.isoformat(), XSD('dateTime')) } def __new__(_cls, value, language=None, datatype=None): if not isinstance(value, string_types): value, auto_datatype = _cls.types[type(value)](value) if datatype is None: datatype = auto_datatype return tuple.__new__(_cls, (value, language, datatype)) @classmethod def _make(cls, iterable, new=tuple.__new__, len=len): 'Make a new Literal object from a sequence or iterable' result = new(cls, iterable) if len(result) != 3: raise TypeError('Expected 3 arguments, got %d' % len(result)) return result def __repr__(self): return 'Literal(value=%r, language=%r, datatype=%r)' % self def _asdict(t): 'Return a new dict which maps field names to their values' return {'value': t[0], 'language': t[1], 'datatype': t[2]} def _replace(_self, **kwds): 'Return a new Literal object replacing specified fields with new value' result = _self._make(map(kwds.pop, ('value', 'language', 'datatype'), _self)) if kwds: raise ValueError('Got unexpected field names: %r' % kwds.keys()) return result def __getnewargs__(self): return tuple(self) value = property(itemgetter(0)) language = property(itemgetter(1)) datatype = property(itemgetter(2)) interfaceName = "Literal" def __str__(self): return text_type(self.value) def toNT(self): quoted = '"' + nt_escape(self.value) + '"' if self.language: return quoted + '@' + self.language elif self.datatype: return quoted + '^^' + self.datatype.toNT() else: return quoted
[docs]class NamedNode(text_type): """A node identified by an IRI.""" interfaceName = "NamedNode" @property def value(self): return self def __repr__(self): return 'NamedNode(' + self.toNT() + ')' def __str__(self): return self.value def toNT(self): return '<' + nt_escape(quote_normalized_iri(self.value)) + '>'
class Prefix(NamedNode): """Node that when called returns the the argument conctantated with self.""" def __call__(self, name): return NamedNode(self + name) XSD = Prefix("http://www.w3.org/2001/XMLSchema#")
[docs]class BlankNode(object): """A BlankNode is a reference to an unnamed resource (one for which an IRI is not known), and may be used in a Triple as a unique reference to that unnamed resource. BlankNodes are stringified by prepending "_:" to a unique value, for instance _:b142 or _:me, this stringified form is referred to as a "blank node identifier".""" interfaceName = "BlankNode" @property def value(self): return ''.join(chr(ord(c) + 17) for c in hex(id(self))[2:]) def __repr__(self): return 'BlankNode()' def __str__(self): return '_:' + self.value def toNT(self): return str(self)
def Index(): return defaultdict(Index)
[docs]class Graph(object): """A `Graph` holds a set of one or more `Triple`. Implements the Python set/sequence API for `in`, `for`, and `len`""" def __init__(self, graph_uri=None): if not isinstance(graph_uri, NamedNode): graph_uri = NamedNode(graph_uri) self._uri = graph_uri self._triples = set() self._spo = Index() self._pos = Index() self._osp = Index() self._actions = set() @property def uri(self): """URI name of the graph, if it has been given a name""" return self._uri def addAction(self, action): self._actions.add(action) return self
[docs] def add(self, triple): """Adds the specified Triple to the graph. This method returns the graph instance it was called on.""" self._triples.add(triple) self._spo[triple.subject][triple.predicate][triple.object] = triple self._pos[triple.predicate][triple.object][triple.subject] = triple self._osp[triple.object][triple.subject][triple.predicate] = triple return self
[docs] def remove(self, triple): """Removes the specified Triple from the graph. This method returns the graph instance it was called on.""" self._triples.remove(triple) del self._spo[triple.subject][triple.predicate][triple.object] del self._pos[triple.predicate][triple.object][triple.subject] del self._osp[triple.object][triple.subject][triple.predicate] return self
[docs] def match(self, subject=None, predicate=None, object=None): """This method returns a new sequence of triples which is comprised of all those triples in the current instance which match the given arguments, that is, for each triple in this graph, it is included in the output graph, if: * calling triple.subject.equals with the specified subject as an argument returns true, or the subject argument is null, AND * calling triple.property.equals with the specified property as an argument returns true, or the property argument is null, AND * calling triple.object.equals with the specified object as an argument returns true, or the object argument is null This method implements AND functionality, so only triples matching all of the given non-null arguments will be included in the result. """ if subject: if predicate: # s, p, ??? if object: # s, p, o if Triple(subject, predicate, object) in self: yield Triple(subject, predicate, object) else: # s, p, ?var for triple in itervalues(self._spo[subject][predicate]): yield triple else: # s, ?var, ??? if object: # s, ?var, o for triple in itervalues(self._osp[object][subject]): yield triple else: # s, ?var, ?var for predicate in self._spo[subject]: for triple in \ itervalues(self._spo[subject][predicate]): yield triple elif predicate: # ?var, p, ??? if object: # ?var, p, o for triple in itervalues(self._pos[predicate][object]): yield triple else: # ?var, p, ?var for object in self._pos[predicate]: for triple in itervalues(self._pos[predicate][object]): yield triple elif object: # ?var, ?var, o for subject in self._osp[object]: for triple in itervalues(self._osp[object][subject]): yield triple else: for triple in self._triples: yield triple
[docs] def removeMatches(self, subject, predicate, object): """This method removes those triples in the current graph which match the given arguments.""" for triple in self.match(subject, predicate, object): self.remove(triple) return self
[docs] def addAll(self, graph_or_triples): """Imports the graph or set of triples in to this graph. This method returns the graph instance it was called on.""" for triple in graph_or_triples: self.add(triple) return self
[docs] def merge(self, graph): """Returns a new Graph which is a concatenation of this graph and the graph given as an argument.""" new_graph = Graph() for triple in graph: new_graph.add(triple) for triple in self: new_graph.add(triple) return new_graph
def __contains__(self, item): return item in self._triples def __len__(self): return len(self._triples) def __iter__(self): return iter(self._triples)
[docs] def toArray(self): """Return the set of :py:class:`Triple` within the :py:class:`Graph`""" return frozenset(self._triples)
[docs] def subjects(self): """Returns an iterator over subjects in the graph.""" return iterkeys(self._spo)
[docs] def predicates(self): """Returns an iterator over predicates in the graph.""" return iterkeys(self._pos)
[docs] def objects(self): """Returns an iterator over objects in the graph.""" return iterkeys(self._osp)
[docs]class Dataset(object): def __init__(self): self._graphs = defaultdict(Graph) def add(self, quad): self._graphs[quad.graph]._uri = quad.graph self._graphs[quad.graph].add(q_as_t(quad)) def remove(self, quad): self._graphs[quad.graph].remove(q_as_t(quad)) def add_graph(self, graph, named=None): name = named or graph.uri if name: graph._uri = name self._graphs[graph.uri] = graph else: raise ValueError("Graph must be named") def remove_graph(self, graph_or_uri): pass @property def graphs(self): return self._graphs.values() def match(self, subject=None, predicate=None, object=None, graph=None): if graph: matches = self._graphs[graph].match(subject, predicate, object) for match in matches: yield t_as_q(graph, match) else: for graph_uri, graph in iteritems(self._graphs): for match in graph.match(subject, predicate, object): yield t_as_q(graph_uri, match)
[docs] def removeMatches(self, subject=None, predicate=None, object=None, graph=None): """This method removes those triples in the current graph which match the given arguments.""" for quad in self.match(subject, predicate, object, graph): self.remove(quad) return self
[docs] def addAll(self, dataset_or_quads): """Imports the graph or set of triples in to this graph. This method returns the graph instance it was called on.""" for quad in dataset_or_quads: self.add(quad) return self
def __len__(self): return sum(len(g) for g in self.graphs) def __contains__(self, item): if hasattr(item, "graph"): if item.graph in self._graphs: graph = self._graphs[item.graph] return q_as_t(item) in graph else: for graph in itervalues(self._graphs): if item in graph: return True def __iter__(self): for graph in itervalues(self._graphs): for triple in graph: yield t_as_q(graph.uri, triple) def toArray(self): return frozenset(self)
# RDF Enviroment Interfaces
[docs]class PrefixMap(collections.OrderedDict): """A map of prefixes to IRIs, and provides methods to turn one in to the other. Example Usage: >>> prefixes = PrefixMap() Create a new prefix mapping for the prefix "rdfs" >>> prefixes['rdfs'] = "http://www.w3.org/2000/01/rdf-schema#" Resolve a known CURIE >>> prefixes.resolve("rdfs:label") u"http://www.w3.org/2000/01/rdf-schema#label" Shrink an IRI for a known CURIE in to a CURIE >>> prefixes.shrink("http://www.w3.org/2000/01/rdf-schema#label") u"rdfs:label" Attempt to resolve a CURIE with an empty prefix >>> prefixes.resolve(":me") ":me" Set the default prefix and attempt to resolve a CURIE with an empty prefix >>> prefixes.setDefault("http://example.org/bob#") >>> prefixes.resolve(":me") u"http://example.org/bob#me" """
[docs] def resolve(self, curie): """Given a valid CURIE for which a prefix is known (for example "rdfs:label"), this method will return the resulting IRI (for example "http://www.w3.org/2000/01/rdf-schema#label")""" return parse_curie(curie, self)
[docs] def shrink(self, iri): """Given an IRI for which a prefix is known (for example "http://www.w3.org/2000/01/rdf-schema#label") this method returns a CURIE (for example "rdfs:label"), if no prefix is known the original IRI is returned.""" return to_curie(iri, self)
def addAll(self, other, override=False): if override: self.update(other) else: for key, value in iteritems(other): if key not in self: self[key] = value return self
[docs] def setDefault(self, iri): """Set the iri to be used when resolving CURIEs without a prefix, for example ":this".""" self[''] = iri
[docs]class TermMap(dict): """A map of simple string terms to IRIs, and provides methods to turn one in to the other. Example usage: >>> terms = TermMap() Create a new term mapping for the term "member" >>> terms['member'] = "http://www.w3.org/ns/org#member" Resolve a known term to an IRI >>> terms.resolve("member") u"http://www.w3.org/ns/org#member" Shrink an IRI for a known term to a term >>> terms.shrink("http://www.w3.org/ns/org#member") u"member" Attempt to resolve an unknown term >>> terms.resolve("label") None Set the default term vocabulary and then attempt to resolve an unknown term >>> terms.setDefault("http://www.w3.org/2000/01/rdf-schema#") >>> terms.resolve("label") u"http://www.w3.org/2000/01/rdf-schema#label" """ def addAll(self, other, override=False): if override: self.update(other) else: for key, value in iteritems(other): if key not in self: self[key] = value return self
[docs] def resolve(self, term): """Given a valid term for which an IRI is known (for example "label"), this method will return the resulting IRI (for example "http://www.w3.org/2000/01/rdf-schema#label"). If no term is known and a default has been set, the IRI is obtained by concatenating the term and the default iri. If no term is known and no default is set, then this method returns null.""" if hasattr(self, 'default'): return self.get(term, self.default + term) else: return self.get(term)
[docs] def setDefault(self, iri): """The default iri to be used when an term cannot be resolved, the resulting IRI is obtained by concatenating this iri with the term being resolved.""" self.default = iri
[docs] def shrink(self, iri): """Given an IRI for which an term is known (for example "http://www.w3.org/2000/01/rdf-schema#label") this method returns a term (for example "label"), if no term is known the original IRI is returned.""" for term, v in iteritems(self): if v == iri: return term return iri
[docs]class Profile(object): """Profiles provide an easy to use context for negotiating between CURIEs, Terms and IRIs.""" def __init__(self, prefixes=None, terms=None): self.prefixes = prefixes or PrefixMap() self.terms = terms or TermMap() if 'rdf' not in self.prefixes: self.prefixes['rdf'] = \ 'http://www.w3.org/1999/02/22-rdf-syntax-ns#' if 'xsd' not in self.prefixes: self.prefixes['xsd'] = 'http://www.w3.org/2001/XMLSchema#'
[docs] def resolve(self, toresolve): """Given an Term or CURIE this method will return an IRI, or null if it cannot be resolved. If toresolve contains a : (colon) then this method returns the result of calling prefixes.resolve(toresolve) otherwise this method returns the result of calling terms.resolve(toresolve)""" if ':' in toresolve: return self.prefixes.resolve(toresolve) else: return self.terms.resolve(toresolve)
[docs] def setDefaultVocabulary(self, iri): """This method sets the default vocabulary for use when resolving unknown terms, it is identical to calling the setDefault method on terms.""" self.terms.setDefault(iri)
[docs] def setDefaultPrefix(self, iri): """This method sets the default prefix for use when resolving CURIEs without a prefix, for example ":me", it is identical to calling the setDefault method on prefixes.""" self.prefixes.setDefault(iri)
[docs] def setTerm(self, term, iri): """This method associates an IRI with a term, it is identical to calling the set method on term.""" self.terms[term] = iri
[docs] def setPrefix(self, prefix, iri): """This method associates an IRI with a prefix, it is identical to calling the set method on prefixes.""" self.prefixes[prefix] = iri
[docs] def importProfile(self, profile, override=False): """This method functions the same as calling prefixes.addAll(profile.prefixes, override) and terms.addAll(profile.terms, override), and allows easy updating and merging of different profiles. This method returns the instance on which it was called.""" self.prefixes.addAll(profile.prefixes, override) self.terms.addAll(profile.terms, override) return self
[docs]class RDFEnvironment(Profile): """The RDF Environment is an interface which exposes a high level API for working with RDF in a programming environment."""
[docs] def createBlankNode(self): """Creates a new :py:class:`BlankNode`.""" return BlankNode()
[docs] def createNamedNode(self, value): """Creates a new :py:class:`NamedNode`.""" return NamedNode(value)
[docs] def createLiteral(self, value, language=None, datatype=None): """Creates a :py:class:`Literal` given a value, an optional language and/or an optional datatype.""" return Literal(value, language, datatype)
[docs] def createTriple(self, subject, predicate, object): """Creates a :py:class:`Triple` given a subject, predicate and object.""" return Triple(subject, predicate, object)
[docs] def createGraph(self, triples=tuple()): """Creates a new :py:class:`Graph`, an optional sequence of :py:class:`Triple` to include within the graph may be specified, this allows easy transition between native sequences and Graphs and is the counterpart for :py:meth:`Graph.toArray`.""" g = Graph() g.addAll(triples) return g
[docs] def createAction(self, test, action): raise NotImplemented
[docs] def createProfile(self, empty=False): if empty: return Profile() else: return Profile(self.prefixes, self.terms)
[docs] def createTermMap(self, empty=False): if empty: return TermMap() else: return TermMap(self.terms)
[docs] def createPrefixMap(self, empty=False): if empty: return PrefixMap() else: return PrefixMap(self.prefixes)
# Pymantic DataSet Extensions
[docs] def createQuad(self, subject, predicate, object, graph): return Quad(subject, predicate, object, graph)
[docs] def createDataset(self, quads=tuple()): ds = Dataset() ds.addAll(quads) return ds