Customer Analytics
Understand customer behavior, value, and patterns with transaction data
Customer analytics help you identify high-value customers, understand purchasing behavior, and create targeted retention strategies. These examples demonstrate how to calculate customer lifetime value, segment customers by behavior, and track cohort performance. Use these patterns to build a data-driven understanding of your customer base.
Customer analytics aggregate transaction data by customer to reveal patterns and value. Always consider time windows when analyzing customer behavior - a customer's lifetime value changes over time, and recent activity may indicate changing behavior patterns.
Prerequisites
Before building customer analytics, understand:
Customer Lifetime Value
Calculate comprehensive customer value metrics.
What this query does
This query calculates customer lifetime value by grouping all processed payment transactions by customer and computing total spend, average order value, and purchase frequency for each customer, ordered by total value.
Query components:
| Component | Description |
|---|---|
| Select | Customer ID, sum, average, and count aggregates |
| Filter | Processed payment transactions |
| Group | Results by customer |
| Order | By total spend (highest first) |
| Limit | Top customers (e.g., 50 or 100) |
Query breakdown
results = (
pl.Transaction.select(
pl.attr.sender.account_id, # Grouping dimension
pl.attr.amount.sum(), # Total lifetime spend
pl.attr.amount.avg(), # Average order value
pl.attr.id.count(), # Purchase frequency
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.order_by("desc(sum(amount))") # Highest value first
.limit(50)
.all()
) # Top 50 customersResult structure:
[
{
"customer_id": "cust_abc123",
"sum(amount)": 15750.0,
"avg(amount)": 525.0,
"count(id)": 30
},
{
"customer_id": "cust_def456",
"sum(amount)": 12300.0,
"avg(amount)": 410.0,
"count(id)": 30
}
]Analysis insights:
- High total, high average: Premium customers making large purchases
- High total, low average: Frequent buyers with smaller orders
- Low total, high average: Infrequent but valuable purchases
- High count, low average: Loyal customers with small transactions
Extended analysis
Calculate customer tiers:
# Define tier thresholds
PLATINUM_THRESHOLD = 10000
GOLD_THRESHOLD = 5000
SILVER_THRESHOLD = 1000
for customer in results:
total_spend = customer["sum(amount)"]
if total_spend >= PLATINUM_THRESHOLD:
tier = "Platinum"
elif total_spend >= GOLD_THRESHOLD:
tier = "Gold"
elif total_spend >= SILVER_THRESHOLD:
tier = "Silver"
else:
tier = "Bronze"
print(f"{customer['sender.account_id']}: {tier} - ${total_spend:,.2f}")Identify at-risk customers:
# Get customer LTV with last transaction date
ltv_with_date = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.amount.sum(),
pl.attr.created_at.max(),
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.order_by("desc(sum(amount))")
.limit(100)
.all()
)
# Identify high-value customers who haven't transacted recently
ninety_days_ago = datetime.now() - timedelta(days=90)
at_risk = []
for customer in ltv_with_date:
last_txn = datetime.fromisoformat(
str(customer["max(created_at)"]).replace("Z", "+00:00")
).replace(tzinfo=None)
total_spend = customer["sum(amount)"] or 0
if last_txn < ninety_days_ago and total_spend > 5000:
at_risk.append(
{
"account_id": customer.sender.account_id,
"total_spend": total_spend,
"days_since_last": (datetime.now() - last_txn).days,
}
)
print(f"Found {len(at_risk)} high-value customers at risk of churning")Calculate purchase frequency distribution:
from collections import Counter
# Group customers by purchase frequency
frequency_dist = Counter()
for customer in results:
count = customer["count(id)"]
frequency_dist[count] += 1
print("Purchase Frequency Distribution:")
for frequency, num_customers in sorted(frequency_dist.items()):
print(f"{frequency} purchases: {num_customers} customers")Customer Segmentation Patterns
Common patterns for segmenting customers by behavior.
Recency, Frequency, Monetary analysis for customer segmentation:
# Get RFM metrics for each customer
rfm = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.created_at.max(), # Recency (last purchase date)
pl.attr.id.count(), # Frequency (purchase count)
pl.attr.amount.sum(), # Monetary (total spend)
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.all()
)
# Score each dimension (1-5 scale)
def score_recency(last_date_str):
dt = datetime.fromisoformat(str(last_date_str).replace("Z", "+00:00")).replace(tzinfo=None)
days_ago = (datetime.now() - dt).days
if days_ago < 30:
return 5
if days_ago < 90:
return 4
if days_ago < 180:
return 3
if days_ago < 365:
return 2
return 1
def score_frequency(count):
if count >= 20:
return 5
if count >= 10:
return 4
if count >= 5:
return 3
if count >= 2:
return 2
return 1
def score_monetary(amount):
if amount >= 10000:
return 5
if amount >= 5000:
return 4
if amount >= 1000:
return 3
if amount >= 500:
return 2
return 1
# Segment customers
for customer in rfm:
r = score_recency(customer["max(created_at)"])
f = score_frequency(customer["count(id)"] or 0)
m = score_monetary(customer["sum(amount)"] or 0)
rfm_score = f"{r}{f}{m}"
if r >= 4 and f >= 4 and m >= 4:
segment = "Champions"
elif r >= 4 and f >= 3:
segment = "Loyal Customers"
elif r >= 4 and m >= 4:
segment = "Big Spenders"
elif r >= 3:
segment = "Promising"
elif f >= 3:
segment = "Need Attention"
elif m >= 3:
segment = "At Risk"
else:
segment = "Lost"
print(f"{customer.sender.account_id}: {segment} (RFM: {rfm_score})")Track customers by signup month and analyze retention:
# Step 1: Get per-customer transaction summaries by month
txns_by_customer = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.created_at.year(),
pl.attr.created_at.month(),
pl.attr.amount.sum(),
pl.attr.id.count(),
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(
pl.attr.sender.account_id,
pl.attr.created_at.year(),
pl.attr.created_at.month(),
)
.all()
)
# Step 2: Get account creation dates for cohort assignment
account_ids = list({row.sender.account_id for row in txns_by_customer})
signup_month = {}
if account_ids:
accounts = (
pl.Account.filter_by(pl.attr.id == "|".join(account_ids))
.select(pl.attr.id, pl.attr.created_at)
.all()
)
for acct in accounts:
d = str(acct.created_at)
dt = datetime.fromisoformat(d.replace("Z", "+00:00"))
signup_month[acct.id] = f"{dt.year}-{dt.month}"
# Step 3: Aggregate into cohorts client-side
cohorts = {}
for row in txns_by_customer:
aid = row.sender.account_id
signup = signup_month.get(aid, "unknown")
active = f"{row['year(created_at)']}-{row['month(created_at)']}"
key = f"{signup}|{active}"
if key not in cohorts:
cohorts[key] = {
"signup": signup,
"active": active,
"customers": set(),
"revenue": 0,
"count": 0,
}
cohorts[key]["customers"].add(aid)
cohorts[key]["revenue"] += row["sum(amount)"] or 0
cohorts[key]["count"] += row["count(id)"] or 0
# Analyze retention by cohort
print("Cohort Analysis:")
for c in cohorts.values():
print(
f"Cohort {c['signup']} in {c['active']}: {len(c['customers'])} active customers, "
f"${c['revenue']:,.2f} revenue"
)Analyze new customer acquisition vs returning customer revenue:
# Get first transaction date for each customer
first_transactions = (
pl.Transaction.select(pl.attr.sender.account_id, pl.attr.created_at.min())
.filter_by(pl.attr.type == "payment")
.group_by(pl.attr.sender.account_id)
.all()
)
# Create lookup of first transaction dates
first_txn_dates = {
row.sender.account_id: row["min(created_at)"] for row in first_transactions
}
# Analyze transactions by new vs returning
thirty_days_ago = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
recent_transactions = pl.Transaction.filter_by(
pl.attr.type == "payment",
pl.attr.status == "processed",
pl.attr.created_at >= thirty_days_ago,
).all()
new_customer_revenue = 0
returning_customer_revenue = 0
new_count = 0
returning_count = 0
for txn in recent_transactions:
first_date = first_txn_dates.get(txn.sender.account_id)
if first_date and str(first_date) >= thirty_days_ago:
new_customer_revenue += txn.amount or 0
new_count += 1
else:
returning_customer_revenue += txn.amount or 0
returning_count += 1
print("Last 30 Days:")
print(f"New Customers: ${new_customer_revenue:,.2f} from {new_count} transactions")
print(
f"Returning Customers: ${returning_customer_revenue:,.2f} from {returning_count} transactions"
)Customer Behavior Analysis
Understand purchasing patterns and preferences.
Calculate average days between purchases for each customer:
# Get transaction dates for each customer
customer_transactions = (
pl.Transaction.select(pl.attr.sender.account_id, pl.attr.created_at)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.order_by(pl.attr.sender.account_id, pl.attr.created_at)
.all()
)
# Group by customer
from collections import defaultdict
customer_dates = defaultdict(list)
for txn in customer_transactions:
customer_dates[txn.sender.account_id].append(txn.created_at)
# Calculate average days between purchases
for account_id, dates in customer_dates.items():
if len(dates) < 2:
continue
# Calculate gaps between purchases
gaps = []
for i in range(len(dates) - 1):
gap = (dates[i + 1] - dates[i]).days
gaps.append(gap)
avg_gap = sum(gaps) / len(gaps)
print(f"{account_id}: {avg_gap:.1f} days between purchases (avg)")Analyze when customers make purchases:
# Analyze when customers make purchases
customer_timing = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.created_at.dayofweek(),
pl.attr.created_at.hour(),
pl.attr.id.count(),
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(
pl.attr.sender.account_id,
pl.attr.created_at.dayofweek(),
pl.attr.created_at.hour(),
)
.all()
)
# Identify peak purchase times per customer
from collections import defaultdict
customer_peaks = defaultdict(list)
for row in customer_timing:
customer_peaks[row.sender.account_id].append(
{
"day": row["dayofweek(created_at)"],
"hour": row["hour(created_at)"],
"count": row["count(id)"],
}
)
# Find most common purchase time per customer
for account_id, times in customer_peaks.items():
peak = max(times, key=lambda x: x["count"])
days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
print(f"{account_id}: Most active {days[peak['day']]} at {peak['hour']}:00")Identify preferred payment methods by customer:
# Analyze payment method usage per customer
customer_methods = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.sender.method_id,
pl.attr.id.count(),
pl.attr.amount.sum(),
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id, pl.attr.sender.method_id)
.all()
)
# Identify preferred payment method
from collections import defaultdict
customer_prefs = defaultdict(list)
for row in customer_methods:
customer_prefs[row.sender.account_id].append(
{
"method": row.sender.method_id,
"count": row["count(id)"],
"amount": row["sum(amount)"],
}
)
for account_id, methods in customer_prefs.items():
preferred = sorted(methods, key=lambda x: x["count"], reverse=True)[0]
print(f"{account_id}: Prefers {preferred['method']} ({preferred['count']} uses)")Customer Retention Metrics
Track and measure customer retention.
Calculate cohort retention rates:
# Define cohort (e.g., customers who made first purchase 3 months ago)
cohort_start = datetime.now(timezone.utc) - timedelta(days=120)
cohort_end = datetime.now(timezone.utc) - timedelta(days=90)
# Get customers in cohort (first purchase in date range)
cohort_customers = (
pl.Transaction.select(pl.attr.sender.account_id, pl.attr.created_at.min())
.filter_by(pl.attr.type == "payment")
.group_by(pl.attr.sender.account_id)
.all()
)
cohort = [
c.sender.account_id
for c in cohort_customers
if cohort_start <= datetime.fromisoformat(
str(c["min(created_at)"]).replace("Z", "+00:00")
) < cohort_end
]
print(f"Cohort size: {len(cohort)} customers")
# Check who has transacted in last 30 days
thirty_days_ago = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
recent_customers = (
pl.Transaction.select(pl.attr.sender.account_id)
.filter_by(
pl.attr.type == "payment",
pl.attr.status == "processed",
pl.attr.created_at >= thirty_days_ago,
)
.group_by(pl.attr.sender.account_id)
.all()
)
recent_set = set(c.sender.account_id for c in recent_customers)
# Calculate retention
retained = len([c for c in cohort if c in recent_set])
retention_rate = (retained / len(cohort)) * 100 if cohort else 0
print(f"Retained customers: {retained}")
print(f"Retention rate: {retention_rate:.1f}%")Identify customers at risk of churning:
# Identify churn risk factors
churn_analysis = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.amount.sum(),
pl.attr.id.count(),
pl.attr.created_at.max(),
pl.attr.amount.avg(),
pl.attr.created_at.min(),
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.all()
)
# Score churn risk
high_risk_customers = []
for customer in churn_analysis:
last_purchase = str(customer["max(created_at)"])
first_purchase = str(customer["min(created_at)"])
days_since_last = (datetime.now() - datetime.fromisoformat(last_purchase.replace("Z", "+00:00")).replace(tzinfo=None)).days
customer_age_days = (datetime.now() - datetime.fromisoformat(first_purchase.replace("Z", "+00:00")).replace(tzinfo=None)).days
total_value = customer["sum(amount)"] or 0
txn_count = customer["count(id)"] or 0
if customer_age_days <= 0:
continue
expected_frequency = customer_age_days / txn_count
if days_since_last > (expected_frequency * 2) and total_value > 1000:
risk_score = min(days_since_last / expected_frequency, 10)
high_risk_customers.append(
{
"account_id": customer.sender.account_id,
"risk_score": risk_score,
"value": total_value,
"days_since_last": days_since_last,
}
)
high_risk_customers.sort(key=lambda x: x["risk_score"] * x["value"], reverse=True)
print("Top 10 At-Risk High-Value Customers:")
for i, customer in enumerate(high_risk_customers[:10], 1):
print(f"{i}. {customer['account_id']}")
print(f" Value: ${customer['value']:,.2f}")
print(f" Days since last: {customer['days_since_last']}")
print(f" Risk score: {customer['risk_score']:.1f}")Best Practices
Tips for effective customer analytics.
Customer behavior changes over time:
# All-time LTV
all_time_ltv = (
pl.Transaction.select(pl.attr.sender.account_id, pl.attr.amount.sum())
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.order_by("desc(sum(amount))")
.limit(50)
.all()
)
# Last 90 days LTV (current value)
ninety_days_ago = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
recent_ltv = (
pl.Transaction.select(pl.attr.sender.account_id, pl.attr.amount.sum())
.filter_by(
pl.attr.type == "payment",
pl.attr.status == "processed",
pl.attr.created_at >= ninety_days_ago,
)
.group_by(pl.attr.sender.account_id)
.order_by("desc(sum(amount))")
.limit(50)
.all()
)
# Compare to identify trendsCount provides context for total spend:
# Good: Includes context
complete = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.amount.sum(), # Total spend
pl.attr.id.count(), # Number of transactions
pl.attr.amount.avg(), # Average order value
)
.group_by(pl.attr.sender.account_id)
.all()
)
# Incomplete: Missing context
incomplete = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.amount.sum(), # Just total - need frequency too
)
.group_by(pl.attr.sender.account_id)
.all()
)Recent behavior predicts future behavior:
# Always include last transaction date
ltv_with_recency = (
pl.Transaction.select(
pl.attr.sender.account_id,
pl.attr.amount.sum(),
pl.attr.id.count(),
pl.attr.created_at.max(), # Last transaction date
pl.attr.created_at.min(), # First transaction date
)
.filter_by(pl.attr.type == "payment", pl.attr.status == "processed")
.group_by(pl.attr.sender.account_id)
.all()
)Different customer segments need different strategies:
from datetime import datetime, timedelta
# Segment by value and activity
segments = {
"champions": [], # High value, recently active
"at_risk": [], # High value, not recently active
"promising": [], # Medium value, recently active
"hibernating": [], # Low value, not recently active
}
threshold_date = datetime.now() - timedelta(days=60)
high_value_threshold = 5000
for customer in ltv_data:
is_recent = customer["max(created_at)"] >= threshold_date
is_high_value = customer["sum(amount)"] >= high_value_threshold
if is_high_value and is_recent:
segments["champions"].append(customer)
elif is_high_value and not is_recent:
segments["at_risk"].append(customer)
elif not is_high_value and is_recent:
segments["promising"].append(customer)
else:
segments["hibernating"].append(customer)
# Different actions per segment
print("Champions: Reward with VIP benefits")
print("At Risk: Re-engagement campaign")
print("Promising: Upsell campaign")
print("Hibernating: Win-back offer")Next Steps
Explore related reporting topics
Revenue Reports
Analyze revenue trends and financial performance with Revenue Reports documentation, track monthly revenue trends, and build multi-dimensional revenue analysis across payment methods and time periods.
Query Mechanics
Deep dive into query building techniques with Building Report Queries guide, master aggregate functions and grouping operations, and optimize complex analytics queries for performance.