# Copyright © 2018 Battelle Memorial Institute
# All rights reserved.
from __future__ import annotations
import warnings
from collections import defaultdict
from typing import TypeVar, Union
import networkx as nx
import numpy as np
import pandas as pd
from networkx.algorithms import bipartite
from scipy.sparse import coo_matrix, csr_matrix
from hypernetx.exception import HyperNetXError
from hypernetx.classes.factory import (
dataframe_factory_method,
dict_factory_method,
list_factory_method,
ndarray_factory_method,
)
from hypernetx.classes.incidence_store import IncidenceStore
from hypernetx.classes.property_store import PropertyStore
from hypernetx.classes.hyp_view import HypergraphView
warnings.filterwarnings("default", category=DeprecationWarning)
__all__ = ["Hypergraph"]
T = TypeVar("T", bound=Union[str, int])
[docs]
class Hypergraph:
"""
Parameters
----------
setsystem : pandas.DataFrame, dict of iterables, dict of dicts, list of iterables, numpy.ndarray, optional, default=None
See SetSystem below for additional setsystem requirements.
edge_col : str | int, optional, default=0
column index (or name) in pandas.DataFrame,
used for (hyper)edge ids. Only used when setsystem is a
pandas.DataFrame
node_col : str | int, optional, default=1
column index (or name) in pandas.dataframe,
used for node ids. Only used when setsystem is a
pandas.DataFrame
cell_weight_col : str | int, optional, default="weight"
column index (or name) in pandas.DataFrame used for
referencing cell weights. For a dict of dicts, it will be
used as a key in the nested dictionary of properties.
These are the same as edge dependent node weights and
will populate the incidence matrix when `weights=True`.
default_cell_weight : int | float, optional, default=1
All incidence pairs in the Hypergraph are assigned a
default weight if weight is not specified in the setsystem.
misc_cell_properties_col : str | int, optional, default=None
Used for Pandas Dataframe with one column containing dictionaries of
properties. Useful if objects have diverse property sets.
Ignored for other setsystem types.
properties : pd.DataFrame | dict, optional, default=None
Concatenation/union of edge_properties and node_properties.
By default, the object id is used and should be the first column of the dataframe, or key in the dict.
If there are nodes and edges with the same ids but distinct properties then separate them and use the
edge_properties and node_properties keywords.
weight_prop_col : str, optional, default=None
Name of property in properties to use for weight
default_weight : int | float, optional, default=1
Used when weight property is missing or undefined
edge_properties : pd.DataFrame | dict, optional, default=None
Properties associated with edge ids.
If a dataframe, the first column must be the names of the edges.
First column of dataframe or keys of dict link to edge ids in
setsystem.
edge_weight_prop_col : str, optional, default=None
Name of property in edge_properties to use for weight.
default_edge_weight : int | float, optional, default=1
Used when edge weight property is missing or undefined.
node_properties : pd.DataFrame | dict, optional, default=None
Properties associated with node ids. If a dataframe, the first column must be the names of the nodes.
First column of dataframe or keys of dict link to nodes ids in setsystem.
node_weight_prop_col : str, optional, default=None
Name of property in node_properties to use for weight.
default_node_weight : int | float, optional, default=1
Used when node weight property is missing or undefined
misc_properties_col : str | int, optional, default=None
Used for properties, edge_properties, and node_properties
Pandas Dataframes with one column containing dictionaries of
properties. Useful if objects have diverse property sets.
Ignored for other setsystem types.
name : str, optional, default=None
Name assigned to hypergraph
======================
Hypergraphs in HNX 2.3
======================
An hnx.Hypergraph H = (V,E) references a pair of disjoint sets:
V = nodes (vertices) and E = (hyper)edges.
HNX allows for multi-edges by distinguishing edges by
their unique identifiers instead of their contents. For example, if
V = {1,2,3} and E = {e1,e2,e3},
where e1 = {1,2}, e2 = {1,2}, and e3 = {1,2,3},
the edges e1 and e2 contain the same set of nodes and yet
are distinct and are distinguishable within H = (V,E).
New as of version 2.3, HNX provides methods to easily store and
access additional metadata such as cell, edge, and node weights.
Metadata associated with all edges, nodes, and (edge,node) incidence
pairs stored in the hypergraph are viewable using: ::
>>> H.edges.to_dataframe
>>> H.nodes.to_dataframe
>>> H.incidences.to_dataframe
The fundamental object needed to create a hypergraph is a **setsystem**. The
setsystem defines the many-to-many relationships between edges and nodes in
the hypergraph. Properties for the incidence pairs are defined within
the setsystem. Properties for the edges and nodes are defined with separate
Pandas DataFrames or dictionaries.
A hypergraph is defined by its relationships. While the
nodes and edges are distinct objects with their own properties, it is only
when they are in a relationship (i.e. incidence pair) that nodes and egdges are viewable
within the hypergraph structure. Consequently, hypergraph metrics and combinatorics do not
use "isolated" nodes or "empty" edges. For example, while `node_properties` could
contain any number of node identifiers, only nodes belonging to an edge
in the hypergraph are counted when computing the size and shape of the
hypergraph.
SetSystems
----------
There are five types of setsystems currently accepted by the library.
1. **iterable of iterables** : Barebones hypergraph, which uses Pandas default
indexing to generate hyperedge ids. Elements must be hashable.: ::
>>> list_of_lists = [['book','candle','cat'],['book','coffee cup'],['coffee cup','radio']]
>>> H = Hypergraph(list_of_lists)
2. **dictionary of iterables** : The most basic way to express many-to-many
relationships providing edge ids. The elements of the iterables must be
hashable): ::
>>> scenes_dictionary = {
>>> 0: ('FN', 'TH'),
>>> 1: ('TH', 'JV'),
>>> 2: ('BM', 'FN', 'JA'),
>>> 3: ('JV', 'JU', 'CH', 'BM'),
>>> 4: ('JU', 'CH', 'BR', 'CN', 'CC', 'JV', 'BM'),
>>> 5: ('TH', 'GP'),
>>> 6: ('GP', 'MP'),
>>> 7: ('MA', 'GP'),
>>> 8: ('FN', 'TH')}
>>> H = hnx.Hypergraph(scenes_dictionary)
3. **dictionary of dictionaries** : allows cell properties to be assigned
to a specific (edge, node) incidence. This is particularly useful when
there are variable length dictionaries assigned to each pair: ::
>>> nested_dictionary = {
>>> 0: {'FN':{'time':'early', 'weight': 7}, 'TH':{'time':'late'}},
>>> 1: {'TH':{'subject':'war'}, 'JV':{'observed_by':'someone'}},
>>> 2: {'BM':{}, 'FN':{}, 'JA':{'role':'policeman'}},
>>> 3: {'JV':{'was_carrying':'stick'}, 'JU':{}, 'CH':{}, 'BM':{'state':'intoxicated', 'color':'pinkish'}},
>>> 4: {'JU':{'weight':15}, 'CH':{}, 'BR':{'state':'worried'}, 'CN':{}, 'CC':{}, 'JV':{}, 'BM':{}},
>>> 5: {'TH':{}, 'GP':{}},
>>> 6: {'GP':{}, 'MP':{}},
>>> 7: {'MA':{}, 'GP':{'accompanied_by':'dog', 'weight':15, 'was_singing': 'Frère Jacques'}}}
>>> H = hnx.Hypergraph(nested_dictionary)
4. **pandas.DataFrame** For large datasets and for datasets with cell
properties it is most efficient to construct a hypergraph directly from
a pandas.DataFrame. Incidence pairs are in the first two columns.
Cell properties shared by all incidence pairs can be placed in their own
column of the dataframe. Variable length dictionaries of cell properties
particular to only some of the incidence pairs may be placed in a single
column of the dataframe. Representing the data above as a dataframe df:
+-----------+-----------+-----------+-----------------------------------+
| col1 | col2 | w | col3 |
+-----------+-----------+-----------+-----------------------------------+
| e1 | 1 | 0.5 | {'name':'related_to'} |
+-----------+-----------+-----------+-----------------------------------+
| e1 | 2 | 0.1 | {"name":"related_to", |
| | | | "startdate":"05.13.2020"} |
+-----------+-----------+-----------+-----------------------------------+
| e2 | 1 | 0.52 | {"name":"owned_by"} |
+-----------+-----------+-----------+-----------------------------------+
The first row of the dataframe is used to reference each column. ::
>>> import pandas as pd
>>> d = {'col1': ['e1', 'e1', 'e2'],
>>> 'col2': [1, 2, 1],
>>> 'w': [0.5, 0.1, 0.52],
>>> 'col3':[{'name': 'related_to'}, {'name': 'related_to', 'startdate':'05.13.2020'}, {'name': 'owned_by'}]}
>>> df = pd.DataFrame(d)
>>> H = hnx.Hypergraph(df, edge_col="col1", node_col="col2",
>>> cell_weight_col="w", misc_cell_properties_col="col3")
5. **numpy.ndarray** For homogeneous datasets given in a *n x 2* ndarray a
pandas dataframe is generated. In this case, the constructor will
only accept properties for the edges and nodes using the
edge and node uids listed in the array, although incidence properties can
be added after construction::
>>> import numpy as np
>>> np_array = np.array([['A','a'],['A','b'],['A','c'],['B','a'],['B','d'],['C','c'],['C','d']])
>>> H = hnx.Hypergraph(np_array)
>>> H.incidences[('A','a')].color = 'red'
>>> H.dataframe
Edge and Node Properties
------------------------
Properties specific to a single edge or node are passed through the
keywords: **edge_properties, node_properties, or properties**.
Properties may be passed as dataframes or dictionaries.
The first column or index of the dataframe or the keys of the dictionary
correspond to the edge and/or node identifiers.
If identifiers are shared among edges and nodes, or are distinct
for edges and nodes, properties may be combined into a single
object and passed to the **properties** keyword. For example:
+-----------+-----------+---------------------------------------+
| uid | weight | properties |
+-----------+-----------+---------------------------------------+
| e1 | 5.0 | {'type':'event'} |
+-----------+-----------+---------------------------------------+
| e2 | 0.52 | {"name":"owned_by"} |
+-----------+-----------+---------------------------------------+
| ... | ... | {...} |
+-----------+-----------+---------------------------------------+
| 1 | 1.2 | {'color':'red'} |
+-----------+-----------+---------------------------------------+
| 2 | .003 | {'name':'Fido','color':'brown'} |
+-----------+-----------+---------------------------------------+
| 3 | 1.0 | {} |
+-----------+-----------+---------------------------------------+
A properties dictionary should have the format: ::
dp = {uid1 : {prop1:val1, prop2:val2, ...},
uid2 : {...},
...}
Weights
-------
The default key for cell and object weights is "weight". The default value
is 1. Weights may be assigned from a column in the dataframe by specifying the
column and/or a new default in the
constructor using **cell_weight_col** and **default_cell_weight** for incidence pairs,
and using **edge_weight_prop_col, default_edge_weight** for edges,
**node_weight_prop_col, default_node_weight** for nodes,
and **weight_prop_col, default_weight** for a shared property dataframe.
"""
def __init__(
self,
### these are for the incidence pairs and their properties
### format for properties must follow from incidence pairs
### so that properties are provided either in the dataframe
### or as part of a nested dictionary.
setsystem=None,
default_cell_weight=1, ### we will no longer support a sequence
edge_col=0,
node_col=1,
cell_weight_col="weight",
misc_cell_properties_col=None,
aggregate_by="first",
### Format for properties can be either a dataframe indexed on uid
### or with first column equal to uid or a dictionary
### use these for a single properties list
properties=None,
### How do we know which column to use for uid? Always the first column.
misc_properties_col=None,
weight_prop_col="weight",
default_weight: float | int = 1,
### these are just for properties on the edges - ignored if properties exists
edge_properties=None,
misc_edge_properties_col=None,
edge_weight_prop_col="weight",
default_edge_weight=1,
### these are just for properties on the nodes - ignored if properties exists
node_properties=None,
misc_node_properties_col=None,
node_weight_prop_col="weight",
default_node_weight=1,
name=None,
**kwargs, ## these are ignored but allow for some backwards compatibility
):
type_dict = {
"DataFrame": dataframe_factory_method,
"dict": dict_factory_method,
"OrderedDict": dict_factory_method,
"defaultdict": dict_factory_method,
"list": list_factory_method,
"ndarray": ndarray_factory_method,
}
if setsystem is None:
setsystem = pd.DataFrame(
columns=["edges", "nodes", "weight", "misc_properties"]
)
setsystem_type = type(setsystem).__name__
if setsystem_type in type_dict:
df = type_dict[setsystem_type](
setsystem,
2,
uid_cols=[edge_col, node_col],
weight_col=cell_weight_col,
default_weight=default_cell_weight,
misc_properties_col=misc_cell_properties_col,
aggregate_by=aggregate_by,
)
## dataframe_factory_method(edf,uid_cols=[uid_col],weight_col,default_weight,misc_properties)
## multi index set by uid_cols = [edge_col,node_col]
incidence_store = IncidenceStore(
pd.DataFrame(list(df.index), columns=["edges", "nodes"])
)
incidence_propertystore = PropertyStore(
data=df, default_weight=default_cell_weight
)
self._E = HypergraphView(incidence_store, 2, incidence_propertystore)
## if no properties PropertyStore should store in the most efficient way
else:
raise HyperNetXError("setsystem data type not supported")
if properties is not None:
property_type = type(properties).__name__
if property_type in type_dict:
dfp = type_dict[property_type](
properties,
0,
weight_col=weight_prop_col,
default_weight=default_weight,
misc_properties_col=misc_properties_col,
)
all_propertystore = PropertyStore(
data=dfp, default_weight=default_weight
)
self._edges = HypergraphView(incidence_store, 0, all_propertystore)
self._nodes = HypergraphView(incidence_store, 1, all_propertystore)
else:
if edge_properties is not None:
edge_property_type = type(edge_properties).__name__
if edge_property_type in type_dict:
edfp = type_dict[edge_property_type](
edge_properties,
0,
weight_col=edge_weight_prop_col,
default_weight=default_edge_weight,
misc_properties_col=misc_properties_col,
)
edge_propertystore = PropertyStore(
edfp, default_weight=default_edge_weight
)
else:
edge_properties = PropertyStore(default_weight=default_edge_weight)
else:
edge_propertystore = PropertyStore(default_weight=default_edge_weight)
self._edges = HypergraphView(incidence_store, 0, edge_propertystore)
if node_properties is not None:
node_property_type = type(node_properties).__name__
if node_property_type in type_dict:
ndfp = type_dict[node_property_type](
node_properties,
1,
weight_col=node_weight_prop_col,
default_weight=default_node_weight,
misc_properties_col=misc_properties_col,
)
node_propertystore = PropertyStore(
ndfp, default_weight=default_node_weight
)
else:
node_propertystore = PropertyStore(
default_weight=default_node_weight
)
else:
node_propertystore = PropertyStore(default_weight=default_node_weight)
self._nodes = HypergraphView(incidence_store, 1, node_propertystore)
self._dataframe = self.dataframe
self._set_default_state()
self.name = name
self.__dict__.update(locals())
@property
def edges(self):
"""
Object associated with edges.
Returns
-------
: HypergraphView
"""
return self._edges
@property
def nodes(self):
"""
Object associated with nodes.
Returns
-------
: HypergraphView
"""
return self._nodes
@property
def incidences(self):
"""
Object associated with incidence pairs
Returns
-------
: HypergraphView
"""
return self._E
@property
def dataframe(self):
"""Returns dataframe of incidence properties
as dataframe with edges and nodes in columns.
Returns
-------
pandas.DataFrame
"""
return self._E.properties.reset_index()
@property
def properties(self):
"""Returns incidence properties
Returns
-------
pandas.DataFrame
"""
return self._E.properties
[docs]
def incidence_matrix(self, index=False, weights=False):
"""
A sparse matrix indicating the existence of an incidence pair
in the hypergraph. Each row corresponds to a node v and each column
corresponds to an edge e. The entry corresponding to (row v, col e)
is nonzero if v is an element of e. If weights = True then the value
equals the weight given in the hypergraph incidence properties for
the incidence pair (e,v). Otherwise, the value is 1.
Parameters
----------
index : bool, optional, default = False
If index=True, returns a tuple containing the incidence matrix, an np.ndarray containing the row and column
index of node_uids, and an np.ndarray containing the row and column index of edge_uids.
Otherwise, returns the incidence matrix.
weights : bool, optional, default = False
If True, use the incidence weights corresponding to
the row and column of the entry.
Returns
-------
incidence matrix: scipy.sparse.csr_matrix
node indexes: np.ndarray
an np.ndarray containing the row and column index of node_uids
edge indexes: np.ndarray
an np.ndarray containing the row and column index of edge_uids
"""
e, n = self._state_dict["data"].T
if weights:
data = self._E.dataframe["weight"]
else:
data = np.ones(len(e)).astype(int)
mat = csr_matrix((data, (n, e)))
if index:
return (
mat,
self._state_dict["labels"]["nodes"],
self._state_dict["labels"]["edges"],
)
return mat
[docs]
def incidence_dataframe(self, weights=False):
mat, rindex, cindex = self.incidence_matrix(index=True, weights=weights)
return pd.DataFrame(mat.toarray(), columns=cindex, index=rindex)
@property
def incidence_dict(self):
"""
Dictionary keyed by edge uids with values as the uids of nodes of each edge
Returns
-------
dict
"""
return self._E.elements
@property
def shape(self):
"""
The number of nodes, number of edges
Returns
-------
number of nodes, number of edges : tuple
"""
return len(self._nodes), len(self._edges)
def __str__(self):
"""
String representation of hypergraph
Returns
-------
str
"""
return f"{self.name} <class 'hypernetx.classes.hypergraph.Hypergraph'>"
def __repr__(self):
"""
String representation of hypergraph
Returns
-------
str
"""
return f"{self.name} hypernetx.classes.hypergraph.Hypergraph"
def __len__(self):
"""
Number of nodes
Returns
-------
int
"""
return len(self._nodes)
def __iter__(self):
"""
Iterate over the nodes of the hypergraph
Returns
-------
dict_keyiterator
"""
return iter(self._nodes)
def __contains__(self, item):
"""
Returns boolean indicating if item is in self.nodes
Parameters
----------
item : hashable
"""
return item in self._nodes
def __getitem__(self, node):
"""
Returns the neighbors of node in the Hypergraph. These
will be the nodes sharing some edge with the node.
Parameters
----------
node : hashable
If hashable, then must be uid of node in hypergraph
Returns
-------
neighbors(node) : iterator
"""
return self.neighbors(node)
[docs]
def clone(self, name=None):
"""
Create a deep copy of the hypergraph
Parameters
----------
name : str, optional, default = None
Returns
-------
: Hypergraph
"""
return self._construct_hyp_from_stores(
self.incidences.to_dataframe, name=f"{name}_clone"
)
def __eq__(self, other):
if type(other) is type(self):
return self.incidences.items == other.incidences.items
return False
[docs]
def rename(self, edges=None, nodes=None, name=None, inplace=True):
"""
Rename the edges and/or nodes of the hypergraph.
Parameters
----------
edges : dict, optional, default = None
dictionary of replacement edge_uids
nodes : dict, optional, default = None
dictionary of replacement node_uids
name : str, optional, default=None
inplace : bool, optional, default=True
Returns
-------
Hypergraph
"""
if edges is None and nodes is None:
return self
else:
edf = self.edges.to_dataframe
ndf = self.nodes.to_dataframe
df = self.incidences.to_dataframe
if edges is not None:
edf = edf.rename(index=edges)
df = df.rename(index=edges, level=0)
if nodes is not None:
ndf = ndf.rename(index=nodes)
df = df.rename(index=nodes, level=1)
eps = PropertyStore(edf)
nps = PropertyStore(ndf)
return self._construct_hyp_from_stores(
df, edge_ps=eps, node_ps=nps, name=name, inplace=inplace
)
[docs]
def get_cell_properties(self, edge_uid, node_uid, prop_name=None):
"""Get cell properties on a specified edge and node
Parameters
----------
edge_uid : str | int
edge_uid
node_uid : str | int
node_uid
prop_name : str, optional, default=None
name of a cell property; if None, all cell properties will be returned
Returns
-------
Any
cell property value if `prop_name` is provided, otherwise ``dict`` of all
cell properties and values
"""
if prop_name is None:
return self.incidences.property_store.get_properties((edge_uid, node_uid))
return self.incidences.property_store.get_property(
(edge_uid, node_uid), prop_name
)
[docs]
def get_properties(self, uid, level=0, prop_name=None):
"""
Returns an object's specific property or all properties
Parameters
----------
uid : hashable
edge or node id
level : int | None , optional, default=0
Enter 0 for edges and 1 for nodes.
prop_name : str | None, optional, default=None
if None then all properties associated with the object will be returned.
Returns
-------
Any
single property or dictionary of properties
"""
if level == 0:
store = self._edges
elif level == 1:
store = self._nodes
elif level == 2:
store = self._E
if prop_name is None: ## rewrite for edges and nodes.
return store.property_store.get_properties(uid)
return store.property_store.get_property(uid, prop_name)
[docs]
def get_linegraph(self, s=1, edges=True):
"""
Creates an :term:`s-linegraph` for the Hypergraph.
If edges=True, then the edges will be the vertices of the line
graph. Two vertices are connected by an s-line-graph edge if the
corresponding hypergraph edges intersect in at least `s` hypergraph nodes.
If edges=False, the hypergraph nodes will be the vertices of the line
graph. Two vertices are connected if the nodes they correspond to share
at least `s` incident (hyper)edges.
Parameters
----------
s : int
The width of the connections.
edges : bool, optional, default = True
Determine if edges or nodes will be the vertices in the linegraph.
Returns
-------
nx.Graph
A NetworkX graph.
"""
d = self._state_dict
key = "sedgelg" if edges else "snodelg"
if s in d[key]:
return d[key][s]
if edges: ### Amaplist needs a dictionary returned for properties.
A, Amap = self.edge_adjacency_matrix(s=s, index=True)
Amaplst = [(k, self._edges[k].properties) for k in Amap]
else:
A, Amap = self.adjacency_matrix(s=s, index=True)
Amaplst = [(k, self._nodes[k].properties) for k in Amap]
### TODO: add key function to compute weights lambda x,y : funcval
A = np.array(np.nonzero(A))
e1 = np.array([Amap[idx] for idx in A[0]])
e2 = np.array([Amap[idx] for idx in A[1]])
A = np.array([e1, e2]).T
g = nx.Graph()
g.add_nodes_from(Amaplst)
g.add_edges_from(A)
d[key][s] = g
return g
[docs]
def set_state(self, **kwargs):
"""
Allow state_dict updates from outside of class. Use with caution.
Parameters
----------
**kwargs : dict, optional
key-value pairs to save in state dictionary
"""
self._state_dict.update(kwargs)
def _set_default_state(self, empty=False):
"""
Populate state_dict with default values
"""
self._state_dict = {}
df = self._E.incidence_store.data
self._state_dict["dataframe"] = df
if empty:
self._state_dict["labels"] = {"edges": np.array([]), "nodes": np.array([])}
self._state_dict["data"] = np.array([[], []])
else:
df.edges = df.edges.astype("category")
df.nodes = df.nodes.astype("category")
self._state_dict["labels"] = {
"edges": np.array(df["edges"].cat.categories),
"nodes": np.array(df["nodes"].cat.categories),
}
self._state_dict["data"] = np.array(
[df["edges"].cat.codes, df["nodes"].cat.codes], dtype=int
).T
self._state_dict["snodelg"] = dict() ### s: nx.graph
self._state_dict["sedgelg"] = dict()
self._state_dict["neighbors"] = defaultdict(dict) ### s: {node: neighbors}
self._state_dict["edge_neighbors"] = defaultdict(
dict
) ### s: {edge: edge_neighbors}
self._state_dict["adjacency_matrix"] = dict() ### s: scipy.sparse.csr_matrix
self._state_dict["edge_adjacency_matrix"] = dict()
[docs]
def edge_size_dist(self):
"""
Returns the size for each edge.
Returns
-------
list
a list of sizes of each edge.
"""
if "edge_size_dist" not in self._state_dict:
dist = np.array(np.sum(self.incidence_matrix(), axis=0))[0].tolist()
self.set_state(edge_size_dist=dist)
return self._state_dict["edge_size_dist"]
[docs]
def degree(self, node_uid, s=1, max_size=None):
"""
The number of edges of size at least s and at most
max_size that contain the node.
Parameters
----------
node_uid : hashable
Identifier for the node.
s : int, optional, default=1
The smallest size (must be positive) of an edge to consider in degree.
max_size : int, optional, default=None
The largest size (must be positive) of edge to consider in degree.
Returns
-------
int
The number of edges of size at least s and at most
max_size that contain the node.
"""
if s == 1 and max_size is None:
return len(self._nodes.memberships[node_uid])
### This could possibly be done more efficiently on the dataframe.
memberships = set()
for edge in self._nodes.memberships[node_uid]:
size = len(self.edges[edge])
if size >= s and (max_size is None or size <= max_size):
memberships.add(edge)
return len(memberships)
[docs]
def size(self, edge, nodeset=None):
"""
The number of nodes in nodeset that belong to edge.
If nodeset is None then returns the size of edge
Parameters
----------
edge : hashable
The uid of an edge in the hypergraph
Returns
-------
size : int
"""
if nodeset is not None:
return len(set(nodeset).intersection(set(self.edges[edge])))
return len(self.edges[edge])
[docs]
def order(self):
"""
The number of nodes in hypergraph.
Returns
-------
order : int
"""
return len(self.nodes)
[docs]
def dim(self, edge):
"""
Same as :meth:`size(edge) - 1`
Parameters
----------
edge : hashable
The uid of an edge in the hypergraph
Returns
-------
int
"""
return self.size(edge) - 1
[docs]
def neighbors(self, node, s=1):
"""
The nodes in hypergraph which share `s` edge(s) with node.
Parameters
----------
node : hashable
uid for a node in hypergraph
s : int, optional, default=1
Minimum number of edges shared by neighbors with node.
Returns
-------
neighbors : list
s-neighbors share at least s edges in the hypergraph
"""
if node not in self.nodes:
warnings.warn(f"{node} is not in hypergraph {self.name}.")
return []
if node in self._state_dict["neighbors"][s]:
return self._state_dict["neighbors"][s][node]
inc_matrix = self.incidence_matrix()
rdx = self._state_dict["labels"]["nodes"]
jdx = np.where(rdx == node)
idx = (inc_matrix[jdx].dot(inc_matrix.T) >= s) * 1
idx = np.nonzero(idx)[1]
neighbors = list(rdx[idx])
if len(neighbors) > 0:
neighbors.remove(node)
self._state_dict["neighbors"][s][node] = neighbors
else:
self._state_dict["neighbors"][s][node] = []
return self._state_dict["neighbors"][s][node]
[docs]
def edge_neighbors(self, edge, s=1):
"""
The edges in hypergraph which share `s` nodes(s) with edge.
Parameters
----------
edge : hashable
uid for an edge in hypergraph
s : int, optional, default=1
Minimum number of nodes shared by neighbors edge node.
Returns
-------
list
a list of edge neighbors
"""
if edge not in self.edges:
warnings.warn(f"Edge is not in hypergraph {self.name}.")
return []
if edge in self._state_dict["edge_neighbors"][s]:
return self._state_dict["edge_neighbors"][s][edge]
inc_matrix = self.incidence_matrix()
cdx = self._state_dict["labels"]["edges"]
jdx = np.where(cdx == edge)
idx = (inc_matrix.T[jdx].dot(inc_matrix) >= s) * 1
idx = np.nonzero(idx)[1]
edge_neighbors = list(cdx[idx])
if len(edge_neighbors) > 0:
edge_neighbors.remove(edge)
self._state_dict["edge_neighbors"][s][edge] = edge_neighbors
else:
self._state_dict["edge_neighbors"][s][edge] = []
return self._state_dict["edge_neighbors"][s][edge]
[docs]
def adjacency_matrix(self, s=1, index=False):
"""
Returns the :term:`s-adjacency matrix` for the hypergraph.
Parameters
----------
s : int, optional, default=1
index: boolean, optional, default=False
If True, returns both the adjacency matrix and an array containing the row and column index of node_uids
Returns
-------
adjacency matrix: scipy.sparse.csr_matrix
node indexes: np.ndarray
an np.ndarray containing the row and column index of node_uids.
"""
# if the adjacency_matrix for size s is not in the state_dict, create the adjacency matrix
# and add it to the state_dict
if (
"adjacency_matrix" not in self._state_dict
or s not in self._state_dict["adjacency_matrix"]
):
incidence_matrix = self.incidence_matrix()
# calculates the square of the incidence matrix by multiplying it with its transpose.
s_adj_matrix = incidence_matrix @ incidence_matrix.T
# sets the diagonal elements of s_adj_matrix to zero to remove self-loops.
s_adj_matrix.setdiag(0)
# sets all values in s_adj_matrix that are greater than or equal to a
# threshold 's' to 1, and all other values to 0.
s_adj_matrix = (s_adj_matrix >= s) * 1
self._state_dict["adjacency_matrix"][s] = s_adj_matrix
if index:
return (
self._state_dict["adjacency_matrix"][s],
self._state_dict["labels"]["nodes"],
)
return self._state_dict["adjacency_matrix"][s]
[docs]
def edge_adjacency_matrix(self, s=1, index=False):
"""
Returns the :term:`s-adjacency matrix` for the dual hypergraph.
Parameters
----------
s : int, optional, default=1
index: boolean, optional, default=False
If True, returns both the adjacency matrix and an array containing the row and column index of edge_uids
Returns
-------
edge adjacency matrix : scipy.sparse.csr_matrix
edge indexes : np.ndarray
an np.ndarray containing the row and column index of edge_uids.
Notes
-----
This is also the adjacency matrix for the line graph.
Two edges are s-adjacent if they share at least `s` nodes.
"""
if (
"edge_adjacency_matrix" not in self._state_dict
or s not in self._state_dict["edge_adjacency_matrix"]
):
incidence_matrix = self.incidence_matrix()
s_adj_matrix = incidence_matrix.T @ incidence_matrix
s_adj_matrix.setdiag(0)
s_adj_matrix = (s_adj_matrix >= s) * 1
self._state_dict["edge_adjacency_matrix"][s] = s_adj_matrix
if index:
return (
self._state_dict["edge_adjacency_matrix"][s],
self._state_dict["labels"]["edges"],
)
return self._state_dict["edge_adjacency_matrix"][s]
[docs]
def auxiliary_matrix(self, s=1, node=True, index=False):
"""
The unweighted :term:`s-auxiliary matrix` for hypergraph
Parameters
----------
s : int, optional, default=1
node : bool, optional, default=True
whether to return based on node or edge adjacencies
index : bool, optional, default=False
If True, returns both the auxiliary matrix and an array containing
the row and column index of node or edge_uids
Returns
-------
auxiliary matrix : scipy.sparse.csr_matrix
Node/Edge adjacency matrix with empty rows and columns removed
index : np.ndarray
row and column index of node or edge uids
"""
if node:
adj_matrix, indexes = self.adjacency_matrix(s, index=True)
else:
adj_matrix, indexes = self.edge_adjacency_matrix(s, index=True)
# sum up the values in each row of the matrix, resulting in a 1D array
# where each element corresponds to the sum of a row.
adj_matrix_sum = np.sum(adj_matrix, axis=1)
# returns a tuple of arrays with the first tuple being an array of the indices of rows whose sum is non-zero.
indices = np.nonzero(np.sum(adj_matrix_sum, axis=1))
non_zero_indices = indices[0]
if len(non_zero_indices) < adj_matrix.shape[0]:
adj_matrix_res = adj_matrix[non_zero_indices][:, non_zero_indices]
else:
adj_matrix_res = adj_matrix
if index:
return adj_matrix_res, indexes[non_zero_indices]
return adj_matrix_res
[docs]
def bipartite(self, keep_data=False, directed=False):
"""
Creates a bipartite NetworkX graph from hypergraph.
The nodes and (hyper)edges of hypergraph become the nodes of bipartite
graph. For every (hyper)edge e in the hypergraph and node v in e there
is an edge (e,v) in the graph.
Parameters
----------
keep_data : bool, optional, default = False
If True the node and edge data from the hypergraph will be added to
the NetworkX graph
directed : bool, optional, default = False
If True the graph edges will be directed so that the hypergraph
edges are the sources and the hypergraph nodes are the targets
Returns
-------
networkx.Graph or DiGraph
"""
if directed is True:
B = nx.DiGraph()
else:
B = nx.Graph()
if keep_data is False:
B.add_nodes_from(self.edges, bipartite=0)
B.add_nodes_from(self.nodes, bipartite=1)
B.add_edges_from(
[
(e, v)
for v in self._nodes.memberships
for e in self._nodes.memberships[v]
]
)
else:
for nd in self.nodes:
B.add_node(nd, bipartite=1, **self.nodes[nd].properties)
for ed in self.edges:
B.add_node(ed, bipartite=0, **self.edges[ed].properties)
B.add_edges_from([(*d, self._E[d].properties) for d in self._E])
return B
def _construct_hyp_from_stores(
self, incidence_df, edge_ps=None, node_ps=None, name=None, inplace=False
):
"""
Parameters
----------
incidence_df : pd.DataFrame
edge_ps : PropertyStore, default=None
node_ps: PropertyStore, default=None
name : str, optional, default=None
inplace : bool, optional, default=False
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
"""
if inplace:
h = self
else:
h = Hypergraph()
incidence_store = IncidenceStore(
pd.DataFrame(incidence_df.index.tolist(), columns=["edges", "nodes"])
)
incidence_ps = PropertyStore(
incidence_df, default_weight=self.incidences.default_weight
)
h._E = HypergraphView(incidence_store, 2, incidence_ps)
if edge_ps is None:
edge_ps = PropertyStore(
self.edges.to_dataframe, default_weight=self.edges.default_weight
)
h._edges = HypergraphView(incidence_store, 0, edge_ps)
if node_ps is None:
node_ps = PropertyStore(
self.nodes.to_dataframe, default_weight=self.nodes.default_weight
)
h._nodes = HypergraphView(incidence_store, 1, node_ps)
h._set_default_state()
h._dataframe = h.dataframe
if not inplace:
h.name = name
return h
[docs]
def dual(self, name=None, share_properties=True):
"""
Constructs a new hypergraph with roles of edges and nodes of hypergraph
reversed.
Parameters
----------
name : hashable, optional, default=None
share_properties : bool, optional, default=True
Whether to tie the edge and node properties of
objects in the dual to objects in the hypergraph.
If True, a change to edge and node properties in one will
be reflected in the other.
Returns
-------
Hypergraph
"""
C = self.dataframe.columns.tolist()
dsetsystem = (
self.dataframe[[C[1], C[0]] + C[2:]]
.rename(columns={"edges": "nodes", "nodes": "edges"})
.set_index(["edges", "nodes"])
)
if share_properties:
edge_ps = self._nodes.property_store
node_ps = self._edges.property_store
else:
edge_ps = PropertyStore(self.nodes.to_dataframe)
node_ps = PropertyStore(self.edges.to_dataframe)
hdual = self._construct_hyp_from_stores(
dsetsystem,
edge_ps=edge_ps,
node_ps=node_ps,
name=name or str(self.name) + "_dual",
)
return hdual
[docs]
def equivalence_classes(self, edges=True):
"""
Returns the equivalence classes created by collapsing edges or nodes.
Parameters
----------
edges : bool, optional, default=True
If True collapses edges, otherwise collapses nodes.
Returns
-------
list
A list of sets of edges or nodes
See Also
--------
collapse_edges
collapse_nodes
collapse_nodes_and_edges
"""
level = 0 if edges else 1
return self._E.incidence_store.equivalence_classes(level=level)
[docs]
def collapse_edges(
self,
name=None,
use_uids=None,
use_counts=False,
return_counts=True,
return_equivalence_classes=False,
aggregate_edges_by=None,
aggregate_cells_by=None,
):
"""
Returns a new hypergraph by collapsing edges.
Parameters
----------
name: str, optional, default = None
use_uids: list, optional, default = None
Specify the edge identifiers to use as representatives
for a single equivalence class. If two identifiers occur in the
same equivalence class, the first one found will be used.
use_counts: boolean, optional, default = False
Rename the equivalence class representatives as `<uid>:<size of class>`
return_counts: bool, optional, default = True
Add the size of the equivalence class to the properties
associated to the representative in the collapsed hypergraph using
keyword: `eclass_size`
return_equivalence_classes: boolean, optional, default = False
Returns a dictionary of edge equivalence classes keyed by a
representative from each class
aggregate_edges_by, aggregate_cells_by : dict, optional, default = {'weight':'sum'}
dictionary of aggregation methods keyed by column names
in the properties dataframes, does not apply to misc_properties.
Defaults to 'first' on unlisted columns.
See pandas.core.groupby.DataFrameGroupBy.agg for usage examples with
dictionaries
Returns
-------
Hypergraph
Notes
-----
Collapses the edges of Hypergraph. Two edges are
duplicates if their respective elements are the same.
Using this as an equivalence relation, the uids of the edges
are partitioned into equivalence classes. A single member of the
equivalence class is chosen to represent the class.
Example
-------
>>> data = {'E1': ('a', 'b'), 'E2': ('a', 'b')}
>>> h = Hypergraph(data)
>>> h.incidence_dict
{'E1': ['a', 'b'], 'E2': ['a', 'b']}
>>> h.collapse_edges().incidence_dict
{'E1': ['a', 'b']}
>>> h.collapse_edges(use_counts=True).incidence_dict
{'E1:2': ['a', 'b']}
>>> h.collapse_edges().properties.to_dict(orient='records')
[{'weight': 2.0, 'misc_properties': {}}, {'weight': 2.0, 'misc_properties': {}}]
"""
_, eclasses = self._E.incidence_store.collapse_identical_elements(
0, use_keys=use_uids
)
aggregate_edges_by = aggregate_edges_by or {"weight": "sum"}
aggregate_cells_by = aggregate_cells_by or {"weight": "sum"}
ndf = self.edges.to_dataframe
df = self.incidences.to_dataframe
if use_uids is not None: ## then eclasses will reorder the dataframes
## so the first occurence of the class is one of these uids.
newindex = []
for v in eclasses.values():
newindex += v
ndf = ndf.loc[newindex]
df = df.loc[newindex, :]
if use_counts:
mapper = {vv: f"{k}:{len(v)}" for k, v in eclasses.items() for vv in v}
else:
mapper = {vv: k for k, v in eclasses.items() for vv in v}
ndf = ndf.rename(index=mapper)
ndf = _agg_rows(ndf, ndf.index, aggregate_edges_by)
edge_ps = PropertyStore(ndf)
df = df.rename(index=mapper, level=0)
df = _agg_rows(df, ["edges", "nodes"], aggregate_cells_by)
node_ps = PropertyStore(self.nodes.to_dataframe)
H = self._construct_hyp_from_stores(
df, edge_ps=edge_ps, node_ps=node_ps, name=name
)
if return_counts:
if use_counts:
for nd in H.edges:
H.edges[nd].equivalence_class_size = int(nd.split(":")[1])
else:
for nd in H.edges:
H.edges[nd].equivalence_class_size = len(eclasses[nd])
if return_equivalence_classes:
return H, {mapper[k]: eclasses[k] for k in eclasses}
else:
return H
[docs]
def collapse_nodes(
self,
name=None,
use_uids=None,
use_counts=False,
return_counts=True,
return_equivalence_classes=False,
aggregate_nodes_by=None,
aggregate_cells_by=None,
):
"""
Returns a new hypergraph by collapsing nodes.
Parameters
----------
name: str, optional, default = None
use_uids: list, optional, default = None
Specify the node identifiers to use as representatives
for a single equivalence class. If two identifiers occur in the
same equivalence class, the first one found will be used.
use_counts: boolean, optional, default = False
Rename the equivalence class representatives as `<uid>:<size of class>`
return_counts: bool, optional, default = True
Add the size of the equivalence class to the properties
associated to the representative in the collapsed hypergraph using
keyword: `eclass_size`
return_equivalence_classes: boolean, optional, default = False
Returns a dictionary of edge equivalence classes keyed by a
representative from each class
aggregate_nodes_by, aggregate_cells_by : dict, optional, default = {'weight':'sum'}
dictionary of aggregation methods keyed by column names
in the properties dataframes, does not apply to misc_properties.
Defaults to 'first' on unlisted columns.
See pandas.core.groupby.DataFrameGroupBy.agg for usage examples with
dictionaries
Returns
-------
Hypergraph
Notes
-----
Collapses the nodes of Hypergraph. Two nodes are
duplicates if their respective memberships are the same.
Using this as an equivalence relation, the uids of the nodes
are partitioned into equivalence classes. A single member of the
equivalence class is chosen to represent the class.
Example
-------
>>> data = {'E1': ('a', 'b'), 'E2': ('a', 'b')}
>>> h = Hypergraph(data)
>>> h.incidence_dict
{'E1': ['a', 'b'], 'E2': ['a', 'b']}
>>> h.collapse_nodes().incidence_dict
{'E1': ['a'], 'E2': ['a']}
>>> h.collapse_nodes(use_counts=True).incidence_dict
{'E1: ['a:2'], 'E2': ['a:2']}
>>> h.collapse_nodes().properties.to_dict(orient='records')
[{'weight': 2.0, 'misc_properties': {}}, {'weight': 2.0, 'misc_properties': {}}]
"""
_, eclasses = self._E.incidence_store.collapse_identical_elements(
1, use_keys=use_uids
)
aggregate_nodes_by = aggregate_nodes_by or {"weight": "sum"}
aggregate_cells_by = aggregate_cells_by or {"weight": "sum"}
ndf = self.nodes.to_dataframe
df = self.incidences.to_dataframe
if use_uids is not None: ## then eclasses will reorder the dataframes
## so the first occurence of the class is one of these uids.
newindex = []
for v in eclasses.values():
newindex += v
ndf = ndf.loc[newindex]
df = df.swaplevel().loc[newindex, :].swaplevel()
if use_counts:
mapper = {vv: f"{k}:{len(v)}" for k, v in eclasses.items() for vv in v}
else:
mapper = {vv: k for k, v in eclasses.items() for vv in v}
ndf = ndf.rename(index=mapper)
ndf = _agg_rows(ndf, ndf.index, aggregate_nodes_by)
node_ps = PropertyStore(ndf)
df = df.rename(index=mapper, level=1)
df = _agg_rows(df, ["edges", "nodes"], aggregate_cells_by)
edge_ps = PropertyStore(self.edges.to_dataframe)
H = self._construct_hyp_from_stores(
df, edge_ps=edge_ps, node_ps=node_ps, name=name
)
if return_counts:
if use_counts:
for nd in H.nodes:
H.nodes[nd].equivalence_class_size = int(nd.split(":")[1])
else:
for nd in H.nodes:
H.nodes[nd].equivalence_class_size = len(eclasses[nd])
if return_equivalence_classes:
return H, {mapper[k]: eclasses[k] for k in eclasses}
else:
return H
[docs]
def collapse_nodes_and_edges(
self,
name=None,
use_edge_uids=None,
use_node_uids=None,
use_counts=False,
return_counts=True,
return_equivalence_classes=False,
aggregate_nodes_by=None,
aggregate_edges_by=None,
aggregate_cells_by=None,
):
"""
Returns a new hypergraph by collapsing nodes and edges.
Parameters
----------
name: str, optional, default = None
return_equivalence_classes: boolean, optional, default = False
Returns a dictionary of edge equivalence classes keyed by a
representative from each class
use_edge_uids, use_node_uids: list, optional, default = None
Specify the edge and node identifiers to use as representatives
for a single equivalence class. If two identifiers occur in the
same equivalence class, the first one found will be used.
use_counts: boolean, optional, default = False
Rename the equivalence class representatives as `<uid>:<size of class>`
return_counts: bool, optional, default = True
Add the size of the equivalence class to the properties
associated to the representative in the collapsed hypergraph using
keyword: `eclass_size`
aggregate_nodes_by, aggregate_edges_by, aggregate_cells_by: optional
default = {'weight' = 'sum'}, all
Method to combine duplicate rows of data for the same uids
Returns
-------
new hypergraph : Hypergraph
node equivalence classes : dict
edge equivalence classes : dict
Notes
-----
Collapses the Nodes and Edges of Hypergraph. Two nodes(edges) are
duplicates if their respective memberships(elements) are the same.
Using this as an equivalence relation, the uids of the nodes(edges)
are partitioned into equivalence classes. A single member of the
equivalence class is chosen to represent the class.
Example
-------
>>> data = {'E1': ('a', 'b'), 'E2': ('a', 'b')}
>>> h = Hypergraph(data)
>>> h.incidence_dict
{'E1': ['a', 'b'], 'E2': ['a', 'b']}
>>> h.collapse_nodes_and_edges().incidence_dict
{'E1': ['a']}
>>> h.collapse_nodes_and_edges(use_counts=True).incidence_dict
{'E1:2': ['a:2']}
"""
aggregate_nodes_by = aggregate_nodes_by or {"weight": "sum"}
aggregate_cells_by = aggregate_cells_by or {"weight": "sum"}
aggregate_edges_by = aggregate_edges_by or {"weight": "sum"}
if return_equivalence_classes:
temp, neq = self.collapse_nodes(
return_equivalence_classes=True,
use_uids=use_node_uids,
use_counts=use_counts,
return_counts=return_counts,
aggregate_nodes_by=aggregate_nodes_by,
aggregate_cells_by=aggregate_cells_by,
)
ntemp, eeq = temp.collapse_edges(
name=name,
return_equivalence_classes=True,
use_uids=use_edge_uids,
use_counts=use_counts,
return_counts=return_counts,
aggregate_edges_by=aggregate_edges_by,
aggregate_cells_by=aggregate_cells_by,
)
return ntemp, neq, eeq
else:
temp = self.collapse_nodes(
use_uids=use_node_uids,
use_counts=use_counts,
return_counts=return_counts,
aggregate_nodes_by=aggregate_nodes_by,
aggregate_cells_by=aggregate_cells_by,
)
return temp.collapse_edges(
name=name,
use_uids=use_edge_uids,
use_counts=use_counts,
return_counts=return_counts,
aggregate_edges_by=aggregate_edges_by,
aggregate_cells_by=aggregate_cells_by,
)
[docs]
def restrict_to_nodes(self, nodes, name=None):
"""
New hypergraph gotten by restricting to nodes
Parameters
----------
nodes : Iterable
node identifiers to restrict to
name : str | int, optional, default=None
node identifier
Returns
-------
Hypergraph
"""
keys = list(set(self._state_dict["labels"]["nodes"]).difference(nodes))
return self._remove(keys, level=1, name=name, inplace=False)
[docs]
def restrict_to_edges(self, edges, name=None):
"""New hypergraph gotten by restricting to edges
Parameters
----------
edges : Iterable
edge identifiers to restrict to
name : str | int, optional, default=None
edge identifier
Returns
-------
Hypergraph
"""
keys = list(set(self._state_dict["labels"]["edges"]).difference(edges))
return self._remove(keys, level=0, name=name, inplace=False)
[docs]
def add_edge(self, edge_uid, inplace=True, **attr):
"""
Add a single edge with attributes to edge properties.
Does not add an incidence to the hypergraph.
Parameters
----------
edge_uid : int | str
edge_uid
inplace : bool, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
**attr : dict, optional
Properties to add to edges as key=value pairs.
Returns
-------
Hypergraph
"""
return self._add_items_from([(edge_uid, attr)], 0, inplace=inplace)
[docs]
def add_edges_from(self, edge_uids, inplace=True):
"""
Add a collection of edges with attributes to edge properties.
Does not add an incidence to the hypergraph.
Parameters
----------
edge_uids : list[int | str] | list[tuple[int | str, dict]], list[int | str | tuple[int | str, dict]]
edge_uids must be a list of uids and/or tuples of the form (uid,data) where data is dictionary
inplace : bool, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
"""
edge_uids = self._process_uids(edge_uids)
return self._add_items_from(edge_uids, 0, inplace=inplace)
[docs]
def add_node(self, node_uid, inplace=True, **attr):
"""
Add a single node with attributes to node properties.
Does not add an incidence to the hypergraph.
Parameters
----------
node_uid : int | str
node_uid
inplace : bool, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
**attr : dict, optional
Properties to add to edges as key=value pairs.
Returns
-------
Hypergraph
"""
return self._add_items_from([(node_uid, attr)], 1, inplace=inplace)
[docs]
def add_nodes_from(self, node_uids, inplace=True):
"""
Add a collection of nodes with attributes to nodes properties.
Does not add an incidence to the hypergraph.
Parameters
----------
node_uids : list[int | str] | list[tuple[int | str, dict]], list[int | str | tuple[int | str, dict]]
node_uids must be a list of uids and/or tuples of the form (uid,data) where data is dictionary
inplace : bool, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
"""
node_uids = self._process_uids(node_uids)
return self._add_items_from(node_uids, 1, inplace=inplace)
def _process_uids(self, uids):
"""Returns a list of items in the form of list[tuple[int | str, dict]"""
new_items = list()
for item in uids:
if not isinstance(item, tuple):
new_items.append((item, {}))
else:
new_items.append(item)
return new_items
[docs]
def add_nodes_to_edges(self, edge_dict, inplace=True):
"""
Adds a collection of incidences to Hypergraph
Parameters
----------
edge_dict: dict[str, list[str | int] | dict[str, dict]]
The edge dictionary must be a dictionary of edges as the keys and a list of nodes or a dictionary
of nodes to properties as the values.
inplace : bool, default=True
If True, changes the existing. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
Hypergraph with the updated edges and their newly added nodes
"""
items = list()
for ed, nodes in edge_dict.items():
if isinstance(nodes, dict):
for nd, data in nodes.items():
items.append(((ed, nd), data))
else:
for nd in nodes:
items.append(((ed, nd), {}))
return self._add_items_from(items, 2, inplace=inplace)
[docs]
def add_incidence(self, edge_uid, node_uid, inplace=True, **attr):
"""
Add a single incidence with attributes to Hypergraph.
Parameters
----------
edge_uid : int | str
edge_uid
node_uid : int | str
node_uid
inplace : bool, optional, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
**attr : dict, optional
Properties to add to incidences as key=value pairs.
Returns
-------
Hypergraph
Hypergraph with incidences added.
"""
return self._add_items_from([((edge_uid, node_uid), attr)], 2, inplace=inplace)
[docs]
def add_incidences_from(self, incidences, inplace=True):
"""
Adds a collection of incidences to Hypergraph
Parameters
----------
incidences: list[str | int, str | int], list[tuple[str | int, str | int, dict[str, Any]]
Incidence pairs must be a list of uids of the form (edge_uid,node_uid)
and/or tuples of the form (edge_uid, node_uid,data) where data is a
dictionary.
inplace : bool, optional, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
Hypergraph with incidences added.
"""
newincidences = list()
for pr in incidences:
if len(pr) == 2:
newincidences.append((pr, {}))
else:
newincidences.append(((pr[0], pr[1]), pr[2]))
return self._add_items_from(newincidences, 2, inplace=inplace)
def _add_items_from(self, items, level, inplace=True):
"""
Helper method to add items to Hypergraph
Parameters
----------
items : list[tuple[str | int, dict[str, Any]]]
Items must be a list of tuples of the form (uid,data) where data is dictionary
level : int
the level to add the items to; 0=edges, 1=nodes, 2=incidences
inplace : bool, optional, default=True
If True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Returns
-------
Hypergraph
"""
df = self.incidences._property_store
ep = self.edges._property_store
ndp = self.nodes._property_store
hv = [ep, ndp, df][level]
for item in items:
uid = item[0]
data = item[1]
hv.set_properties(uid, data)
return self._construct_hyp_from_stores(
df.properties, edge_ps=ep, node_ps=ndp, name=self.name, inplace=inplace
)
#### This should follow behavior of restrictions
[docs]
def remove_edges(self, edge_uids, name=None, inplace=True):
"""
Removes the edges from the Hypergraph.
If inplace=True, changes the existing Hypergraph. Otherwise,
creates a new Hypergraph with the requested changes.
Parameters
----------
edge_uids : str | int | list[str | int]
edge_uids
name : str, optional, default=None
The name of the new Hypergraph. Used only when inplace=False;
ignored if inplace=True.
inplace : bool, optional, default=True
Whether to replace the current hypergraph with a new one.
Returns
-------
Hypergraph
"""
if not isinstance(edge_uids, list):
edge_uids = [edge_uids]
return self._remove(edge_uids, level=0, name=name, inplace=inplace)
[docs]
def remove_nodes(self, node_uids, name=None, inplace=True):
"""
Removes the nodes from the Hypergraph.
If inplace=True, changes the existing Hypergraph. Otherwise, creates a new Hypergraph with the requested changes.
Parameters
----------
node_uids : str | int | list[str | int]
node_uids
name : str, optional, default=None
The name of the new Hypergraph. Used only when inplace=False; ignored if inplace=True.
inplace : bool, optional, default=True
Whether to replace the current hypergraph with a new one.
Returns
-------
Hypergraph
"""
if not isinstance(node_uids, list):
node_uids = [node_uids]
return self._remove(node_uids, level=1, name=name, inplace=inplace)
[docs]
def remove_incidences(self, incidence_uids, name=None, inplace=True):
"""
Removes the incidences from the Hypergraph.
If inplace=True, changes the existing Hypergraph. Otherwise,
creates a new Hypergraph with the requested changes.
Parameters
----------
incidence_uids : tuple[str | int] | list[tuple[str | int]]
incidence_uids
name : str, optional, default=None
The name of the new Hypergraph. Used only when inplace=False;
ignored if inplace=True.
inplace : bool, optional, default=True
Whether to replace the current hypergraph with a new one.
Returns
-------
Hypergraph
"""
if not isinstance(incidence_uids, list):
incidence_uids = [incidence_uids]
return self._remove(incidence_uids, name=name, inplace=inplace)
def _remove(self, uids, level=2, name=None, inplace=False):
"""
Creates a hypergraph with nodes and/or edges indexed by keys removed.
More efficient for creating a restricted hypergraph if the
restricted set is greater than what is being removed.
If inplace=True, changes the existing Hypergraph.
Otherwise, creates a new Hypergraph with the requested changes.
Parameters
----------
uids : list
list of uids from edges, nodes, or incidence pairs(listed as tuples)
level : int, optional, default=2
Enter 0 to remove edges.
Enter 1 to remove nodes.
Enter 2 to remove incidence pairs as tuples
name : str, optional, default=None
The name of the new Hypergraph. Used only when inplace=False;
ignored if inplace=True.
inplace : bool, default=False
Whether to replace the current hypergraph with the new one.
Returns
-------
Hypergraph
Notes
-----
Removal of a node or edge from the hypergraph will remove all
instances of these objects from incidence pairs and from the data.
Removal of an incidence pair, only removes that pair but does not
affect the user data attached to the edge and node in the pair.
"""
if inplace:
df = self.incidences.properties
ep = self.edges.property_store
ndp = self.nodes.property_store
else:
df = self.incidences.to_dataframe
ep = self.edges.property_store.copy(deep=True)
ndp = self.nodes.property_store.copy(deep=True)
if level in [0, 1]:
hv = [ep, ndp][level]
df = df.drop(labels=uids, level=level, errors="ignore")
hv._data = hv._data.drop(labels=uids, errors="ignore")
else:
df = df.drop(labels=uids, errors="ignore")
return self._construct_hyp_from_stores(
df, edge_ps=ep, node_ps=ndp, name=name, inplace=inplace
)
[docs]
def toplexes(self, return_hyp=False):
"""
Computes a maximal collection of toplexes for the hypergraph.
A :term:`toplex` is a hyperedge, which is not contained in any other
hyperedge. If return_hyp=True, then returns the :term:`simple hypergraph` created by restricting
to the toplexes.
Parameters
----------
return_hyp: bool, optional, default=False
Returns
-------
Hypergraph | list
"""
def operate(a, b):
return np.mod(np.mod(a * b, 2) + b, 2)
df = self.incidence_dataframe().T
toplexes = []
while True:
edge_sizes = dict(df.sum(axis=1))
edges = np.array(
sorted(df.index, key=lambda x: edge_sizes[x], reverse=True)
)
toplexes.append(edges[0])
df = df.loc[edges]
mat = df.values
df = df.loc[edges[operate(mat[0], mat).sum(axis=1).nonzero()]]
if len(df.index) == 0:
break
if return_hyp:
return self.restrict_to_edges(toplexes)
return toplexes
#### hypergraph method using linegraph gotten from incidence store
[docs]
def is_connected(self, s=1, edges=False):
"""
Determines if hypergraph is :term:`s-connected`.
Parameters
----------
s: int, optional, default=1
edges: boolean, optional, default=False
If True, will determine if s-edge-connected.
For s=1 s-edge-connected is the same as s-connected.
Returns
-------
is_connected : boolean
Notes
-----
A hypergraph is s node connected if for any two nodes v0,vn
there exists a sequence of nodes v0,v1,v2,...,v(n-1),vn
such that every consecutive pair of nodes v(i),v(i+1)
share at least s edges.
A hypergraph is s edge connected if for any two edges e0,en
there exists a sequence of edges e0,e1,e2,...,e(n-1),en
such that every consecutive pair of edges e(i),e(i+1)
share at least s nodes.
"""
g = self.get_linegraph(s=s, edges=edges)
is_connected = None
try:
is_connected = nx.is_connected(g)
except nx.NetworkXPointlessConcept:
warnings.warn("Graph is null; ")
is_connected = False
return is_connected
[docs]
def singletons(self):
"""
Returns a list of singleton edges. A singleton edge is an edge of
size 1 with a node of degree 1.
Returns
-------
singles : list
A list of edge uids.
"""
M, _, cdict = self.incidence_matrix(index=True)
# which axis has fewest members? if 1 then columns
idx = np.argmax(M.shape).tolist()
# we add down the row index if there are fewer columns
cols = M.sum(idx)
singles = []
# index along opposite axis with one entry each
for c in np.nonzero((cols - 1 == 0))[(idx + 1) % 2]:
# if the singleton entry in that column is also
# singleton in its row find the entry
if idx == 0:
r = np.argmax(M.getcol(c))
# and get its sum
s = np.sum(M.getrow(r))
# if this is also 1 then the entry in r,c represents a
# singleton so we want to change that entry to 0 and
# remove the row. this means we want to remove the
# edge corresponding to c
if s == 1:
singles.append(cdict[c])
else: # switch the role of r and c
r = np.argmax(M.getrow(c))
s = np.sum(M.getcol(r))
if s == 1:
singles.append(cdict[r])
return singles
[docs]
def remove_singletons(self, name=None):
"""
Constructs clone of hypergraph with singleton edges removed.
Parameters
----------
name : str, optional, default=None
Returns
-------
Hypergraph
"""
singletons = self.singletons()
if len(singletons) > len(self.edges):
edges = [e for e in self.edges if e not in singletons]
return self.restrict_to_edges(edges, name=name)
return self.remove_edges(singletons, name=name, inplace=False)
[docs]
def s_connected_components(self, s=1, edges=True, return_singletons=False):
"""
Returns a generator for the :term:`s-edge-connected component`
or the :term:`s-node-connected component` of the hypergraph.
Parameters
----------
s : int, optional, default=1
edges : boolean, optional, default=True
If True, return edge components; otherwise, return node components
return_singletons : bool, optional, default=False
If True, keep singletons. Otherwise, remove singletons
Notes
-----
If edges=True, this method returns the s-edge-connected components as
lists of lists of edge uids.
An s-edge-component has the property that for any two edges e1 and e2
there is a sequence of edges starting with e1 and ending with e2
such that pairwise adjacent edges in the sequence intersect in at least
s nodes. If s=1 these are the path components of the hypergraph.
If edges=False this method returns s-node-connected components.
A list of sets of uids of the nodes which are s-walk connected.
Two nodes v1 and v2 are s-walk-connected if there is a
sequence of nodes starting with v1 and ending with v2 such that
pairwise adjacent nodes in the sequence share s edges. If s=1 these
are the path components of the hypergraph.
Example
-------
>>> S = {'A':{1,2,3},'B':{2,3,4},'C':{5,6},'D':{6}}
>>> H = Hypergraph(S)
>>> list(H.s_connected_components(edges=True))
[{'C', 'D'}, {'A', 'B'}]
>>> list(H.s_connected_components(edges=False))
[{1, 2, 3, 4}, {5, 6}]
Yields
------
s_connected_components : iterator
Iterator returns sets of uids of the edges (or nodes) in the
s-edge(node) components of hypergraph.
"""
g = self.get_linegraph(s, edges=edges)
for c in nx.connected_components(g):
if not return_singletons and len(c) == 1:
continue
yield c
[docs]
def s_component_subgraphs(
self, s=1, edges=True, return_singletons=False, name=None
):
"""
Returns a generator for the induced subgraphs of s_connected
components. Removes singletons unless return_singletons is set to True.
Computed using s-linegraph generated either by the hypergraph
(edges=True) or its dual (edges = False)
Parameters
----------
s : int, optional, default=1
edges : boolean, optional, default=False
Determines if edge or node components are desired. Returns
subgraphs equal to the hypergraph restricted to each set of
nodes(edges) in the s-connected components or s-edge-connected
components
return_singletons : bool, optional, default=False
If True, keep singletons in subgraph. Otherwise, remove singletons.
name : str, optional, default=None
Yields
------
s_component_subgraphs : iterator
Iterator returns subgraphs generated by the edges (or nodes) in the
s-edge(node) components of hypergraph.
"""
for idx, c in enumerate(
self.s_components(s=s, edges=edges, return_singletons=return_singletons)
):
if edges:
yield self.restrict_to_edges(c, name=f"{name or self.name}:{idx}")
else:
yield self.restrict_to_nodes(c, name=f"{name or self.name}:{idx}")
[docs]
def s_components(self, s=1, edges=True, return_singletons=True):
"""
Same as :meth:`s_connected_components`
See Also
--------
s_connected_components
"""
return self.s_connected_components(
s=s, edges=edges, return_singletons=return_singletons
)
[docs]
def connected_components(self, edges=False):
"""
Same as :meth:`s_connected_components` with s=1, but nodes are returned
by default. Return iterator.
See Also
--------
s_connected_components
"""
return self.s_connected_components(edges=edges, return_singletons=True)
[docs]
def connected_component_subgraphs(self, return_singletons=True, name=None):
"""
Same as :meth:`s_component_subgraphs` with s=1. Returns iterator
See Also
--------
s_component_subgraphs
"""
return self.s_component_subgraphs(
return_singletons=return_singletons, name=name
)
[docs]
def components(self, edges=False):
"""
Same as :meth:`s_connected_components` with s=1, but nodes are returned
by default. Return iterator.
See Also
--------
s_connected_components
"""
return self.s_connected_components(s=1, edges=edges, return_singletons=True)
[docs]
def component_subgraphs(self, return_singletons=False, name=None):
"""
Same as :meth:`s_components_subgraphs` with s=1. Returns iterator.
See Also
--------
s_component_subgraphs
"""
return self.s_component_subgraphs(
return_singletons=return_singletons, name=name
)
[docs]
def node_diameters(self, s=1):
"""
Returns the node diameters of the connected components in the hypergraph.
Parameters
----------
s : int, optional, default=1
Returns
-------
maximum diameter, list of diameters, list of component : tuple[int, list, list]
maximum diameter, list of diameters (List of node_diameters for s-node component subgraphs in hypergraph),
list of component (List of the node uids in the s-node component subgraphs)
"""
A, coldict = self.adjacency_matrix(s=s, index=True)
G = nx.from_scipy_sparse_array(A)
diams = []
comps = []
for c in nx.connected_components(G):
diamc = nx.diameter(G.subgraph(c))
temp = set()
for e in c:
temp.add(coldict[e])
comps.append(temp)
diams.append(diamc)
loc = np.argmax(diams).tolist()
return diams[loc], diams, comps
[docs]
def edge_diameters(self, s=1):
"""
Returns the edge diameters of the s_edge_connected component subgraphs in the hypergraph.
Parameters
----------
s : int, optional, default=1
Returns
-------
maximum diameter, list of diameters, list of component : tuple[int, list, list]
maximum diameter, list of diameters (List of edge_diameters for s-edge component subgraphs in hypergraph),
list of component (List of the edge uids in the s-edge component subgraphs)
"""
A, coldict = self.edge_adjacency_matrix(s=s, index=True)
G = nx.from_scipy_sparse_array(A)
diams = []
comps = []
for c in nx.connected_components(G):
diamc = nx.diameter(G.subgraph(c))
temp = set()
for e in c:
temp.add(coldict[e])
comps.append(temp)
diams.append(diamc)
loc = np.argmax(diams).tolist()
return diams[loc], diams, comps
[docs]
def diameter(self, s=1):
"""
Returns the length of the longest shortest s-walk between nodes in
hypergraph
Parameters
----------
s : int, optional, default=1
Returns
-------
diameter : int
Raises
------
HyperNetXError
If hypergraph is not s-edge-connected
Notes
-----
Two nodes are s-adjacent if they share s edges.
Two nodes v_start and v_end are s-walk connected if there is a
sequence of nodes v_start, v_1, v_2, ... v_n-1, v_end such that
consecutive nodes are s-adjacent. If the graph is not connected,
an error will be raised.
"""
adj_matrix = self.adjacency_matrix(s=s)
graph = nx.from_scipy_sparse_array(adj_matrix)
if nx.is_connected(graph):
return nx.diameter(graph)
raise HyperNetXError(f"Hypergraph is not s-connected. s={s}")
[docs]
def edge_diameter(self, s=1):
"""
Returns the length of the longest shortest :term:`s-walk` between edges in the hypergraph
Parameters
----------
s : int, optional, default=1
Return
------
edge_diameter : int
Raises
------
HyperNetXError
If hypergraph is not s-edge-connected
Notes
-----
Two edges are s-adjacent if they share s nodes.
Two nodes e_start and e_end are s-walk connected if there is a
sequence of edges e_start, e_1, e_2, ... e_n-1, e_end such that
consecutive edges are s-adjacent. If the graph is not connected, an
error will be raised.
"""
A = self.edge_adjacency_matrix(s=s)
G = nx.from_scipy_sparse_array(A)
if nx.is_connected(G):
return nx.diameter(G)
raise HyperNetXError(f"Hypergraph is not s-connected. s={s}")
[docs]
def distance(self, source, target, s=1):
"""
Returns the shortest :term:`s-walk` distance between two nodes in the hypergraph.
Parameters
----------
source : str | int
a node in the hypergraph
target : str | int
a node in the hypergraph
s : positive int, optional, default=1
the number of edges
Returns
-------
s-walk distance : int
See Also
--------
edge_distance
Notes
-----
The s-distance is the shortest s-walk length between the nodes.
An s-walk between nodes is a sequence of nodes that pairwise share
at least s edges. The length of the shortest s-walk is 1 less than
the number of nodes in the path sequence.
Uses the networkx shortest_path_length method on the graph
generated by the s-adjacency matrix.
"""
g = self.get_linegraph(s=s, edges=False)
try:
dist = nx.shortest_path_length(g, source, target)
except (nx.NetworkXNoPath, nx.NodeNotFound):
warnings.warn(f"No {s}-path between {source} and {target}")
dist = np.inf
return dist
[docs]
def edge_distance(self, source, target, s=1):
"""
Returns the shortest :term:`s-walk` distance between two edges in the hypergraph.
Parameters
----------
source : str | int
an edge in the hypergraph
target : str | int
an edge in the hypergraph
s : positive int, optional, default=1
the number of intersections between pairwise consecutive edges
Returns
-------
s-walk distance : int | float
The shortest s-walk edge distance. A shortest s-walk is computed as a sequence of edges;
the s-walk distance is the number of edges in the sequence
minus 1. If no such path exists returns np.inf.
See Also
--------
distance
Notes
-----
The s-distance is the shortest s-walk length between the edges.
An s-walk between edges is a sequence of edges such that
consecutive pairwise edges intersect in at least s nodes. The
length of the shortest s-walk is 1 less than the number of edges
in the path sequence.
Uses the networkx shortest_path_length method on the graph
generated by the s-edge_adjacency matrix.
"""
g = self.get_linegraph(s=s, edges=True)
try:
edge_dist = nx.shortest_path_length(g, source, target)
except (nx.NetworkXNoPath, nx.NodeNotFound):
warnings.warn(f"No {s}-path between {source} and {target}")
edge_dist = np.inf
return edge_dist
#### Needs to create stores then hypergraph.
[docs]
@classmethod
#### Need to preserve graph properties in data
def from_bipartite(cls, B, node_id=1, name=None, **kwargs):
"""
Static method creates a Hypergraph from a NetworkX bipartite graph.
Still to come: capturing edge and node properties from the graph for use
in the hypergraph.
Parameters
----------
B: nx.Graph()
A networkx bipartite graph. Each node in the graph has a property
'bipartite' taking the value of 0 or 1 indicating a 2-coloring of
the graph.
node_id : int
bipartite value assigned to graph nodes that will be hypergraph
edges
name: hashable, optional
Returns
-------
Hypergraph
Notes
-----
A partition for the nodes in a bipartite graph generates a hypergraph.
>>> import networkx as nx
>>> B = nx.Graph()
>>> B.add_nodes_from([1, 2, 3, 4], bipartite=0)
>>> B.add_nodes_from(['a', 'b', 'c'], bipartite=1)
>>> B.add_edges_from([(1, 'a'), (1, 'b'), (2, 'b'), (2, 'c'), (3, 'c'), (4, 'a')])
>>> H = Hypergraph.from_bipartite(B, nodes=1)
>>> list(H.nodes), list(H.edges)
(['a', 'b', 'c'], [1, 2, 3, 4])
"""
edges = []
nodes = []
for n, d in B.nodes(data=True):
if d["bipartite"] == node_id:
nodes.append(n)
else:
edges.append(n)
if not bipartite.is_bipartite_node_set(B, nodes):
raise HyperNetXError(
"Error: Method requires a 2-coloring of a bipartite graph."
)
elist = []
for e in list(B.edges):
if e[0] in edges:
elist.append([e[0], e[1]])
else:
elist.append([e[1], e[0]])
df = pd.DataFrame(elist)
return Hypergraph(df, name=name, **kwargs)
[docs]
@classmethod
def from_incidence_matrix(
cls,
M,
name=None,
**kwargs,
):
"""
Accepts numpy.matrix or scipy.sparse matrix
"""
mat = coo_matrix(M)
edges = mat.col
nodes = mat.row
weights = mat.data
df = pd.DataFrame({"edges": edges, "nodes": nodes, "weights": weights})
return Hypergraph(df, cell_weight_col="weights", name=name, **kwargs)
[docs]
@classmethod
def from_numpy_array(
cls,
M,
node_names=None,
edge_names=None,
name=None,
key=None,
**kwargs,
):
"""
Create a hypergraph from a real valued matrix represented as a 2 dimensional numpy array.
The matrix is converted to a matrix of 0's and 1's so that any truthy cells are converted to 1's and
all others to 0's.
Parameters
----------
M : real valued array-like object, 2 dimensions
representing a real valued matrix with rows corresponding to nodes and columns to edges
node_names : object, array-like, default=None
List of node names must be the same length as M.shape[0].
If None then the node names correspond to row indices with 'v' prepended.
edge_names : object, array-like, default=None
List of edge names must have the same length as M.shape[1].
If None then the edge names correspond to column indices with 'e' prepended.
name : hashable
key : (optional) function
boolean function to be evaluated on each cell of the array,
must be applicable to numpy.array
Returns
-------
: Hypergraph
Note
----
The constructor does not generate empty edges.
All zero columns in M are removed and the names corresponding to these
edges are discarded.
"""
# Create names for nodes and edges
# Validate the size of the node and edge arrays
M = np.array(M)
if len(M.shape) != 2:
raise HyperNetXError("Input requires a 2 dimensional numpy array")
# apply boolean key if available
if key is not None:
M = key(M)
if node_names is not None:
nodenames = np.array(node_names)
if len(nodenames) != M.shape[0]:
raise HyperNetXError(
"Number of node names does not match number of rows."
)
else:
nodenames = np.array([f"v{idx}" for idx in range(M.shape[0])])
if edge_names is not None:
edgenames = np.array(edge_names)
if len(edgenames) != M.shape[1]:
raise HyperNetXError(
"Number of edge_names does not match number of columns."
)
else:
edgenames = np.array([f"e{jdx}" for jdx in range(M.shape[1])])
df = pd.DataFrame(M, columns=edgenames, index=nodenames)
return Hypergraph.from_incidence_dataframe(df, name=name)
[docs]
@classmethod
def from_incidence_dataframe(
cls,
df,
name=None,
fillna=0,
key=None,
return_only_dataframe=False,
**kwargs,
):
"""
Create a hypergraph from a Pandas Dataframe object, which has values equal
to the incidence matrix of a hypergraph. Its index will identify the nodes
and its columns will identify its edges.
Parameters
----------
df : Pandas.Dataframe
a real valued dataframe with a single index
name : (optional) string, default = None
fillna : float, default = 0
a real value to place in empty cell, all-zero columns will not
generate an edge.
key : (optional) function, default = None
boolean function to be applied to dataframe. will be applied to
entire dataframe.
return_only_dataframe : (optional) bool, default = False
to use the incidence_dataframe with cell_properties or properties, set this
to true and use it as the setsystem in the Hypergraph constructor.
See also
--------
from_numpy_array
Returns
-------
Hypergraph | pd.DataFrame
"""
if not isinstance(df, pd.DataFrame):
raise HyperNetXError("Error: Input object must be a pandas dataframe.")
df = df.fillna(fillna)
if key:
mat = key(df.values) * 1
else:
mat = df.values * 1
cols = df.columns
rows = df.index
CM = coo_matrix(mat)
c1 = CM.row
c1 = [rows[c1[idx]] for idx in range(len(c1))]
c2 = CM.col
c2 = [cols[c2[idx]] for idx in range(len(c2))]
c3 = CM.data
dfnew = pd.DataFrame({"edges": c2, "nodes": c1, "weight": c3})
if return_only_dataframe is True:
return dfnew
else:
return Hypergraph(dfnew, cell_weight_col="weight", name=name, **kwargs)
def __add__(self, other):
"""
Concatenate incidences from two hypergraphs, removing duplicates and
dropping duplicate property data in the order of addition.
Parameters
----------
other : Hypergraph
Returns
-------
Hypergraph
"""
return self.sum(other)
def __sub__(self, other):
"""
Concatenate incidences from two hypergraphs, removing duplicates and
dropping duplicate property data in the order of addition.
Parameters
----------
other : Hypergraph
Returns
-------
Hypergraph
"""
return self.difference(other)
[docs]
def sum(self, other, name=None):
"""
Hypergraph obtained by joining incidences from self and other.
Removes duplicates and uses properties of self.
Parameters
----------
other : Hypergraph
Returns
-------
Hypergraph
"""
incidence_df = self._combine_properties_dataframes(
self.incidences.to_dataframe, other.incidences.to_dataframe
)
edges_data = self._combine_properties_dataframes(
self.edges.to_dataframe, other.edges.to_dataframe
)
nodes_data = self._combine_properties_dataframes(
self.nodes.to_dataframe, other.nodes.to_dataframe
)
return self._construct_hyp_from_stores(
incidence_df,
edge_ps=PropertyStore(edges_data),
node_ps=PropertyStore(nodes_data),
name=name,
)
def _combine_properties_dataframes(self, df1, df2):
df = pd.concat([df1, df2])
return df[~df.index.duplicated(keep="first")]
[docs]
def difference(self, other, name=None):
"""
Hypergraph obtained by restricting to incidences in self but not in other.
Parameters
----------
other : Hypergraph
name : str, optional, default = None
Returns
-------
Hypergraph
"""
ndx = list(self.incidences.items.difference(other.incidences.items))
ndf = self.incidences.to_dataframe.loc[ndx]
return self._construct_hyp_from_stores(ndf, name=name)
[docs]
def intersection(self, other, name=None):
"""
Returns a hypergraph created by restricting to incidence pairs contained in both self and other.
Properties inherited from self.
Parameters
----------
other : Hypergraph
name : str, optional, default=None
Returns
-------
Hypergraph
"""
nodes_intersection = list(
self.incidences.items.intersection(other.incidences.items)
)
incidence_df = self.incidences.to_dataframe.loc[nodes_intersection]
return self._construct_hyp_from_stores(incidence_df, name=name)
[docs]
def union(self, other, name=None):
"""
The hypergraph gotten by joining incidence pairs contained in
self and other. Duplicates removed. Properties inherited from self.
Same as :meth:`sum`
Parameters
----------
other : Hypergraph
name : str, optional, default=None
Returns
-------
Hypergraph
"""
return self.sum(other, name=name)
def _agg_rows(df, groupby, rule_dict=None):
"""
Helper method for collapsing nodes and edges in hypergraph
Parameters
----------
df : pandas.DataFrame
groupby : index or columns aggregating on
rule_dict : dict, optional, defaults as 'first' for all keys
dictionary keyed by df columns where values are the
aggregation rules e.g. 'first', 'sum', 'last'
Returns
-------
pandas.DataFrame
"""
default_agg = {col: "first" for col in df.columns}
for k, v in rule_dict.items():
if k in default_agg:
default_agg[k] = v
return df.reset_index().groupby(groupby).agg(default_agg)