Skip to content

Instantly share code, notes, and snippets.

@z5ottu
Forked from chinnurtb/riak_csv_etl.erl
Created May 15, 2018 19:24
Show Gist options
  • Save z5ottu/e310ff6ac9b70199d1aa280b36db0421 to your computer and use it in GitHub Desktop.
Save z5ottu/e310ff6ac9b70199d1aa280b36db0421 to your computer and use it in GitHub Desktop.
%% -------------------------------------------------------------------
%%
%% riak_csv_etl: etl tools for csv files in Riak
%%
%% Copyright (c) 2011 Bradley Taylor. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% Example usage:
%% {ok, C} = riak:local_client().
%% B = <<"HealthScores_CSV">>.
%% P = "../resturants.csv".
%% riak_csv_etl:load_file(C, B, P).
%% F = riak_csv_etl:get_fields(C, B).
%% {ok, JList} = C:mapred(B,[{map, {modfun, riak_csv_etl,json_encode_map},F,true}]).
%% B2 = <<"HealthScores_JSON">>.
%% riak_csv_etl:put_list(C, B2, JList).
-module(riak_csv_etl).
-export([load_file/3, json_encode_map/3, json_encode_and_put_map/3, put_list/3, put_fields/3, get_fields/2]).
%% Loads contents of a csv into a bucket.
%% -generates md5 key
%% -assumes first line contains field names and stashes in bucket props
%% -stores value as erlang list of binary values
%% -use mapreduce to transform to other formats
%%
%% To get the stashed fields:
%% F = riak_csv_etl:get_fields(C, B).
%%
load_file(Client, Bucket, Path) ->
{ok, File} = file:open(Path, read),
load_line(Client, Bucket, File, first).
%% Encode value to json
json_encode_map(Value, _KeyData, Fields) ->
V = riak_object:get_value(Value),
K = riak_object:key(Value),
J = json_encode(Fields, V),
[{K,J}].
%% Encode value to json and put in specified Bucket
json_encode_and_put_map(Value, KeyData, {Fields, Bucket}) ->
[{K, V}] = json_encode_map(Value, KeyData, Fields),
{ok, C} = riak:local_client(),
Res = C:put(riak_object:new(Bucket, K, V)),
[Res].
put_list(C, B, List) ->
PFun = fun({K,V}) -> C:put(riak_object:new(B, K, V)) end,
lists:foreach(PFun, List).
get_fields(Client, Bucket) ->
proplists:get_value(csv_fields, Client:get_bucket(Bucket)).
put_fields(Client, Bucket, Fields) ->
Client:set_bucket(Bucket, [{csv_fields, Fields}]).
%% Given a list of keys and list of values, make JSON
json_encode(Fields, Value) ->
Z = lists:zip(Fields, Value),
J = mochijson2:encode({struct,Z}),
iolist_to_binary(J).
%% Read a line and delegate action
load_line(C, B, F) ->
load_line(C, B, F, file:read_line(F)).
%% read first line and stash list of fields as atoms in bucket properties
%% access these later in map functions.
load_line(C, B, F, first) ->
{ok, Data} = file:read_line(F),
% convert to field names to atoms
Fields = [list_to_atom(Field) || Field <- read(Data)],
put_fields(C, B, Fields),
load_line(C, B, F);
%% extract and load binary data into Riak
load_line(C, B, F, {ok, D}) ->
Data = [list_to_binary(V) || V <- read(D)],
Key = make_key(Data),
Obj = riak_object:new(B, Key, Data),
C:put(Obj),
load_line(C, B, F);
%% eof, we're done!
load_line(_, _, _, eof) ->
win;
%% skip unhandled weirdness
load_line(C, B, F, _) ->
load_line(C, B, F).
%% generate cheezy md5 key since we don't know what the logical key is.
make_key(Stuffs) ->
crypto:md5(list_to_binary(Stuffs)).
%% Awesome (and insane) CSV splitting courtesy of TrapExit
%% http://www.trapexit.org/Comma_Separated_Values
read(String) -> hd(read(String, [])).
read([], Acc) ->
lists:reverse(Acc);
read(String, []) ->
{Line, Rest} = read_line(String),
read(Rest, [Line]);
read([10|String], Acc) ->
{Line, Rest} = read_line(String),
read(Rest, [Line|Acc]);
read([13,10|String], Acc) ->
{Line, Rest} = read_line(String),
read(Rest, [Line|Acc]).
add_spaces(0, String) -> String;
add_spaces(Count, String) -> add_spaces(Count-1, [$ |String]).
read_item([34|T]) -> read_item_quoted(T, []);
read_item(Other) -> read_item(Other, 0, []).
read_item([32|T], 0, []) -> read_item(T, 0, []);
read_item([9|T], 0, []) -> read_item(T, 0, []);
read_item([10|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [10|T]};
read_item([13,10|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [13,10|T]};
read_item([$,|T], _SpaceCount, Acc) -> {lists:reverse(Acc), [$,|T]};
read_item([], _SpaceCount, Acc) -> {lists:reverse(Acc), []};
read_item([9|T], SpaceCount, Acc) -> read_item(T, SpaceCount+1, Acc);
read_item([32|T], SpaceCount, Acc) -> read_item(T, SpaceCount+1, Acc);
read_item([C|T], SpaceCount, Acc) -> read_item(T, 0, [C|add_spaces(SpaceCount, Acc)]).
read_item_quoted([34,34|T], Acc) -> read_item_quoted(T, [34|Acc]);
read_item_quoted([34|T], Acc) -> {lists:reverse(Acc), T};
read_item_quoted([C|T], Acc) -> read_item_quoted(T, [C|Acc]).
read_line(String) -> read_line(String,[]).
read_line([10|T], Acc) -> {lists:reverse(Acc), [10|T]};
read_line([13,10|T], Acc) -> {lists:reverse(Acc), [13|T]};
read_line([], Acc) -> {lists:reverse(Acc), []};
read_line(String, []) -> {Item, Rest} = read_item(String), read_line(Rest, [Item]);
read_line([$,|String], Acc) -> {Item, Rest} = read_item(String), read_line(Rest, [Item|Acc]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment