AS
AgSkills.dev
MARKETPLACE

rust-async-patterns

Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.

29.3k
3.2k

Preview

SKILL.md
name
rust-async-patterns
description
Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.

Rust Async Patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

When to Use This Skill

  • Building async Rust applications
  • Implementing concurrent network services
  • Using Tokio for async I/O
  • Handling async errors properly
  • Debugging async code issues
  • Optimizing async performance

Core Concepts

1. Async Execution Model

Future (lazy) β†’ poll() β†’ Ready(value) | Pending
                ↑           ↓
              Waker ← Runtime schedules

2. Key Abstractions

ConceptPurpose
FutureLazy computation that may complete later
async fnFunction returning impl Future
awaitSuspend until future completes
TaskSpawned future running concurrently
RuntimeExecutor that polls futures

Quick Start

# Cargo.toml [dependencies] tokio = { version = "1", features = ["full"] } futures = "0.3" async-trait = "0.1" anyhow = "1.0" tracing = "0.1" tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration}; use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { // Initialize tracing tracing_subscriber::fmt::init(); // Async operations let result = fetch_data("https://api.example.com").await?; println!("Got: {}", result); Ok(()) } async fn fetch_data(url: &str) -> Result<String> { // Simulated async operation sleep(Duration::from_millis(100)).await; Ok(format!("Data from {}", url)) }

Patterns

Pattern 1: Concurrent Task Execution

use tokio::task::JoinSet; use anyhow::Result; // Spawn multiple concurrent tasks async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> { let mut set = JoinSet::new(); for url in urls { set.spawn(async move { fetch_data(&url).await }); } let mut results = Vec::new(); while let Some(res) = set.join_next().await { match res { Ok(Ok(data)) => results.push(data), Ok(Err(e)) => tracing::error!("Task failed: {}", e), Err(e) => tracing::error!("Join error: {}", e), } } Ok(results) } // With concurrency limit use futures::stream::{self, StreamExt}; async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> { stream::iter(urls) .map(|url| async move { fetch_data(&url).await }) .buffer_unordered(limit) // Max concurrent tasks .collect() .await } // Select first to complete use tokio::select; async fn race_requests(url1: &str, url2: &str) -> Result<String> { select! { result = fetch_data(url1) => result, result = fetch_data(url2) => result, } }

Pattern 2: Channels for Communication

use tokio::sync::{mpsc, broadcast, oneshot, watch}; // Multi-producer, single-consumer async fn mpsc_example() { let (tx, mut rx) = mpsc::channel::<String>(100); // Spawn producer let tx2 = tx.clone(); tokio::spawn(async move { tx2.send("Hello".to_string()).await.unwrap(); }); // Consume while let Some(msg) = rx.recv().await { println!("Got: {}", msg); } } // Broadcast: multi-producer, multi-consumer async fn broadcast_example() { let (tx, _) = broadcast::channel::<String>(100); let mut rx1 = tx.subscribe(); let mut rx2 = tx.subscribe(); tx.send("Event".to_string()).unwrap(); // Both receivers get the message let _ = rx1.recv().await; let _ = rx2.recv().await; } // Oneshot: single value, single use async fn oneshot_example() -> String { let (tx, rx) = oneshot::channel::<String>(); tokio::spawn(async move { tx.send("Result".to_string()).unwrap(); }); rx.await.unwrap() } // Watch: single producer, multi-consumer, latest value async fn watch_example() { let (tx, mut rx) = watch::channel("initial".to_string()); tokio::spawn(async move { loop { // Wait for changes rx.changed().await.unwrap(); println!("New value: {}", *rx.borrow()); } }); tx.send("updated".to_string()).unwrap(); }

Pattern 3: Async Error Handling

use anyhow::{Context, Result, bail}; use thiserror::Error; #[derive(Error, Debug)] pub enum ServiceError { #[error("Network error: {0}")] Network(#[from] reqwest::Error), #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Not found: {0}")] NotFound(String), #[error("Timeout after {0:?}")] Timeout(std::time::Duration), } // Using anyhow for application errors async fn process_request(id: &str) -> Result<Response> { let data = fetch_data(id) .await .context("Failed to fetch data")?; let parsed = parse_response(&data) .context("Failed to parse response")?; Ok(parsed) } // Using custom errors for library code async fn get_user(id: &str) -> Result<User, ServiceError> { let result = db.query(id).await?; match result { Some(user) => Ok(user), None => Err(ServiceError::NotFound(id.to_string())), } } // Timeout wrapper use tokio::time::timeout; async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError> where F: std::future::Future<Output = Result<T, ServiceError>>, { timeout(duration, future) .await .map_err(|_| ServiceError::Timeout(duration))? }

Pattern 4: Graceful Shutdown

use tokio::signal; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; async fn run_server() -> Result<()> { // Method 1: CancellationToken let token = CancellationToken::new(); let token_clone = token.clone(); // Spawn task that respects cancellation tokio::spawn(async move { loop { tokio::select! { _ = token_clone.cancelled() => { tracing::info!("Task shutting down"); break; } _ = do_work() => {} } } }); // Wait for shutdown signal signal::ctrl_c().await?; tracing::info!("Shutdown signal received"); // Cancel all tasks token.cancel(); // Give tasks time to cleanup tokio::time::sleep(Duration::from_secs(5)).await; Ok(()) } // Method 2: Broadcast channel for shutdown async fn run_with_broadcast() -> Result<()> { let (shutdown_tx, _) = broadcast::channel::<()>(1); let mut rx = shutdown_tx.subscribe(); tokio::spawn(async move { tokio::select! { _ = rx.recv() => { tracing::info!("Received shutdown"); } _ = async { loop { do_work().await } } => {} } }); signal::ctrl_c().await?; let _ = shutdown_tx.send(()); Ok(()) }

Pattern 5: Async Traits

use async_trait::async_trait; #[async_trait] pub trait Repository { async fn get(&self, id: &str) -> Result<Entity>; async fn save(&self, entity: &Entity) -> Result<()>; async fn delete(&self, id: &str) -> Result<()>; } pub struct PostgresRepository { pool: sqlx::PgPool, } #[async_trait] impl Repository for PostgresRepository { async fn get(&self, id: &str) -> Result<Entity> { sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id) .fetch_one(&self.pool) .await .map_err(Into::into) } async fn save(&self, entity: &Entity) -> Result<()> { sqlx::query!( "INSERT INTO entities (id, data) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET data = $2", entity.id, entity.data ) .execute(&self.pool) .await?; Ok(()) } async fn delete(&self, id: &str) -> Result<()> { sqlx::query!("DELETE FROM entities WHERE id = $1", id) .execute(&self.pool) .await?; Ok(()) } } // Trait object usage async fn process(repo: &dyn Repository, id: &str) -> Result<()> { let entity = repo.get(id).await?; // Process... repo.save(&entity).await }

Pattern 6: Streams and Async Iteration

use futures::stream::{self, Stream, StreamExt}; use async_stream::stream; // Create stream from async iterator fn numbers_stream() -> impl Stream<Item = i32> { stream! { for i in 0..10 { tokio::time::sleep(Duration::from_millis(100)).await; yield i; } } } // Process stream async fn process_stream() { let stream = numbers_stream(); // Map and filter let processed: Vec<_> = stream .filter(|n| futures::future::ready(*n % 2 == 0)) .map(|n| n * 2) .collect() .await; println!("{:?}", processed); } // Chunked processing async fn process_in_chunks() { let stream = numbers_stream(); let mut chunks = stream.chunks(3); while let Some(chunk) = chunks.next().await { println!("Processing chunk: {:?}", chunk); } } // Merge multiple streams async fn merge_streams() { let stream1 = numbers_stream(); let stream2 = numbers_stream(); let merged = stream::select(stream1, stream2); merged .for_each(|n| async move { println!("Got: {}", n); }) .await; }

Pattern 7: Resource Management

use std::sync::Arc; use tokio::sync::{Mutex, RwLock, Semaphore}; // Shared state with RwLock (prefer for read-heavy) struct Cache { data: RwLock<HashMap<String, String>>, } impl Cache { async fn get(&self, key: &str) -> Option<String> { self.data.read().await.get(key).cloned() } async fn set(&self, key: String, value: String) { self.data.write().await.insert(key, value); } } // Connection pool with semaphore struct Pool { semaphore: Semaphore, connections: Mutex<Vec<Connection>>, } impl Pool { fn new(size: usize) -> Self { Self { semaphore: Semaphore::new(size), connections: Mutex::new((0..size).map(|_| Connection::new()).collect()), } } async fn acquire(&self) -> PooledConnection<'_> { let permit = self.semaphore.acquire().await.unwrap(); let conn = self.connections.lock().await.pop().unwrap(); PooledConnection { pool: self, conn: Some(conn), _permit: permit } } } struct PooledConnection<'a> { pool: &'a Pool, conn: Option<Connection>, _permit: tokio::sync::SemaphorePermit<'a>, } impl Drop for PooledConnection<'_> { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let pool = self.pool; tokio::spawn(async move { pool.connections.lock().await.push(conn); }); } } }

Debugging Tips

// Enable tokio-console for runtime debugging // Cargo.toml: tokio = { features = ["tracing"] } // Run: RUSTFLAGS="--cfg tokio_unstable" cargo run // Then: tokio-console // Instrument async functions use tracing::instrument; #[instrument(skip(pool))] async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> { tracing::debug!("Fetching user"); // ... } // Track task spawning let span = tracing::info_span!("worker", id = %worker_id); tokio::spawn(async move { // Enters span when polled }.instrument(span));

Best Practices

Do's

  • Use tokio::select! - For racing futures
  • Prefer channels - Over shared state when possible
  • Use JoinSet - For managing multiple tasks
  • Instrument with tracing - For debugging async code
  • Handle cancellation - Check CancellationToken

Don'ts

  • Don't block - Never use std::thread::sleep in async
  • Don't hold locks across awaits - Causes deadlocks
  • Don't spawn unboundedly - Use semaphores for limits
  • Don't ignore errors - Propagate with ? or log
  • Don't forget Send bounds - For spawned futures

Resources

GitHub Repository
wshobson/agents
Stars
29,322
Forks
3,213
Open Repository
Install Skill
Download ZIP1 files