Context Bloat & Context Rot
Performance degradation happens even within supported context limits. Understanding these phenomena is crucial for building reliable agents.
Two Distinct Problems
Context Bloat
Too much information in the context window, leading to degraded attention and reasoning. The model struggles to find relevant information among noise.
Context Rot
Outdated or contradictory information accumulating over time, contaminating the model's reasoning with stale facts.
The Hidden Cost
Understanding Context Bloat
ATTENTION DISTRIBUTION
High │ ████ ████
│ ████ ████
│ ████ ████
Attn │ ████ ████
│ ████ ░░░░░░░░░░░░░░░░ ████
│ ████ ░░░░░░░░░░░░░░░░ ████
Low │ ████ ░░░░░░░░░░░░░░░░ ████
└────────────────────────────────────────────
START MIDDLE END
████ = High attention (information well-retained)
░░░░ = Low attention (information often missed)
Research finding: Information in the middle of long contexts
is recalled with 20-40% lower accuracy than at the edges. | Study | Finding | Implication |
|---|---|---|
| Liu et al. (2023) | "Lost in the Middle" - U-shaped recall curve | Place critical info at start/end |
| Letta Context-Bench | Performance degrades before reaching stated limits | Test actual performance, not specs |
| Anthropic (2024) | Curated 10K context beats padded 100K | Quality over quantity |
| NIAH Benchmarks | Recall varies by position and context size | Benchmark your specific use case |
Research findings on context length vs performance
Why Bloat Hurts Performance
Understanding Context Rot
Time T0: Fresh context
┌────────────────────────────────────────┐
│ "Stock price is $150" (accurate) │
│ "User prefers dark mode" (accurate) │
│ "API endpoint is /v2/data" (accurate) │
└────────────────────────────────────────┘
│
│ Time passes...
▼
Time T1: Partially stale
┌────────────────────────────────────────┐
│ "Stock price is $150" [!] (now $175) │
│ "User prefers dark mode" (still true) │
│ "API endpoint is /v2/data" (still true)│
└────────────────────────────────────────┘
│
│ More time passes...
▼
Time T2: Contradictions emerge
┌────────────────────────────────────────┐
│ "Stock price is $150" [x] (outdated) │
│ "Stock price is $175" (newer message) │
│ "User prefers dark mode" (still true) │
│ "API endpoint is /v3/data" (updated) │
│ "API endpoint is /v2/data" (old) │ ← CONTRADICTION
└────────────────────────────────────────┘ | Type | Cause | Symptoms |
|---|---|---|
| Temporal staleness | Information ages naturally | Incorrect facts, outdated recommendations |
| Contradictions | Updated info alongside old | Inconsistent responses, confusion |
| Superseded decisions | Old decisions remain in context | Agent follows outdated instructions |
| Accumulation noise | Failed attempts stay in history | Repeating same mistakes |
Types of context rot and their symptoms
Measuring Context Health: Needle-in-Haystack Testing
The needle-in-haystack test measures a model's ability to retrieve specific information from various positions within a large context. This reveals the practical limits of your model's context handling.
Test Matrix:
Context Size: 4K → 8K → 16K → 32K → 64K → 128K
│
▼
┌───────────────────┐
│ Filler Content │
│ (paragraphs, │
│ documents) │
│ │
→ │ [NEEDLE] │ ← Insert at position
│ "Code: XYZ-123" │
│ │
│ More filler... │
└───────────────────┘
│
▼
Query: "What is the code?"
│
▼
Check: Does response
contain "XYZ-123"?
Positions tested: Start (10%), Middle (50%), End (90%) function runNeedleInHaystackTest(model, config):
results = []
for contextSize in config.contextSizes:
for needlePosition in ["start", "middle", "end"]:
# Generate filler content
filler = generateFiller(contextSize)
# Create needle (fact to recall)
needle = "The secret code is: ALPHA-7392"
# Insert needle at position
context = insertAtPosition(filler, needle, needlePosition)
# Query for recall
query = "What is the secret code?"
response = model.generate(
context: context,
query: query
)
# Check if needle was recalled correctly
success = "ALPHA-7392" in response
results.append({
contextSize: contextSize,
position: needlePosition,
success: success,
tokens: countTokens(context)
})
return analyzeResults(results)
function analyzeResults(results):
# Group by context size
bySize = groupBy(results, "contextSize")
for size, sizeResults in bySize:
startSuccess = average(filter(sizeResults, pos="start").success)
middleSuccess = average(filter(sizeResults, pos="middle").success)
endSuccess = average(filter(sizeResults, pos="end").success)
print(f"{size} tokens:")
print(f" Start: {startSuccess}%")
print(f" Middle: {middleSuccess}%")
print(f" End: {endSuccess}%") import random
import string
from dataclasses import dataclass
from typing import List
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
import tiktoken
@dataclass
class NeedleTestConfig:
context_sizes: List[int] = None
positions: List[str] = None
num_trials: int = 5
needle_template: str = "The secret code is: {code}"
def __post_init__(self):
if self.context_sizes is None:
self.context_sizes = [4000, 8000, 16000, 32000, 64000]
if self.positions is None:
self.positions = ["start", "middle", "end"]
@dataclass
class TestResult:
context_size: int
position: str
success: bool
actual_tokens: int
response: str
class NeedleInHaystackTest:
def __init__(self, model: str = "gpt-4"):
self.llm = ChatOpenAI(model=model, max_tokens=50)
self.enc = tiktoken.get_encoding("cl100k_base")
def run(self, config: NeedleTestConfig) -> List[TestResult]:
"""Run needle-in-haystack tests."""
results = []
for size in config.context_sizes:
for position in config.positions:
for trial in range(config.num_trials):
result = self._run_single(size, position, config)
results.append(result)
print(f"Size: {size}, Pos: {position}, "
f"Trial: {trial+1}, Success: {result.success}")
return results
def _run_single(
self,
target_size: int,
position: str,
config: NeedleTestConfig
) -> TestResult:
"""Run a single needle test."""
# Generate unique needle
code = ''.join(random.choices(
string.ascii_uppercase + string.digits, k=8
))
needle = config.needle_template.format(code=code)
# Generate filler content
filler = self._generate_filler(target_size)
# Insert needle at position
context = self._insert_needle(filler, needle, position)
# Query for recall using LangChain
messages = [
SystemMessage(content=context),
HumanMessage(content="What is the secret code? Reply with just the code.")
]
response = self.llm.invoke(messages)
answer = response.content
success = code in answer
return TestResult(
context_size=target_size,
position=position,
success=success,
actual_tokens=len(self.enc.encode(context)),
response=answer
)
def _generate_filler(self, target_tokens: int) -> str:
"""Generate filler text to reach target token count."""
# Use diverse, realistic-looking content
paragraphs = [
"The quarterly report shows significant growth in "
"multiple sectors. Revenue increased by 15% compared "
"to the previous quarter, driven by strong performance "
"in the technology division.",
"Market analysis indicates favorable conditions for "
"expansion. Consumer sentiment remains positive, with "
"confidence indices reaching their highest levels in "
"eighteen months.",
"Infrastructure investments continue according to plan. "
"The new data center will be operational by Q3, adding "
"substantial capacity to our cloud services platform.",
# Add more diverse paragraphs...
]
result = []
current_tokens = 0
while current_tokens < target_tokens:
para = random.choice(paragraphs)
result.append(para)
current_tokens = len(self.enc.encode(" ".join(result)))
return " ".join(result)
def _insert_needle(
self,
filler: str,
needle: str,
position: str
) -> str:
"""Insert needle at specified position."""
sentences = filler.split(". ")
if position == "start":
idx = len(sentences) // 10 # 10% from start
elif position == "middle":
idx = len(sentences) // 2
else: # end
idx = int(len(sentences) * 0.9)
sentences.insert(idx, needle)
return ". ".join(sentences)
def analyze(self, results: List[TestResult]) -> dict:
"""Analyze test results."""
from collections import defaultdict
by_size = defaultdict(lambda: defaultdict(list))
for r in results:
by_size[r.context_size][r.position].append(r.success)
analysis = {}
for size, positions in sorted(by_size.items()):
analysis[size] = {}
for pos, successes in positions.items():
rate = sum(successes) / len(successes) * 100
analysis[size][pos] = f"{rate:.1f}%"
return analysis
# Usage
test = NeedleInHaystackTest(model="gpt-4-turbo")
config = NeedleTestConfig(
context_sizes=[4000, 8000, 16000, 32000],
num_trials=10
)
results = test.run(config)
analysis = test.analyze(results)
for size, positions in analysis.items():
print(f"\n{size:,} tokens:")
for pos, rate in positions.items():
print(f" {pos}: {rate}") using Microsoft.Extensions.AI;
using Microsoft.ML.Tokenizers;
public class NeedleTestConfig
{
public int[] ContextSizes { get; init; } =
{ 4000, 8000, 16000, 32000, 64000 };
public string[] Positions { get; init; } =
{ "start", "middle", "end" };
public int NumTrials { get; init; } = 5;
public string NeedleTemplate { get; init; } =
"The secret code is: {0}";
}
public record TestResult(
int ContextSize,
string Position,
bool Success,
int ActualTokens,
string Response);
public class NeedleInHaystackTest
{
private readonly IChatClient _client;
private readonly Tokenizer _tokenizer;
private readonly Random _random = new();
public NeedleInHaystackTest(IChatClient client)
{
_client = client;
_tokenizer = TiktokenTokenizer.CreateForModel("gpt-4");
}
public async Task<List<TestResult>> RunAsync(
NeedleTestConfig config)
{
var results = new List<TestResult>();
foreach (var size in config.ContextSizes)
{
foreach (var position in config.Positions)
{
for (int trial = 0; trial < config.NumTrials; trial++)
{
var result = await RunSingleAsync(
size, position, config);
results.Add(result);
Console.WriteLine(
$"Size: {size}, Pos: {position}, " +
$"Trial: {trial + 1}, Success: {result.Success}");
}
}
}
return results;
}
private async Task<TestResult> RunSingleAsync(
int targetSize,
string position,
NeedleTestConfig config)
{
// Generate unique code
var code = GenerateCode(8);
var needle = string.Format(config.NeedleTemplate, code);
// Generate filler and insert needle
var filler = GenerateFiller(targetSize);
var context = InsertNeedle(filler, needle, position);
// Query for recall
var messages = new List<ChatMessage>
{
new(ChatRole.System, context),
new(ChatRole.User,
"What is the secret code? Reply with just the code.")
};
var response = await _client.GetResponseAsync(
messages,
new ChatOptions { MaxOutputTokens = 50 });
var success = response.Text.Contains(code);
return new TestResult(
targetSize,
position,
success,
_tokenizer.CountTokens(context),
response.Text);
}
private string GenerateCode(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Range(0, length)
.Select(_ => chars[_random.Next(chars.Length)])
.ToArray());
}
private string GenerateFiller(int targetTokens)
{
var paragraphs = new[]
{
"The quarterly report shows significant growth. " +
"Revenue increased by 15% compared to previous quarter.",
"Market analysis indicates favorable conditions. " +
"Consumer sentiment remains positive across segments.",
"Infrastructure investments continue according to plan. " +
"New capacity will be operational by next quarter."
};
var result = new List<string>();
var currentTokens = 0;
while (currentTokens < targetTokens)
{
var para = paragraphs[_random.Next(paragraphs.Length)];
result.Add(para);
currentTokens = _tokenizer.CountTokens(
string.Join(" ", result));
}
return string.Join(" ", result);
}
private string InsertNeedle(
string filler,
string needle,
string position)
{
var sentences = filler.Split(". ").ToList();
var idx = position switch
{
"start" => sentences.Count / 10,
"middle" => sentences.Count / 2,
"end" => (int)(sentences.Count * 0.9),
_ => sentences.Count / 2
};
sentences.Insert(idx, needle);
return string.Join(". ", sentences);
}
public Dictionary<int, Dictionary<string, string>> Analyze(
List<TestResult> results)
{
return results
.GroupBy(r => r.ContextSize)
.OrderBy(g => g.Key)
.ToDictionary(
g => g.Key,
g => g.GroupBy(r => r.Position)
.ToDictionary(
pg => pg.Key,
pg => $"{pg.Average(r => r.Success ? 100 : 0):F1}%"
)
);
}
} Example Results Interpretation
Context Size Start Middle End
─────────────────────────────────────
4,000 98% 96% 98%
8,000 97% 91% 97%
16,000 95% 82% 96%
32,000 93% 71% 94%
64,000 89% 58% 91%
128,000 84% 43% 87%
Observation: Middle position degrades significantly
faster than start/end positions as context grows.
Context Health Management
Proactive management prevents both bloat and rot from degrading agent performance:
function manageContextHealth(messages, config):
currentTokens = countTokens(messages)
# Check for bloat - approaching limits
if currentTokens > config.softLimit:
messages = compressOlderMessages(messages, config)
# Check for rot - stale information
for message in messages:
if message.hasFactualClaims:
staleness = checkStaleness(message)
if staleness > config.maxStalenessDays:
message = refreshOrRemove(message)
# Check for contradictions
contradictions = findContradictions(messages)
if contradictions:
messages = resolveContradictions(messages, contradictions)
return messages
function compressOlderMessages(messages, config):
# Keep recent messages intact
recentCount = config.preserveRecentCount
recent = messages[-recentCount:]
older = messages[:-recentCount]
# Summarize older messages
summary = llm.summarize(older,
instruction: "Preserve facts, decisions, and pending tasks"
)
return [systemMessage(summary), ...recent]
function checkStaleness(message):
# Extract time-sensitive claims
claims = extractTimeSensitiveClaims(message)
for claim in claims:
if claim.referencesExternalState:
# Flag for potential refresh
return daysSince(message.timestamp)
return 0 # Not time-sensitive from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import tiktoken
@dataclass
class ContextHealthConfig:
soft_limit_tokens: int = 8000
hard_limit_tokens: int = 12000
preserve_recent_count: int = 5
max_staleness_days: int = 7
compression_model: str = "gpt-4o-mini"
class ContextHealthManager:
def __init__(self, client, config: ContextHealthConfig):
self.client = client
self.config = config
self.enc = tiktoken.get_encoding("cl100k_base")
def manage(self, messages: List[Dict]) -> List[Dict]:
"""Apply all context health checks."""
current_tokens = self._count_tokens(messages)
# Phase 1: Check for bloat
if current_tokens > self.config.soft_limit_tokens:
messages = self._compress_older(messages)
# Phase 2: Check for rot (stale information)
messages = self._handle_staleness(messages)
# Phase 3: Check for contradictions
messages = self._resolve_contradictions(messages)
return messages
def _compress_older(self, messages: List[Dict]) -> List[Dict]:
"""Compress older messages to reduce bloat."""
preserve = self.config.preserve_recent_count
if len(messages) <= preserve:
return messages
older = messages[:-preserve]
recent = messages[-preserve:]
# Summarize older messages
summary = self._summarize(older)
return [
{
"role": "system",
"content": f"Previous conversation summary:\n{summary}"
},
*recent
]
def _handle_staleness(
self,
messages: List[Dict]
) -> List[Dict]:
"""Identify and handle stale information."""
result = []
for msg in messages:
staleness = self._check_staleness(msg)
if staleness > self.config.max_staleness_days:
# Option 1: Remove stale message
# Option 2: Add staleness warning
result.append({
**msg,
"content": f"[STALE - {staleness} days old] "
f"{msg['content']}"
})
else:
result.append(msg)
return result
def _check_staleness(self, message: Dict) -> int:
"""Check if message contains stale information."""
timestamp = message.get("timestamp")
if not timestamp:
return 0
age = datetime.now() - timestamp
content = message.get("content", "").lower()
# Heuristics for time-sensitive content
time_sensitive_indicators = [
"current", "now", "today", "latest",
"price", "status", "available"
]
if any(ind in content for ind in time_sensitive_indicators):
return age.days
return 0
def _resolve_contradictions(
self,
messages: List[Dict]
) -> List[Dict]:
"""Detect and resolve contradictions."""
# Use LLM to find contradictions
check_prompt = f"""
Analyze these messages for contradictions. List any
conflicting information (e.g., "Message 3 says X is Y
but Message 7 says X is Z").
Messages:
{self._format_messages(messages)}
Contradictions (or "None found"):
"""
response = self.client.chat.completions.create(
model=self.config.compression_model,
messages=[{"role": "user", "content": check_prompt}]
)
contradictions = response.choices[0].message.content
if "none found" in contradictions.lower():
return messages
# Add contradiction warning to context
return [
{
"role": "system",
"content": f"WARNING: Contradictions detected. "
f"Prefer recent information.\n"
f"{contradictions}"
},
*messages
]
def _summarize(self, messages: List[Dict]) -> str:
"""Summarize a list of messages."""
formatted = self._format_messages(messages)
response = self.client.chat.completions.create(
model=self.config.compression_model,
messages=[{
"role": "user",
"content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important facts discovered
3. Pending tasks or questions
Conversation:
{formatted}
Summary:"""
}]
)
return response.choices[0].message.content
def _format_messages(self, messages: List[Dict]) -> str:
return "\n".join([
f"{m['role'].upper()}: {m.get('content', '')[:500]}"
for m in messages
])
def _count_tokens(self, messages: List[Dict]) -> int:
return sum(
len(self.enc.encode(str(m.get("content", ""))))
for m in messages
) using Microsoft.Extensions.AI;
using Microsoft.ML.Tokenizers;
public class ContextHealthConfig
{
public int SoftLimitTokens { get; init; } = 8000;
public int HardLimitTokens { get; init; } = 12000;
public int PreserveRecentCount { get; init; } = 5;
public int MaxStalenessDays { get; init; } = 7;
}
public class ContextHealthManager
{
private readonly IChatClient _client;
private readonly ContextHealthConfig _config;
private readonly Tokenizer _tokenizer;
public ContextHealthManager(
IChatClient client,
ContextHealthConfig config)
{
_client = client;
_config = config;
_tokenizer = TiktokenTokenizer.CreateForModel("gpt-4");
}
public async Task<List<ChatMessage>> ManageAsync(
List<ChatMessage> messages)
{
var currentTokens = CountTokens(messages);
// Phase 1: Check for bloat
if (currentTokens > _config.SoftLimitTokens)
{
messages = await CompressOlderAsync(messages);
}
// Phase 2: Handle staleness
messages = HandleStaleness(messages);
// Phase 3: Resolve contradictions
messages = await ResolveContradictionsAsync(messages);
return messages;
}
private async Task<List<ChatMessage>> CompressOlderAsync(
List<ChatMessage> messages)
{
var preserve = _config.PreserveRecentCount;
if (messages.Count <= preserve)
return messages;
var older = messages.Take(messages.Count - preserve).ToList();
var recent = messages.Skip(messages.Count - preserve).ToList();
var summary = await SummarizeAsync(older);
var result = new List<ChatMessage>
{
new(ChatRole.System,
$"Previous conversation summary:\n{summary}")
};
result.AddRange(recent);
return result;
}
private List<ChatMessage> HandleStaleness(
List<ChatMessage> messages)
{
var result = new List<ChatMessage>();
foreach (var msg in messages)
{
var staleness = CheckStaleness(msg);
if (staleness > _config.MaxStalenessDays)
{
result.Add(new ChatMessage(
msg.Role,
$"[STALE - {staleness} days old] {msg.Text}"));
}
else
{
result.Add(msg);
}
}
return result;
}
private int CheckStaleness(ChatMessage message)
{
// Check message metadata for timestamp
if (message.AdditionalProperties?.TryGetValue(
"timestamp", out var ts) != true)
return 0;
var timestamp = (DateTime)ts;
var age = DateTime.UtcNow - timestamp;
var content = message.Text?.ToLower() ?? "";
var timeSensitiveIndicators = new[]
{
"current", "now", "today", "latest",
"price", "status", "available"
};
if (timeSensitiveIndicators.Any(i => content.Contains(i)))
return (int)age.TotalDays;
return 0;
}
private async Task<List<ChatMessage>> ResolveContradictionsAsync(
List<ChatMessage> messages)
{
var formatted = FormatMessages(messages);
var response = await _client.GetResponseAsync($"""
Analyze for contradictions. List any conflicting info.
Messages:
{formatted}
Contradictions (or "None found"):
""");
if (response.Text.Contains("none found",
StringComparison.OrdinalIgnoreCase))
return messages;
var result = new List<ChatMessage>
{
new(ChatRole.System,
$"WARNING: Contradictions detected.\n{response.Text}")
};
result.AddRange(messages);
return result;
}
private async Task<string> SummarizeAsync(
List<ChatMessage> messages)
{
var formatted = FormatMessages(messages);
var response = await _client.GetResponseAsync($"""
Summarize preserving: decisions, facts, pending tasks.
{formatted}
""");
return response.Text;
}
private string FormatMessages(List<ChatMessage> messages) =>
string.Join("\n", messages.Select(m =>
$"{m.Role.ToString().ToUpper()}: " +
$"{Truncate(m.Text ?? "", 500)}"));
private int CountTokens(List<ChatMessage> messages) =>
messages.Sum(m => _tokenizer.CountTokens(m.Text ?? ""));
private static string Truncate(string s, int max) =>
s.Length <= max ? s : s[..max] + "...";
} Mitigation Strategies
Input Context (potentially bloated/rotted)
│
▼
┌───────────────────────────────────┐
│ 1. REORDER BY IMPORTANCE │
│ Score relevance to current query │
│ Place important at start/end │
└───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ 2. SLIDING WINDOW │
│ Summarize old content │
│ Keep overlap for continuity │
└───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ 3. SEMANTIC DEDUPLICATION │
│ Remove near-duplicate messages │
│ Keep most recent version │
└───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ 4. FRESHNESS INDICATORS │
│ Mark message ages │
│ Flag potential staleness │
└───────────────────────────────────┘
│
▼
Healthy Context (ready for inference) # Mitigation strategies for context bloat and rot
class ContextMitigationPipeline:
function process(messages, query):
# Step 1: Prioritize by position (combat "lost in middle")
messages = reorderByImportance(messages, query)
# Step 2: Apply sliding window with overlap
if countTokens(messages) > hardLimit:
messages = applySlidingWindow(messages)
# Step 3: Deduplicate similar content
messages = deduplicateSemantically(messages)
# Step 4: Add freshness metadata
messages = addFreshnessIndicators(messages)
return messages
function reorderByImportance(messages, query):
# Score each message by relevance to current query
scored = []
for msg in messages:
relevance = computeRelevance(msg, query)
scored.append((msg, relevance))
scored.sort(by: relevance, descending: true)
# Place most relevant at start and end (attention hotspots)
high = scored[:len/3]
medium = scored[len/3:2*len/3]
low = scored[2*len/3:]
# Optimal ordering: high → low → medium → high
return interleave(high, low, medium)
function applySlidingWindow(messages, windowSize, overlap):
# Keep recent window with overlap from previous
if length(messages) <= windowSize:
return messages
# Summarize content outside window
outsideWindow = messages[:-windowSize]
summary = llm.summarize(outsideWindow)
# Keep overlap for continuity
overlapMessages = messages[-(windowSize+overlap):-windowSize]
return [
summaryMessage(summary),
...overlapMessages,
...messages[-windowSize:]
]
function deduplicateSemantically(messages, threshold=0.9):
unique = []
embeddings = []
for msg in messages:
msgEmb = embed(msg.content)
# Check similarity with existing
isDuplicate = false
for existingEmb in embeddings:
if cosineSimilarity(msgEmb, existingEmb) > threshold:
isDuplicate = true
break
if not isDuplicate:
unique.append(msg)
embeddings.append(msgEmb)
return unique from typing import List, Dict
import numpy as np
from sentence_transformers import SentenceTransformer
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import tiktoken
class ContextMitigationPipeline:
def __init__(
self,
hard_limit: int = 16000,
window_size: int = 10,
overlap: int = 2
):
self.llm = ChatOpenAI(model="gpt-4o-mini")
self.hard_limit = hard_limit
self.window_size = window_size
self.overlap = overlap
self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
self.enc = tiktoken.get_encoding("cl100k_base")
def process(
self,
messages: List[Dict],
query: str
) -> List[Dict]:
"""Apply all mitigation strategies."""
# Step 1: Reorder by importance (combat "lost in middle")
messages = self._reorder_by_importance(messages, query)
# Step 2: Apply sliding window if over limit
if self._count_tokens(messages) > self.hard_limit:
messages = self._apply_sliding_window(messages)
# Step 3: Semantic deduplication
messages = self._deduplicate(messages)
# Step 4: Add freshness indicators
messages = self._add_freshness_indicators(messages)
return messages
def _reorder_by_importance(
self,
messages: List[Dict],
query: str
) -> List[Dict]:
"""Reorder to place important content at attention hotspots."""
query_emb = self.embedder.encode(query)
# Score by relevance
scored = []
for msg in messages:
content = msg.get("content", "")
msg_emb = self.embedder.encode(content)
relevance = np.dot(query_emb, msg_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(msg_emb)
)
scored.append((msg, relevance))
scored.sort(key=lambda x: x[1], reverse=True)
# Split into thirds by relevance
n = len(scored)
high = [m for m, _ in scored[:n//3]]
medium = [m for m, _ in scored[n//3:2*n//3]]
low = [m for m, _ in scored[2*n//3:]]
# Optimal order: high relevance at start and end
# Low relevance in middle (least attended region)
reordered = []
reordered.extend(high[:len(high)//2]) # Start
reordered.extend(low) # Middle
reordered.extend(medium) # Middle-end
reordered.extend(high[len(high)//2:]) # End
return reordered
def _apply_sliding_window(
self,
messages: List[Dict]
) -> List[Dict]:
"""Apply sliding window with overlap."""
if len(messages) <= self.window_size:
return messages
# Content outside window
outside = messages[:-(self.window_size + self.overlap)]
# Summarize outside content
if outside:
summary = self._summarize(outside)
summary_msg = {
"role": "system",
"content": f"Summary of earlier conversation:\n{summary}"
}
else:
summary_msg = None
# Overlap for continuity
overlap_msgs = messages[
-(self.window_size + self.overlap):-self.window_size
]
# Recent window
window_msgs = messages[-self.window_size:]
result = []
if summary_msg:
result.append(summary_msg)
result.extend(overlap_msgs)
result.extend(window_msgs)
return result
def _deduplicate(
self,
messages: List[Dict],
threshold: float = 0.9
) -> List[Dict]:
"""Remove semantically duplicate messages."""
unique = []
embeddings = []
for msg in messages:
content = msg.get("content", "")
if not content:
unique.append(msg)
continue
msg_emb = self.embedder.encode(content)
is_duplicate = False
for existing_emb in embeddings:
similarity = np.dot(msg_emb, existing_emb) / (
np.linalg.norm(msg_emb) *
np.linalg.norm(existing_emb)
)
if similarity > threshold:
is_duplicate = True
break
if not is_duplicate:
unique.append(msg)
embeddings.append(msg_emb)
return unique
def _add_freshness_indicators(
self,
messages: List[Dict]
) -> List[Dict]:
"""Add freshness metadata to time-sensitive content."""
from datetime import datetime
result = []
for msg in messages:
timestamp = msg.get("timestamp")
content = msg.get("content", "")
if timestamp:
age_days = (datetime.now() - timestamp).days
if age_days > 0:
content = f"[{age_days}d ago] {content}"
result.append({**msg, "content": content})
return result
def _summarize(self, messages: List[Dict]) -> str:
"""Summarize a list of messages using LangChain."""
formatted = "\n".join([
f"{m['role']}: {m.get('content', '')[:300]}"
for m in messages
])
prompt = ChatPromptTemplate.from_messages([
("user", "Summarize concisely:\n{content}")
])
chain = prompt | self.llm
response = chain.invoke({"content": formatted})
return response.content
def _count_tokens(self, messages: List[Dict]) -> int:
return sum(
len(self.enc.encode(str(m.get("content", ""))))
for m in messages
) using Microsoft.Extensions.AI;
public class ContextMitigationPipeline
{
private readonly IChatClient _client;
private readonly IEmbeddingGenerator<string, Embedding<float>> _embedder;
private readonly int _hardLimit;
private readonly int _windowSize;
private readonly int _overlap;
public ContextMitigationPipeline(
IChatClient client,
IEmbeddingGenerator<string, Embedding<float>> embedder,
int hardLimit = 16000,
int windowSize = 10,
int overlap = 2)
{
_client = client;
_embedder = embedder;
_hardLimit = hardLimit;
_windowSize = windowSize;
_overlap = overlap;
}
public async Task<List<ChatMessage>> ProcessAsync(
List<ChatMessage> messages,
string query)
{
// Step 1: Reorder by importance
messages = await ReorderByImportanceAsync(messages, query);
// Step 2: Apply sliding window if needed
if (CountTokens(messages) > _hardLimit)
messages = await ApplySlidingWindowAsync(messages);
// Step 3: Deduplicate
messages = await DeduplicateAsync(messages);
// Step 4: Add freshness indicators
messages = AddFreshnessIndicators(messages);
return messages;
}
private async Task<List<ChatMessage>> ReorderByImportanceAsync(
List<ChatMessage> messages,
string query)
{
var queryResult = await _embedder.GenerateAsync(query);
var queryEmb = queryResult.Vector;
var scored = new List<(ChatMessage Msg, float Score)>();
foreach (var msg in messages)
{
var msgResult = await _embedder.GenerateAsync(msg.Text ?? "");
var score = CosineSimilarity(queryEmb, msgResult.Vector);
scored.Add((msg, score));
}
scored.Sort((a, b) => b.Score.CompareTo(a.Score));
var n = scored.Count;
var high = scored.Take(n / 3).Select(x => x.Msg).ToList();
var medium = scored.Skip(n / 3).Take(n / 3)
.Select(x => x.Msg).ToList();
var low = scored.Skip(2 * n / 3).Select(x => x.Msg).ToList();
// Optimal: high at start/end, low in middle
var result = new List<ChatMessage>();
result.AddRange(high.Take(high.Count / 2));
result.AddRange(low);
result.AddRange(medium);
result.AddRange(high.Skip(high.Count / 2));
return result;
}
private async Task<List<ChatMessage>> ApplySlidingWindowAsync(
List<ChatMessage> messages)
{
if (messages.Count <= _windowSize)
return messages;
var outsideIdx = messages.Count - _windowSize - _overlap;
var outside = messages.Take(Math.Max(0, outsideIdx)).ToList();
var result = new List<ChatMessage>();
if (outside.Any())
{
var summary = await SummarizeAsync(outside);
result.Add(new ChatMessage(ChatRole.System,
$"Summary of earlier conversation:\n{summary}"));
}
// Add overlap and window
var startIdx = Math.Max(0, messages.Count - _windowSize - _overlap);
result.AddRange(messages.Skip(startIdx));
return result;
}
private async Task<List<ChatMessage>> DeduplicateAsync(
List<ChatMessage> messages,
float threshold = 0.9f)
{
var unique = new List<ChatMessage>();
var embeddings = new List<ReadOnlyMemory<float>>();
foreach (var msg in messages)
{
if (string.IsNullOrEmpty(msg.Text))
{
unique.Add(msg);
continue;
}
var msgResult = await _embedder.GenerateAsync(msg.Text);
var msgEmb = msgResult.Vector;
var isDuplicate = embeddings.Any(e =>
CosineSimilarity(msgEmb, e) > threshold);
if (!isDuplicate)
{
unique.Add(msg);
embeddings.Add(msgEmb);
}
}
return unique;
}
private List<ChatMessage> AddFreshnessIndicators(
List<ChatMessage> messages)
{
return messages.Select(msg =>
{
if (msg.AdditionalProperties?.TryGetValue(
"timestamp", out var ts) != true)
return msg;
var age = DateTime.UtcNow - (DateTime)ts;
if (age.TotalDays < 1)
return msg;
return new ChatMessage(msg.Role,
$"[{(int)age.TotalDays}d ago] {msg.Text}");
}).ToList();
}
private async Task<string> SummarizeAsync(
List<ChatMessage> messages)
{
var formatted = string.Join("\n",
messages.Select(m => $"{m.Role}: {Truncate(m.Text ?? "", 300)}"));
var response = await _client.GetResponseAsync(
$"Summarize concisely:\n{formatted}");
return response.Text;
}
private static float CosineSimilarity(
ReadOnlyMemory<float> a,
ReadOnlyMemory<float> b)
{
var aSpan = a.Span;
var bSpan = b.Span;
float dot = 0, normA = 0, normB = 0;
for (int i = 0; i < aSpan.Length; i++)
{
dot += aSpan[i] * bSpan[i];
normA += aSpan[i] * aSpan[i];
normB += bSpan[i] * bSpan[i];
}
return dot / (MathF.Sqrt(normA) * MathF.Sqrt(normB));
}
private static int CountTokens(List<ChatMessage> messages) =>
messages.Sum(m => (m.Text?.Length ?? 0) / 4); // Approximation
private static string Truncate(string s, int max) =>
s.Length <= max ? s : s[..max] + "...";
} | Strategy | Addresses | Effectiveness | Overhead |
|---|---|---|---|
| Position reordering | Bloat (lost in middle) | 10-25% recall improvement | Low (embedding cost) |
| Sliding window | Bloat (size limit) | Prevents limit errors | Medium (summarization) |
| Deduplication | Bloat + Rot | 10-30% token reduction | Low (embedding cost) |
| Freshness tracking | Rot (staleness) | Varies by task | Very low |
| Contradiction detection | Rot (conflicts) | Prevents confused reasoning | Medium (LLM call) |
Mitigation strategies and their effectiveness
Evaluation Approach
Measure both the presence of bloat/rot and the effectiveness of your mitigations:
| Metric | What it Measures | How to Calculate |
|---|---|---|
| Needle recall rate | Information retrieval accuracy | NIAH tests at various sizes/positions |
| Effective context ratio | Useful tokens / total tokens | Estimate via relevance scoring |
| Contradiction frequency | Conflicting information count | Automated detection pass |
| Staleness score | Age of time-sensitive content | Weighted average of claim ages |
| Task degradation curve | Performance vs context size | Same task at increasing sizes |
Metrics for context health evaluation
Benchmark Suite Recommendations
- Letta Context-Bench - Comprehensive long-context evaluation
- Custom NIAH - Tailored needle tests for your domain
- Recall@K tests - Multi-fact retrieval accuracy
- Reasoning consistency - Same question, different context orderings