Page 2
def prepare_product_vertical_data(df, min_initiatives, min_confidence):
"""Prepare the base data for product-vertical analysis with early filtering"""
# Create pivot table with necessary columns
df_pivot = df.pivot_table(
index=['territory_l4_name', 'product', 'vertical'],
columns='initiative_stage',
values='count',
aggfunc='sum',
fill_value=0
).reset_index()
# Calculate required metrics
df_pivot['total_initiatives'] = df_pivot.drop(['territory_l4_name', 'product', 'vertical'], axis=1).sum(axis=1)
df_pivot['closed_won_cnt'] = df_pivot.get('CLOSED_WON', 0)
df_pivot['closed_cnt'] = df_pivot.get('CLOSED', 0)
# Calculate rates
df_pivot['win_rate'] = df_pivot['closed_won_cnt'] / df_pivot['total_initiatives'].replace(0, np.nan)
df_pivot['lost_rate'] = df_pivot['closed_cnt'] / df_pivot['total_initiatives'].replace(0, np.nan)
# Calculate vertical mix
vertical_mix = df.pivot_table(index=['vertical'], values='count', aggfunc='sum')
vertical_mix['mix_pct'] = vertical_mix['count'] / vertical_mix['count'].sum() * 100
# Apply early filtering for vertical-level data
vertical_initiatives = df_pivot.groupby('vertical')['total_initiatives'].sum()
qualified_verticals = vertical_initiatives[vertical_initiatives >= min_initiatives].index.tolist()
# Apply early filtering for region-vertical combinations
region_vertical_initiatives = df_pivot.groupby(['territory_l4_name', 'vertical'])['total_initiatives'].sum()
qualified_region_verticals = region_vertical_initiatives[region_vertical_initiatives >= min_confidence].reset_index()
qualified_region_verticals['qualified'] = True
# Mark qualified combinations in the main dataframe
df_pivot = df_pivot.merge(
qualified_region_verticals[['territory_l4_name', 'vertical', 'qualified']],
on=['territory_l4_name', 'vertical'],
how='left'
)
df_pivot['qualified'] = df_pivot['qualified'].fillna(False)
# Add a flag for qualified verticals
df_pivot['vertical_qualified'] = df_pivot['vertical'].isin(qualified_verticals)
return df_pivot, vertical_mix
def identify_best_products_for_verticals(df_pivot):
best_products = {}
current_products = {}
product_stats = {}
for vertical in df_pivot['vertical'].unique():
vertical_data = df_pivot[df_pivot['vertical'] == vertical]
# Find current most used product for all verticals
current_product = vertical_data.groupby('product')['total_initiatives'].sum().idxmax()
current_products[vertical] = current_product
# For verticals with insufficient data (already filtered), mark as low confidence
if not vertical_data['vertical_qualified'].any():
best_products[vertical] = [] # Empty list indicates insufficient data
continue
# Get qualified products (already filtered by vertical qualification)
qualified_data = vertical_data[vertical_data['vertical_qualified']]
if len(qualified_data) < 1:
best_products[vertical] = []
continue
# Find best products by win rate
top_products = qualified_data.nlargest(3, 'win_rate')
best_products[vertical] = top_products['product'].tolist()
# Store product stats
product_stats[vertical] = {}
for _, row in qualified_data.iterrows():
product_stats[vertical][row['product']] = {
'win_rate': row['win_rate'],
'lost_rate': row['lost_rate'],
'total_initiatives': row['total_initiatives']
}
return best_products, current_products, product_stats
def calculate_regional_adoption(df_pivot, best_products, product_stats):
regional_distribution = {}
for region in df_pivot['territory_l4_name'].unique():
region_data = df_pivot[df_pivot['territory_l4_name'] == region]
for vertical, products in best_products.items():
# Skip if no best products
if not products:
regional_distribution[(region, vertical)] = None
continue
# Check if we have enough data for this specific region-vertical combination
# (already filtered by qualified flag)
region_vertical_data = region_data[(region_data['vertical'] == vertical) & (region_data['qualified'])]
if len(region_vertical_data) == 0:
regional_distribution[(region, vertical)] = None
continue
# Calculate adoption percentage
vertical_initiatives = region_data[region_data['vertical'] == vertical]['total_initiatives'].sum()
if vertical_initiatives == 0:
regional_distribution[(region, vertical)] = 0
continue
best_product_initiatives = region_data[
(region_data['vertical'] == vertical) &
(region_data['product'].isin(products))
]['total_initiatives'].sum()
percentage = (best_product_initiatives / vertical_initiatives) * 100
regional_distribution[(region, vertical)] = percentage
return regional_distribution
def analyze_product_vertical_fit(df, min_initiatives=500, min_confidence=200):
"""Complete analysis of product-vertical fit
Args:
df: DataFrame containing product-vertical data
min_initiatives: Minimum number of initiatives required to identify best products
for a vertical. Verticals with fewer initiatives will have null values.
min_confidence: Minimum number of initiatives required in a specific region-vertical
combination to calculate adoption percentage. Combinations with fewer
initiatives will show as null (white cells in the heatmap).
Returns:
Dictionary containing analysis results
"""
import itertools
# Prepare base data with early filtering
df_pivot, vertical_mix = prepare_product_vertical_data(df, min_initiatives, min_confidence)
# Get all unique verticals and regions
all_verticals = df_pivot['vertical'].unique()
all_regions = df_pivot['territory_l4_name'].unique()
# Identify best products and calculate adoption
best_products, current_products, product_stats = identify_best_products_for_verticals(df_pivot)
# Use calculate_regional_adoption function
adoption_data = calculate_regional_adoption(df_pivot, best_products, product_stats)
# Create regional_distribution with all verticals
regional_distribution = {}
for region, vertical in itertools.product(all_regions, all_verticals):
key = (region, vertical)
# If we have data from calculate_regional_adoption, use it
if key in adoption_data:
regional_distribution[key] = adoption_data[key]
else:
# Otherwise mark as None (insufficient data)
regional_distribution[key] = None
# Create adoption dataframe
adoption_df = pd.DataFrame([
{'region': region, 'vertical': vertical, 'adoption': percentage}
for (region, vertical), percentage in regional_distribution.items()
])
# Calculate variance in adoption
variance_by_vertical = adoption_df.groupby('vertical')['adoption'].agg(['mean', 'std', 'min', 'max'])
variance_by_vertical['range'] = variance_by_vertical['max'] - variance_by_vertical['min']
variance_by_vertical = variance_by_vertical.merge(vertical_mix['mix_pct'], left_index=True, right_index=True)
# Calculate proper vertical-level metrics for qualified data
qualified_data = df_pivot[df_pivot['vertical_qualified']]
vertical_metrics = qualified_data.groupby('vertical').agg({
'closed_won_cnt': 'sum',
'closed_cnt': 'sum',
'total_initiatives': 'sum'
})
# Calculate proper ratios
vertical_metrics['win_rate'] = vertical_metrics['closed_won_cnt'] / vertical_metrics['total_initiatives']
vertical_metrics['lost_rate'] = vertical_metrics['closed_cnt'] / vertical_metrics['total_initiatives']
# Calculate proper vertical-level adoption rates
vertical_adoption = {}
for vertical, products in best_products.items():
if not products: # Skip verticals with no best products
vertical_adoption[vertical] = None
continue
vertical_data = df_pivot[df_pivot['vertical'] == vertical]
total_initiatives = vertical_data['total_initiatives'].sum()
if total_initiatives == 0:
vertical_adoption[vertical] = 0
continue
best_product_initiatives = vertical_data[
vertical_data['product'].isin(products)
]['total_initiatives'].sum()
vertical_adoption[vertical] = (best_product_initiatives / total_initiatives) * 100
# Convert to DataFrame
vertical_adoption_df = pd.DataFrame([
{'vertical': vertical, 'adoption': adoption}
for vertical, adoption in vertical_adoption.items()
]).dropna()
return {
'df_pivot': df_pivot,
'vertical_mix': vertical_mix,
'best_products': best_products,
'current_products': current_products,
'product_stats': product_stats,
'regional_distribution': regional_distribution,
'adoption_df': adoption_df,
'variance_by_vertical': variance_by_vertical,
'vertical_metrics': vertical_metrics,
'vertical_adoption': vertical_adoption_df
}
def generate_product_vertical_insights(analysis_results):
"""Generate insights from product-vertical analysis results"""
# Extract data from analysis results
vertical_mix = analysis_results['vertical_mix']
best_products = analysis_results['best_products']
current_products = analysis_results['current_products']
product_stats = analysis_results['product_stats']
adoption_df = analysis_results['adoption_df']
variance_by_vertical = analysis_results['variance_by_vertical']
vertical_metrics = analysis_results['vertical_metrics']
vertical_adoption_df = analysis_results['vertical_adoption']
# Calculate correlation with adoption
correlation_data = vertical_metrics.reset_index().merge(
vertical_adoption_df,
on='vertical',
how='inner'
)
win_correlation = correlation_data[['win_rate', 'adoption']].corr().iloc[0,1]
lost_correlation = correlation_data[['lost_rate', 'adoption']].corr().iloc[0,1]
# Generate opportunity insights
opportunities = []
for vertical, best_prods in best_products.items():
if vertical not in current_products or not best_prods:
continue
current_prod = current_products[vertical]
if current_prod == best_prods[0]:
continue
# Check if both products have stats (meet min_confidence)
if current_prod not in product_stats.get(vertical, {}) or best_prods[0] not in product_stats.get(vertical, {}):
continue
best_prod = best_prods[0]
current_rate = product_stats[vertical][current_prod]['win_rate']
best_rate = product_stats[vertical][best_prod]['win_rate']
vertical_adoption = vertical_adoption_df[vertical_adoption_df['vertical'] == vertical]['adoption'].mean()
mix_pct = vertical_mix.loc[vertical, 'mix_pct'] if vertical in vertical_mix.index else 0
if mix_pct < 3.0: # Skip low-mix verticals
continue
opportunity_score = (best_rate - current_rate) * mix_pct * (100 - vertical_adoption) / 100
opportunities.append({
'Vertical (mix)': f"{vertical} ({mix_pct:.0f}%)",
'% pitch on best fit': f"{vertical_adoption:.0f}%",
'Recommended action': f"{current_prod} → {best_prod}",
'Potential value': f"{current_rate*100:.0f}% → {best_rate*100:.0f}%",
'Opportunity score': opportunity_score
})
# Generate variance insights
high_variance = []
for vertical in variance_by_vertical[
(variance_by_vertical['mix_pct'] >= 2.0) &
(variance_by_vertical['range'] >= 20.0)
].sort_values('range', ascending=False).index:
if vertical not in best_products:
continue
lowest_regions = adoption_df[adoption_df['vertical'] == vertical].nsmallest(2, 'adoption')['region'].tolist()
high_variance.append({
'Vertical (mix)': f"{vertical} ({variance_by_vertical.loc[vertical, 'mix_pct']:.0f}%)",
'Adoption range': f"{variance_by_vertical.loc[vertical, 'min']:.0f}%-{variance_by_vertical.loc[vertical, 'max']:.0f}%",
'Recommended action': f"Standardize on {', '.join(best_products[vertical][:2])}",
'Region call-outs': ', '.join(lowest_regions)
})
# Create dataframes
opportunity_df = pd.DataFrame(opportunities).sort_values('Opportunity score', ascending=False).head(3)
opportunity_df = opportunity_df.drop('Opportunity score', axis=1)
variance_df = pd.DataFrame(high_variance).head(2)
return {
'opportunity_table': opportunity_df.to_markdown(index=False) if len(opportunity_df) > 0 else None,
'variance_table': variance_df.to_markdown(index=False) if len(variance_df) > 0 else None,
'win_correlation': win_correlation,
'lost_correlation': lost_correlation
}
def visualize_best_product_adoption(analysis_results):
# Extract data from analysis results
adoption_df = analysis_results['adoption_df']
# Create heatmap
pivot_data = adoption_df.pivot_table(
index='vertical',
columns='region',
values='adoption'
)
plt.figure(figsize=(12, 10))
sns.heatmap(pivot_data, annot=True, cmap='RdYlGn', fmt='.1f', mask=pivot_data.isnull())
plt.title('Adoption of Best Products by Region and Vertical (%)')
return plt
def format_product_vertical_insights(insights):
"""Print formatted product-vertical insights with explanatory text"""
def get_strength(corr):
"""Get descriptive strength of correlation"""
abs_corr = abs(corr)
if abs_corr < 0.2: return "weak"
elif abs_corr < 0.4: return "modest"
elif abs_corr < 0.6: return "moderate"
elif abs_corr < 0.8: return "strong"
else: return "very strong"
print("# Product-Vertical Fit Analysis\n")
print(f"Analysis shows a {get_strength(insights['win_correlation'])} correlation (r={insights['win_correlation']:.2f}) between")
print(f"best product adoption and win rates, and a {get_strength(insights['lost_correlation'])} correlation")
print(f"(r={insights['lost_correlation']:.2f}) with lost rates.\n")
print("## Key Vertical Improvement Opportunities\n")
print("Top opportunities ranked by potential impact, calculated from:")
print("- Win rate improvement potential (best product vs. current product)")
print("- Vertical mix percentage (higher mix = higher impact)")
print("- Current adoption gap (lower adoption = more room for improvement)")
print("- Only verticals with >3% mix and significant win rate differences are shown\n")
print(insights['opportunity_table'])
print("\n## Verticals with Inconsistent Adoption\n")
print("Verticals where adoption varies significantly across regions (>20% range):")
print("- Only verticals with >2% mix are shown")
print("- Region call-outs highlight areas with lowest adoption\n")
print(insights['variance_table'])
analysis_results = analyze_product_vertical_fit(df_product_vertical)
insights = generate_product_vertical_insights(analysis_results)
format_product_vertical_insights(insights)
fig = visualize_best_product_adoption(analysis_results)
Quantitative
Calculates expected vs. actual rates for each region by comparing with "rest of world" data (Separate the impact of product-vertical mix from execution quality)
Introduces performance indexing that shows how each region performs relative to others
Measures weighted impact of product-vertical combinations on overall regional performance
so that we get to ...
Regional performance benchmarking - Identify which regions over/underperform expectations
Depth-spotter - Pinpoint specific product-vertical combinations driving performance gaps
Opportunity sizing - Quantify potential improvement by addressing underperforming combinations
def analyze_product_vertical_fit(df, metric_name):
# Create necessary count columns based on initiative_stage
df_pivot = df.pivot_table(
index=["territory_l4_name", "product", "vertical"],
columns="initiative_stage",
values="count",
aggfunc="sum",
fill_value=0,
).reset_index()
# Calculate total initiatives and metrics
df_pivot["total_initiatives"] = df_pivot.drop(
["territory_l4_name", "product", "vertical"], axis=1
).sum(axis=1)
df_pivot["closed_won_cnt"] = df_pivot.get("CLOSED_WON", 0)
df_pivot["closed_cnt"] = df_pivot.get("CLOSED", 0)
# Calculate percentages based on metric_name
if "won" in metric_name:
df_pivot[metric_name] = (
df_pivot["closed_won_cnt"] / df_pivot["total_initiatives"]
)
else:
df_pivot[metric_name] = df_pivot["closed_cnt"] / df_pivot["total_initiatives"]
# Add Global row
global_data = (
df_pivot.groupby(["product", "vertical"])
.agg({"total_initiatives": "sum", "closed_won_cnt": "sum", "closed_cnt": "sum"})
.reset_index()
)
global_data["territory_l4_name"] = "Global"
# Calculate global metrics
if "won" in metric_name:
global_data[metric_name] = (
global_data["closed_won_cnt"] / global_data["total_initiatives"]
)
else:
global_data[metric_name] = (
global_data["closed_cnt"] / global_data["total_initiatives"]
)
# Combine with original data
df_pivot = pd.concat([df_pivot, global_data], ignore_index=True)
# Initialize lists for results
expected_rates = []
actual_rates = []
mix_details = []
# Process each region
for region in df_pivot["territory_l4_name"].unique():
region_data = df_pivot[df_pivot["territory_l4_name"] == region]
region_mix = region_data.groupby(["product", "vertical"])[
"total_initiatives"
].sum()
region_mix_pct = region_mix / region_mix.sum()
if region != "Global":
# Rest of world data (excluding Global)
rest_data = df_pivot[
(df_pivot["territory_l4_name"] != region)
& (df_pivot["territory_l4_name"] != "Global")
]
rest_rates = rest_data.groupby(["product", "vertical"])[metric_name].mean()
rest_mix = rest_data.groupby(["product", "vertical"])[
"total_initiatives"
].sum()
rest_mix_pct = rest_mix / rest_mix.sum()
# Calculate expected rate
expected_rate = (region_mix_pct * rest_rates).sum()
# Add mix details
for idx in region_mix.index:
product, vertical = idx
mix_details.append(
{
"region": region,
"product": product,
"vertical": vertical,
"region_mix_pct": region_mix_pct[idx],
"rest_mix_pct": rest_mix_pct.get(idx, None),
"region_rate": region_data.set_index(["product", "vertical"])[
metric_name
].get(idx, None),
"rest_rate": rest_rates.get(idx, None),
}
)
else:
# For Global, use its own data
actual_rate_col = "closed_won_cnt" if "won" in metric_name else "closed_cnt"
expected_rate = (
region_data[actual_rate_col].sum()
/ region_data["total_initiatives"].sum()
)
# Add Global mix details
for idx in region_mix.index:
product, vertical = idx
mix_details.append(
{
"region": "Global",
"product": product,
"vertical": vertical,
"region_mix_pct": region_mix_pct[idx],
"rest_mix_pct": None,
"region_rate": region_data.set_index(["product", "vertical"])[
metric_name
].get(idx, None),
"rest_rate": None,
}
)
# Calculate actual rate
actual_rate_col = "closed_won_cnt" if "won" in metric_name else "closed_cnt"
actual_rate = (
region_data[actual_rate_col].sum() / region_data["total_initiatives"].sum()
)
# Append results
expected_rates.append({"region": region, "expected_rate": expected_rate})
actual_rates.append({"region": region, "actual_rate": actual_rate})
# Create final dataframes
rates_df = pd.DataFrame(expected_rates).merge(
pd.DataFrame(actual_rates), on="region"
)
rates_df["rate_diff"] = rates_df["actual_rate"] - rates_df["expected_rate"]
rates_df["rate_diff_pct"] = (
rates_df["rate_diff"] / rates_df["expected_rate"]
) * 100
mix_df = pd.DataFrame(mix_details)
mix_df["performance_index"] = mix_df["region_rate"] / mix_df["rest_rate"]
mix_df["weighted_impact"] = mix_df["region_mix_pct"] * (
mix_df["region_rate"] - mix_df["rest_rate"]
)
return rates_df, mix_df
def find_key_drivers_for_anomaly(mix_df, anomaly_region, metric_name, higher_is_better):
"""Find key drivers (positive and negative) for an anomalous region"""
# Filter for the anomalous region
region_data = mix_df[mix_df["region"] == anomaly_region].copy()
# Sort by weighted impact
if higher_is_better:
# For metrics where higher is better (like win rate)
# Positive impact (higher weighted_impact) is good
positive_drivers = region_data[region_data["weighted_impact"] > 0].nlargest(
5, "weighted_impact"
)
negative_drivers = region_data[region_data["weighted_impact"] < 0].nsmallest(
5, "weighted_impact"
)
else:
# For metrics where lower is better (like closed rate for negative outcomes)
# Negative impact (lower weighted_impact) is good
positive_drivers = region_data[region_data["weighted_impact"] < 0].nsmallest(
5, "weighted_impact"
)
negative_drivers = region_data[region_data["weighted_impact"] > 0].nlargest(
5, "weighted_impact"
)
return positive_drivers, negative_drivers
def generate_enhanced_insights(rates_df, mix_df, metric_anomaly_map, metric_name):
"""Generate enhanced insights focusing on the anomalous region first"""
anomaly_info = metric_anomaly_map[metric_name]
anomaly_region = anomaly_info["anomalous_region"]
higher_is_better = anomaly_info.get("higher_is_better", False)
# Get drivers for anomalous region
positive_drivers, negative_drivers = find_key_drivers_for_anomaly(
mix_df, anomaly_region, metric_name, higher_is_better
)
# Format metric name for display
display_metric = "win rate" if "won" in metric_name else "closed rate"
# Generate insights for anomalous region
anomaly_insights = {
"region": anomaly_region,
"metric": display_metric,
"direction": anomaly_info["direction"],
"magnitude": anomaly_info["magnitude"],
"strengths": [],
"weaknesses": [],
}
# Only include strengths for higher-than-expected performance when higher is better
# or for lower-than-expected performance when lower is better
show_strengths = (anomaly_info["direction"] == "higher" and higher_is_better) or (
anomaly_info["direction"] == "lower" and not higher_is_better
)
# Add strengths or weaknesses based on anomaly direction
if show_strengths:
for _, driver in positive_drivers.iterrows():
product, vertical = driver["product"], driver["vertical"]
region_rate = driver["region_rate"] * 100
rest_rate = driver["rest_rate"] * 100
diff = abs(region_rate - rest_rate)
anomaly_insights["strengths"].append(
{
"product_vertical": f"{product} in {vertical}",
"region_rate": f"{region_rate:.1f}%",
"rest_rate": f"{rest_rate:.1f}%",
"diff": f"{diff:.1f}pp",
"impact": f"{(driver['weighted_impact']*100):.2f}pp",
}
)
else:
for _, driver in negative_drivers.iterrows():
product, vertical = driver["product"], driver["vertical"]
region_rate = driver["region_rate"] * 100
rest_rate = driver["rest_rate"] * 100
diff = abs(region_rate - rest_rate)
anomaly_insights["weaknesses"].append(
{
"product_vertical": f"{product} in {vertical}",
"region_rate": f"{region_rate:.1f}%",
"rest_rate": f"{rest_rate:.1f}%",
"diff": f"{diff:.1f}pp",
"impact": f"{(driver['weighted_impact']*100):.2f}pp",
}
)
# Find opportunities for improvement across all regions
opportunities = []
# Get all regions except Global
regions = rates_df[rates_df["region"] != "Global"]["region"].unique()
# For each region, find top negative impact combinations
for region in regions:
# Determine which combinations have negative impact based on higher_is_better
region_data = mix_df[mix_df["region"] == region].copy()
if higher_is_better:
# For metrics where higher is better, negative weighted_impact is bad
negative_impact = region_data[region_data["weighted_impact"] < 0]
top_negative = negative_impact.nsmallest(3, "weighted_impact")
else:
# For metrics where lower is better, positive weighted_impact is bad
negative_impact = region_data[region_data["weighted_impact"] > 0]
top_negative = negative_impact.nlargest(3, "weighted_impact")
# For each negative impact combination, find regions where it's a strength
for _, weakness in top_negative.iterrows():
product, vertical = weakness["product"], weakness["vertical"]
# Find regions where this combination has positive impact
other_regions = mix_df[
(mix_df["product"] == product)
& (mix_df["vertical"] == vertical)
& (mix_df["region"] != region)
]
if higher_is_better:
potential_teachers = other_regions[other_regions["weighted_impact"] > 0]
if len(potential_teachers) > 0:
best_teacher = potential_teachers.nlargest(
1, "weighted_impact"
).iloc[0]
else:
potential_teachers = other_regions[other_regions["weighted_impact"] < 0]
if len(potential_teachers) > 0:
best_teacher = potential_teachers.nsmallest(
1, "weighted_impact"
).iloc[0]
if len(potential_teachers) > 0:
# Calculate the improvement in weighted impact if region adopts benchmark rate
current_impact = weakness["weighted_impact"]
# What the impact would be if we had the teacher's rate with our mix
potential_impact = weakness["region_mix_pct"] * (
best_teacher["region_rate"] - weakness["rest_rate"]
)
# The change in impact represents our improvement
impact_change = potential_impact - current_impact
impact_sign = "+" if impact_change > 0 else "-"
opportunities.append(
{
"region": region,
"product_vertical": f"{product} in {vertical}",
"current": f"{weakness['region_rate']*100:.1f}%",
"benchmark": f"{best_teacher['region_rate']*100:.1f}%",
"learn_from": best_teacher["region"],
"potential_impact": f"{impact_sign}{abs(impact_change*100):.2f}pp",
}
)
# Sort opportunities by impact
opportunities = sorted(
opportunities, key=lambda x: x["potential_impact"], reverse=True
)
# Calculate distribution metrics for each region
# First, Replace 0.0 and infinity with NaN
mix_df["performance_index"] = mix_df["performance_index"].replace(
[0.0, np.inf, -np.inf], np.nan
)
filtered_df = mix_df[
(mix_df["region"] != "Global") & (~mix_df["performance_index"].isna())
]
results = []
for region in filtered_df["region"].unique():
region_data = filtered_df[filtered_df["region"] == region]["performance_index"]
# For higher_is_better=True, performance_index > 1 is good
# For higher_is_better=False, performance_index < 1 is good
if higher_is_better:
benchmark_comparison = (region_data > 1.0).mean() * 100 # % above benchmark
else:
benchmark_comparison = (region_data < 1.0).mean() * 100 # % below benchmark
results.append(
{
"region": region,
"mean": region_data.mean(),
"median": region_data.median(),
"std": region_data.std(),
"skew": region_data.skew(),
"favorable_benchmark_pct": benchmark_comparison,
"count": len(region_data),
}
)
stats_df = pd.DataFrame(results).sort_values(
"favorable_benchmark_pct", ascending=not higher_is_better
)
# Calculate correlation
correlation = (
rates_df[rates_df["region"] != "Global"][["expected_rate", "actual_rate"]]
.corr()
.iloc[0, 1]
)
# Calculate correlation
correlation = (
rates_df[rates_df["region"] != "Global"][["expected_rate", "actual_rate"]]
.corr()
.iloc[0, 1]
)
return {
"anomaly_insights": anomaly_insights,
"opportunities": opportunities[:5], # Top 5 opportunities
"stats": stats_df,
"correlation": correlation,
}
def print_enhanced_insights(insights, template_str, metric_anomaly_map, metric_name):
"""Print enhanced insights in a readable format using Jinja templates"""
anomaly_info = metric_anomaly_map[metric_name]
anomaly_region = anomaly_info["anomalous_region"]
higher_is_better = anomaly_info.get("higher_is_better", False)
# Determine if anomaly is negative for business
is_negative = (anomaly_info["direction"] == "higher" and not higher_is_better) or (
anomaly_info["direction"] == "lower" and higher_is_better
)
# Get region stats
stats_df = insights["stats"]
region_stats = stats_df[stats_df["region"] == anomaly_region].iloc[0]
# Determine if we show strengths or weaknesses
show_strengths = (anomaly_info["direction"] == "higher" and higher_is_better) or (
anomaly_info["direction"] == "lower" and not higher_is_better
)
# Create tables for strengths/weaknesses
if show_strengths:
strengths_df = pd.DataFrame(insights["anomaly_insights"]["strengths"])
table_content = strengths_df[
["product_vertical", "region_rate", "rest_rate", "diff", "impact"]
].to_markdown(index=False)
section_title = "Key Strengths"
else:
weaknesses_df = pd.DataFrame(insights["anomaly_insights"]["weaknesses"])
table_content = weaknesses_df[
["product_vertical", "region_rate", "rest_rate", "diff", "impact"]
].to_markdown(index=False)
section_title = "Key Weaknesses"
# Create opportunities table
opportunities_df = pd.DataFrame(insights["opportunities"])
opportunities_table = opportunities_df[
[
"region",
"product_vertical",
"current",
"benchmark",
"learn_from",
"potential_impact",
]
].to_markdown(index=False)
correlation = insights["correlation"]
# Create template
template_str = template_str
# Prepare data for template
template_data = {
"metric_name": metric_name.replace("_", " ").title(),
"region": anomaly_region,
"magnitude": anomaly_info["magnitude"],
"direction": anomaly_info["direction"],
"is_negative": is_negative,
"higher_is_better": higher_is_better,
"median_index": region_stats["median"],
"favorable_benchmark_pct": region_stats["favorable_benchmark_pct"],
"correlation": correlation,
"section_title": section_title,
"stats_df": stats_df.to_markdown(index=False),
"show_strengths": show_strengths,
"table_content": table_content,
"opportunities_table": opportunities_table,
}
# Render template
template = Template(template_str)
return template.render(**template_data)
template_str = """
Callout: {{ metric_name }} in {{ region }} is {{ magnitude }} {{ direction }} than Global mean. This is {% if is_negative %}negative{% else %}positive{% endif %} for the business.
Product-Vertical Analysis: {{ region }} shows significantly {% if is_negative %}worse{% else %}better{% endif %} performance with:
• Median performance index of {{ median_index|round(2) }}x ({% if higher_is_better %}higher{% else %}lower{% endif %} means better {{ metric_name }})
• {{ favorable_benchmark_pct|round(1) }}% of product-vertical combinations performing better than rest of world average
{{ stats_df }}
Primary Driver: Execution quality is the main factor ({{ correlation|round(2) }} correlation), not product-vertical mix.
## {{ section_title }}
Product-vertical combinations {% if show_strengths %}contributing positively{% else %}underperforming{% endif %}:
{{ table_content }}
## Improvement Opportunities
Specific opportunities for regions to learn from others:
{{ opportunities_table }}
"""
# Get performance analysis
rates_df, mix_df = analyze_product_vertical_fit(df_product_vertical, metric_name)
# Generate enhanced insights
insights = generate_enhanced_insights(rates_df, mix_df, metric_anomaly_map, metric_name)
# Print insights
print(print_enhanced_insights(insights, template_str, metric_anomaly_map, metric_name))
Last updated