# File: probe/visual_utils.py
# Last Update: 05/15/24
# Updated by: JW
import warnings
import numpy as np
import pandas as pd
from ..data_utils import (
custom_Zscore,
error,
get_minimal_std,
select_highestZscoreCols,
std_zscore_threshold_filter,
)
from ..observatory import Observatory
[docs]
def initialize():
return jmapObservatory
[docs]
class jmapObservatory(Observatory):
"""
Custom observatory for viewing JMAP Stars.
This class extends the `Observatory` class and provides additional
functionality specific to the graph models outputted by JMAP Star.
Parameters:
-----------
star_file : str
The file path to the star file.
Attributes:
-----------
_unclustered : list
A list of unclustered items.
_group_lookuptable : dict
A dictionary to aid in group decomposition and item lookup.
_node_lookuptable : dict
A dictionary to aid in group decomposition and item lookup.
_group_directory : dict
A dictionary containing cluster members for each group.
Methods:
--------
get_items_groupID(item_id)
Look up function for finding an item's connected component (i.e., group).
get_items_nodeID(item_id)
Look up function for finding an item's member node.
get_nodes_members(node_id)
Look up function to get members of a node.
get_groups_members(group_id)
Look up function to get items within a group.
get_groups_member_nodes(group_id)
Look up function to get nodes within a connected component.
get_nodes_groupID(node_id)
Returns the node's group id.
get_global_stats()
Calculates global mean and standard deviation statistics.
get_nodes_raw_df(node_id)
Returns a subset of the raw dataframe only containing members of the
specified node.
get_nodes_clean_df(node_id)
Returns a subset of the clean dataframe only containing members of the
specified node.
get_nodes_projections(node_id)
Returns a subset of the projections array only containing members of
the specified node.
get_groups_raw_df(group_id)
Returns a subset of the raw dataframe only containing members of the
specified group.
get_groups_clean_df(group_id)
Returns a subset of the clean dataframe only containing members of the
specified group.
get_groups_projections(group_id)
Returns a subset of the projections array only containing members of
the specified group.
compute_node_description(node_id, description_fn=get_minimal_std)
Compute a simple description of each node in the graph.
compute_group_description(group_id, description_fn=get_minimal_std)
Compute a simple description of a policy group.
Example
---------
>>> from thema.probe.observatories import jmapObservatory
>>> star_file = "path/to/star_file"
>>> obs = jmapObservatory(star_file)
>>> obs.get_items_groupID(1)
"""
def __init__(self, star_file):
"""
Initialize a JmapObservatory object.
Parameters
----------
star_file : str
The path to the star file.
Returns
-------
None
Notes
-----
This constructor initializes a JmapObservatory object by calling the superclass's
constructor and setting up various data structures for group decomposition and item lookup.
The following instance variables are initialized:
- self._unclustered : list
A list of unclustered items obtained from the star file.
- self._group_lookuptable : dict
A dictionary that maps each item to the list of groups it belongs to.
- self._node_lookuptable : dict
A dictionary that maps each item to the list of nodes it belongs to.
- self._group_directory : dict
A dictionary that maps each group to its cluster members.
"""
super().__init__(star_file=star_file)
self._unclustered = self.star.get_unclustered_items()
self._group_lookuptable = {key: [] for key in self.data.index}
self._node_lookuptable = {key: [] for key in self.data.index}
self._group_directory = {}
node_members = self.star.nodes
for i in self.star.starGraph.components:
cluster_members = {}
for node in self.star.starGraph.components[i].nodes:
cluster_members[node] = node_members[node]
for item in node_members[node]:
self._node_lookuptable[item] = self._node_lookuptable[item] + [node]
self._group_lookuptable[item] = list(
set(self._group_lookuptable[item] + [i])
)
self._group_directory[i] = cluster_members
[docs]
def get_items_groupID(self, item_id: int):
"""
Look up function for finding an item's connected component (ie group)
Parameters:
-----------
item_id: int
Index of desired look up item from user's raw data frame
Returns:
--------
A list of group ids that the item is a member of (-1 if unclustered)
"""
if item_id in self._unclustered:
return -1
else:
return self._group_lookuptable[item_id][0]
[docs]
def get_items_nodeID(self, item_id: int):
"""
Look up function for finding item's member node
Parameters:
-----------
item_id: int
Index of desired look up item from user's raw data frame
Returns:
--------
A list of node ids that the item is a member of
"""
if item_id in self._unclustered:
return -1
else:
return self._node_lookuptable[item_id]
[docs]
def get_nodes_members(self, node_id: str):
"""
Look up function to get members of a node
Parameters:
-----------
node_id: str
String identifier of a node
Returns:
--------
A list of member items
"""
return self._group_directory[self.get_nodes_groupID(node_id)][node_id]
[docs]
def get_groups_members(self, group_id: int):
"""
Look up function to get items within a group
Parameters:
-----------
group_id: int
Group number of desired connected component
Returns:
--------
A list of the item members for the specified group
"""
member_list = []
for node in self._group_directory[group_id].keys():
member_list = member_list + self._group_directory[group_id][node]
return list(set(member_list))
[docs]
def get_groups_member_nodes(self, group_id: int):
"""
Look up Function to get nodes within a connected component
Parameters:
-----------
group_id: int
Group number of desired connected component
Returns:
--------
A list of node members for the specified group
"""
return [node for node in self._group_directory[group_id].keys()]
[docs]
def get_nodes_groupID(self, node_id: str):
"""
Returns the node's group id.
Parameters:
-----------
node_id : str
A character ID specifying the node
Returns:
--------
A group ID number.
"""
for group in self._group_directory.keys():
for node in self._group_directory[group].keys():
if node == node_id:
return group
return None
[docs]
def get_global_stats(self):
"""
Calculates global mean and standard deviation statistics.
Returns
-------
A dictionary containing statistics on both raw and clean df subsets for each group.
"""
group_stats = {}
raw_stats = pd.DataFrame()
clean_stats = pd.DataFrame()
dropped_columns = self.jmapper.tupper.get_dropped_columns()
for id in self._group_directory.keys():
numeric_columns = (
self.get_groups_raw_df(id)
.drop(columns=dropped_columns)
.select_dtypes(include=np.number)
.columns
)
raw_sub_df = self.get_groups_raw_df(id).select_dtypes(include=np.number)
raw_stats["std"] = raw_sub_df.std()
raw_stats["mean"] = raw_sub_df.mean()
clean_sub_df = self.get_groups_clean_df(id)[numeric_columns]
clean_stats["std"] = clean_sub_df.std()
clean_stats["mean"] = clean_sub_df.mean()
group_stats[id] = {"raw": raw_stats, "clean": clean_stats}
return group_stats
[docs]
def get_nodes_raw_df(self, node_id: str):
"""
Returns a subset of the raw dataframe only containing members of the specified node.
Parameters
----------
node_id : str
A node's string identifier
Returns
--------
A pandas data frame.
"""
member_items = self.get_nodes_members(node_id)
return self.data.iloc[member_items]
[docs]
def get_nodes_clean_df(self, node_id: str):
"""
Returns a subset of the clean dataframe only containing members of the specified node.
Parameters
----------
node_id : str
A node's string identifier
Returns
--------
A pandas data frame.
"""
member_items = self.get_nodes_members(node_id)
return self.clean.iloc[member_items]
[docs]
def get_nodes_projections(self, node_id: str):
"""
Returns a subset of the projectinos array only containing members of the specified node.
Parameters
----------
node_id : str
A node's string identifier
Returns
--------
An np.array of projections.
"""
member_items = self.get_nodes_members(node_id)
projections = {}
for item in member_items:
projections[item] = self._projection[item]
return projections
[docs]
def get_groups_raw_df(self, group_id: int):
"""
Returns a subset of the raw dataframe only containing members of the specified group.
Parameters
----------
group_id : int
A group's identifier
Returns
--------
A pandas data frame.
"""
member_items = self.get_groups_members(group_id)
return self.data.iloc[member_items]
[docs]
def get_groups_clean_df(self, group_id: int):
"""
Returns a subset of the clean dataframe only containing members of the specified group.
Parameters
----------
group_id : int
A group's identifier
Returns
--------
A pandas data frame.
"""
member_items = self.get_groups_members(group_id)
return self.clean.iloc[member_items]
[docs]
def get_groups_projections(self, group_id: int):
"""
Returns a subset of the projectinos array only containing members of the specified group.
Parameters
----------
node_id : str
A groups's identifier
Returns
--------
An np.array of projections.
"""
member_items = self.get_groups_members(group_id)
projections = {}
for item in member_items:
projections[item] = self._projection[item]
return projections
# NOTE: Implementations of description functions can be found in data_utils.py
[docs]
def compute_node_description(self, node_id: str, description_fn=get_minimal_std):
"""
Compute a simple description of each node in the graph.
This function labels each node based on a description function. The description
function is used to select a defining column from the original dataset, which will
serve as a representative of the noes identity. Obviously there is a number of ways
to do this, but as a default this computes the most homogenous data column for a each
node.
Parameters:
----------
node_id:
A node identifier (-1 for unclustered items)
description_fn: function
A function that takes a data frame, mask, and density columns and returns
a column.
Returns:
--------
A dictionary containing the representing column label and the number of items in
the node.
"""
cols = np.intersect1d(
self.data.select_dtypes(include=["number"]).columns,
self.clean.columns,
)
if node_id == -1:
mask = self._unclustered
else:
mask = self.get_nodes_members(node_id)
label = description_fn(
df=self.clean,
mask=mask,
density_cols=cols,
)
size = len(mask)
return {"label": label, "size": size}
[docs]
def compute_group_description(self, group_id: int, description_fn=get_minimal_std):
"""
Compute a simple description of a policy group.
This function creates a density description based on its member nodes description
in compute_node_description().
Parameters:
-----------
group_id: int
A group's identifier (-1 to get unclustered group)
description_fn: function
A function to be passed to compute_node_description()
Returns
-------
A density description of the group.
"""
tmp = {}
group_size = 0
if group_id == -1:
unclustered_density = self.compute_node_description(
-1, description_fn=description_fn
)
return {unclustered_density["label"]: 1}
else:
member_nodes = self.get_groups_member_nodes(group_id)
for node in member_nodes:
node_density = self.compute_node_description(
node, description_fn=description_fn
)
label = node_density["label"]
size = node_density["size"]
group_size += size
# If multiple nodes have same identifying column
if label in tmp.keys():
size += tmp[label]
tmp[label] = size
return {
label: np.round(size / group_size, 2) for label, size in tmp.items()
}
[docs]
def compute_group_identity(
self, group_id: int, eval_fn=std_zscore_threshold_filter, *args, **kwargs
):
"""
Computes the most important identifiers of a group as specified by the evalulation function.
Parameters
----------
group_id:
A group's identifier.
eval_fn:
The function used score each column in the dataframe. The minimum scoring columns are chosen to represent the
group's identity.
kwargs:
Any key word arguments that need to passed to the aliased evaluation functinon. If, for example,
you wanted to pass a parameter `std_threshold` to your eval function, `std_zscore_threshold_filter`
you could do as
`compute_group_identity(id, eval_fn=std_zscore_threshold_filter, std_threshold=0.8)`
"""
dropped_columns = self.jmapper.tupper.get_dropped_columns()
global_stats = self.get_global_stats()[group_id]
numeric_columns = (
self.get_groups_raw_df(group_id)
.drop(columns=dropped_columns)
.select_dtypes(include=np.number)
.columns
)
sub_df = self.get_groups_clean_df(group_id)[numeric_columns]
id_table = sub_df.aggregate(eval_fn, global_stats=global_stats, *args, **kwargs)
min_val = id_table.min()
return id_table[id_table == min_val].index.tolist()
[docs]
def get_group_descriptions(self, description_fn=get_minimal_std):
"""
Returns a dictionary of group descriptions for each group as specified by the passed
description function.
Parameter
---------
description_fn: function
A function that determines a representative column for each node in a group.
Returns
--------
A density representing the composition of a group by its nodes' descriptions.
"""
descriptions = {}
for group_id in self._group_directory.keys():
descriptions[group_id] = self.compute_group_description(
group_id=group_id, description_fn=description_fn
)
return descriptions
[docs]
def get_group_identities(
self,
eval_fn=std_zscore_threshold_filter,
*args,
**kwargs,
):
"""
Returns a dictionary of group identies as specified by compute_group_identity.
Paramters
---------
eval_fn:
The function used score each column in the dataframe. The minimum scoring columns are chosen to represent the
group's identity.
kwargs:
Any key word arguments that need to passed to the aliased evaluation functinon. If, for example,
you wanted to pass a parameter `std_threshold` to your eval function, `std_zscore_threshold_filter`
you could do so with
`get_group_identities(eval_fn=std_zscore_threshold_filter, std_threshold=0.8)`
"""
identities = {}
for group_id in self._group_directory.keys():
identities[group_id] = self.compute_group_identity(
group_id=group_id, eval_fn=eval_fn, *args, **kwargs
)
return identities
[docs]
def target_matching(
self,
target: pd.DataFrame,
col_filter: list = None,
):
"""
Matches a target item into a generated group by calculating the minimum deviation
from a groups mean over available numeric columns.
Parameters
----------
target: pd.DataFrame
A data frame containing one row.
col_filter:
A list of columns to perform the mathcing on.
"""
target_cols = target.select_dtypes(include=np.number).dropna(axis=1).columns
if col_filter:
raw_cols = col_filter
else:
raw_cols = self.raw.select_dtypes(include=np.number).columns
scores = {}
for group_id in self._group_directory.keys():
group_data = self.get_groups_raw_df(group_id)
score = 0
for col in target_cols:
if col in raw_cols:
x = target[col][0]
mu = group_data[col].mean()
score += error(x, mu)
scores[group_id] = score
min_index = min(scores, key=scores.get)
return scores, min_index
[docs]
def get_group_numbers(self) -> list:
"""
Return a list of all group #s in a jmapStar graph
"""
return list(self.star.starGraph.components.keys())
[docs]
def get_aggregatedGroupDf(
self, aggregation_func=None, clean: bool = True
) -> pd.DataFrame:
"""
Aggregate each group of the DataFrame using a custom aggregation function.
Parameters:
- aggregation_func: function, the aggregation function to apply
Returns:
- DataFrame with the aggregation function applied to each group
"""
if aggregation_func is None:
aggregation_func = np.mean
if clean:
df_func = self.get_groups_clean_df
else:
df_func = self.get_groups_raw_df
combined_df = pd.DataFrame()
for num in self.get_group_numbers():
temp = df_func(num)
temp["Group"] = num
combined_df = pd.concat([combined_df, temp], ignore_index=True)
grouped_df = combined_df.groupby("Group")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
aggregated_df = grouped_df.agg(aggregation_func).reset_index()
if not clean:
for col in aggregated_df.select_dtypes(include=["object"]):
if col in aggregated_df.columns:
most_common = grouped_df[col].agg(
lambda x: x.value_counts().idxmax()
)
aggregated_df[col] = aggregated_df.apply(
lambda row: most_common[row["Group"]], axis=1
)
return aggregated_df
[docs]
def define_nodeValueDict(
self, group_number: int, col: str, aggregation_func=None
) -> dict:
"""
Creates a dict where each node is assiged a value based on an
aggregation of items in that node,used to create a path graph
target/sink nodes.
Parameters
---------
group_number : int
Group/Connected component number
col : str
Column from the clean dataframe
aggregation_func : np.<function>, defaults to calculating mean
method by which to aggregate values within a node for coloring
- color node by the median value or by the sum of values for
example supports all numpy aggregation functions such as
np.mean, np.median, np.sum, etc
Returns
-------
results_dict : dict
A dictionarty with node IDs as keys and their corresponding
numeric value as values
"""
if aggregation_func is None:
aggregation_func = np.mean
results = []
for node in self.get_groups_member_nodes(group_number):
df = self.get_nodes_clean_df(node)
results.append((node, df[col].agg(aggregation_func)))
results.sort(key=lambda x: x[1], reverse=True)
results_dict = {}
for _, (node, ret) in enumerate(results, start=1):
results_dict[node] = ret
return results_dict
[docs]
def dataset_zscores_df(self, n_cols=10):
"""
STUB
"""
zscore_df = pd.DataFrame()
for group in self.get_group_numbers():
subset_df = self.get_groups_clean_df(group)
group_dict = {"Group": group}
for col in self.clean:
t = custom_Zscore(self.clean, subset_df=subset_df, column_name=col)
group_dict[col] = t
zscore_df = pd.concat(
[zscore_df, pd.DataFrame([group_dict])], ignore_index=True
)
zscores = zscore_df.set_index("Group")
return select_highestZscoreCols(zscores, n_cols=n_cols)