# Copyright © 2024 Battelle Memorial Institute
# All rights reserved.
from __future__ import annotations
import pandas as pd
from scipy.sparse import csr_matrix
from collections import defaultdict
import numpy as np
__all__ = ["IncidenceStore"]
[docs]
class IncidenceStore:
"""
Incidence store object that stores and accesses (multi) incidences with standard methods.
Parameters
----------
data : Two column pandas dataframe of edges and nodes, respectively.
"""
def __init__(self, data):
"""
Initiate data in self as the two column pandas dataframe provided through factory method.
"""
# initiate self with data (pandas dataframe) with duplicate incidence pairs removed.
self._data = data
self._elements = data.groupby("edges").agg(list).to_dict()["nodes"]
self._memberships = data.groupby("nodes").agg(list).to_dict()["edges"]
@property
def data(self):
return self._data.copy(deep=True)
@property
def elements(self):
return self._elements
@property
def memberships(self):
return self._memberships
@property
def dimensions(self):
"""
The number of distinct edges and nodes in that order
Returns
-------
tuple of ints
Tuple of size two of (number of unique edges, number of unique nodes).
"""
return (len(self._elements), len(self._memberships))
@property
def edges(self):
"""
Returns an array of edge names from the incidence pairs
Returns
-------
array
Returns an array of edge names
"""
return list(self._data["edges"].unique())
@property
def nodes(self):
"""
Returns an array of node names from the incidence pairs
Returns
-------
array
Returns an array of node names
"""
return list(self._data["nodes"].unique())
def __iter__(self):
"""
Iterator over the incidence pairs of the hypergraph
Returns
-------
iter of tuples
Iterator over incidence pairs (tuples) in the hypergraph.
"""
# itertuples provides iterator over rows in a dataframe
# with index as false to not return index
# and name as None to return a standard tuple.
return self._data.itertuples(index=False, name=None)
def __len__(self):
"""
Total number of incidences
Returns
-------
int
Number of incidence pairs in the hypergraph.
"""
return len(self._data)
def __contains__(self, incidence_pair):
"""
Checks if an incidence pair exists in the incidence pairs dataframe.
First, this checks if the incidence pair is of length two.
Then, it checks if it exists in the incidence pairs.
Parameters
----------
incidence_pair : tuple
Incidence pair that is a tuple (or array-like object; e.g., list or array) of length two.
Returns
-------
bool
True if incidence pair exists in incidence store.
"""
# df = self._data
# #verify the incidence pair is of length two. Otherwise, pair does not exist.
# if len(incidence_pair) == 2:
# node, edge = incidence_pair[0], incidence_pair[1]
# # check if first element in pair (node) exists in 'nodes' column anywhere
# # and check if second element of pair (edge) exists in 'edges' column anywhere.
# does_contain = ((df['nodes'] == node) & (df['edges'] == edge)).any()
# return does_contain
# else:
# return False
# Numpy's __contains__ method does not work on non-scalars
# see https://github.com/numpy/numpy/issues/3016
# This implementation is workaround on numpy's __contains__ issue until it is resolved
store = [tuple(pair) for pair in self._data.values.tolist()]
return any(incidence_pair == pair for pair in store)
[docs]
def neighbors(self, level, key):
"""
Returns elements or memberships depending on level.
Parameters
----------
level : int
Level indicator for finding either elements or memberships.
For level 0 (elements), returns nodes in the edge.
For level 1 (memberships), returns edges containing the node.
key : int or str
Name of node or edge depending on level.
Returns
-------
list
Elements or memberships (depending on level) of a given edge or node, respectively.
"""
if level == 0:
return self._elements.get(key, [])
elif level == 1:
return self._memberships.get(key, [])
else:
return []
[docs]
def restrict_to(self, level, items, inplace=False):
### TODO if inplace == True the constructor's attributes need to be
### adjusted.
"""
returns IncidenceStore of subset of incidence store restricted
to pairs with items in the given level
Will return with same data or deepcopy depending on inplace
Parameters
----------
level : int
Level indicator for finding either elements or memberships.
For level 0 (elements), returns nodes in the edge.
For level 1 (memberships), returns edges containing the node.
items : list
List of uids to be removed from level
inplace : bool, optional
whether to replace self, by default False
Returns
-------
list
subset of incidence store given a restriction.
"""
if level == 0:
column = "edges"
elif level == 1:
column = "nodes"
else:
raise ValueError("Invalid level provided. Must be 0 or 1.")
if inplace:
self._data.drop(
self._data[~self._data[column].isin(items)].index, inplace=True
)
return self._data
else: # return a subset without editing the original dataframe.
df = self._data
return df[df[column].isin(items)]
[docs]
def equivalence_classes(self, level=0):
if level == 0:
old_dict = self._elements
elif level == 1:
old_dict = self._memberships
else:
return None
temp = defaultdict(list)
for k, v in old_dict.items():
temp[frozenset(v)] += [k]
return list(temp.values())
[docs]
def collapse_identical_elements(self, level, use_keys=None):
if level == 0:
col = "edges"
elif level == 1:
col = "nodes"
else:
return None
eclasses = self.equivalence_classes(level=level)
if use_keys == None:
edict = {ec[0]: ec for ec in eclasses}
else:
edict = dict()
for ec in eclasses:
klist = list(set(use_keys).intersection(ec))
if len(klist) > 0:
k = klist[0]
else:
k = ec[0]
ec.remove(k)
edict[k] = [k] + ec
df = self._data.loc[self._data[col].isin(edict.keys())]
return df, edict