Notes about Elixir concurrency
1. A simple ServerProcess (ch06 from Elixir in Action)
1.1. How to understand OTP server
- A server process is a beam process that use recurive call (loop) to handle different messages.
- Instead of directly handle different message in that server process, we provide the server process with a callback module.
- Instead of directly handle different message in that server process, we provide the server process with a callback module.
- The key to understand ServerProcess and callback module is:
- The state related with our custom module is maintained inside server process, not by callback module(client) itself.
- Therefore, the callback module is a group of
pure functions
. - For example in KeyValueStore, the state which is a map of key and value is updated not in the KeyValueStore module but in the ServerProcess module, in its recursive loop.
- callback functions are always invoked in the server process., such as
init/0
andhandle_call/2
. - interface functions are run in the client processes.
- The state related with our custom module is maintained inside server process, not by callback module(client) itself.
- The generic server process does
- use endless recursive call to loop
- in loop, maintain state
- provide 2 type of interface functions to let callback module handle messages: one for sync, another for async.
- use endless recursive call to loop
- The callback module receive and handle’s the specific message.
- callback module call
ServerProcess.start
to register itself - callback module handle messages by calling
ServerProcess.call
orServerProcess.cast
.
- So, those messages are sent into server process in which the state could be updated.
- The
ServerProcess.call
orServerProcess.cast
will send message to serverpid, so the messages are process in recursiveloop
.
- So, those messages are sent into server process in which the state could be updated.
ServerProcess.call/cast
goes into server process’s loop.
In ServerProcess’s recursive loop:
- The message is processed by callback module’s
:handle_call
or:handle_cast
which all need to return a new state. - For
:call
, we need to send message back before loop with new state.
- Because the
ServerProcess.call
is blocking to receive this message.
- Because the
- After
ServerProcess.call
orServerProcess.cast
return. This goes back to in callback module 2).
- The message is processed by callback module’s
- callback module call
2. GenServer (ch06 from Elixir in Action)
2.1. How to use GenServer in our module
- Define server callback functions:
init/1
handle_cast/2
handle_call/3
- Define client API:
- Use
GenServer.start/2
to start the process. - Use
GenServer.cast/2
,GenServer.call/2
to issue requests.
- Use
- Some notes
- The client is any process that invokes the client/interface functions.
- The server is always the process identify or process name that we explicitly pass as argument to the client API.
- The client is any process that invokes the client/interface functions.
2.2. What is the context of state in GenServer
- During loop in server process, it needs to maintain state. So the state needed to be carried in loop as argument.
- This also means, the callback module called inside the loop need to accept that state as argument.
- So, in GenServer, the state is carried as
- the last argument for server all callbacks
- the last element in the return tuple
- the last argument for server all callbacks
2.3. How to handle plain messages
- We may need to handle messages that aren’t specific to GenServer.
- For example, periodically send message by
:timer.send_interval(5000, :cleanup)
.
- For example, periodically send message by
Use GenServer callback
handle_info
to handle it:
def handle_info(:cleanup, state) do IO.puts "performance cleanup" {:noreply, state} end
2.4. Understand the GenServer-powered processes.
- A client process starts the server by calling
GenServer.start
and providing the callback module. This creates the new server process which is powered byGenServer
. - When a message is received,
GenServer
invokes callback functions to handle it. Therefore, callback functions are always executed in server process. - The client process state is maintained in the GenServer loop but it is defined and manipulated by the callback functions:
- It starts with
init/1
which defines the initial state that is then passed to subsequenthandle_*
callbacks. - Each of those callbacks receive the current state and must return its new version, which is used by the GenServer loop in place of the old one.
- It starts with
- A GenServer cheat sheet
3. Building concurrency system (ch07 from Elixir in Action)
3.1. General rules
- Make server process do things in sequential way.
- Spawn multiple server processes to handle concurrency.
- In general,
- if we need to do synchronization between multiple processes, we let one dedicate process to handle these.
- make concurrent process run independently.
- if we need to do synchronization between multiple processes, we let one dedicate process to handle these.
3.2. How to build a concurrency system (to-do lists example)
- Goal: a distributed HTTP server than can handle many end users who are simultaneously manipulating many to-do lists.
- Component
- A pure functional Todo.List abstraction.
- A to-do server process that can be used to manage one to-do list for a long time.
A cache for managing a map: we create Todo.Server instances or fetch the existing ones.
- key is the to-do list name
- value is the to-do server pid.
This is how we work with multple to-do lists: run one instance of the existing to-do server for each to-do list.
- key is the to-do list name
- A pure functional Todo.List abstraction.
4. Fault-tolerance basics (ch08 from Elixir in Action)
4.1. Rumtime error types
BEAM distinguish 3 types of runtime errors :error
, :exit
, or :throw
.
- For
:error
- For example, invalid arithmetic expression, call a nonexistent function, patter-matching error.
- You can raise your own error by using
raise/1
macro. If a function eplicitly raises an error, it should be appended with the!
character to its name, for instance,File.open!
.
- For example, invalid arithmetic expression, call a nonexistent function, patter-matching error.
- For
:exit
- Use
exit/1
to exit the current process.
- Use
- For
:throw
throw(:thrown_value)
- The purpose of throws is allow nonlocal returns.
4.2. Linking processes (bidirectional)
- If two processes are linked, and one of the terminates, the other process receives a exit signal, a notification that a process has crashed.
- A exit signal contains the pid of the crashed process and the exit reasion.
- A exit signal contains the pid of the crashed process and the exit reasion.
- When a process terminates abnormally, the linked process is also take down.
- How to create linked process
- If a link is created when you start a process, use
spawn_link/1
. - Otherwise, use
Process.link/1
which connect the current process with another process.
- If a link is created when you start a process, use
Usually we don’t want a linked process to crash. We could detect the process crash using trapping exits.
spawn(fn -> Process.flag(:trap_exit, true) spawn_link(fn -> raise("something went wrong") end) Process.sleep(1000) receive do msg -> IO.inspect(msg) end end)
- Format of the exit singal message is:
{:EXIT, from_pid, exit_reason}
.
- If a process is terminated due to a throw or an error, the exit reason is a tuple in the form
{reason, where}
. - If a process is terminated due to an exit, the reason is a term provided to
exit/1
.
- If a process is terminated due to a throw or an error, the exit reason is a tuple in the form
- Format of the exit singal message is:
4.3. Monitors (unidirectional)
Make the current process monitor the target process.
monitor_ref = Proccess.monitor(target_pid)
- If the monitored process dies, your process receives a message in the format:
{:DOWN, monitor_ref, :process, from_pid, exit_reasion}
.
- If the monitored process dies, your process receives a message in the format:
Stop the monitor
Process.demonitor(monitor_ref)
- Exits are propagated through GenServer calls.
When you issue a synchronous request viaGenServer.call
, if a server process crashes, an exit signal will occur in your client process.
4.4. Supervisors
- A supervisor is a generic process whose only responsibility is to receive links and monitor notifications, and do something when a process crashes.
- Processes that aren’t supervisors are called workers.
- When invoke
Supervisor.start_link(child_specs, options)
, the following happens:
- The new process is started, powered by the
Supervisor
module. - The supervisor process goes through the list of child specifications and starts each child, one by one.
- Each specification is resolved, if needed, by invoking
child_spec/1
from the corresponding module.
- The
child_spec/1
function is automatically defined when we use Agent, GenServer and Supervisor, etc.
- The
- The supervisor starts the child process, according to the
:start
filed of the child specification.
- The new process is started, powered by the
Start the supervisor as a module
defmodule Todo.System do def start_link do Supervisor.start_link( [Todo.Cache], strategy: :one_for_one ) end end # Todo.System.start_link()
Another callback module way to use supervisor
defmodule Todo.System do use Supervisor def start_link do Supervisor.start_link(__MODULE__, nil) end @impl true def init(_) do Supervisor.init([Todo.Cache], strategy: :one_for_one) end end
Note
: A general guideline is to use the supervisor without a callback module only at the top of your supervision tree, generally in the Application.start/2 callback. We recommend using module-based supervisors for any other supervisor in your application, so they can run as a child of another supervisor in the tree.
4.5. Other notes
- Every time you add a new child to a supervisor, it is important to evaluate if the supervisor strategy is correct as well as the order of child processes.
Learned functions
# Get the pid from registered name cache_pid = Process.whereis(Todo.Cache) # Kill the process Process.exit(cache_pid, :kill) # Verify what the injected implementation of child_spec/1 returns: Todo.Cache.child_spec(nil) # check the number of running processes: :erlang.system_info(:process_count)
4.6. Current questions
- Explain
start_link
,child_spec
, - Why use ETS?
- Notice that before our changes KV.Registry.lookup/2 sent requests to the server, but now it reads directly from the ETS table, which is shared across all processes. That’s also the main idea behind the cache mechanism we are implementing.
- Notice that before our changes KV.Registry.lookup/2 sent requests to the server, but now it reads directly from the ETS table, which is shared across all processes. That’s also the main idea behind the cache mechanism we are implementing.
- Differences between task, agents, generic servers, and supervisors.
- Common
- Different
- Common
- About task
- Pass a two-element tuple as child specification, which in turn will invoke Task.startlink/1.
- Pass a two-element tuple as child specification, which in turn will invoke Task.startlink/1.
5. Nodes – ch16 in Programming Elixir
- A common pattern: we have a module that is responsible both for spawning a process and for providing the external interface to that process.
- The general rue is to register your process names when your application starts.
- About input, output, pid and nodes (p227)
- In elixir, you identify an open file or device by the pid of its I/O server.
- The default device used by IO.puts is returned by the function
:erlang.group_leader
. - So, by register the pid returned by
group_leader
under a global name, such as our node name. We can pass it to IO.puts. The output appears in the registered node terminal window.
- In elixir, you identify an open file or device by the pid of its I/O server.
6. OTP: Servers – ch17 in Programming Elixir
start_link
function asks GenServer to start a new process and link to us.
- The second argument to
start_link
is the state which comes from the GenServer’s callback functioninit
. - You can think of
init
as being like the constructor in an OOP language.
- A constructor takes values and creates the object’s initial state.
init
takes some initial value and uses it to construct the state of the server.
- A constructor takes values and creates the object’s initial state.
- The second argument to
- Tracing a server’s execution
In the third parameter to
start_link
is a set of options
GenServer.start_link(Sequence.Server, state, [debug: [:trace]])
We could also include
:statistic
to the debug list
{:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]]) iex> :sys.statistics pid, :get {:ok, [ start_time: {{2017, 12, 23}, {14, 6, 7}}, current_time: {{2017, 12, 23}, {14, 6, 24}}, reductions: 36, messages_in: 2, messages_out: 0 ]}
sys
module is our interface to the world of system messages.
- The list associated with the debug parameter you give to GenServer is simply the names of functions to call in the sys module.
We can turn things on and off after you have started a server.
iex> :sys.trace pid, true :ok iex> GenServer.call(pid, :next_number) *DBG* <0.69.0> got call next_number from <0.25.0> *DBG* <0.69.0> sent 105 to <0.25.0>, new state 106 105 iex> :sys.trace pid, false :ok iex> GenServer.call(pid, :next_number) 106
pid
is the result ofGenServer.start_link
.
- Other useful function
:sys.get_status pid
.
We could customize the formatting of the status message GenServer provides.
def format_status(_reason, [ _pdict, state ]) do [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]] end
- The list associated with the debug parameter you give to GenServer is simply the names of functions to call in the sys module.
GenServer Callbacks and corresponding caller. (p238, ch17)
init(start_argument)
, <– GenServer.startlink or GenServer.start (Note: The parameter is the second argument passed to GenServer.startlink or GenServer.start)handle_call(request, from, state)
, <– GenServer.call(pid, request)handle_cast(request, state)
, <– GenServer.cast(pid, request)handle_info(info, state)
, message send by pass GenServer will be routed to this function.
Responses are common between call and cast:
{:noreply, new_state [, :hibernate | timeout]} {:stop, reason, new_state}
Only
handle_call
can use:
{:reply, response, new_state [, :hibernate | timeout]} {:stop, reason, reply, new_state}
- Naming a process
Local naming, use the
name:
option (unique for all OTP processes on our node.)
iex> { :ok, pid } = GenServer.start_link(Sequence.Server, 100, name: :seq) {:ok,#PID<0.58.0>} iex> GenServer.call(:seq, :next_number) 100
7. Task and Agents – ch21 in Programming Elixir
7.1. Tasks
Different ways to execute Tasks
Given the existing module.
defmodule Fib do def of(0), do: 0 def of(1), do: 1 def of(n), do: Fib.of(n-1) + Fib.of(n-2) end
Execute Tasks from annonymous function
IO.puts "Start the task" worker = Task.async(fn -> Fib.of(20) end) IO.puts "Do something else" result = Task.await(worker)
Execute Tasks directly by specify module, function and arguments
worker = Task.async(Fib, :of, [20]) result = Task.await(worker) IO.puts "The result is #{result}"
- How to suppervise Tasks
Because Tasks are implemented as OTP servers, they can be added to application’s suppervision tree.
Use Task.Supervisor
# 1. Add the task supervisor to the supervision tree Supervisor.start_link([ {Task.Supervisor, name: MyApp.TaskSupervisor} ], strategy: :one_for_one) # 2. Use async/await by passing the name of the supervisor Task.Supervisor.async(MyApp.TaskSupervisor, fn -> # Do something end) |> Task.await()
- By calling
Task.start_link
instead ofTask.asyn
. (For fire-and-forget tasks). Wrap Task in its own module similar to how we would do with a GenServer. (For fire-and-forget tasks).
defmodule MyTask do use Task def start_link(arg) do Task.start_link(__MODULE__, :run, [arg]) end def run(arg) do # ... end end # Then passing it to the supervisor Supervisor.start_link([ {MyTask, arg} ], strategy: :one_for_one)
7.2. Agents
- Agents are a simple abstraction around state.
- The Agent module provides a basic server implementation that allows state to be retrieved and updated via a simple API.
7.3. Tasks vs Spawn process
What is the point of task if we could spawn and receive message so easily.
# using spawn current = self() child = spawn(fn -> send(current, {self(), 1 + 2}) end) receive do value -> value end # using task task = Task.async(fn -> {self(), 1 + 2} end) Task.await(task)
- There is less in the primary functionality of starting another process, but much more in the surrounding context.
- Spawned proccess neither implement common OTP functionality nor are they supervisied.
7.4. Task, or Agent, or GenServer
- Use the simplest approach that works.
- Wrapping your agents and tasks in modules. Such that, you can always switch from them to the full-blown GenServer without affecting the rest of the code.
- Some key points of GenServer.
- A GenServer process is a general purpose server process which maintains some state.
- When we start a GenServer process, we start it on a different process from the current one.
- A GenServer process is a general purpose server process which maintains some state.
8. How to save a GenServer’s process
For example, we have multple GenServer process(Sequence.Server) which may crash and we want to save a GenServer process’s state and recover it later from that process. How to do this?
- The answer is obvious: whenever we need to refer state in OTP, we need to think about GenServer process. So, we just save state into another GenServer process, say Sequence.Stash
- Two things to be done:
- Update the Sequence.Server’s call back
terminate/2
to save our state into Sequence.Stash. - Update the Sequence.Server’s interface module such that when it is started, use the state from Sequence.Stash.
- Update the Sequence.Server’s call back
9. About Process Naming
9.1. Three types of accepted values when naming a process
- an atom
- This includes module names.
- This includes module names.
- a {:global, term} tuple, like {:global, :jobrunner}
- This registers the process globally. Useful for distributed applications.
- This registers the process globally. Useful for distributed applications.
- a {:via, module, term} tuple
- where module is an Elixir module that would take care of the registration process, using the value term.
- where module is an Elixir module that would take care of the registration process, using the value term.
9.2. The problem of using atoms to name a process.
- Atoms are note garbage collected by the Erlang VM, and there are soft limits.
9.3. The Registry
- It allows us to use strings, rather than atoms, which don’t have the same limitation.
- Each Registry works as a process which we have to include in our application supervision tree.
- The usage of Registry (see: Registry – Hex docs)
- Using in
:via
, it can be used to register and access named process.
- In this case, we specify the key as “:unique”.
- In this case, we specify the key as “:unique”.
- Using as a dispatcher and PubSub
- In this case, we specify the keys as “:duplicate”
- Use Registry.dispatch/3 to invoke callbacks
- In this case, we specify the keys as “:duplicate”
- Using in
9.4. How to use Registry
Example01
# First, in application when specify children of supervision tree children = [ {Registry, keys: unique, name: Jobber.JobRegistry} ... ] # Then, in GenServer' start_link GenServer.start_link(__MODULE__, args, name: {:via, Registry, {Jobber.JobRegistry, key, value}})
10. About Supervisor Module
- ref: Using Supervisors to Organize Your Elixir Application
- How to add worker to a custom supervisor module at runtime
- Given
- Currently, our supervision tree looks like this
- OurNewApp.Supervisor
- OurNewApp.CounterSup
- {OurNewApp.Counter, 10000}
- {OurNewApp.Counter, 20000}
- {OurNewApp.Counter, 10000}
- OurNewApp.CounterSup
- OurNewApp.Supervisor
- WE want to add an exra OurNewApp.Counter under OurNewApp.CounterSup
- Currently, our supervision tree looks like this
General steps
new_child_spec = Supervisor.child_spec({OurNewApp.Counter, 30000}, id: 30000) Supervisor.start_child(OurNewApp.CounterSup, new_child_spec)
- Notes:
- Supervisor.startchild/2
- Supervisor.deletechild/2
- Supervisor.restartchild/2
- Supervisor.startchild/2
- Given
How to add a new subtree with its own children (without creating a special module for the subtree supervisor)
children_specs = [10000, 20000, 30000] |> Enum.map(fn x -> Supervisor.child_spec({OurNewApp.Counter, x}, id: x) end) hand_crafted_sup_spec = %{ id: :hand_crafted_sup, start: {Supervisor, :start_link, [children_specs, [strategy: :one_for_one]]}, type: :supervisor, restart: :permanent, shutdown: 5000 } Supervisor.start_child(OurNewApp.Supervisor, hand_crafted_sup_spec)
- Now, the supervision tree becomes
- OurNewApp.Supervisor (the root of our supervision tree)
- OurNewApp.CounterSup
- {OurNewApp.Counter, 10000}
- {OurNewApp.Counter, 20000}
- {OurNewApp.Counter, 10000}
- :handcraftedsup
- {OurNewApp.Counter, 10000}
- {OurNewApp.Counter, 20000}
- {OurNewApp.Counter, 30000}
- {OurNewApp.Counter, 10000}
- OurNewApp.CounterSup
- OurNewApp.Supervisor (the root of our supervision tree)
We could stop our application by
Application.stop(:our_new_app)
- Now, the supervision tree becomes
How to terminate a GenServer process with customization
- Modify the state it holds to hold a marker which is used to indicate if the process has been ordered to terminate.
- Through interface function, invoke GenServer.call and send message to indicate this process need to be terminated.
- During handlecall for that message, set the termination marker.
- User timer to send periodic info to self()
- During handleinfo, check if the termination mark is present and whether the desired condition is reached.
- If so, {:stop, :normal, state}
- Otherwise, update state (and timer)
- If so, {:stop, :normal, state}
Note: during init/1, we also need to set flag
Process.flag(:trap_exit, true)
- Modify the state it holds to hold a marker which is used to indicate if the process has been ordered to terminate.
- Beased on we could terminate a GenServer process, how to terminate a group of GenServer worker processes when we stop the application.
- To do this, we must implement the callbacks
prep_stop/1
. For example
@impl true def prep_stop(st) do stop_tasks = Supervisor.which_children(OurNewApp.CounterSup) |> Enum.map(fn {_, pid, _, _} -> Task.async(fn -> :ok = OurNewApp.Counter.stop_gracefully(pid) end) end) Task.await_many(stop_tasks) st end
- To do this, we must implement the callbacks
- Some note
Supervisor.child_spec/2
Supervisor.start_link
Supervisor.init
(used in custom supervisor)start_link
(usered in custom supervisor and custom GenServer module)
11. About GenStage
11.1. Why and when we use GenStage
- You only add stages acoording to the runtime needs, typically when we need to provide back-pressure or leverage concurrency.
- Rule of thumb
- Always start with plain functions.
- When you recognize the need for using back-pressure, create a 2-stage data pipeline first.
- Gradually extend pipeline when spot an opportunity for improvement.
- Always start with plain functions.
11.2. Concepts to understand
- What is back-pressure mechanism.
11.3. Different dispatcher
- one-to-one, GenStage.DemandDispatcher
- one-to-many, GenStage.BroadcastDispatcher
- Let each consumer how to do the futher processing.
- Let each consumer how to do the futher processing.
- one-to-many, GenStage.PartitionDispatcher
- Let producer examines each event, and assigns it to a partition (like bucket).
- Let producer examines each event, and assigns it to a partition (like bucket).