Introduction
Building robust, real-time applications in Rust can often feel like a juggling act. You want a responsive user interface, but you also need to handle complex, asynchronous data updates—all without sacrificing Rust’s core promise of memory safety and thread safety. This is where managing shared state becomes a critical challenge. The standard approach of manually wrapping everything or RwLock and Mutex or using channels can quickly lead to complex, verbose, and error-prone code. What if there was a simpler, more elegant way to get notified when your data changes, no matter where that change originates?
Introducing the observable-property crate, a zero-dependency Rust library that brings the power of the observer pattern to your concurrent applications. It provides a thread-safe, panic-isolated mechanism for observing changes to any value that can be cloned. Whether you’re building a real-time dashboard, a game, or a desktop application, observable-property simplifies state management by letting you focus on the logic, not the locks. In this article, we’ll explore its features by building a somehwat realistic stock ticker application, demonstrating how it solves common concurrency problems with clarity and safety.
The implementation
Adding the important stuff
Start by bringing in these prerequisites in your Cargo.toml file:
[dependencies]
observable-property = "0.2.1"
rand = "0.8"
Why do you need these?
- The
observable-propertycrate will provide us with theObservableProperty<>wrapper. - In the code you will be simulating a stock ticker which can go up and down. We do this simulation by adding or substracting a random amount of the stock price.
The Stock struct
The implementation of the Stock struct looks like this:
#[derive(Debug)]
struct Stock {
symbol: String,
price: ObservableProperty<f64>,
}
The Stock struct has two properties:
- The stock symbol, represented by a string.
- The price, which because it can change, we wrap in an
ObservableProperty<>.
Also there is a very basic implementation of a constructor function:
impl Stock {
fn new(symbol: &str, initial_price: f64) -> Self {
Stock {
symbol: symbol.to_string(),
price: ObservableProperty::new(initial_price),
}
}
}
The constructor function takes in a symbol and an initial price, creates a new struct, sets the appropiate fields, and returns the newly constructed struct.
Testing time
Now we run a small test. If you have code in your main() function, clear that, and start our test code by adding these lines:
let stock = Stock::new("OBSP", 100.0);
println!("--- Initializing Stock Ticker ---");
println!("Starting price for {}: €{:.2}", stock.symbol, stock.price.get()?);
println!("---------------------------------");
We need a stock so we can observe the price, so that is what we create. Also, we print out the initial state of the stock.
Adding the subscribers
Next create our first subscriber which we will call our dashboard as it prints out any change to the price property:
let symbol_dashboard = stock.symbol.clone();
stock.price.subscribe(Arc::new(move |_old_price, new_price| {
println!("\n[Dashboard] Price Updated for {}: €{:.2}", symbol_dashboard, new_price);
}))?;
However, if the price drops below € 95.00 the property needs to alert an observer, and this is what the next subscriber does, using a filter function:
let symbol_alert = stock.symbol.clone();
stock.price.subscribe_filtered(
Arc::new(move |_old_price, new_price| {
println!("\n*** [ALERT!] Price for {} has dropped to €{:.2} ***", symbol_alert, new_price);
}),
|_old_price, new_price| *new_price < 95.0,
)?;
Next we simulate a slow subscriber which simulates some slow work:
stock.price.subscribe(Arc::new(|old_price, new_price| {
thread::sleep(Duration::from_millis(500));
println!("[Log] Price change from €{:.2} to €{:.2} completed.", old_price, new_price);
}))?;
Running the ticker
Now it is time to run the full simulation:
let price_arc = Arc::new(stock.price);
let price_for_thread = Arc::clone(&price_arc);
let symbol_thread = stock.symbol.clone();
let data_feed_thread = thread::spawn(move || {
let price = price_for_thread;
let mut rng = rand::thread_rng();
for _ in 0..10 {
let current_price = price.get().unwrap_or_else(|e| {
eprintln!("Error getting current price: {}", e);
100.0 // Default value in case of error
});
let volatility = 0.05;
let change_percent = rng.gen_range(-volatility..volatility);
let change_amount = current_price * change_percent;
let noise = rng.gen_range(-1.0..1.0);
let new_price = (current_price + change_amount + noise).max(0.0);
if rng.gen_bool(0.5) {
println!("\n[Data Feed] Updating price for {} synchronously...", symbol_thread);
if let Err(e) = price.set(new_price) {
eprintln!("Error setting value: {}", e);
}
} else {
println!("\n[Data Feed] Updating price for {} asynchronously...", symbol_thread);
if let Err(e) = price.set_async(new_price) {
eprintln!("Error setting value asynchronously: {}", e);
}
}
thread::sleep(Duration::from_millis(1000));
}
});
The code begins by wrapping a stock’s price in an Arc (Atomic Reference Counting), which enables thread-safe sharing of the price data between threads. The Arc::clone() creates a new reference to the same data, allowing the spawned thread to safely access and modify the shared price.
A new thread is created using thread::spawn() to simulate a data feed that updates the stock price. This thread runs independently from the main program, with its own local copy of the symbol and a reference to the shared price data.
Inside the thread, a random number generator, rand::thread_rng(), is used to simulate market volatility. For each iteration:
- The current price is retrieved using the thread-safe reference
- A realistic price change is calculated based on:
- A volatility factor (5%)
- A random percentage change within that volatility range
- Some additional random noise
We calculate the new price by applying these changes to the current price, with a minimum floor of 0.0 to prevent negative prices.
The thread then randomly chooses between two update methods:
- Synchronous updates using
price.set()which block until complete - Asynchronous updates using
price.set_async()which delegate the notification of observers to worker threads
After each update, the thread sleeps for one second using thread::sleep(Duration::from_millis(1000)) to simulate a realistic update frequency. This process repeats 10 times before the thread completes.
The full main()function
Here is the full source to the main() function:
fn main() -> Result<(), PropertyError> {
let stock = Stock::new("OBSP", 100.0);
println!("--- Initializing Stock Ticker ---");
println!("Starting price for {}: €{:.2}", stock.symbol, stock.price.get()?);
println!("---------------------------------");
let symbol_dashboard = stock.symbol.clone();
stock.price.subscribe(Arc::new(move |_old_price, new_price| {
println!("\n[Dashboard] Price Updated for {}: €{:.2}", symbol_dashboard, new_price);
}))?;
let symbol_alert = stock.symbol.clone();
stock.price.subscribe_filtered(
Arc::new(move |_old_price, new_price| {
println!("\n*** [ALERT!] Price for {} has dropped to €{:.2} ***", symbol_alert, new_price);
}),
|_old_price, new_price| *new_price < 95.0, // The filter condition
)?;
stock.price.subscribe(Arc::new(|old_price, new_price| {
// Simulates a slow logging operation
thread::sleep(Duration::from_millis(500));
println!("[Log] Price change from €{:.2} to €{:.2} completed.", old_price, new_price);
}))?;
let price_arc = Arc::new(stock.price);
let price_for_thread = Arc::clone(&price_arc);
let symbol_thread = stock.symbol.clone();
let data_feed_thread = thread::spawn(move || {
let price = price_for_thread;
let mut rng = rand::thread_rng();
for _ in 0..10 {
let current_price = price.get().unwrap_or_else(|e| {
eprintln!("Error getting current price: {}", e);
100.0 // Default value in case of error
});
let volatility = 0.05;
let change_percent = rng.gen_range(-volatility..volatility);
let change_amount = current_price * change_percent;
let noise = rng.gen_range(-1.0..1.0);
let new_price = (current_price + change_amount + noise).max(0.0);
if rng.gen_bool(0.5) {
println!("\n[Data Feed] Updating price for {} synchronously...", symbol_thread);
if let Err(e) = price.set(new_price) {
eprintln!("Error setting value: {}", e);
}
} else {
println!("\n[Data Feed] Updating price for {} asynchronously...", symbol_thread);
if let Err(e) = price.set_async(new_price) {
eprintln!("Error setting value asynchronously: {}", e);
}
}
thread::sleep(Duration::from_millis(1000));
}
});
data_feed_thread.join().expect("Data feed thread panicked.");
println!("\n--- Ticker Simulation Complete ---");
Ok(())
}
Conclusion
The Observer design pattern, as implemented by the observable-property crate, provides a powerful and elegant solution for managing state and concurrency in Rust. As we’ve seen with the stock ticker example, it allows you to build a robust system where changes to a subject—like a stock’s price—automatically notify all registered observers.
This approach eliminates the complexities of manual locking and channel management, leading to cleaner, more maintainable code. By decoupling the data from the behaviors that react to it, you can easily extend your application with new features without modifying the core logic. Whether you’re creating a simple dashboard or a complex, event-driven system, the observable-property crate helps you write safer, more scalable, and more readable concurrent applications.
The observable-property can be found here and on crates.io you can find even more examples. The source code can be found here. If you have feature requests, bug reports or if you simply want to contribute please let me know.




