Source code for KGE.models.translating_based.RotatE

"""An implementation of RotatE
"""

import logging
import numpy as np
import tensorflow as tf
from ..base_model.TranslatingModel import TranslatingModel
from ...score import LpDistance
from ...loss import SelfAdversarialNegativeSamplingLoss
from ...ns_strategy import UniformStrategy

logging.getLogger().setLevel(logging.INFO)

[docs]class RotatE(TranslatingModel): """An implementation of RotatE from `[sun 2019] <https://arxiv.org/abs/1902.10197v1>`_. RotatE represents both entities and relations as embedding vectors in the complex space, and models the relation as an element-wise **rotation** from the head to tail: .. math:: \\textbf{e}_h \circ \\textbf{r}_r \\approx \\textbf{e}_t where :math:`\\textbf{e}_i, \\textbf{r}_i \in \mathbb{C}^k` are vector representations of the entities and relations. and :math:`\circ` is the Hadmard (element-wise) product. The score of :math:`(h,r,t)` is: .. math:: f(h,r,t) = s(\\textbf{e}_h \circ \\textbf{r}_r, \\textbf{e}_t) where :math:`s` is a scoring function (:py:mod:`KGE.score`) that scores the plausibility of matching between :math:`(translation, predicate)`. \n By default, using :py:mod:`KGE.score.LpDistance`, negative L1-distance: .. math:: s(\\textbf{e}_h \circ \\textbf{r}_r, \\textbf{e}_t) = - \left\| \\textbf{e}_h \circ \\textbf{r}_r - \\textbf{e}_t \\right\|_1 You can change to L2-distance by giving :code:`score_fn=LpDistance(p=2)` in :py:func:`__init__`, or change any score function you like by specifying :code:`score_fn` in :py:func:`__init__`. RotatE constrains the modulus of each element of :math:`\\textbf{r} \in \mathbb{C}^k` to 1, i.e., :math:`r_i \in \mathbb{C}` to be :math:`\left| r_i \\right| = 1`. By doing this, :math:`r_i` is of the form :math:`e^{i\\theta_{r,i}}` """
[docs] def __init__(self, embedding_params, negative_ratio, corrupt_side, score_fn=LpDistance(p=1), loss_fn=SelfAdversarialNegativeSamplingLoss(margin=3, temperature=1), ns_strategy=UniformStrategy, n_workers=1): """Initialized RotatE Parameters ---------- embedding_params : dict embedding dimension parameters, should have key :code:`'embedding_size'` for embedding dimension :math:`k` negative_ratio : int number of negative sample corrupt_side : str corrupt from which side while trainging, can be :code:`'h'`, :code:`'t'`, or :code:`'h+t'` score_fn : function, optional scoring function, by default :py:mod:`KGE.score.LpDistance` loss_fn : class, optional loss function class :py:mod:`KGE.loss.Loss`, by default :py:mod:`KGE.loss.SelfAdversarialNegativeSamplingLoss` ns_strategy : function, optional negative sampling strategy, by default :py:func:`KGE.ns_strategy.uniform_strategy` n_workers : int, optional number of workers for negative sampling, by default 1 """ super(RotatE, self).__init__(embedding_params, negative_ratio, corrupt_side, score_fn, loss_fn, ns_strategy, n_workers)
def _init_embeddings(self, seed): """Initialized the RotatE embeddings. If :code:`model_weight_initial` not given in :py:func:`train`, initialized embeddings randomly, otherwise, initialized from :code:`model_weight_initial`. Parameters ---------- seed : int random seed """ if self._model_weights_initial is None: assert self.embedding_params.get("embedding_size") is not None, "'embedding_size' should be given in embedding_params when using TransE" if hasattr(self.loss_fn, "margin"): margin = self.loss_fn.margin else: margin = 6.0 self.limit = (margin + 2.0) / self.embedding_params["embedding_size"] uniform_initializer = tf.initializers.RandomUniform(minval=-self.limit, maxval=self.limit, seed=seed) ent_emb = tf.Variable( uniform_initializer([len(self.metadata["ind2ent"]), self.embedding_params["embedding_size"], 2]), name="entities_embedding", dtype=np.float32 ) uniform_initializer = tf.initializers.RandomUniform(minval=-self.limit, maxval=self.limit, seed=seed) rel_emb = tf.Variable( uniform_initializer([len(self.metadata["ind2rel"]), self.embedding_params["embedding_size"]]), name="relations_embedding", dtype=tf.float32 ) self.model_weights = {"ent_emb": ent_emb, "rel_emb": rel_emb} else: self._check_model_weights(self.__model_weights_initial) self.model_weights = self.__model_weights_initial def _check_model_weights(self, model_weights): """Check the model_weights have necessary keys and dimensions Parameters ---------- model_weights : dict model weights to check. """ assert model_weights.get("ent_emb") is not None, "entity embedding should be given in model_weights with key 'ent_emb'" assert model_weights.get("rel_emb") is not None, "relation embedding should be given in model_weights with key 'rel_emb'" assert list(model_weights["ent_emb"].shape) == [len(self.metadata["ind2ent"]), self.embedding_params["embedding_size"], 2], \ "shape of 'ent_emb' should be (len(metadata['ind2ent']), embedding_params['embedding_size'])" assert list(model_weights["rel_emb"].shape) == [len(self.metadata["ind2rel"]), self.embedding_params["embedding_size"]], \ "shape of 'rel_emb' should be (len(metadata['ind2rel']), embedding_params['embedding_size'])"
[docs] def score_hrt(self, h, r, t): """ Score the triplets :math:`(h,r,t)`. If :code:`h` is :code:`None`, score all entities: :math:`(h_i, r, t)`. \n If :code:`t` is :code:`None`, score all entities: :math:`(h, r, t_i)`. \n :code:`h` and :code:`t` should not be :code:`None` simultaneously. Parameters ---------- h : tf.Tensor or np.ndarray or None index of heads with shape :code:`(n,)` r : tf.Tensor or np.ndarray index of relations with shape :code:`(n,)` t : tf.Tensor or np.ndarray or None index of tails with shape :code:`(n,)` Returns ------- tf.Tensor triplets scores with shape :code:`(n,)` """ h,r,t = super(RotatE, self).score_hrt(h,r,t) h_emb = tf.nn.embedding_lookup(self.model_weights["ent_emb"], h) r_emb = tf.nn.embedding_lookup(self.model_weights["rel_emb"], r) t_emb = tf.nn.embedding_lookup(self.model_weights["ent_emb"], t) if len(h_emb.shape) == 2: h_emb = tf.expand_dims(h_emb, 0) if len(t_emb.shape) == 2: t_emb = tf.expand_dims(t_emb, 0) # normalize to [-pi, pi] to ensure sin & cos functions are one-to-one r_emb = r_emb / self.limit * np.pi hadamard = tf.multiply(tf.complex(h_emb[:,:,0], h_emb[:,:,1]), tf.complex(tf.math.cos(r_emb), tf.math.sin(r_emb))) return self.score_fn(hadamard, tf.complex(t_emb[:,:,0], t_emb[:,:,1]))
def _constraint_loss(self, X): """Perform constraint if necessary. Parameters ---------- X : batch_data batch data Returns ------- tf.Tensor regularization term with shape (1,) """ return 0