Image by John Muellerleile
This is a guide on basic mapreduce usage in Erlang. The information written here is mostly a summary of the Riak documentation that I compiled to make it easier to understand, together with some other tidbits of information that I find useful.
The mapreduce function
The function used to invoke mapreduce in Riak looks like this:
riakc_pb_socket:mapred(Pid, Keys, Phases)
Pid - The process id of your Riak connection.
Keys - A list of bucket-key pairs where each element is a tuple in the format {Bucket, Key}. Bucket and Key should be in binary format.
Example:
[
{<<"bucket1">>, <<"key1">>},
{<<"bucket1">>, <<"key2">>}
]
Phases - A list of mapreduce phases where each element is a tuple in the format {Type, Function, Static, Return}.
Type should be either 'map' or 'reduce', depending on which type the phase is.
Function is a tuple in one of two formats:
{qfun, Fun} where Fun is an anonymous function (ie. Fun = fun() -> do_stuff end.)
{modfun, Module, Function} where function and module are atoms specifying a function in a module
Static is a static argument passed to every execution of the phase.
Return should be either true or false, and determines if the result of the phase is included in the final output.
Returns {ok, Results} where Results is a list in the form [{0, OutputFromPhase1}, {1, OutputFromPhase2}, ...]. You only get output from a phase if Return for that phase is true.
Map function
A map function should take three arguments in the form map(RiakObject, KeyData, Static)
RiakObject - The object stored at the input bucket-key.
KeyData - A list of additional data that is sent along with the bucket-key. For more information on how to use this read this.
Static - The static argument passed in the mapreduce function.
Return value should be a list
Reduce function
A reduce function should take two arguments in the form reduce(List, Static)
List - A list that can be in any form.
Static - The static argument passed in the mapreduce function.
Return value should be a list that preferably is in the same format as the input list, since the reduce function might be called multiple times.
Example mapreduce module
-module(mapred).
-export([mapred/2, maptags/3, redtags/2]).
mapred(Pid, Keys) ->
{ok, [{1, [Result]}]} = riakc_pb_socket:mapred(
Pid,
Keys,
[
{map, {modfun, ?MODULE, maptags}, none, false},
{reduce, {modfun, ?MODULE, redtags}, none, true}
]
),
dict:to_list(Result).
maptags(RiakObject, _, _) -> %We don't care about keydata or the static argument
[dict:from_list([{I, 1} || I <- binary_to_term(riak_object:get_value(RiakObject))])].
redtags(Input, _) -> %Once again we don't care about the static argument
[lists:foldl(
fun(Tag, Acc) ->
dict:merge(
fun(_, Amount1, Amount2) ->
Amount1 + Amount2
end,
Tag,
Acc
)
end,
dict:new(),
Input
)].
Note: ?MODULE is an erlang macro for the local module, in this case it translates to mapred.
This module does a mapreduce which calculates the amount of hastags in the input data. Let's assume we have put the hashtags of three tweets into these three key-value pairs:
{<<"bucket1">>, <<"key1">>}, which contains ["#love", "#smile"]
{<<"bucket1">>, <<"key2">>}, which contains ["#smile"]
{<<"bucket1">>, <<"key3">>}, which contains ["#yolo", "#love", "#food"]
Next, let's assume we have started a link to our Riak server and saved the process-id as Pid, and saved the three keys as Keys.
With all of this prepared we invoke our mapreduce function by calling mapreduce:mapreduce(Pid, Keys).
This starts a mapreduce on the server with our keys, with two phases:
One map-phase where the map-function is maptags/3, the static argument is none and the phase doesn't return its output to the final output.
One reduce-phase, where the reduce-function is redtags/2, the static argument is none and the phase returns its output to the final output.
Our three bucket-key pairs are sent to the map function. Riak retrieves the object every bucket-key pair points at and passes it as the first argument to the function. Since we didn't include any keydata undefined is passed as the second argument, and the none we specified in the mapreduce-function is passed as the third.
Our map function does a list-comprehension that takes every item stored in the RiakObject and puts it in a dict together with the value 1. Note the use of riak_object which is used by Riak for internal computations rather than the riak-erlang-client's riakc_obj.
The output of our three maps would be
Once all the mapping is complete the output are sent to our reduce-function.
Our reduce function uses lists:foldl/3 which calls a function on every element in List. In our case the function is an anonymous function which uses dict:merge to merge the dicts in our input.
As a result our reduce function takes the list [[{"#love", 1}, {"#smile", 1}], [{"#smile", 1}], [{"#yolo", 1}, {"#love", 1}, {"#food", 1}]] as Input and merges these into the list [{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]. This list is the output of our reduce-function.
Now our mapreduce has finished its calculation and returns {ok, [{1, [[{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]]}]}, which we pattern-match to {ok, [{1, [Result]}]} and then finally convert Result from dict to list to make it readable.
For reference, if we had set Return of our map-phase to true the output would've instead been {ok, [{0, [[[{"#love", 1}, {"#smile", 1}], [{"#smile", 1}], [{"#yolo", 1}, {"#love", 1}, {"#food", 1}]]]}, {1, [[{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]]}]}
Loading code into Riak
In order for Riak to run functions specified with {modfun, Module, Function} the module must be loaded into Riak. This is done by adding a beam path to Riak. Add the following to your advanced.config:
{riak_kv, [
%% ...
{add_paths, ["/path/to/beams/"]},
%% ...
]
}
advanced.config is in the same folder as riak.conf. If it's not there, create it or download my template.
Once you've added the paths you need to instruct Riak to reload its beams: riak-admin erl-reload
If you have multiple nodes on your computer you must edit advanced.config and reload the beams seperately on each node.
Getting the keys
In order to do a mapreduce we need to have a list of bucket-key pairs to perform on. While we could grab all the keys in a bucket using riakc_pb_socket:list_keys/2 this is a very expensive operation that should not be used in production. One solution to this problem is using Secondary Indexes. Secondary indexes allow you to attach meta-data to keys which can then be queried to get the keys you want. Note that in order to use secondary indexes you need to setup Riak to use an index-capable storage backend such as LevelDB.