UP | HOME
Land of Lisp

Zhao Wei

How can man die better than facing fearful odds, for the ashes of his fathers and the temples of his Gods? -- By Horatius.

Notes about Elixir concurrency

1. A simple ServerProcess (ch06 from Elixir in Action)

1.1. How to understand OTP server

  1. A server process is a beam process that use recurive call (loop) to handle different messages.
    • Instead of directly handle different message in that server process, we provide the server process with a callback module.
  2. The key to understand ServerProcess and callback module is:
    • The state related with our custom module is maintained inside server process, not by callback module(client) itself.
    • Therefore, the callback module is a group of pure functions.
    • For example in KeyValueStore, the state which is a map of key and value is updated not in the KeyValueStore module but in the ServerProcess module, in its recursive loop.
    • callback functions are always invoked in the server process., such as init/0 and handle_call/2.
    • interface functions are run in the client processes.
  1. The generic server process does
    1. use endless recursive call to loop
    2. in loop, maintain state
    3. provide 2 type of interface functions to let callback module handle messages: one for sync, another for async.
  2. The callback module receive and handle’s the specific message.
    1. callback module call ServerProcess.start to register itself
    2. callback module handle messages by calling ServerProcess.call or ServerProcess.cast.
      • So, those messages are sent into server process in which the state could be updated.
      • The ServerProcess.call or ServerProcess.cast will send message to serverpid, so the messages are process in recursive loop.
    3. ServerProcess.call/cast goes into server process’s loop.
      In ServerProcess’s recursive loop:
      • The message is processed by callback module’s :handle_call or :handle_cast which all need to return a new state.
      • For :call, we need to send message back before loop with new state.
        • Because the ServerProcess.call is blocking to receive this message.
      • After ServerProcess.call or ServerProcess.cast return. This goes back to in callback module 2).

2. GenServer (ch06 from Elixir in Action)

2.1. How to use GenServer in our module

  1. Define server callback functions:
    • init/1
    • handle_cast/2
    • handle_call/3
  2. Define client API:
    • Use GenServer.start/2 to start the process.
    • Use GenServer.cast/2, GenServer.call/2 to issue requests.
  3. Some notes
    • The client is any process that invokes the client/interface functions.
    • The server is always the process identify or process name that we explicitly pass as argument to the client API.

2.2. What is the context of state in GenServer

  • During loop in server process, it needs to maintain state. So the state needed to be carried in loop as argument.
  • This also means, the callback module called inside the loop need to accept that state as argument.
  • So, in GenServer, the state is carried as
    • the last argument for server all callbacks
    • the last element in the return tuple

2.3. How to handle plain messages

  • We may need to handle messages that aren’t specific to GenServer.
    • For example, periodically send message by :timer.send_interval(5000, :cleanup).
  • Use GenServer callback handle_info to handle it:

    def handle_info(:cleanup, state) do
      IO.puts "performance cleanup"
      {:noreply, state}
    end
    

2.4. Understand the GenServer-powered processes.

  1. A client process starts the server by calling GenServer.start and providing the callback module. This creates the new server process which is powered by GenServer.
  2. When a message is received, GenServer invokes callback functions to handle it. Therefore, callback functions are always executed in server process.
  3. The client process state is maintained in the GenServer loop but it is defined and manipulated by the callback functions:
    • It starts with init/1 which defines the initial state that is then passed to subsequent handle_* callbacks.
    • Each of those callbacks receive the current state and must return its new version, which is used by the GenServer loop in place of the old one.
  4. A GenServer cheat sheet

3. Building concurrency system (ch07 from Elixir in Action)

3.1. General rules

  • Make server process do things in sequential way.
  • Spawn multiple server processes to handle concurrency.
  • In general,
    • if we need to do synchronization between multiple processes, we let one dedicate process to handle these.
    • make concurrent process run independently.

3.2. How to build a concurrency system (to-do lists example)

  • Goal: a distributed HTTP server than can handle many end users who are simultaneously manipulating many to-do lists.
  • Component
    • A pure functional Todo.List abstraction.
    • A to-do server process that can be used to manage one to-do list for a long time.
    • A cache for managing a map: we create Todo.Server instances or fetch the existing ones.

      • key is the to-do list name
      • value is the to-do server pid.

      This is how we work with multple to-do lists: run one instance of the existing to-do server for each to-do list.

4. Fault-tolerance basics (ch08 from Elixir in Action)

4.1. Rumtime error types

BEAM distinguish 3 types of runtime errors :error, :exit, or :throw.

  1. For :error
    • For example, invalid arithmetic expression, call a nonexistent function, patter-matching error.
    • You can raise your own error by using raise/1 macro. If a function eplicitly raises an error, it should be appended with the ! character to its name, for instance, File.open!.
  2. For :exit
    • Use exit/1 to exit the current process.
  3. For :throw
    • throw(:thrown_value)
    • The purpose of throws is allow nonlocal returns.

4.2. Linking processes (bidirectional)

  1. If two processes are linked, and one of the terminates, the other process receives a exit signal, a notification that a process has crashed.
    • A exit signal contains the pid of the crashed process and the exit reasion.
  2. When a process terminates abnormally, the linked process is also take down.
  3. How to create linked process
    • If a link is created when you start a process, use spawn_link/1.
    • Otherwise, use Process.link/1 which connect the current process with another process.
  4. Usually we don’t want a linked process to crash. We could detect the process crash using trapping exits.

    spawn(fn ->
      Process.flag(:trap_exit, true)
      spawn_link(fn -> raise("something went wrong") end)
    
      Process.sleep(1000)
      receive do
        msg -> IO.inspect(msg)
      end
    end)
    
    • Format of the exit singal message is: {:EXIT, from_pid, exit_reason}.
      • If a process is terminated due to a throw or an error, the exit reason is a tuple in the form {reason, where}.
      • If a process is terminated due to an exit, the reason is a term provided to exit/1.

4.3. Monitors (unidirectional)

  1. Make the current process monitor the target process.

    monitor_ref = Proccess.monitor(target_pid)
    
    • If the monitored process dies, your process receives a message in the format:
      {:DOWN, monitor_ref, :process, from_pid, exit_reasion}.
  2. Stop the monitor

    Process.demonitor(monitor_ref)
    
  3. Exits are propagated through GenServer calls.
    When you issue a synchronous request via GenServer.call, if a server process crashes, an exit signal will occur in your client process.

4.4. Supervisors

  • A supervisor is a generic process whose only responsibility is to receive links and monitor notifications, and do something when a process crashes.
  • Processes that aren’t supervisors are called workers.
  • When invoke Supervisor.start_link(child_specs, options), the following happens:
    1. The new process is started, powered by the Supervisor module.
    2. The supervisor process goes through the list of child specifications and starts each child, one by one.
    3. Each specification is resolved, if needed, by invoking child_spec/1 from the corresponding module.
      • The child_spec/1 function is automatically defined when we use Agent, GenServer and Supervisor, etc.
    4. The supervisor starts the child process, according to the :start filed of the child specification.
  • Start the supervisor as a module

    defmodule Todo.System do
      def start_link do
        Supervisor.start_link(
          [Todo.Cache],
          strategy: :one_for_one
        )
      end
    end
    
    # Todo.System.start_link()
    

    Another callback module way to use supervisor

    defmodule Todo.System do
      use Supervisor
    
      def start_link do
        Supervisor.start_link(__MODULE__, nil)
      end
    
      @impl true 
      def init(_) do
        Supervisor.init([Todo.Cache], strategy: :one_for_one)
      end
    end
    

    Note: A general guideline is to use the supervisor without a callback module only at the top of your supervision tree, generally in the Application.start/2 callback. We recommend using module-based supervisors for any other supervisor in your application, so they can run as a child of another supervisor in the tree.

4.5. Other notes

  • Every time you add a new child to a supervisor, it is important to evaluate if the supervisor strategy is correct as well as the order of child processes.
  • Learned functions

    # Get the pid from registered name
    cache_pid = Process.whereis(Todo.Cache)
    
    # Kill the process
    Process.exit(cache_pid, :kill)
    
    # Verify what the injected implementation of child_spec/1 returns:
    Todo.Cache.child_spec(nil)
    
    # check the number of running processes:
    :erlang.system_info(:process_count)
    

4.6. Current questions

  • Explain start_link, child_spec,
  • Why use ETS?
    • Notice that before our changes KV.Registry.lookup/2 sent requests to the server, but now it reads directly from the ETS table, which is shared across all processes. That’s also the main idea behind the cache mechanism we are implementing.
  • Differences between task, agents, generic servers, and supervisors.
    • Common
    • Different
  • About task
    • Pass a two-element tuple as child specification, which in turn will invoke Task.startlink/1.

5. Nodes – ch16 in Programming Elixir

  • A common pattern: we have a module that is responsible both for spawning a process and for providing the external interface to that process.
  • The general rue is to register your process names when your application starts.
  • About input, output, pid and nodes (p227)
    • In elixir, you identify an open file or device by the pid of its I/O server.
    • The default device used by IO.puts is returned by the function :erlang.group_leader.
    • So, by register the pid returned by group_leader under a global name, such as our node name. We can pass it to IO.puts. The output appears in the registered node terminal window.

6. OTP: Servers – ch17 in Programming Elixir

  1. start_link function asks GenServer to start a new process and link to us.
    • The second argument to start_link is the state which comes from the GenServer’s callback function init.
    • You can think of init as being like the constructor in an OOP language.
      • A constructor takes values and creates the object’s initial state.
      • init takes some initial value and uses it to construct the state of the server.
  2. Tracing a server’s execution
    • In the third parameter to start_link is a set of options

      GenServer.start_link(Sequence.Server, state, [debug: [:trace]])
      
    • We could also include :statistic to the debug list

      {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]])
      
      iex> :sys.statistics pid, :get
      {:ok,
       [
         start_time: {{2017, 12, 23}, {14, 6, 7}},
         current_time: {{2017, 12, 23}, {14, 6, 24}},
         reductions: 36,
         messages_in: 2,
         messages_out: 0
       ]}
      
  3. sys module is our interface to the world of system messages.
    • The list associated with the debug parameter you give to GenServer is simply the names of functions to call in the sys module.
    • We can turn things on and off after you have started a server.

      iex> :sys.trace pid, true
      :ok
      iex> GenServer.call(pid, :next_number) *DBG* <0.69.0> got call next_number from <0.25.0>
      *DBG* <0.69.0> sent 105 to <0.25.0>, new state 106
      105
      iex> :sys.trace pid, false
      :ok
      iex> GenServer.call(pid, :next_number)
      106
      
      • pid is the result of GenServer.start_link.
    • Other useful function
      • :sys.get_status pid.
        • We could customize the formatting of the status message GenServer provides.

          def format_status(_reason, [ _pdict, state ]) do
            [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
          end
          
  4. GenServer Callbacks and corresponding caller. (p238, ch17)

    • init(start_argument), <– GenServer.startlink or GenServer.start (Note: The parameter is the second argument passed to GenServer.startlink or GenServer.start)
    • handle_call(request, from, state), <– GenServer.call(pid, request)
    • handle_cast(request, state), <– GenServer.cast(pid, request)
    • handle_info(info, state), message send by pass GenServer will be routed to this function.

    Responses are common between call and cast:

    {:noreply, new_state [, :hibernate | timeout]}
    {:stop, reason, new_state}
    

    Only handle_call can use:

    {:reply, response, new_state [, :hibernate | timeout]}
    {:stop, reason, reply, new_state}
    
  5. Naming a process
    • Local naming, use the name: option (unique for all OTP processes on our node.)

      iex> { :ok, pid } = GenServer.start_link(Sequence.Server, 100, name: :seq)
      {:ok,#PID<0.58.0>}
      iex> GenServer.call(:seq, :next_number)
      100
      

7. Task and Agents – ch21 in Programming Elixir

7.1. Tasks

  1. Different ways to execute Tasks
    Given the existing module.

    defmodule Fib do
      def of(0), do: 0
      def of(1), do: 1
      def of(n), do: Fib.of(n-1) + Fib.of(n-2)
    end 
    
    1. Execute Tasks from annonymous function

      IO.puts "Start the task"
      worker = Task.async(fn -> Fib.of(20) end)
      IO.puts "Do something else"
      result = Task.await(worker)
      
    2. Execute Tasks directly by specify module, function and arguments

      worker = Task.async(Fib, :of, [20])
      result = Task.await(worker)
      IO.puts "The result is #{result}"
      
  2. How to suppervise Tasks
    Because Tasks are implemented as OTP servers, they can be added to application’s suppervision tree.
    1. Use Task.Supervisor

      # 1. Add the task supervisor to the supervision tree
      Supervisor.start_link([
        {Task.Supervisor, name: MyApp.TaskSupervisor}
      ], strategy: :one_for_one)
      
      # 2. Use async/await by passing the name of the supervisor
      Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
        # Do something
      end)
      |> Task.await()
      
    2. By calling Task.start_link instead of Task.asyn. (For fire-and-forget tasks).
    3. Wrap Task in its own module similar to how we would do with a GenServer. (For fire-and-forget tasks).

      defmodule MyTask do
        use Task
      
        def start_link(arg) do
          Task.start_link(__MODULE__, :run, [arg])
        end
      
        def run(arg) do
          # ...
        end
      end
      
      # Then passing it to the supervisor
      Supervisor.start_link([
        {MyTask, arg}
      ], strategy: :one_for_one)
      

7.2. Agents

  • Agents are a simple abstraction around state.
  • The Agent module provides a basic server implementation that allows state to be retrieved and updated via a simple API.

7.3. Tasks vs Spawn process

What is the point of task if we could spawn and receive message so easily.

# using spawn 
current = self()
child = spawn(fn -> send(current, {self(), 1 + 2}) end)

receive do
  value -> value 
end

# using task
task = Task.async(fn -> {self(), 1 + 2} end)
Task.await(task)
  • There is less in the primary functionality of starting another process, but much more in the surrounding context.
  • Spawned proccess neither implement common OTP functionality nor are they supervisied.

7.4. Task, or Agent, or GenServer

  • Use the simplest approach that works.
  • Wrapping your agents and tasks in modules. Such that, you can always switch from them to the full-blown GenServer without affecting the rest of the code.
  • Some key points of GenServer.
    • A GenServer process is a general purpose server process which maintains some state.
    • When we start a GenServer process, we start it on a different process from the current one.

8. How to save a GenServer’s process

For example, we have multple GenServer process(Sequence.Server) which may crash and we want to save a GenServer process’s state and recover it later from that process. How to do this?

  • The answer is obvious: whenever we need to refer state in OTP, we need to think about GenServer process. So, we just save state into another GenServer process, say Sequence.Stash
  • Two things to be done:
    1. Update the Sequence.Server’s call back terminate/2 to save our state into Sequence.Stash.
    2. Update the Sequence.Server’s interface module such that when it is started, use the state from Sequence.Stash.

9. About Process Naming

9.1. Three types of accepted values when naming a process

  1. an atom
    • This includes module names.
  2. a {:global, term} tuple, like {:global, :jobrunner}
    • This registers the process globally. Useful for distributed applications.
  3. a {:via, module, term} tuple
    • where module is an Elixir module that would take care of the registration process, using the value term.

9.2. The problem of using atoms to name a process.

  • Atoms are note garbage collected by the Erlang VM, and there are soft limits.

9.3. The Registry

  • It allows us to use strings, rather than atoms, which don’t have the same limitation.
  • Each Registry works as a process which we have to include in our application supervision tree.
  • The usage of Registry (see: Registry – Hex docs)
    1. Using in :via, it can be used to register and access named process.
      • In this case, we specify the key as “:unique”.
    2. Using as a dispatcher and PubSub
      • In this case, we specify the keys as “:duplicate”
      • Use Registry.dispatch/3 to invoke callbacks

9.4. How to use Registry

  • Example01

    # First, in application when specify children of supervision tree
    children = [
      {Registry, keys: unique, name: Jobber.JobRegistry}
      ...
    ]
    
    # Then, in GenServer' start_link
    GenServer.start_link(__MODULE__, args, name: {:via, Registry, {Jobber.JobRegistry, key, value}})
    

10. About Supervisor Module

  1. ref: Using Supervisors to Organize Your Elixir Application
  2. How to add worker to a custom supervisor module at runtime
    • Given
      • Currently, our supervision tree looks like this
        • OurNewApp.Supervisor
          • OurNewApp.CounterSup
            • {OurNewApp.Counter, 10000}
            • {OurNewApp.Counter, 20000}
      • WE want to add an exra OurNewApp.Counter under OurNewApp.CounterSup
    • General steps

      new_child_spec = Supervisor.child_spec({OurNewApp.Counter, 30000}, id: 30000)
      Supervisor.start_child(OurNewApp.CounterSup, new_child_spec)
      
    • Notes:
      • Supervisor.startchild/2
      • Supervisor.deletechild/2
      • Supervisor.restartchild/2
  3. How to add a new subtree with its own children (without creating a special module for the subtree supervisor)

    children_specs = [10000, 20000, 30000] |> Enum.map(fn x ->
        Supervisor.child_spec({OurNewApp.Counter, x}, id: x)
      end)
    
    hand_crafted_sup_spec = %{
      id: :hand_crafted_sup,
      start: {Supervisor, :start_link, [children_specs, [strategy: :one_for_one]]},
      type: :supervisor,
      restart: :permanent,
      shutdown: 5000
    }
    
    Supervisor.start_child(OurNewApp.Supervisor, hand_crafted_sup_spec)
    
    • Now, the supervision tree becomes
      • OurNewApp.Supervisor (the root of our supervision tree)
        • OurNewApp.CounterSup
          • {OurNewApp.Counter, 10000}
          • {OurNewApp.Counter, 20000}
        • :handcraftedsup
          • {OurNewApp.Counter, 10000}
          • {OurNewApp.Counter, 20000}
          • {OurNewApp.Counter, 30000}
    • We could stop our application by

      Application.stop(:our_new_app)
      
  4. How to terminate a GenServer process with customization

    1. Modify the state it holds to hold a marker which is used to indicate if the process has been ordered to terminate.
    2. Through interface function, invoke GenServer.call and send message to indicate this process need to be terminated.
    3. During handlecall for that message, set the termination marker.
    4. User timer to send periodic info to self()
    5. During handleinfo, check if the termination mark is present and whether the desired condition is reached.
      • If so, {:stop, :normal, state}
      • Otherwise, update state (and timer)

    Note: during init/1, we also need to set flag

    Process.flag(:trap_exit, true)
    
  5. Beased on we could terminate a GenServer process, how to terminate a group of GenServer worker processes when we stop the application.
    • To do this, we must implement the callbacks prep_stop/1.
    • For example

      @impl true
      def prep_stop(st) do
        stop_tasks =
          Supervisor.which_children(OurNewApp.CounterSup)
          |> Enum.map(fn {_, pid, _, _} ->
            Task.async(fn ->
              :ok = OurNewApp.Counter.stop_gracefully(pid)
            end)
          end)
      
        Task.await_many(stop_tasks)
      
        st
      end
      
  6. Some note
    • Supervisor.child_spec/2
    • Supervisor.start_link
    • Supervisor.init (used in custom supervisor)
    • start_link (usered in custom supervisor and custom GenServer module)

11. About GenStage

11.1. Why and when we use GenStage

  • You only add stages acoording to the runtime needs, typically when we need to provide back-pressure or leverage concurrency.
  • Rule of thumb
    • Always start with plain functions.
    • When you recognize the need for using back-pressure, create a 2-stage data pipeline first.
    • Gradually extend pipeline when spot an opportunity for improvement.

11.2. Concepts to understand

  • What is back-pressure mechanism.

11.3. Different dispatcher

  1. one-to-one, GenStage.DemandDispatcher
  2. one-to-many, GenStage.BroadcastDispatcher
    • Let each consumer how to do the futher processing.
  3. one-to-many, GenStage.PartitionDispatcher
    • Let producer examines each event, and assigns it to a partition (like bucket).

11.4. Understand the events and state in GenStage module