Guide to MapReduce with Riak in Erlang

Image by John Muellerleile
This is a guide on basic mapreduce usage in Erlang. The information written here is mostly a summary of the Riak documentation that I compiled to make it easier to understand, together with some other tidbits of information that I find useful.

The mapreduce function

The function used to invoke mapreduce in Riak looks like this:
riakc_pb_socket:mapred(Pid, Keys, Phases) Returns {ok, Results} where Results is a list in the form [{0, OutputFromPhase1}, {1, OutputFromPhase2}, ...]. You only get output from a phase if Return for that phase is true.

Map function

A map function should take three arguments in the form map(RiakObject, KeyData, Static) Return value should be a list

Reduce function

A reduce function should take two arguments in the form reduce(List, Static) Return value should be a list that preferably is in the same format as the input list, since the reduce function might be called multiple times.

Example mapreduce module

-module(mapred). -export([mapred/2, maptags/3, redtags/2]). mapred(Pid, Keys) -> {ok, [{1, [Result]}]} = riakc_pb_socket:mapred( Pid, Keys, [ {map, {modfun, ?MODULE, maptags}, none, false}, {reduce, {modfun, ?MODULE, redtags}, none, true} ] ), dict:to_list(Result). maptags(RiakObject, _, _) -> %We don't care about keydata or the static argument [dict:from_list([{I, 1} || I <- binary_to_term(riak_object:get_value(RiakObject))])]. redtags(Input, _) -> %Once again we don't care about the static argument [lists:foldl( fun(Tag, Acc) -> dict:merge( fun(_, Amount1, Amount2) -> Amount1 + Amount2 end, Tag, Acc ) end, dict:new(), Input )].
Note: ?MODULE is an erlang macro for the local module, in this case it translates to mapred.

This module does a mapreduce which calculates the amount of hastags in the input data. Let's assume we have put the hashtags of three tweets into these three key-value pairs: Next, let's assume we have started a link to our Riak server and saved the process-id as Pid, and saved the three keys as Keys.
With all of this prepared we invoke our mapreduce function by calling mapreduce:mapreduce(Pid, Keys). This starts a mapreduce on the server with our keys, with two phases: Our three bucket-key pairs are sent to the map function. Riak retrieves the object every bucket-key pair points at and passes it as the first argument to the function. Since we didn't include any keydata undefined is passed as the second argument, and the none we specified in the mapreduce-function is passed as the third.
Our map function does a list-comprehension that takes every item stored in the RiakObject and puts it in a dict together with the value 1. Note the use of riak_object which is used by Riak for internal computations rather than the riak-erlang-client's riakc_obj.
The output of our three maps would be Once all the mapping is complete the output are sent to our reduce-function.

Our reduce function uses lists:foldl/3 which calls a function on every element in List. In our case the function is an anonymous function which uses dict:merge to merge the dicts in our input.
As a result our reduce function takes the list [[{"#love", 1}, {"#smile", 1}], [{"#smile", 1}], [{"#yolo", 1}, {"#love", 1}, {"#food", 1}]] as Input and merges these into the list [{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]. This list is the output of our reduce-function.

Now our mapreduce has finished its calculation and returns {ok, [{1, [[{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]]}]}, which we pattern-match to {ok, [{1, [Result]}]} and then finally convert Result from dict to list to make it readable.
For reference, if we had set Return of our map-phase to true the output would've instead been {ok, [{0, [[[{"#love", 1}, {"#smile", 1}], [{"#smile", 1}], [{"#yolo", 1}, {"#love", 1}, {"#food", 1}]]]}, {1, [[{"#love", 2}, {"#smile", 2}, {"yolo", 1}, {"#food", 1}]]}]}

Loading code into Riak

In order for Riak to run functions specified with {modfun, Module, Function} the module must be loaded into Riak. This is done by adding a beam path to Riak. Add the following to your advanced.config: {riak_kv, [ %% ... {add_paths, ["/path/to/beams/"]}, %% ... ] } advanced.config is in the same folder as riak.conf. If it's not there, create it or download my template.
Once you've added the paths you need to instruct Riak to reload its beams: riak-admin erl-reload
If you have multiple nodes on your computer you must edit advanced.config and reload the beams seperately on each node.

Getting the keys

In order to do a mapreduce we need to have a list of bucket-key pairs to perform on. While we could grab all the keys in a bucket using riakc_pb_socket:list_keys/2 this is a very expensive operation that should not be used in production. One solution to this problem is using Secondary Indexes. Secondary indexes allow you to attach meta-data to keys which can then be queried to get the keys you want. Note that in order to use secondary indexes you need to setup Riak to use an index-capable storage backend such as LevelDB.


Written by Andreas Lundkvist - 2014