Skip to content

Instantly share code, notes, and snippets.

@z5ottu
Last active May 25, 2018 15:20
Show Gist options
  • Save z5ottu/918755e5998422a12bca6dc8b31cfd21 to your computer and use it in GitHub Desktop.
Save z5ottu/918755e5998422a12bca6dc8b31cfd21 to your computer and use it in GitHub Desktop.
Riak Mapreduce map phase for reduce_plist_sum. Together is like sql "count" - "group by" functionality
%% -------------------------------------------------------------------
%%
%% riak_mapreduce_map_for_groupby:
%% Map phase for reduce_plist_sum.
%% Together is like sql "count" - "group by" functionality)
%%
%% Copyright (c)2018, Szloboda Zsolt, z5ottu@gmail.com All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% isntall:
%% http://docs.basho.com/riak/kv/2.2.3/using/reference/custom-code/
%%
%% example:
%% http://riakhost/mapred POST
%% {
%% "inputs": "test_bucket",
%% "query": [
%% {
%% "map": {
%% "language": "erlang",
%% "module": "riak_mapreduce_map_for_groupby",
%% "function": "map_meta_collector",
%% "arg": [<<"a_bin">>,<<"b_bin">>]
%% }
%% },
%% {
%% "reduce": {
%% "language": "erlang",
%% "module": "riak_kv_mapreduce",
%% "function": "reduce_plist_sum"
%% }
%% }
%% ]
%% }
%%
%% we have a bucket(test_bucket) with some of data and indices (a_bin and b_bin)
%%
%% example mapreduce result:
%% {
%% "a_bin|foo|b_bin|joe": 23,
%% "a_bin|foo|b_bin|john": 234,
%% "a_bin|foo|b_bin|jonathan": 2,
%% "a_bin|bar|b_bin|joe": 123,
%% "a_bin|bar|b_bin|john": 23,
%% "a_bin|bar|b_bin|johathan": 1
%% }
-compile(export_all).
-module(riak_mapreduce_map_for_groupby).
-export([
map_meta_collector/3
]).
%%
%% Map Phases
%%
%% @spec map_meta_collector(riak_object:riak_object(), term(), term()) ->
%% [Key :: binary()]
%% @doc map phase function returning object key in a readable format
map_meta_collector({error, notfound}, _, _) ->
[];
map_meta_collector(RiakObject, _Props, Arg) when is_list(Arg) ->
Meta = riak_object:get_metadata(RiakObject),
Index = dict:fetch(<<"index">>, Meta),
List = [[K,normalize_to_bin(V)] || {K,V} <- Index, lists:member(K,Arg)],
Flatlist = lists:append(List),
case Flatlist of
[] -> [];
Flat -> [{binary_join(Flat, <<"|">>),1}]
end;
map_meta_collector(RiakObject, Props, Arg) when is_atom(Arg) ->
map_meta_collector(RiakObject, Props, <<"">>);
map_meta_collector(RiakObject, _, Arg) when is_binary(Arg) ->
Meta = riak_object:get_metadata(RiakObject),
Index = dict:fetch(<<"index">>, Meta),
High = proplists:get_value(Arg, Index, <<"">>),
[{High,1}];
map_meta_collector(_, _, _) ->
[].
%% hidden
normalize_to_bin(nil) ->
<<"">>;
normalize_to_bin(Value) when is_list(Value) ->
normalize_to_bin(list_to_binary(Value));
normalize_to_bin(Value) when is_integer(Value) ->
normalize_to_bin(list_to_binary(integer_to_list((Value))));
normalize_to_bin(Value) when is_atom(Value) ->
normalize_to_bin(atom_to_binary(Value,utf8));
normalize_to_bin(Value) when is_binary(Value) ->
Value.
%% hidden
-spec binary_join([binary()], binary()) -> binary().
binary_join([], _Sep) ->
<<>>;
binary_join([Part], _Sep) ->
Part;
binary_join([Head|Tail], Sep) ->
lists:foldl(fun (Value, Acc) -> <<Acc/binary, Sep/binary, Value/binary>> end, Head, Tail).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment