Skip to main content

Rust Concurrency Patterns for AI Agents

· 7 min read
fr4nk
Software Engineer
Hugging Face

Production patterns for building fast, concurrent AI agents in Rust.

Why Rust over Python?

# Python: GIL blocks true parallelism
await asyncio.gather(
rag_search(query), # waiting...
web_search(query) # also waiting... (not parallel for CPU)
)
// Rust: actually parallel
let (rag, web) = tokio::join!(
rag_search(&query), // thread 1
web_search(&query) // thread 2 - truly concurrent
);
PythonRust
CPU ParallelismGIL blocksReal threads
Memory~28 bytes/int + GC4 bytes/int, no GC
LatencyGC spikes (50-200ms)Consistent
ScalingLimited by GILLinear with cores

Bottom line: Python's GIL prevents true parallelism for CPU-bound work. Rust gives you real concurrency with predictable performance.


Tokio vs Rayon: Quick Reference

┌─────────────────────────────────────────────────────────────┐
│ AI Agent Workloads │
├─────────────────────────┬───────────────────────────────────┤
│ I/O-bound │ CPU-bound │
│ (Tokio) │ (Rayon) │
├─────────────────────────┼───────────────────────────────────┤
│ • LLM API calls │ • Local embedding │
│ • Web search │ • Reranking/scoring │
│ • Vector DB queries │ • Text chunking │
│ • URL fetching │ • Matrix operations │
└─────────────────────────┴───────────────────────────────────┘

Rule of thumb:

  • Network/API call → tokio
  • Local CPU compute → rayon
  • Both → tokio + spawn_blocking + rayon

Pattern 1: Bounded Concurrent Processing

Use case: Parse thousands of files concurrently with memory limits

use tokio::sync::Semaphore;
use futures::stream::{self, StreamExt};
use dashmap::DashMap;

pub struct DocumentProcessor {
max_concurrent: usize,
cache: Arc<DashMap<String, Document>>,
}

async fn process_all(&self, files: Vec<PathBuf>) -> Result<Vec<Document>> {
let sem = Arc::new(Semaphore::new(self.max_concurrent));

stream::iter(files)
.map(|path| {
let sem = sem.clone();
async move {
let _permit = sem.acquire().await?;
tokio::task::spawn_blocking(move || {
parse_document(&path)
}).await?
}
})
.buffer_unordered(self.max_concurrent)
.collect()
.await
}
ChoiceWhyAlternative
SemaphoreLimit memory usagejoin_all - OOM with 10K files
buffer_unorderedProcess as completebuffer - waits for order
spawn_blockingDon't block tokioInline - blocks runtime
DashMapLock-free cacheMutex<HashMap> - contention

Pattern 2: Parallel Pipeline Stages

Use case: RAG pipeline - retrieve, rerank, generate

async fn run(&self, query: &str) -> Result<Response> {
// Stage 1: Parallel retrieval (I/O)
let (docs, web, graph) = tokio::try_join!(
self.retriever.search_docs(query),
self.retriever.search_web(query),
self.retriever.search_graph(query)
)?;

// Stage 2: Rerank (CPU)
let ranked = tokio::task::spawn_blocking({
let all = merge(docs, web, graph);
move || rerank(&all)
}).await?;

// Stage 3: Generate (I/O)
self.generator.generate(query, &ranked).await
}
ChoiceWhyAlternative
try_join!Fail fast, static dispatchjoin! - waits on error
spawn_blockingCPU rerank off asyncInline - blocks 100ms+
Sequential stagesRerank needs retrievalFull parallel - impossible

Use case: Query expansion - search "auth", "login", "JWT" together

use futures::future::try_join_all;

async fn multi_search(&self, queries: &[String], k: usize) -> Result<Vec<SearchResult>> {
let futures = queries.iter().map(|q| self.search(q, k));
let results = try_join_all(futures).await?;
Ok(merge_and_dedupe(results))
}

async fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
let emb = tokio::task::spawn_blocking({
let q = query.to_string();
move || embed(&q)
}).await??;
self.vector_store.search(&emb, k).await
}
ChoiceWhyAlternative
try_join_allDynamic query counttry_join! - fixed at compile
try_join_allFail fastjoin_all - collects errors
spawn_blockingEmbed is CPUInline - blocks tokio

Pattern 4: Provider Fallback Chain

Use case: Embedding fallback - OpenAI → Ollama → local ONNX

pub struct EmbeddingService {
openai: Option<OpenAIProvider>,
ollama: Option<OllamaProvider>,
onnx: Option<OnnxModel>,
}

async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>> {
// Remote APIs (I/O)
if let Some(openai) = &self.openai {
if let Ok(emb) = openai.embed_batch(texts).await {
return Ok(emb);
}
}
if let Some(ollama) = &self.ollama {
if let Ok(emb) = ollama.embed_batch(texts).await {
return Ok(emb);
}
}
// Local model (CPU)
if let Some(onnx) = &self.onnx {
return tokio::task::spawn_blocking({
let t = texts.to_vec();
let m = onnx.clone();
move || m.embed_sync(&t)
}).await?;
}
Err(anyhow!("No provider available"))
}
ChoiceWhyAlternative
Sequential tryStop on successselect! - wastes all
Early returnFast pathCollect all - slower
spawn_blockingONNX is CPUInline - blocks 50-200ms

Pattern 5: Event Loop with Select

Use case: Agent daemon - queries, file changes, shutdown

async fn run_agent(&self, mut events: Receiver<Event>) -> Result<()> {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {
self.cleanup_cache().await?;
}
Some(event) = events.recv() => {
match event {
Event::Query(q) => self.handle_query(q).await?,
Event::Index(path) => self.reindex(&path).await?,
}
}
_ = tokio::signal::ctrl_c() => break,
}
}
Ok(())
}
ChoiceWhyAlternative
select!Multiplex N eventsSeparate loops - complex
select!Auto-cancel losersManual cancel - bugs
ChannelDecouple producersShared state - locks

Pattern 6: RwLock for Read-Heavy Cache

Use case: Embedding cache - track stats without blocking reads

use parking_lot::RwLock;
use dashmap::DashMap;

pub struct EmbeddingCache<K, V> {
data: DashMap<K, V>,
stats: Arc<RwLock<CacheStats>>,
order: Arc<Mutex<VecDeque<K>>>,
}

impl<K, V> EmbeddingCache<K, V> {
pub fn get(&self, key: &K) -> Option<V> {
if let Some(value) = self.data.get(key) {
if let Some(mut stats) = self.stats.try_write() {
stats.hits += 1;
}
return Some(value.clone());
}
if let Some(mut stats) = self.stats.try_write() {
stats.misses += 1;
}
None
}

pub fn hit_ratio(&self) -> f64 {
let stats = self.stats.read();
stats.hits as f64 / (stats.hits + stats.misses) as f64
}
}
ChoiceWhyAlternative
DashMapLock-free K/VRwLock<HashMap> - contention
RwLockMany readers OKMutex - blocks reads
parking_lotFaster, no poisonstd::sync - slower
try_writeNon-blockingwrite - hurts latency
Mutex for orderAlways mutatingRwLock - no benefit

Concurrency Primitives Summary

TypeUse CaseTrade-off
SemaphoreLimit concurrent tasksPermit overhead
DashMapHigh-concurrency K/VMemory (sharding)
RwLockRead-heavy dataWrite starvation
MutexWrite-heavy, orderingReaders wait
try_lock()Non-critical updatesMay skip
spawn_blockingCPU in async contextThread pool overhead

Rig Agent Patterns

Agent Builder with Tool Registration

use rig::providers::openai;

pub struct AgentBuilder {
executor: Arc<GraphToolExecutor>,
tier: ContextTier,
}

impl AgentBuilder {
pub fn new(executor: Arc<GraphToolExecutor>) -> Self {
Self { executor, tier: ContextTier::Standard }
}

pub fn tier(mut self, tier: ContextTier) -> Self {
self.tier = tier;
self
}

pub fn build(self) -> Result<Agent> {
let factory = GraphToolFactory::new(self.executor);

let agent = openai::Client::from_env()
.agent("gpt-4")
.preamble(&self.system_prompt())
.max_tokens(self.max_tokens())
// Register tools via factory
.tool(factory.semantic_search())
.tool(factory.dependency_analysis())
.tool(factory.code_complexity())
.tool(factory.call_chain())
.build();

Ok(agent)
}
}
ChoiceWhyAlternative
Builder patternFluent configConstructor args - messy
Tool factoryCentralized creationInline tools - duplication
Arc<Executor>Share across toolsClone executor - expensive

ReAct Multi-Turn Agent

use rig::agent::Agent;

pub struct ReactAgent {
agent: Agent,
factory: GraphToolFactory,
max_turns: usize,
}

impl ReactAgent {
pub async fn execute(&self, query: &str) -> Result<String> {
let response = self.agent
.prompt(query)
.multi_turn(self.max_turns) // reasoning loop
.await?;

// Track tool usage
let tool_count = self.factory.take_tool_call_count();
let traces = self.factory.take_tool_traces();
tracing::info!("Tools called: {}, traces: {:?}", tool_count, traces);

Ok(response)
}

pub async fn execute_stream(&self, query: &str) -> impl Stream<Item = AgentEvent> {
stream! {
yield AgentEvent::Thinking;

let response = self.agent
.prompt(query)
.multi_turn(self.max_turns)
.await;

match response {
Ok(text) => {
for chunk in text.chars().collect::<Vec<_>>().chunks(50) {
yield AgentEvent::Chunk(chunk.iter().collect());
}
yield AgentEvent::Done;
}
Err(e) => yield AgentEvent::Error(e.to_string()),
}
}
}
}
ChoiceWhyAlternative
multi_turnReAct reasoning loopSingle turn - no iteration
Tool tracesObservabilityNo tracking - blind
Stream eventsProgressive UIWait for full response

Tool Factory with Counting

pub struct GraphToolFactory {
executor: Arc<GraphToolExecutor>,
call_count: Arc<AtomicUsize>,
traces: Arc<Mutex<Vec<ToolTrace>>>,
}

impl GraphToolFactory {
pub fn new(executor: Arc<GraphToolExecutor>) -> Self {
Self {
executor,
call_count: Arc::new(AtomicUsize::new(0)),
traces: Arc::new(Mutex::new(Vec::new())),
}
}

pub fn semantic_search(&self) -> impl Tool {
CountingTool::new(
SemanticSearchTool::new(self.executor.clone()),
self.call_count.clone(),
self.traces.clone(),
)
}

pub fn take_tool_call_count(&self) -> usize {
self.call_count.swap(0, Ordering::SeqCst)
}
}

struct CountingTool<T> {
inner: T,
count: Arc<AtomicUsize>,
traces: Arc<Mutex<Vec<ToolTrace>>>,
}

impl<T: Tool> Tool for CountingTool<T> {
async fn call(&self, input: Value) -> Result<Value> {
self.count.fetch_add(1, Ordering::SeqCst);
let start = Instant::now();

let result = self.inner.call(input.clone()).await;

if let Ok(ref traces) = self.traces.try_lock() {
traces.push(ToolTrace {
tool: std::any::type_name::<T>(),
input,
duration: start.elapsed(),
});
}
result
}
}
ChoiceWhyAlternative
AtomicUsizeLock-free countingMutex<usize> - contention
try_lockDon't block on tracelock - hurts latency
Wrapper patternAdd counting to any toolModify each tool - tedious

Full Example: RAG Pipeline

pub async fn search_and_answer(query: &str) -> Result<Answer> {
// 1. Parallel retrieval (I/O)
let (rag, web) = tokio::try_join!(
async {
let (vec, kw) = tokio::join!(
vector_search(query),
keyword_search(query),
);
rrf_merge(vec![vec, kw])
},
async {
let urls = web_search(query).await?;
parallel_fetch(&urls).await
},
)?;

// 2. Rerank (CPU)
let top_k = tokio::task::spawn_blocking({
let merged = merge(rag, web);
let q = query.to_string();
move || {
use rayon::prelude::*;
let mut scored: Vec<_> = merged
.par_iter()
.map(|doc| (doc.clone(), score(&q, doc)))
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
scored.into_iter().take(10).map(|(d, _)| d).collect()
}
}).await?;

// 3. Generate (I/O)
generate_with_citations(query, &top_k).await
}

Resources