The Rusty Actor: using SharpAndRusty to implement multiple actors.

Introduction

You can build applications in many ways. One way to do this is using actors. In this setup, an actor is the ‘primitive’ of the appliction. Actors have a number of important characteristics namely:

  1. Each actor encapsulates its private state and behaviour. An important aspect of this pattern is that the state can only be modified in response to the messages the actor receives.
  2. Mesages between actors are passed in an asynchronous manner. This is to allow decoupling between sender and receiver, and allows for non-blocking communicatin between actors. It also makes scaling systems much easier.
  3. Actors themselves can, in principle, create new actors when needed, and when constructor can maintain references to the newly created actors. They can however only communicate with actors whose addresses, which in many cases will be references, they know because they created the other actors, or they got the address in the messages they received.
  4. From the above it follows that this pattern supports concurrency at least among the actors. Since actors can operate independently this can result to an efficient use of resources.
  5. Making these systems fault-tolerant, since the state of each actor is independent of other actors. You can event have supervising actors, as we shall see in the example, remove faulty actors and as mentioned spin up new ones.

Implementation in C# with SharpAndRusty

To illustrate this pattern we will build imagine a world where you have some radio controlled cars, which are controlled by central fleet controller, which can broadcast commands to the entire fleet. Commands are very simple: Start, Stop, and Turn either left or right.

To start set up a dotnet console app, and adding the Esox.SharpAndRusty nuget package to your project.

Next in your Program.cs file add the following imports:

using System.Threading.Channels;
using Esox.SharpAndRusty.Extensions;
using Esox.SharpAndRusty.Types;

What do these do?

  1. We will be using .NET channels for communication between actors. In this simple example this will be just between the fleet controller and the cars.
  2. The SharpAndRusty library provides a Result<> type, which we will use extensively in this example. That is what the Esox.SharpAndRusty.Types provides.
  3. We also need some extension methods like Bind() and Match() which are provided by the Esox.SharpAndRusty.Extensions package.

Cars can only move either left or right, so we encapsulate that behaviour in an enum:

public enum Direction
{
    Left,
    Right,
}

Next we implement the CarCommand:

public abstract record CarCommand
{
    public sealed record Turn(Direction Direction) : CarCommand;

    public sealed record Stop : CarCommand;

    public sealed record Start : CarCommand;

    public sealed record GetState(Channel<CarCommand> ReplyChannel) : CarCommand;

    public override string ToString() => this switch
    {
        Turn turn => $"Turn: {turn.Direction}",
        Stop => "Stop",
        Start => "Start",
        GetState => "GetState",
        _ => "Unknown"
    };
    
}

What happens here?

  1. We use abstract record to basically create a discriminated union, otherwise known as a sum type representing all possible commands that can be sent to a radio controlled car.
  2. In it we define four nested sealed records representing the four possible command variants, namely Turn, Stop, Start and GetState. As you can see the Turn command carries a direction, and the GetState command carries a reply channel for requesting the car’s current state.
  3. The sealed keyword prevents further inheritance from the nested command types, ensureing the union is closed and only these four variants exist.
  4. In the ToString() method we use a switch expression to provide string representations for each command variant, with a fallback to Unknown for exhaustiveness.
  5. This command-pattern is typesafe as it ensures only valid ommand types can be created and processed, making the code safe and more maintainable than using string or enums with separate data structures.

The RadioControlledCar class

Now we can start work on the RadioControlledCar class. We start by defining three properties:

    private readonly string _id;
    private readonly ChannelReader<CarCommand> _receiver;
    private Option<CarCommand> _state;

Let’s describe these:

  1. Since we will be handling a fleet of cars, each car has a unique id.
  2. Each car needs to be able to receive commands from a controller, or perhaps from other cars but that is not something we will implement in this example, so we need a channel for that.
  3. Finally we need to keep track of the state the car is currently in. This may be uninitialized, so we could have gone for a nullable value. However, we use an Option<> type instead.

Why is Option<> the better choice here?

  • Explicit Intent: It clearly signals that the absence of a state is a valid and expected scenario, not an error condition. The Option<> type forces us to handle both Some and None cases explicitly.
  • Type Safety: With Option<> you cannot accidentally access the value without pattern matching or unwrapping. A nullable CarCommand? could lead to null reference exceptions if accessed without checking.
  • Functional Semantics: The code uses functional patterns, like Result<> and pattern matching, Option<> integrates naturally with this style.

Next we implement the constructor, which is pretty straightforward:

    public RadioControlledCar(string id, ChannelReader<CarCommand> receiver)
    {
        _id = id;
        _receiver = receiver;
        _state = new Option<CarCommand>.Some(new CarCommand.Stop());
    }

Note that the default state of the car is the stopped state.

 public Task<Result<CarCommand, Error>> StartAsync(CancellationToken cancellationToken = default)
    {
        return Task.Run(async () =>
        {
            try
            {
                await foreach (var message in _receiver.ReadAllAsync(cancellationToken))
                {
                    switch (message)
                    {
                        case CarCommand.GetState getState:
                            try
                            {
                                var stateToSend = _state switch
                                {
                                    Option<CarCommand>.Some some => some.Value,
                                    _ => new CarCommand.Stop()
                                };
                                await getState.ReplyChannel.Writer.WriteAsync(stateToSend, cancellationToken);
                            }
                            catch (Exception e)
                            {
                                Console.Error.WriteLine($"[{_id}] Failed to send state reply: {e.Message}");
                            }

                            break;
                        case CarCommand.Stop:
                            Console.WriteLine($"[{_id}] Received {message}");
                            _state = new Option<CarCommand>.Some(message);
                            Console.WriteLine($"[{_id}] State: {_state}");
                            return _state switch
                            {
                                Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                                _ => Result<CarCommand, Error>.Err(Error.New("No state available", ErrorKind.Other))
                            };
                        default:
                            Console.WriteLine($"[{_id}] Received {message}");
                            _state = new Option<CarCommand>.Some(message);
                            Console.WriteLine($"[{_id}] State: {_state}");
                            break;
                    }
                }

                return _state switch
                {
                    Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                    _ => Result<CarCommand, Error>.Err(Error.New("No state available", ErrorKind.Other))
                };
            }
            catch (OperationCanceledException)
            {
                return _state switch
                {
                    Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                    _ => Result<CarCommand, Error>.Err(Error.New("Operation cancelled with no state", ErrorKind.Other))
                };
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"[{_id}] Receiver error: {e.Message}");
                return Result<CarCommand, Error>.Err(Error.FromException(e));
            }
        }, cancellationToken);
    }

The purpose of this code is to start an asynchronous message processing loop which receives and handles CarCommand messages from a channel, maintaining the cars internal state. Let’s go over it line by line:

  1. The method starts with a call to Task.Run(). This offloads message processing to a background thread while still supporting async operations.
  2. The await foreach loop continuously reads messages from the _receiver channel until it’s completed or cancelled.
  3. In the loop itself we use pattern mathcing to extract the messages.
  4. Note that since _state is an Option<> type we must use a switch expression to extract the state.
  5. The GetState command returns the current state over a specified channel.
  6. The Stop command prints out a message, updates the state and returns from the foreach loop.
  7. All other commands like Start and Turn print out messages, update the state, and exit the switch statement to wait for the next command.
  8. When the foreach loop has terminated, we return the current state, wrapped in a Result<> type.
  9. We have two catch clauses: the first one is for when the channel is closed. In this case we return a Result<> which if there is a valid state will be Ok, otherwise an error will be returned.
  10. All other exceptions are caught in the second catch clause, in which case an error result will be returned.

The full code for RadioControlledCar looks like this:

public class RadioControlledCar
{
    private readonly string _id;
    private readonly ChannelReader<CarCommand> _receiver;
    private Option<CarCommand> _state;
    
    public RadioControlledCar(string id, ChannelReader<CarCommand> receiver)
    {
        _id = id;
        _receiver = receiver;
        _state = new Option<CarCommand>.Some(new CarCommand.Stop());
    }

    public Task<Result<CarCommand, Error>> StartAsync(CancellationToken cancellationToken = default)
    {
        return Task.Run(async () =>
        {
            try
            {
                await foreach (var message in _receiver.ReadAllAsync(cancellationToken))
                {
                    switch (message)
                    {
                        case CarCommand.GetState getState:
                            try
                            {
                                var stateToSend = _state switch
                                {
                                    Option<CarCommand>.Some some => some.Value,
                                    _ => new CarCommand.Stop()
                                };
                                await getState.ReplyChannel.Writer.WriteAsync(stateToSend, cancellationToken);
                            }
                            catch (Exception e)
                            {
                                Console.Error.WriteLine($"[{_id}] Failed to send state reply: {e.Message}");
                            }

                            break;
                        case CarCommand.Stop:
                            Console.WriteLine($"[{_id}] Received {message}");
                            _state = new Option<CarCommand>.Some(message);
                            Console.WriteLine($"[{_id}] State: {_state}");
                            return _state switch
                            {
                                Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                                _ => Result<CarCommand, Error>.Err(Error.New("No state available", ErrorKind.Other))
                            };
                        default:
                            Console.WriteLine($"[{_id}] Received {message}");
                            _state = new Option<CarCommand>.Some(message);
                            Console.WriteLine($"[{_id}] State: {_state}");
                            break;
                    }
                }

                return _state switch
                {
                    Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                    _ => Result<CarCommand, Error>.Err(Error.New("No state available", ErrorKind.Other))
                };
            }
            catch (OperationCanceledException)
            {
                return _state switch
                {
                    Option<CarCommand>.Some some => Result<CarCommand, Error>.Ok(some.Value),
                    _ => Result<CarCommand, Error>.Err(Error.New("Operation cancelled with no state", ErrorKind.Other))
                };
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"[{_id}] Receiver error: {e.Message}");
                return Result<CarCommand, Error>.Err(Error.FromException(e));
            }
        }, cancellationToken);
    }
}

The FleetController class

Next we come to the FleetController class. This class manages the fleet of cars and in our simple example all it can do is broadcast a command to the entire fleet of cars.

We will start by defining two properties:

    private readonly List<ChannelWriter<CarCommand>> _senders;
    private readonly object _lock = new object();

Line by line:

  1. We need to able to access the cars through their channels, and that is why we need a list of those.
  2. Since our fleet controller might accessed by multiple threads, we need to make sure it is threadsafe, that is what the _lock is for.

The constructor is quite straightforward, it receives a list of channels as its only argument:

    public FleetController(List<ChannelWriter<CarCommand>> senders)
    {
        _senders = senders;
    }

Next we implement the Broadcast() method:

public Result<Unit, Error> Broadcast(CarCommand message)
    {
        ChannelWriter<CarCommand>[] snapshot;
        lock (_lock)
        {
            snapshot = _senders.ToArray();
        }

        for (int index = 0; index < snapshot.Length; index++)
        {
            try
            {
                if (!snapshot[index].TryWrite(message))
                {
                    var error = $"Car {index}: Channel full or closed";
                    Console.Error.WriteLine($"Failed to broadcast to car {index}: {error}");
                    return Result<Unit, Error>.Err(Error.New(error, ErrorKind.Other));
                }
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"Failed to broadcast to car {index}: {e.Message}");
                return Result<Unit, Error>.Err(Error.FromException(e));
            }
        }


        return Result<Unit, Error>.Ok(Unit.Value);
    }

Line by line:

  1. The method receives a CarCommand as its single argument, which is the command which will be broadcast.
  2. The return type of the method is Result<Unit,Error> which can indicate a broadcast success or a failure.
  3. The method start by creating a snapshot of the current _senders using the lock. This is to prevent changes any changes to this list interfering with the broadcast.
  4. Next we iterate over the snapshotm and try to send the message to each channel. The TryWrite() method attempts an immediate write without blocking. It returns false if the the channel is full or closed.
  5. If an error occurs we return an error result.
  6. Any other exceptions are caught in the catch clause, which also returns an error result.
  7. If all went well, we return an ok result.

The Size() method is basically a utility method, returning the number of current senders. Note the use of the lock to make sure that other threads cannot access the senders list while the size is being determined:

    public int Size
    {
        get
        {
            lock (_lock)
            {
                return _senders.Count;
            }
        }
    }

Testing it a real program

Now we can start using this pattern in a real program:

    public static async Task<int> Main(string[] args)
    {
        Console.WriteLine("Starting Radio-Controlled Car Fleet Demo\n");

        var firstChannel = Channel.CreateUnbounded<CarCommand>();
        var firstCar = new RadioControlledCar("Car-1", firstChannel.Reader);
        var firstHandle = firstCar.StartAsync();

        var secondChannel = Channel.CreateUnbounded<CarCommand>();
        var secondCar = new RadioControlledCar("Car-2", secondChannel.Reader);
        var secondHandle = secondCar.StartAsync();

        var controller = new FleetController(new List<ChannelWriter<CarCommand>>
        {
            firstChannel.Writer,
            secondChannel.Writer
        });

        Console.WriteLine($"Fleet size: {controller.Size}\n\n");
        
        var result = controller.Broadcast(new CarCommand.Start())
            .Bind(_ => controller.Broadcast(new CarCommand.Turn(Direction.Left)))
            .Bind(_ => controller.Broadcast(new CarCommand.Turn(Direction.Right)))
            .Bind(_ => controller.Broadcast(new CarCommand.Stop()));

        if (result.IsFailure)
        {
            result.InspectErr(error => Console.Error.WriteLine($"Broadcast failed: {error}"));
            return 1;
        }

        firstChannel.Writer.Complete();
        secondChannel.Writer.Complete();

        var firstResult = await firstHandle;
        var secondResult = await secondHandle;

        Console.WriteLine("\n---------- Final States ----------\n");

        firstResult.Match(
            success: state =>
            {
                Console.WriteLine($"Car-1: {state}");
                return Unit.Value;
            },
            failure: error =>
            {
                Console.Error.WriteLine($"Car-1 Error: {error}");
                return Unit.Value;
            });

        secondResult.Match(
            success: state =>
            {
                Console.WriteLine($"Car-2: {state}");
                return Unit.Value;
            },
            failure: error =>
            {
                Console.Error.WriteLine($"Car-2 Error: {error}");
                return Unit.Value;
            });

        var finalResult = firstResult.Bind(_ => secondResult).Map(_ => Unit.Value);

        return finalResult.Match(
            success: _ => 0,
            failure: _ => 1);
    }

Line by line:

  1. We start by creating two unbounded channels for communication with the two cars.
  2. Each car receives the reader for the channel.
  3. Next we start each car which returns a task handle, which will complete when the car stops.
  4. Now we can create the fleetcontroller, providing it with the two channel writers.
  5. With all this set up, we can start broadcasting commands., using the functional Bind() operation. Each operation only proceeds if the previous one succeeded.
  6. If there was a failure we print a message and return with error code 1.
  7. Since there will be no more commands, we can call Complete() on each channel to signal no more messages will be sent.
  8. Next we wait for both car tasks to finish and retrieve their final states.
  9. Finally we print out the finall states using the Match() method.

Conclusion

The “Actor” pattern, implemented here using SharpAndRusty, demonstrates how a functional approach can significantly modernize and stabilize actor-based systems in C#. By moving away from traditional null-checking and exception-driven logic, we create a system that is not only more robust but also much more expressive.

The Power of Result and Option

The backbone of this implementation lies in the use of Option<T> and Result<T, E>. These types transition the codebase from “defensive programming” to “type-safe intent”:

  • Eliminating Null-Reference Anxiety: By using Option<CarCommand>, we explicitly acknowledge that a car might not have a current state. The compiler effectively forces us to handle the “None” case, preventing the infamous NullReferenceException at runtime.
  • Predictable Error Handling: The Result<> type turns errors into first-class citizens. Instead of catching generic exceptions that might bubble up from anywhere, the Broadcast and StartAsync methods return a clear “Success or Failure” status. This makes the control flow visible and predictable.
  • Declarative Pipelines with Bind and Match: Using .Bind() allows us to chain operations into a clean, readable pipeline. If any command in the sequence fails, the chain short-circuits gracefully. Finally, .Match() ensures that we exhaustively handle every possible outcome before the program exits.

If you are interested in experimenting with the code, it is available, with some tests on https://github.com/snoekiede/MultipleActors.SharpAndRusty

The Code Nomad
The Code Nomad
Articles: 165

Leave a Reply

Your email address will not be published. Required fields are marked *