Page 1
def calculate_rates_and_mix(df, metric_name):
# Calculate the _cnt columns based on the metric_name
closed_cnt_col = metric_name.replace('_pct', '_cnt')
cli_cnt_col = 'cli_cnt'
# Initialize lists to store results
expected_rates = []
actual_rates = []
mix_details = []
# Track if Global has been added
global_added = False
# Iterate over each unique region
for region in df['territory_l4_name'].unique():
# Calculate the product mix for the current region
region_data = df[df['territory_l4_name'] == region]
region_mix = region_data.groupby('products_impacted')[cli_cnt_col].sum()
region_mix_pct = region_mix / region_mix.sum()
# Calculate the rates for the rest of the world, excluding Global
if region != 'Global':
rest_of_world_data = df[(df['territory_l4_name'] != region) & (df['territory_l4_name'] != 'Global')]
rest_rates = rest_of_world_data.groupby('products_impacted')[metric_name].mean()
# Calculate the product mix for the rest of the world
rest_mix = rest_of_world_data.groupby('products_impacted')[cli_cnt_col].sum()
rest_mix_pct = rest_mix / rest_mix.sum()
# Calculate the expected rate for the current region using its own mix and the rest of the world's rates
expected_rate = (region_mix_pct * rest_rates).sum()
# Append mix details for non-Global regions
for product in region_mix.index:
mix_details.append({
'region': region,
'product': product,
'region_mix_pct': region_mix_pct[product],
'rest_mix_pct': rest_mix_pct.get(product, None), # Add rest of world mix percentage
'region_rate': region_data.set_index('products_impacted')[metric_name].get(product, None),
'rest_rate': rest_rates.get(product, None)
})
else:
# For Global, use its own data for actual rate
expected_rate = region_data[closed_cnt_col].sum() / region_data[cli_cnt_col].sum()
# Calculate the actual rate for the current region
actual_rate = region_data[closed_cnt_col].sum() / region_data[cli_cnt_col].sum()
# Append results
expected_rates.append({'region': region, 'expected_rate': expected_rate})
actual_rates.append({'region': region, 'actual_rate': actual_rate})
# Ensure Global is added only once
if region == 'Global' and not global_added:
global_added = True
# Add Global to mix_df with nulls for "the rest"
global_data = df[df['territory_l4_name'] == 'Global']
global_mix = global_data.groupby('products_impacted')[cli_cnt_col].sum()
global_mix_pct = global_mix / global_mix.sum()
global_rates = global_data.set_index('products_impacted')[metric_name]
for product in global_mix.index:
mix_details.append({
'region': 'Global',
'product': product,
'region_mix_pct': global_mix_pct[product],
'rest_mix_pct': None, # Global has no "rest of world"
'region_rate': global_rates.get(product, None),
'rest_rate': None
})
rates_df = pd.DataFrame(expected_rates).merge(pd.DataFrame(actual_rates), on='region')
# Calculate difference between actual and expected rates
rates_df['rate_diff'] = rates_df['actual_rate'] - rates_df['expected_rate']
rates_df['rate_diff_pct'] = (rates_df['rate_diff'] / rates_df['expected_rate']) * 100
mix_df = pd.DataFrame(mix_details)
# How much better each product performs than the rest of the world
mix_df['performance_index'] = mix_df['region_rate'] / mix_df['rest_rate']
# How much each product contributes to overall performance
mix_df['weighted_impact'] = mix_df['region_mix_pct'] * (mix_df['region_rate'] - mix_df['rest_rate'])
return rates_df, mix_df
def generate_anomaly_explanation(mix_df, rates_df, metric_anomaly_map, metric_name):
anomaly_info = metric_anomaly_map.get(metric_name, {})
if not anomaly_info:
return "No anomaly detected for this metric."
region = anomaly_info['anomalous_region']
higher_is_better = anomaly_info.get('higher_is_better', None)
# Calculate correlation and rate differences
correlation = rates_df[rates_df['region'] != 'Global'][['expected_rate', 'actual_rate']].corr().iloc[0,1]
rates_df['rate_diff'] = rates_df['actual_rate'] - rates_df['expected_rate']
rates_df['rate_diff_pct'] = (rates_df['rate_diff'] / rates_df['expected_rate']) * 100
region_rate = rates_df[rates_df['region'] == region].iloc[0]
# Calculate weighted impact
impact_summary = mix_df.groupby('region')['weighted_impact'].sum().reset_index()
region_mix_df = mix_df[mix_df['region'] == region].sort_values('weighted_impact', ascending=False)
top_drivers = region_mix_df.head(3)
total_impact = region_mix_df['weighted_impact'].sum()
# Determine best practices region based on higher_is_better
if higher_is_better is not None:
best_practices_region = impact_summary[impact_summary['region'] != 'Global'].sort_values('weighted_impact', ascending=not higher_is_better).iloc[0]['region']
else:
best_practices_region = impact_summary[impact_summary['region'] != 'Global'].sort_values('weighted_impact', ascending=False).iloc[0]['region']
# Format output
output = f"Callout: {metric_name.replace('_', ' ').title()} in {region} is {anomaly_info['magnitude']} {anomaly_info['direction']} than Global mean."
if higher_is_better is not None:
is_anomaly_good = (anomaly_info['direction'] == 'higher' and higher_is_better) or (anomaly_info['direction'] == 'lower' and not higher_is_better)
output += f" This is {'positive' if is_anomaly_good else 'negative'} for the business.\n\n"
else:
output += "\n\n"
# Add primary driver insight
if abs(correlation) > 0.5:
if correlation > 0: # Product mix is the driver
output += f"Primary Driver: Product mix is the main factor ({correlation:.2f} correlation between expected and actual rates).\n\n"
# Find optimal mix region
optimal_mix_region = rates_df[rates_df['region'] != 'Global'].sort_values('expected_rate', ascending=not higher_is_better if higher_is_better is not None else False).iloc[0]
output += f"Product Mix Impact: Had {region} been assigned the same product mix as {optimal_mix_region['region']}, "
output += f"its rate would be approximately {optimal_mix_region['expected_rate']:.1%} (vs current {region_rate['actual_rate']:.1%}), "
if higher_is_better is not None:
would_be_better = (optimal_mix_region['expected_rate'] > region_rate['actual_rate'] and higher_is_better) or (optimal_mix_region['expected_rate'] < region_rate['actual_rate'] and not higher_is_better)
output += f"which would be {'better' if would_be_better else 'worse'} for the business.\n\n"
else:
output += "which would change performance significantly.\n\n"
else: # Execution quality is the driver
output += f"Primary Driver: Execution quality is the main factor ({correlation:.2f} correlation between expected and actual rates).\n\n"
output += f"Execution Quality: {region} performs {abs(region_rate['rate_diff_pct']):.1f}% "
output += "better" if (region_rate['rate_diff'] > 0 and higher_is_better) or (region_rate['rate_diff'] < 0 and not higher_is_better) else "worse"
output += " than expected based on its product mix. "
if region != best_practices_region:
output += f"{region} should adopt {best_practices_region}'s execution practices for "
output += f"{top_drivers.iloc[0]['product']}, {top_drivers.iloc[1]['product']}, and {top_drivers.iloc[2]['product']} "
output += f"to {'improve' if higher_is_better else 'reduce'} its {metric_name.replace('_', ' ')}.\n\n"
else:
output += f"Other regions should learn from {region}'s execution practices, particularly for "
output += f"{top_drivers.iloc[0]['product']}, {top_drivers.iloc[1]['product']}, and {top_drivers.iloc[2]['product']}.\n\n"
else:
output += "Both product mix and execution quality contribute to the performance.\n\n"
# Add product drivers
output += "Key product drivers:\n"
for _, product in top_drivers.iterrows():
impact_pct = (product['weighted_impact'] / total_impact) * 100
output += f"- {product['product']}: {impact_pct:.1f}% of the difference - {product['performance_index']:.1f}x "
output += "higher" if product['performance_index'] > 1 else "lower"
output += " than rest of world"
# Add mix context if significant
mix_diff = product['region_mix_pct'] - product['rest_mix_pct']
if abs(mix_diff) > 0.03:
mix_direction = "higher" if mix_diff > 0 else "lower"
output += f" with {abs(mix_diff)*100:.1f}pp {mix_direction} mix than average"
output += "\n"
if correlation < -0.5:
output += "\n*Note: Negative correlation means regions expected to perform better based on product mix are actually performing differently, confirming execution quality drives results."
return output
def plot_performance_heatmap(mix_df, metric_anomaly_map, metric_name):
# Get higher_is_better flag
higher_is_better = metric_anomaly_map.get(metric_name, {}).get('higher_is_better', True)
# Prepare heatmap data
heatmap_data = mix_df[mix_df['region']!='Global'].pivot(index='product', columns='region', values='performance_index')
# Clean column names (remove AM- prefix)
heatmap_data.columns = [col.replace('AM-', '') for col in heatmap_data.columns]
# Set color map based on higher_is_better
cmap = 'RdYlGn' if higher_is_better else 'RdYlGn_r'
# Set format based on metric name
fmt = '.1%' if '_pct' in metric_name else '.2f'
# Create plot
plt.figure(figsize=(12, 8))
sns.heatmap(heatmap_data, annot=True, cmap=cmap, center=1.0, fmt=fmt)
plt.title(f'Region-Product Performance Index ({metric_name}), region/rest of the world')
# Remove axis labels
plt.xlabel('')
plt.ylabel('')
plt.tight_layout()
return plt
rates_df, mix_df = calculate_rates_and_mix(df_product_mix, metric_name)
explanation = generate_anomaly_explanation(mix_df, rates_df, metric_anomaly_map, metric_name)
print(explanation)
# Summarize by region
impact_summary = mix_df.groupby("region")["weighted_impact"].sum().reset_index()
# print(impact_summary.to_markdown())
fig = plot_performance_heatmap(mix_df, metric_anomaly_map, metric_name)
Last updated