Page 3
def analyze_reason_distribution(
df: pd.DataFrame,
metric_anomaly_map: Dict[str, Dict[str, Any]],
metric_name: str,
baseline_region: str = 'the rest'
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
"""
Analyze reason distribution for anomalous regions
Args:
df: DataFrame with reason data
metric_anomaly_map: Map from detect_anomalies_in_df
metric_name: Column name for the anomalous metric
baseline_region: Baseline for comparison ('Global' or 'the rest')
Returns:
Tuple of (analysis_df, region_comparison_df, template_data)
"""
# Get anomaly info
anomaly_info = metric_anomaly_map.get(metric_name, {})
if not anomaly_info:
return pd.DataFrame(), pd.DataFrame(), {}
anomalous_region = anomaly_info['anomalous_region']
# Filter based on metric name
if 'closed_won' in metric_name:
stage_filter = 'CLOSED_WON'
else:
stage_filter = 'CLOSED'
# Group by territory and reason category first
df = df.groupby(['territory_l4_name', 'reason_category', 'initiative_stage']).agg({
'closed_initiatives': 'sum',
'closed_opportunity_size': 'sum'
}).reset_index()
filtered_df = df[df['initiative_stage'] == stage_filter].copy()
# Create baseline data
if baseline_region == 'the rest':
filtered_df = filtered_df[(filtered_df['territory_l4_name'] != 'Global')].copy()
baseline_df = filtered_df[(filtered_df['territory_l4_name'] != anomalous_region)].copy()
baseline_df['territory_l4_name'] = 'the rest'
baseline_df = baseline_df.groupby(['territory_l4_name', 'reason_category']).agg({
'closed_initiatives': 'sum',
'closed_opportunity_size': 'sum'
}).reset_index()
else:
baseline_df = filtered_df[filtered_df['territory_l4_name'] == baseline_region].copy()
# Calculate percentages for each region
result_dfs = []
for region in filtered_df['territory_l4_name'].unique():
if region == baseline_region:
continue
region_df = filtered_df[filtered_df['territory_l4_name'] == region].copy()
# Calculate total initiatives and opportunity size
total_initiatives = region_df['closed_initiatives'].sum()
total_opportunity = region_df['closed_opportunity_size'].sum()
# Calculate percentages
region_df['pct_initiatives'] = region_df['closed_initiatives'] / total_initiatives
region_df['pct_opportunity'] = region_df['closed_opportunity_size'] / total_opportunity
result_dfs.append(region_df)
# Add baseline data
total_initiatives = baseline_df['closed_initiatives'].sum()
total_opportunity = baseline_df['closed_opportunity_size'].sum()
baseline_df['pct_initiatives'] = baseline_df['closed_initiatives'] / total_initiatives
baseline_df['pct_opportunity'] = baseline_df['closed_opportunity_size'] / total_opportunity
result_dfs.append(baseline_df)
# Combine all data
result_df = pd.concat(result_dfs)
# Create pivot tables with explicit column names
pct_init_pivot = result_df.pivot(index='reason_category', columns='territory_l4_name', values='pct_initiatives').reset_index()
pct_init_pivot.columns = ['reason_category'] + [f"{col}_pct_init" for col in pct_init_pivot.columns[1:]]
pct_opp_pivot = result_df.pivot(index='reason_category', columns='territory_l4_name', values='pct_opportunity').reset_index()
pct_opp_pivot.columns = ['reason_category'] + [f"{col}_pct_opp" for col in pct_opp_pivot.columns[1:]]
count_pivot = result_df.pivot(index='reason_category', columns='territory_l4_name', values='closed_initiatives').reset_index()
count_pivot.columns = ['reason_category'] + [f"{col}_count" for col in count_pivot.columns[1:]]
dollar_pivot = result_df.pivot(index='reason_category', columns='territory_l4_name', values='closed_opportunity_size').reset_index()
dollar_pivot.columns = ['reason_category'] + [f"{col}_dollar" for col in dollar_pivot.columns[1:]]
# Create region comparison DataFrame (for visualization)
region_comparison_df = pd.merge(pct_init_pivot, count_pivot, on='reason_category')
region_comparison_df = pd.merge(region_comparison_df, pct_opp_pivot, on='reason_category')
region_comparison_df = pd.merge(region_comparison_df, dollar_pivot, on='reason_category')
# Create analysis DataFrame (with calculations)
analysis_df = region_comparison_df.copy()
# Calculate differences for anomalous region
baseline_col = f"{baseline_region}_pct_init"
anomaly_col = f"{anomalous_region}_pct_init"
if baseline_col in analysis_df.columns and anomaly_col in analysis_df.columns:
analysis_df['delta_pct'] = analysis_df[anomaly_col] - analysis_df[baseline_col]
analysis_df['overindex_ratio'] = analysis_df[anomaly_col] / analysis_df[baseline_col].replace(0, np.nan)
# Calculate expected and excess values
total_anomaly_initiatives = result_df[result_df['territory_l4_name'] == anomalous_region]['closed_initiatives'].sum()
total_anomaly_opportunity = result_df[result_df['territory_l4_name'] == anomalous_region]['closed_opportunity_size'].sum()
analysis_df['expected_count'] = total_anomaly_initiatives * analysis_df[baseline_col]
analysis_df['excess_count'] = analysis_df[f"{anomalous_region}_count"] - analysis_df['expected_count']
# Use correct column names with suffixes
baseline_opp_col = f"{baseline_region}_pct_opp"
analysis_df['expected_dollar'] = total_anomaly_opportunity * analysis_df[baseline_opp_col]
analysis_df['excess_dollar'] = analysis_df[f"{anomalous_region}_dollar"] - analysis_df['expected_dollar']
# Prepare template data
template_data = {}
if 'delta_pct' in analysis_df.columns:
# Get top reasons by excess count
top_by_count = analysis_df.sort_values('excess_count', ascending=False).head(2)
top_by_dollar = analysis_df.sort_values('excess_dollar', ascending=False).head(2)
# Find non-anomalous regions with higher dollar values
non_anomaly_regions = [r for r in filtered_df['territory_l4_name'].unique()
if r != anomalous_region and r != baseline_region and r != 'Global']
higher_dollar_regions = []
for region in non_anomaly_regions:
region_total = filtered_df[filtered_df['territory_l4_name'] == region]['closed_opportunity_size'].sum()
if region_total > total_anomaly_opportunity:
higher_dollar_regions.append({
'region': region,
'total_dollar': region_total,
'difference': region_total - total_anomaly_opportunity
})
table_data = []
for _, row in top_by_count.iterrows():
reason = row['reason_category']
# Format reason name with title case and spaces
formatted_reason = reason.replace('_', ' ').title()
# Get dollar data for this reason
dollar_row = top_by_dollar[top_by_dollar['reason_category'] == reason]
dollar_value = row[f"{anomalous_region}_dollar"] if not dollar_row.empty else 0
expected_dollar = dollar_value - row['excess_dollar'] if not dollar_row.empty else 0
table_data.append({
'Reason': formatted_reason,
'% of Total (vs Baseline)': f"{row[anomaly_col] * 100:.1f}% (+{row['delta_pct'] * 100:.1f}pp)",
'Count (vs Expected)': f"{int(row[f'{anomalous_region}_count'])} ({int(row['expected_count']):.0f})",
'$ Impact (vs Expected)': f"${dollar_value/1000000:.2f}M (${expected_dollar/1000000:.2f}M)"
})
table_df = pd.DataFrame(table_data)
template_data = {
'anomalous_region': anomalous_region,
'baseline_region': baseline_region,
'stage': 'won' if stage_filter == 'CLOSED_WON' else 'lost',
'top_reasons': [r['reason_category'] for _, r in top_by_count.iterrows()],
'formatted_top_reasons': [r.replace('_', ' ').title() for r in [row['reason_category'] for _, row in top_by_count.iterrows()]],
'table_markdown': table_df.to_markdown(index=False),
'higher_dollar_regions': higher_dollar_regions,
'anomaly_info': anomaly_info
}
return region_comparison_df, analysis_df, template_data
def format_reason_insight(df, metric_anomaly_map, metric_name, baseline_region='the rest', reason_actions=None):
analysis_df, anomalous_df, template_data = analyze_reason_distribution(df, metric_anomaly_map, metric_name, baseline_region)
if not template_data:
return "No anomaly detected for this metric."
# Create descriptive text
output = f"Callout: {metric_name.replace('_', ' ').title()} in {template_data['anomalous_region']} is {template_data['anomaly_info']['magnitude']} {template_data['anomaly_info']['direction']} than Global mean.\n\n"
# Add insight based on reason categories
if template_data['stage'] == 'won':
output += f"More specific reasons are: {', '.join(template_data['formatted_top_reasons'])}.\n\n"
else:
output += "Potential action items for the team:\n"
for reason in template_data['top_reasons']:
if reason_actions and reason in reason_actions:
output += f"- {reason_actions[reason]}\n"
output += "\n"
# Add table
output += template_data['table_markdown']
# Add higher dollar regions note
if template_data.get('higher_dollar_regions'):
output += f"\n\nNote: Higher total {template_data['stage']} opportunity in other regions:\n"
for region in template_data['higher_dollar_regions']:
output += f"- {region['region']}: ${region['total_dollar']/1000000:.2f}M "
output += f"(+${region['difference']/1000000:.2f}M vs {template_data['anomalous_region']})\n"
return output
def plot_reason_heatmaps(df, metric_name, anomalous_region, suffix='_pct_init'):
# Create new dataframe with only needed data (because seaborn acts weirdly when we pass the original data)
df_heatmap = pd.DataFrame()
# Get percentage columns
cols_to_use = [col for col in df.columns if col.endswith(suffix)]
# Add formatted reason categories
df_heatmap['formatted_reason'] = df['reason_category'].apply(lambda x: x.replace('_', ' ').title())
# Copy percentage data and rename columns
for col in cols_to_use:
clean_name = col.replace(suffix, '').replace('AM-', '')
df_heatmap[clean_name] = df[col].astype(float)
# Prepare data for left heatmap (anomalous vs rest)
anomalous_clean = anomalous_region.replace('AM-', '')
left_cols = [anomalous_clean, 'the rest']
left_data = df_heatmap[['formatted_reason'] + left_cols].copy()
left_data = left_data.set_index('formatted_reason')
# Prepare data for right heatmap (other regions)
right_cols = [col for col in df_heatmap.columns if col not in left_cols + ['formatted_reason'] and 'Global' not in col]
right_data = df_heatmap[['formatted_reason'] + right_cols].copy()
right_data = right_data.set_index('formatted_reason')
# Convert to float explicitly
left_data = left_data.astype(float)
right_data = right_data.astype(float)
# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8), gridspec_kw={'width_ratios': [1, 2]})
# Determine colormap based on metric
cmap = "Greens" if 'closed_won' in metric_name else "Reds"
def format_heatmap(data, ax, cmap, suffix, yticklabels):
if suffix.endswith('_dollar') or suffix.endswith('_count'):
# Use empty fmt and custom annotations
sns.heatmap(data, ax=ax, cmap=cmap, annot=True, fmt='',
cbar=False, linewidths=0.5, yticklabels=yticklabels)
for text in ax.texts: #format texts in dollar and MM/K
try:
value = float(text.get_text())
if value >= 1e6:
text.set_text(f"${value/1e6:.1f}M")
elif value >= 1e3:
text.set_text(f"${value/1e3:.0f}K")
elif value > 0:
text.set_text(f"${value:.0f}")
else:
text.set_text("0")
except ValueError:
pass # Skip non-numeric text
else:
# Use % formatting if it's a '_pct' else use default formatting
fmt = ".0%" if '_pct' in suffix else ".2f"
sns.heatmap(data, ax=ax, cmap=cmap, annot=True, fmt=fmt,
cbar=False, linewidths=0.5, yticklabels=yticklabels)
for ax, data, yticklabels in zip([ax1, ax2], [left_data, right_data], [True, False]):
format_heatmap(data, ax, cmap, suffix, yticklabels)
ax.set_ylabel("") #remove y-axis label
# Set main title
metric_title = metric_name.replace('_', ' ').title()
title_suffix = ""
if suffix.endswith('_dollar'):
title_suffix = "(Opportunity Size $)"
elif suffix.endswith('_count'):
title_suffix = "(Count of Initiatives)"
elif suffix.endswith('_pct_init'):
title_suffix = "(% of Initiatives)"
elif suffix.endswith('_pct_opp'):
title_suffix = "(% of Opportunity Size $)"
fig.suptitle(f"Reason Distribution for {metric_title} {title_suffix}", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.95])
return fig
reason_actions_dict = {'cli_closed_pct': lost_reason_actions, 'cli_closed_won_pct': won_reason_to_specifics}
Last updated