1BRC - Erlang
15 minuter i lästid 1BRC Erlang

1BRC - Erlang

Förra artikeln i 1BRC serien handlade om JavaScript eller mer precist Node.js, dvs JS körande på en server eller laptop, liknande som t.ex. Python. Artikeln inledde med en kort historik, där upphovsmannen Ryan Dahl hade hämtat inspiration från andra programspråk. Ett av dessa var Erlang vilket utgör ämnet för denna artikel, där vi ska implementera en lösning av 1BRC på klassiskt Erlang-vis. Häng med!

Om du är ny här, så länkar jag till tidigare artiklar i denna serie, sist i denna artikel. I korthet går programmet jag ämnar skriva, ut på att läsa en textfil med namn på väderstationer och tillhörande temperatur separerade av semikolon och ett sådant par per rad. Temperaturen för respektive väderstation ska aggregeras till medelvärde samt lägsta och högsta temperatur. Resultatet ska skrivas ut sorterat på stationsnamnen i alfabetisk ordning.

Erlang

Språket Erlang utvecklades av Joe Armstrong med flera på Datalogi-labbet vid Ellemtel i Älvsjö, som var ett forskningsinstitut samägt av Ericsson och Televerket (Telia).

Namnet myntades av deras chef Bjarne Däcker som en ganska fiffig ordlek. Dels, var det en hyllning till den danske matematikern Agner Krarup Erlang, som givit upphov till enheten erlang inom trafik-teori. Dels, var det syllabisk förkortning av Ericsson Language.

Det finns en kolossalt intressant och underhållande historia kring språket Erlang, vilket får bli ämnet för en annan artikel, eftersom denna artikel har ett helt annat fokus.

Syntaxen för Erlang skiljer sig ganska radikalt från konventionella programspråk, som C++, Java, Python, Pascal med flera. Inspirationen utgörs av språket Prolog. Hur det kommer sig, får jag skriva om i en annan artikel.

Hello World

Så här ser ett hello world program ut i Erlang.

%% file: hello.erl
-module(hello).
-export([greeting/0]).

greeting() ->
  io:format("Hello from a tiny Erlang program\n").

Direktiv inleds med '-' och här ovan ser vi module och export. Filnamnet är hello.erl och måste stämma överens med modul-namnet. Detta påminner om Java, där filnamn och klassnamn måste stämma överens. Båda språken har ett begrepp som code-path (Erlang) och class-path (Java), där kompilerade filer finns. Det är också detta som är skälet till att filnamn och modul-/klass-namn måste stämma överens, så att respektive VM kan hitta den kompilerade filen i filsystemet.

Det finns 1 funktion i programmet (greeting). Den tar 0 argument och gör en utskrift till terminalen. Funktioner som ska kunna anropas utanför en Erlang modul, måste exporteras. Liknande som publika medlemsfunktioner i en Java klass, eller för den delen module.exports i klassisk Node.js.

Det vanligaste är att man startar Erlang i REPL mode och kompilerar och exekverar där. Men låt oss strunta i det just nu och stanna kvar på kommandoraden. Så här kompilerar man programmet. Resultatet sparas i katalogen ebin (Erlang BINary) och alla varningar är aktiverade.

$ mkdir ebin
$ erlc -o ebin -Wall src/hello.erl
$ ls ebin
hello.beam

Du som kompilerat C/C++ program, känner igen flaggorna -o dir och -Wall. Det finns även flaggor som -Iinclude-dir och -Dmacro=value. Precis som för Java, så skapas en fil med byte-code. I detta fall hello.beam. Hade det varit ett Java-program skulle filnamnet blivit Hello.class i stället. Så här kör vi programmet på kommando-raden. Som synes, är det lite omständligt, men det funkar.

$ erl -pa ebin -noshell -run hello greeting -run erlang halt
Hello from a tiny Erlang program

Fibonacci

Låt oss kika på ytterligare ett program. Denna gång, ett som räknar ut och tabellerar Fibonacci-tal. Erlang är ett funktionellt språk, vilket innebär bland annat att en variabel bara kan ha ett värde. Detta kallas för single-assignment property. Detta låter förvisso som ganska begränsande. Men tricket är att detta gäller för ett funktionsanrop. Så anropar man samma funktion igen, så har man en ny kontext och därmed en ny möjlighet att knyta ett värde till en variabel. En konsekvens av detta är att Erlang saknar loop-satser, såsom while och for, eftersom det inte går att uppdatera en loop-variabel.

Loopar och iterationer hanteras uteslutande via rekursion. Detta kan ju också låta lite ineffektivt. Men hemligheten är att Erlang VM optimerar bort rekursionen. Detta kallas för svans-rekursions-optimering (tail recursion optimization). Om man anstränger sig, så går det förvisso skriva program som inte kan optimeras på detta sätt, vilket ju är att be om problem. Här kommer programmet.

% file: fibonacci.erl
-module(fibonacci).
-export([compute/0, compute/1]).

compute() -> compute(10).

compute(N) when N == 0 -> 0;
compute(N) when N == 1 -> 1;
compute(N) when is_integer(N), N > 1 ->
  compute(N - 2) + compute(N - 1).

Precis som med flera andra programspråk så kan man överlagra funktioner. Eftersom språket är o-typat, så är det bara med avseende på antalet argument, liknande som i JavaScript. Här ovan ser vi att compute är överlagrad, dels en med 0 argument och dels en med 1 argument.

Vad som dock är det mest särskiljande här, givet att du inte sett kod i Erlang eller Prolog förut, är att compute/1 har tre ingångar, separerade med semi-kolon (;), samt att den avslutas med punkt (.). Varje funktionsingång har också ett ingångsvillkor. Språktekniskt sett, så utgör de tre alternativen en disjunktion (OR) och ingångsvillkoren i den sista en konjunktion (AND). Den här gången kan vi hoppa in i REPL. Först kompilerar vi, sen kör vi funktionen två gånger.

$ erl -pa ebin
Erlang/OTP 25 [erts-13.1.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [jit:ns]
Eshell V13.1.5  (abort with ^G)
1> c(fibonacci).
{ok,fibonacci}
2> fibonacci:compute().
55
3> fibonacci:compute(15).
610

Loopar och variabler

Hur skriver man då en enkel for-loop, när det bara finns rekursion? Jo, så här. Koden nedan utgörs av funktionen print, som skriver ut alla Fibonacci-tal från 1 och uppåt. Här kan vi se att print är överlagrad i inte mindre än i tre versioner, varav de första två är exporterade. En funktion måste returnera något och när det inte finns något vettigt att returnera petar man dit ett dummy-värde. Oftast är detta ok, men det kvittar vad.

-module(table).
-export([print/0, print/1]).

print()  -> print(10).
print(N) -> print(1, N).

print(I, N) when I > N -> thats_all_folks;
print(I, N) when I =< N ->
  io:format("fib(~p) = ~p\n", [I, fibonacci:compute(I)]),
  print(I + 1, N).

Erlang betraktar identifierare med inledande versal bokstav som en variabel och med inledande gemen bokstav som en systemkonstant, vilket kallas för atom. Så I och N är parameter-variabler medan thats_all_folks är en atom. Här ser man också att det går bra att anropa en exporterad funktion från en annan modul, givet att man skriver module:function. Så här anropar vi funktionen.

4> c(table).   
{ok,table}
5> table:print(12).
fib(1) = 1
fib(2) = 1
fib(3) = 2
fib(4) = 3
fib(5) = 5
fib(6) = 8
fib(7) = 13
fib(8) = 21
fib(9) = 34
fib(10) = 55
fib(11) = 89
fib(12) = 144
thats_all_folks
6>

Processer och meddelanden

Jag har ytterligare en sak till att diskutera innan, det är dags att gå igenom huvudprogrammet. Erlang saknar motsvarighet till globala variabler, eftersom vi har single-assignment property, plus att funktioner alltid är state-less. Dvs, de returnerar något värde och för samma indata så är det alltid samma utdata. Så, hur i all världen kan man då komma ihåg något? Eller mer precist, hur hanterar man program state?

Svaret är samtidigt exekverande objekt. Erlang-purister sätter nu i halsen när jag nämner Erlang och objekt i samma mening. Det officiella namnet är processer, vilket leder tankarna lika fel. En mer precis term är actor. Emellertid, är det inte så många som är bekanta med the Actor model, och man så att säga skjuter bredvid. Joe Armstrong och gruppen kring honom var mycket inspirerade av just actor objects. Jag länkar längst ned till en artikel på Wikipedia om detta, om du vill fördjupa dig vidare.

Kort sammanfattat, så är en actor ett objekt med egen exekverings-mekanism. Dvs, ett stort antal actors kan exekvera samtidigt. Dessa kommunicerar via meddelande-sändning. En actor kan ta emot ett meddelande i taget och utför hela sin uppgift (run to completion), tills den är redo att behandla nästa meddelande eller vänta på att det kommer ett.

Så här fungerar det också i Erlang. Man skapar en Erlang process (dvs actor object) med funktionen spawn. Actor state hanteras med hjälp av parameter-variabler till dess huvudfunktion, samt att funktionen är rekursiv. Så här kan en liten Erlang process se ut. Den räknar till tio och terminerar sen.

-module(silly).
-export([start/0, silly_run/2]).

start() -> {ok, spawn(silly, silly_run, [1, 10])}.

silly_run(I, N) when I > N -> okidoki;
silly_run(I, N) when I < N ->
    io:format("~p\n", I),
    silly_run(I + 1, N).

Att skicka ett meddelande till en annan process görs på följande vis

Pid ! {msg, "Tjabba Habba"}

Till vänster om utropstecknet finns ett process id (Pid) och till höger ett godtyckligt Erlang uttryck. Att ta emot ett meddelande görs med receive uttrycket. Som synes nedan, kan man vänta på olika typer av meddelande, samt även sätta upp en maxtid att vänta på att det dyker upp ett meddelande.

Message = receive
  {msg, Text} -> 
    io:format("[consumer] ~p\n", [Text]), Text
    ;
  {num, Number} when is_integer(Number), Number > 0 -> 
    io:format("[consumer] ~p\n", [Number]), Number
    ;
  stop -> done
    ;
  Whatever ->
    io:format("[consumer] unexpected message ~p\n", [Whatever]), error
    ;
  after 60 * 1000 -> timeout
end

Här följer ett komplett mini-program med en producer och en consumer.

-module(pipe).
-export([start/1, producer_run/3, consumer_run/0]).

start(N) when is_integer(N), N > 1 ->
  Consumer = spawn(pipe, consumer_run, []),
  Producer = spawn(pipe, producer_run, [1, N, Consumer]),
  {pipe, Consumer, Producer}.

consumer_run() ->
  receive
    Msg when Msg > 0 ->
      io:format("[cons] ~p\n", [Msg]),
      consumer_run();
    done -> ok
  end.

producer_run(K, N, _) when K > N -> done;
producer_run(K, N, C) when K =< N ->
  C ! K,
  producer_run(K + 1, N, C).
11> c(pipe, {outdir,"ebin"}).
{ok,pipe}
12> pipe:start(10).          
[cons] 1
{pipe,<0.121.0>,<0.122.0>}
[cons] 2
[cons] 3
[cons] 4
[cons] 5
[cons] 6
[cons] 7
[cons] 8
[cons] 9
[cons] 10
13>

1BRC i Erlang

Då var vi äntligen framme vid själva programmet, efter denna extremt korta intro till språket Erlang. Nu har du som inte sett Erlang tidigare, en bättre möjlighet att följa med i huvudprogrammet.

Vi vet, sedan tidigare, att programmet består av tre delar; nämligen (1) läs radvis, (2) aggregera samt (3) sortera och skriv ut. I Erlang blir detta enkelt att representera med tre olika processer (actors).

Reader

Den första läser radvis och skickar till nästa. Här visar jag programkoden för just denna del. Varje process består av dels en funktion som startar denna, samt en funktion som utgör själva process-loopen och innehåller dess interna tillstånd (process state).

reader_start(Filename, Next) ->
  File = case file:open(Filename, [read, {encoding, utf8}]) of
           {ok, F} -> F;
           {error, Reason} ->
             io:format("cannot open ~s: ~p", [Filename, Reason]),
             exit(kill)
         end,
  {reader, spawn(calc, reader_loop, [File, Next])}.

reader_loop(File, Next) ->
  case io:get_line(File, '') of
    eof ->
      Next ! eof,
      file:close(File)
      ;
    Line ->
      [Station, Txt] = string:split(Line, ";"),
      {Temperature, _} = string:to_float(Txt),
      Next ! {measurement, Station, Temperature},
      reader_loop(File, Next)
  end.

Start-funktionen öppnar data-filen och startar processen. Loop-funktionen läser radvis, splittrar varje rad och skickar vidare till nästa. Vid fil-slut skickar den atomen eof till nästa process och terminerar sedan.

Aggregator

Nästa process är den som aggregerar all temperatur-data. I samtliga våra tidigare lösningar har vi använt en hash-tabell, vilket också blir fallet här. I Erlang finns det en (relativt ny) dylik som heter maps. Start-funktionen skapar en ny tom tabell och skickar till loop-funktionen i samband med att processen startar. Loop-funktionen tar emot stations-namn och temperatur och aggregerar detta tills den får meddelandet eof.

aggregator_start(Next) ->
  {aggregator, spawn(calc, aggregator_loop, [maps:new(), Next])}.

aggregator_loop(Data, Next) ->
  receive
    eof ->
      Next ! {data, maps:to_list(Data)}
    ;
    {measurement, Station, Temperature} ->
      NewData = case maps:get(Station, Data, none) of
                  none ->
                    {1, Temperature, Temperature, Temperature}
                  ;
                  {Count, Sum, Min, Max} ->
                    {Count + 1, Sum + Temperature, min(Temperature, Min), max(Temperature, Max)}
                end,
      aggregator_loop(maps:put(Station, NewData, Data), Next)
  end.

För varje mottaget namn/temperatur par, görs en uppslagning i tabellen. Är det första gången skapas ett nytt objekt. Här använder vi en tuple, som är en behållare med flera data-värden. Detta har liknande innebörd som tuple i Python.

Om det fanns data sedan tidigare, så skapar vi en ny tuple. Notera här att vi har pattern matching och destructuring och det skapas fyra variabler, vilka sen används för att skapa en ny tuple. Innan loop-funktionen anropas på nytt, så uppdateras tabellen med det nya datat. Vi har förvisso också pattern matching i receive.

När meddelandet om fil-slut kommer, skickas alla namn/temperatur par till den sista processen i kedjan. Tabellen görs om till en lista av par. Detta liknar funktioner som Map.entries() i Java, och Dict.items() i Python.

Writer

Sista processen i kedjan väntar på exakt ett meddelande, vilket då utgörs av en lista med namn/temperatur par. När listan anländer, sorteras den först på stations-namn och sen skrivs det hela ut med funktionen lists:foreach. Denna funktion tar dels ett lambda-uttryck och dels en lista att traversera och applicera lambdan på varje element.

writer_start() ->
  {writer, spawn(calc, writer_loop, [])}.
  
writer_loop() ->
  receive
    {data, ListOfStationAndData} ->
      Sorted = lists:keysort(1, ListOfStationAndData),
      Print  = fun ({Station, {Count, Sum, Min, Max}}) ->
                 io:format("~ts: ~.2f C, ~.1f/~.1f (~p)~n",
                          [Station, Sum / Count, Min, Max, Count])
                end,
      lists:foreach(Print, Sorted),
      calc_avg ! {done, self()}
  end.

Ett lambda-uttryck i Erlang skrivs som

fun (PARAMS) -> EXPRESSION end

Parametrarna är Erlang variabler och det går att göra destructuring på dessa, vilket är fallet i writer. Notera att man kan göra på nästan samma sätt i modern JavaScript.

fun ({Station, {Count, Sum, Min, Max}}) -> EXPRESSION end

Det sista processen writer gör är att skicka ett klar-meddelande till shell:et, där funktionen som startade pipelinen (launch) väntar på detta, så att den sen kan skriv ut förfluten tid.

Launch

launch(Filename) ->
  {_, _} = erlang:statistics(wall_clock),
  case whereis(calc_avg) of
    undefined -> register(calc_avg, self()); %%% (3)
    Pid when is_pid(Pid) -> ok
  end,
  %%% (1)
  {writer, Writer} = writer_start(),
  {aggregator, Aggregator} = aggregator_start(Writer),
  {reader, Reader} = reader_start(Filename, Aggregator),
  io:format("pids: ~p, ~p, ~p\nfilename: ~s\n----\n", [Reader, Aggregator, Writer, Filename]),
  receive
    {done, Writer} -> %%% (2)
      unregister(calc_avg),
      {_, Diff} = erlang:statistics(wall_clock), %%% (4)
      io:format("----\nelapsed ~.2f seconds, ~s\n", [Diff / 1000, Filename])
  end.

Den här funktionen gör fyra saker.

  1. Startar de tre processerna och skriver ut deras process id (pid)
  2. Väntar på klar-meddelandet
  3. Registrerar ett globalt namn för shell-processen, som writer använder för att skicka klar-markeringen
  4. Mäter förfluten tid i millisekunder, mellan två anrop till erlang:statistics(wall_clock)

Start

Start-funktionen hanterar vilken data-fil som ska användas och utgör den sista delen av programkoden.

start()     -> launch(filename(data_tiny)).
start(Size) -> launch(filename(Size)).

filename(Size) ->
  Suffix = case Size of
             data_tiny -> "weather-data-tiny.csv";
             data_100k -> "weather-data-100K.csv";
             data_1m   -> "weather-data-1M.csv";
             data_10m  -> "weather-data-10M.csv";
             data_100m -> "weather-data-100M.csv";
             data_1b   -> "weather-data-1B.csv"
           end,
  string:concat("../../data/", Suffix).

Exekvering

Kör vi den minsta filen först, kan det se ut så här

19> c(calc).
{ok,calc}
20> calc:start().
pids: <0.159.0>, <0.157.0>, <0.156.0>
filename: ../../data/weather-data-tiny.csv
----
Cairo: 28.30 C, 28.3/28.3 (1)
Changsha: -0.10 C, -0.1/-0.1 (1)
Douala: 25.70 C, 25.7/25.7 (1)
Hong Kong: 28.40 C, 28.4/28.4 (1)
Kampala: 5.30 C, 5.3/5.3 (1)
Knutby: -6.63 C, -24.2/8.5 (3)
Madrid: 18.60 C, 18.6/18.6 (1)
Stockholm: 30.40 C, 30.4/30.4 (1)
Sundsvall: 18.10 C, 18.1/18.1 (1)
Uppsala: 5.10 C, -5.0/14.0 (4)
Zürich: 10.30 C, 10.3/10.3 (1)
İzmir: 9.90 C, 9.9/9.9 (1)
----
elapsed 0.03 seconds, ../../data/weather-data-tiny.csv
ok
21>

Så här kör vi programmet med olika indata-filer.

 c("src/calc.erl", {outdir,"ebin"}).
{ok,calc}
7> calc:start(data_100k).
pids: <0.108.0>, <0.106.0>, <0.105.0>
filename: ../../data/weather-data-100K.csv
----
. . .
Zürich: 11.30 C, -13.2/38.0 (207)
Ürümqi: 7.03 C, -23.3/36.9 (249)
İzmir: 18.91 C, -6.6/45.0 (234)
----
elapsed 0.66 seconds, ../../data/weather-data-100K.csv
ok

Tabellen nedan summerar exekveringstiderna för körningar av programmet med olika filstorlekar.

# Rows Seconds
1 100.000 0.66
2 1.000.000 6.19
3 10.000.000 82.00
4 100.000.000 813.33

Länksamling