Load 10Mb of data into /tmp/foo
:
sudo dd if=/dev/disk0 of=/tmp/foo bs=1m count=10
Start loading it into dev1 node
for i in `seq 1 10000`
do curl -XPUT "http://localhost:10018/buckets/food3/keys/ooopp$i" -d@/tmp/foo
done
Kill the dev4 (replication target cluster (single node))
ps awux | grep dev4 | awk '{print $2}' |xargs kill -9
Now the queue should be building up...
Load in the queue querying function:
DumpRtQ = fun(Dest) ->
Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards = proplists:get_value( local_forwards , Z),
RoutedClusters = proplists:get_value( routed_clusters, Z),
RO = riak_repl_util:from_wire( lists:nth(1, erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end, riak_repl2_rtq:dumpq() ),
OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),
file:write_file(Dest,OutputText)
end.
Execute it
DumpRtQ("/tmp/foo.txt").
ok
Example output
head -n5 /tmp/foo.txt
534:foo/3177bar1 ["riak2"],undefined
534:foo/3178bar1 ["riak2"],undefined
534:foo/3179bar1 ["riak2"],undefined
534:foo/3180bar1 ["riak2"],undefined
534:foo/3181bar1 ["riak2"],undefined
534:foo/3182bar1 ["riak2"],undefined
534:foo/3183bar1 ["riak2"],undefined
534:foo/3184bar1 ["riak2"],undefined
532:foo/3185bar1 ["riak2"],undefined
I modified the utility slightly, so it could be passed in the queue, I did this because the queue only lives for a while before being thrown away, so I needed to grap a copy I could play with. Anyhow, I did the following:
(dev1@127.0.0.1)1> Q = riak_repl2_rtq:dumpq().
[{9775,1,
<<131,108,0,0,0,1,109,0,0,0,230,42,1,0,0,0,3,102,111,111,
0,0,0,8,57,...>>,
.......
{...}|...]
DumpRtQ = fun(Queue,Dest) ->
Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards = proplists:get_value( local_forwards , Z),
RoutedClusters = proplists:get_value( routed_clusters, Z),
RO = riak_repl_util:from_wire( lists:nth(1, erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end, Queue ),
OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),
file:write_file(Dest,OutputText)
end.
#Fun<erl_eval.12.82930912>
(dev1@127.0.0.1)3> eprof:start().
{ok,<0.31204.3>}
(dev1@127.0.0.1)11> eprof:profile( fun() -> DumpRtQ(Q,"/tmp/stats"), ok end ).
{ok,ok}
(dev1@127.0.0.1)12> eprof:analyze().
The result
FUNCTION CALLS % TIME [uS / CALLS]
-------- ----- --- ---- [----------]
orddict:'-from_list/1-fun-0-'/2 5 0.00 0 [ 0.00]
gb_sets:is_element/2 2 0.00 0 [ 0.00]
gb_sets:is_member/2 2 0.00 0 [ 0.00]
erl_eval:'-expr/5-fun-1-'/4 1 0.00 0 [ 0.00]
sets:mk_seg/1 2 0.00 0 [ 0.00]
erl_lint:guard/3 2 0.00 0 [ 0.00]
erl_lint:unused_vars/3 4 0.00 0 [ 0.00]
erl_lint:canonicalize_string/1 2 0.00 0 [ 0.00]
erl_lint:check_format_3/2 2 0.00 0 [ 0.00]
erl_lint:extract_sequences/2 4 0.00 0 [ 0.00]
string:substr2/2 4 0.00 0 [ 0.00]
file:write_file/2 1 0.00 0 [ 0.00]
gb_sets:is_member_1/2 2 0.00 1 [ 0.50]
gb_sets:from_ordset/1 4 0.00 1 [ 0.25]
gen:call/4 1 0.00 1 [ 1.00]
gen:do_call/4 1 0.00 1 [ 1.00]
erl_eval:'-expr/5-fun-0-'/3 5 0.00 1 [ 0.20]
lists:rumergel/3 2 0.00 1 [ 0.50]
dict:get_bucket/2 13 0.00 1 [ 0.08]
packages:concat_1/1 10 0.00 1 [ 0.10]
erl_lint:start/2 2 0.00 1 [ 0.50]
erl_lint:pattern/3 6 0.00 1 [ 0.17]
erl_lint:reject_bin_alias_expr/3 6 0.00 1 [ 0.17]
erl_lint:default_types/0 2 0.00 1 [ 0.50]
erl_lint:fun_clauses/3 2 0.00 1 [ 0.50]
erl_lint:fun_clause/3 2 0.00 1 [ 0.50]
erl_lint:pat_var/5 14 0.00 1 [ 0.07]
erl_lint:shadow_vars/4 2 0.00 1 [ 0.50]
erl_lint:check_old_unused_vars/3 2 0.00 1 [ 0.50]
erl_lint:warn_unused_vars/3 4 0.00 1 [ 0.25]
erl_lint:vtnew/2 8 0.00 1 [ 0.13]
erl_lint:vtsubtract/2 4 0.00 1 [ 0.25]
erl_lint:vt_no_unsafe/1 2 0.00 1 [ 0.50]
erl_lint:check_format_1/1 2 0.00 1 [ 0.50]
erl_lint:check_format_2/2 2 0.00 1 [ 0.50]
erl_lint:check_format_2a/2 2 0.00 1 [ 0.50]
erl_lint:args_list/1 4 0.00 1 [ 0.25]
erl_lint:check_format_string/1 2 0.00 1 [ 0.50]
erl_lint:extract_sequence/3 8 0.00 1 [ 0.13]
erl_lint:control_type/2 2 0.00 1 [ 0.50]
erl_lint:is_local_function/2 1 0.00 1 [ 1.00]
erl_lint:is_autoimport_suppressed/2 1 0.00 1 [ 1.00]
erl_lint:bif_clash_specifically_disabled/2 1 0.00 1 [ 1.00]
erl_lint:'-fun_clauses/3-fun-0-'/3 2 0.00 1 [ 0.50]
erl_lint:'-nowarn_function/2-lc$^0/1-0-'/2 1 0.00 1 [ 1.00]
erl_lint:'-used_vars/2-fun-1-'/2 5 0.00 1 [ 0.20]
erl_lint:'-used_vars/2-fun-0-'/2 5 0.00 1 [ 0.20]
string:chr/2 4 0.00 1 [ 0.25]
string:chr/3 4 0.00 1 [ 0.25]
file:file_name/1 1 0.00 1 [ 1.00]
file:make_binary/1 1 0.00 1 [ 1.00]
file:call/2 1 0.00 1 [ 1.00]
file:check_and_call/2 1 0.00 1 [ 1.00]
file:check_args/1 3 0.00 1 [ 0.33]
file:native_name_encoding/0 1 0.00 1 [ 1.00]
orddict:from_list/1 2 0.00 2 [ 1.00]
ordsets:is_element/2 19 0.00 2 [ 0.11]
erl_eval:hide_calls/2 2 0.00 2 [ 1.00]
sets:new/0 2 0.00 2 [ 1.00]
erl_lint:value_option/7 2 0.00 2 [ 1.00]
erl_lint:pseudolocals/0 6 0.00 2 [ 0.33]
erl_lint:used_vars/2 2 0.00 2 [ 1.00]
erl_lint:start/0 2 0.00 2 [ 1.00]
erl_lint:is_warn_enabled/2 3 0.00 2 [ 0.67]
erl_lint:zip_file_and_line/2 2 0.00 2 [ 1.00]
erl_lint:nowarn_function/2 1 0.00 2 [ 2.00]
erl_lint:imported/3 1 0.00 2 [ 2.00]
erl_lint:pattern_list/5 2 0.00 2 [ 1.00]
erl_lint:guard_tests/3 2 0.00 2 [ 1.00]
erl_lint:modify_line/2 2 0.00 2 [ 1.00]
erl_lint:is_format_function/2 13 0.00 2 [ 0.15]
erl_lint:args_length/1 4 0.00 2 [ 0.50]
erl_lint:'-vt_no_unsafe/1-lc$^0/1-0-'/1 7 0.00 2 [ 0.29]
gen_server:call/3 1 0.00 2 [ 2.00]
file:file_name_1/2 11 0.00 2 [ 0.18]
lists:member/2 6 0.00 2 [ 0.33]
erlang:whereis/1 1 0.00 2 [ 2.00]
gb_sets:empty/0 6 0.00 3 [ 0.50]
gb_sets:balance_list/2 4 0.00 3 [ 0.75]
gb_sets:from_list/1 4 0.00 3 [ 0.75]
ordsets:from_list/1 7 0.00 3 [ 0.43]
lists:usplit_2_1/6 4 0.00 3 [ 0.75]
lists:umerge2_1/5 8 0.00 3 [ 0.38]
lists:umerge2_2/4 6 0.00 3 [ 0.50]
dict:find_val/2 13 0.00 3 [ 0.23]
packages:concat/1 10 0.00 3 [ 0.30]
erl_lint:format_function/5 13 0.00 3 [ 0.23]
erl_lint:'-pattern_list/5-fun-0-'/5 9 0.00 3 [ 0.33]
erl_lint:'-start/2-lc$^1/1-1-'/1 8 0.00 3 [ 0.38]
otp_internal:obsolete_1/3 14 0.00 3 [ 0.21]
string:substr/2 2 0.00 3 [ 1.50]
erlang:demonitor/2 1 0.00 3 [ 3.00]
lists:usort/1 7 0.00 4 [ 0.57]
lists:usplit_2/5 8 0.00 4 [ 0.50]
lists:umergel/3 6 0.00 4 [ 0.67]
erl_lint:head/4 5 0.00 4 [ 0.80]
erl_lint:expr_var/4 17 0.00 4 [ 0.24]
erl_lint:check_unused_vars/3 2 0.00 4 [ 2.00]
erl_lint:'-unused_vars/3-fun-0-'/2 19 0.00 4 [ 0.21]
otp_internal:obsolete/3 14 0.00 4 [ 0.29]
gb_sets:balance_list_1/2 12 0.00 5 [ 0.42]
dict:get_bucket_s/2 13 0.00 5 [ 0.38]
packages:concat/2 10 0.00 5 [ 0.50]
erl_lint:check_qlc_hrl/5 13 0.00 5 [ 0.38]
erlang:monitor/2 1 0.00 5 [ 5.00]
erlang:fun_info/2 3 0.00 5 [ 1.67]
erl_lint:exprs/3 14 0.00 6 [ 0.43]
erl_lint:expr_list/3 27 0.00 6 [ 0.22]
erl_lint:vtold/2 8 0.00 6 [ 0.75]
erl_lint:expand_package/2 13 0.00 6 [ 0.46]
dict:find/2 13 0.00 7 [ 0.54]
packages:is_valid_1/1 93 0.00 7 [ 0.08]
erl_lint:'-start/2-lc$^0/1-0-'/1 28 0.00 7 [ 0.25]
packages:is_valid/1 10 0.00 8 [ 0.80]
erl_lint:bool_option/4 26 0.00 8 [ 0.31]
erl_lint:pattern/5 18 0.00 8 [ 0.44]
erl_lint:check_remote_function/5 13 0.00 8 [ 0.62]
erl_lint:deprecated_function/5 14 0.00 8 [ 0.57]
erl_lint:'-vtnew/2-fun-0-'/3 29 0.00 9 [ 0.31]
erl_lint:merge_used/2 50 0.00 10 [ 0.20]
erl_lint:'-expr_list/3-fun-0-'/3 48 0.00 10 [ 0.21]
erlang:apply/2 2 0.00 11 [ 5.50]
erlang:tuple_to_list/1 124 0.00 11 [ 0.09]
erlang:list_to_atom/1 10 0.00 11 [ 1.10]
ordsets:union/2 90 0.00 13 [ 0.14]
erl_lint:vtupdate/2 42 0.00 13 [ 0.31]
erl_lint:vtmerge/2 58 0.00 13 [ 0.22]
erlang:send/3 1 0.00 13 [ 13.00]
erl_scan:set_attr/3 124 0.00 14 [ 0.11]
erl_lint:'-vtold/2-fun-0-'/3 51 0.00 14 [ 0.27]
orddict:is_key/2 140 0.01 15 [ 0.11]
erl_lint:vtmerge_pat/2 24 0.01 15 [ 0.63]
erl_lint:merge_lines/2 50 0.01 15 [ 0.30]
erl_scan:set_attribute/3 124 0.01 18 [ 0.15]
erl_parse:set_line/2 124 0.01 20 [ 0.16]
erl_lint:'-vtupdate/2-fun-0-'/3 50 0.01 21 [ 0.42]
orddict:filter/2 126 0.01 23 [ 0.18]
erl_lint:'-default_types/0-lc$^0/1-0-'/1 112 0.01 23 [ 0.21]
erl_eval:do_apply/5 1 0.01 25 [ 25.00]
erl_lint:'-zip_file_and_line/2-fun-0-'/2 124 0.01 25 [ 0.20]
erl_lint:'-zip_file_and_line/2-fun-1-'/2 124 0.01 31 [ 0.25]
erl_lint:expr/3 64 0.01 36 [ 0.56]
riak_object:bucket/1 594 0.02 54 [ 0.09]
erl_lint:modify_line1/2 299 0.02 61 [ 0.20]
io_lib:write/1 594 0.02 63 [ 0.11]
riak_object:key/1 594 0.02 64 [ 0.11]
io_lib_pretty:print_length_list/3 594 0.02 70 [ 0.12]
lists:nth/2 594 0.03 72 [ 0.12]
orddict:merge/3 318 0.03 74 [ 0.23]
io_lib:write_atom/1 594 0.03 78 [ 0.13]
io_lib_pretty:list_length_tail/2 594 0.03 85 [ 0.14]
erlang:iolist_to_binary/1 1 0.03 97 [ 97.00]
io_lib:write/2 594 0.03 99 [ 0.17]
riak_object:sibs_of_binary/2 594 0.04 101 [ 0.17]
lists:reverse/2 599 0.04 110 [ 0.18]
erlang:integer_to_list/1 594 0.04 110 [ 0.19]
erl_eval:eval_fun/5 1189 0.04 111 [ 0.09]
proplists:get_value/2 1188 0.04 114 [ 0.10]
io_lib_pretty:write_tail/2 594 0.04 118 [ 0.20]
io_lib_pretty:print/4 1188 0.04 120 [ 0.10]
erl_lint:keyword_warning/3 29 0.04 123 [ 4.24]
orddict:new/0 1190 0.04 125 [ 0.11]
erl_eval:guard0/4 1190 0.04 126 [ 0.11]
erlang:list_to_tuple/1 719 0.05 130 [ 0.18]
io_lib:format/2 1188 0.05 138 [ 0.12]
lists:map/2 595 0.05 140 [ 0.24]
erl_eval:'-expr/5-fun-2-'/5 594 0.05 141 [ 0.24]
io_lib:quote_atom/2 594 0.05 142 [ 0.24]
io_lib_pretty:write_list/2 594 0.05 144 [ 0.24]
io_lib_pretty:max_cs/2 594 0.05 154 [ 0.26]
riak_object:last_mod_meta/2 594 0.06 165 [ 0.28]
dict:from_list/1 600 0.06 165 [ 0.28]
io_lib_pretty:list_length/2 594 0.06 169 [ 0.28]
riak_object:deleted_meta/2 594 0.06 170 [ 0.29]
io_lib:write_string/2 594 0.06 171 [ 0.29]
riak_object:vtag_meta/2 594 0.06 171 [ 0.29]
erl_scan:reserved_word/1 594 0.06 178 [ 0.30]
riak_object:from_binary/3 594 0.06 183 [ 0.31]
riak_object:sibs_of_binary/3 1188 0.07 193 [ 0.16]
io_lib_format:encoding/2 1188 0.07 200 [ 0.17]
io_lib_format:pad_char/2 1188 0.07 210 [ 0.18]
io_lib_format:collect_cseq/2 1188 0.08 218 [ 0.18]
io_lib_format:field_width/3 1188 0.08 223 [ 0.19]
io_lib_format:pcount/1 1188 0.08 224 [ 0.19]
erl_eval:eval_fun/6 1190 0.08 235 [ 0.20]
erl_eval:hide/3 1089 0.08 240 [ 0.22]
io_lib_pretty:write/1 1188 0.09 245 [ 0.21]
erl_eval:new_bindings/0 1190 0.09 247 [ 0.21]
dict:mk_seg/1 1204 0.09 248 [ 0.21]
io_lib_format:precision/2 1188 0.09 251 [ 0.21]
binary:bin_to_list/1 1188 0.09 254 [ 0.21]
erl_eval:'-expr/5-fun-3-'/6 594 0.09 261 [ 0.44]
io_lib_pretty:print/6 1188 0.09 264 [ 0.22]
io_lib_pretty:print_length_list1/3 1188 0.09 267 [ 0.22]
erl_eval:guard/4 1190 0.10 278 [ 0.23]
io_lib_format:field_width/2 1188 0.10 279 [ 0.23]
io_lib_format:decr_pc/2 1188 0.10 281 [ 0.24]
riak_repl_util:from_wire/1 594 0.10 283 [ 0.48]
dict:new/0 1204 0.10 288 [ 0.24]
io_lib_format:fwrite/2 1188 0.10 293 [ 0.25]
io_lib_format:pcount/2 2376 0.10 301 [ 0.13]
io_lib_pretty:print_length/3 1188 0.11 305 [ 0.26]
erl_eval:bif/5 2376 0.11 317 [ 0.13]
io_lib_pretty:printable_list/2 1188 0.11 319 [ 0.27]
erl_eval:add_bindings/2 1190 0.12 334 [ 0.28]
io_lib_format:control/8 1188 0.12 346 [ 0.29]
riak_object:sib_of_binary/1 594 0.12 352 [ 0.59]
io_lib_format:field_value/2 1188 0.12 358 [ 0.30]
dict:maybe_expand/2 4273 0.13 374 [ 0.09]
io_lib:string_char/4 2970 0.13 375 [ 0.13]
io_lib_format:collect_cc/2 1188 0.13 375 [ 0.32]
io_lib_format:print/7 3564 0.14 389 [ 0.11]
erl_internal:bif/2 2378 0.14 391 [ 0.16]
proplists:get_value/3 1782 0.14 399 [ 0.22]
io_lib_format:build/3 2376 0.15 430 [ 0.18]
erlang:binary_to_list/1 594 0.15 439 [ 0.74]
io_lib_format:collect/2 2376 0.16 473 [ 0.20]
io_lib:printable_list/1 4158 0.17 499 [ 0.12]
erl_eval:match_list/4 2974 0.17 500 [ 0.17]
io_lib:name_chars/1 5346 0.18 527 [ 0.10]
dict:store/3 4273 0.18 528 [ 0.12]
io_lib:write_string1/3 3564 0.22 623 [ 0.17]
erl_eval:eval_op/6 5940 0.23 651 [ 0.11]
riak_object:meta_of_binary/2 2970 0.23 652 [ 0.22]
erl_eval:exprs/5 4757 0.24 680 [ 0.14]
erl_eval:match/3 3566 0.25 722 [ 0.20]
erl_eval:match/4 5350 0.25 723 [ 0.14]
dict:'-from_list/1-fun-0-'/2 3678 0.25 724 [ 0.20]
dict:maybe_expand_aux/2 4273 0.27 780 [ 0.18]
erlang:phash/2 4286 0.29 825 [ 0.19]
io_lib:name_char/1 4752 0.29 826 [ 0.17]
erl_eval:'-add_bindings/2-fun-0-'/2 4754 0.29 849 [ 0.18]
dict:'-store/3-fun-0-'/3 4273 0.30 867 [ 0.20]
erl_eval:match_tuple/5 6534 0.32 908 [ 0.14]
dict:store_bkt_val/3 5153 0.33 941 [ 0.18]
riak_object:decode_maybe_binary/1 5346 0.41 1179 [ 0.22]
dict:get_slot/2 4286 0.48 1386 [ 0.32]
erl_eval:add_binding/3 8320 0.48 1395 [ 0.17]
erlang:binary_to_term/1 3564 0.49 1406 [ 0.39]
dict:on_bucket/3 4273 0.49 1414 [ 0.33]
packages:is_segmented/1 7735 0.59 1712 [ 0.22]
erl_internal:bif/3 7725 0.62 1777 [ 0.23]
erlang:atom_to_list/1 8334 0.65 1859 [ 0.22]
lists:reverse/1 9510 0.68 1950 [ 0.21]
erl_eval:expand_module_name/2 7725 0.71 2032 [ 0.26]
erlang:setelement/3 17096 0.76 2199 [ 0.13]
erl_eval:do_apply/6 14259 0.78 2255 [ 0.16]
erl_eval:expr_list/4 8914 0.79 2267 [ 0.25]
shell:'-eval_loop/3-fun-0-'/3 14259 0.81 2342 [ 0.16]
shell:apply_fun/3 14259 0.92 2648 [ 0.19]
erlang:term_to_binary/1 594 0.98 2813 [ 4.74]
erl_eval:match1/4 10696 1.00 2881 [ 0.27]
erl_eval:expr_list/6 23179 1.09 3138 [ 0.14]
erl_eval:binding/2 26149 1.73 4987 [ 0.19]
erl_eval:ret_expr/3 51700 1.79 5162 [ 0.10]
orddict:to_list/1 30308 1.88 5404 [ 0.18]
orddict:store/3 32685 1.94 5577 [ 0.17]
packages:is_segmented_1/1 67826 2.21 6352 [ 0.09]
erl_eval:merge_bindings/2 29118 2.45 7061 [ 0.24]
erl_eval:expr/5 52295 4.85 13955 [ 0.27]
lists:foldl/3 199302 6.83 19674 [ 0.10]
erl_eval:'-merge_bindings/2-fun-0-'/2 159226 9.82 28281 [ 0.18]
erlang:'++'/2 5940 11.27 32441 [ 5.46]
orddict:find/2 645307 32.23 92821 [ 0.14]
ok
(dev1@127.0.0.1)13>
Continuing on, I had dumped the queue to a file yesterday, I edited it quickly (adding a full stop to the end of the file) so it could be read in by file:consult/1. This saves me from having to fire up replication every time I want to get my hands on that queue data.
Lets read that back in again:
(....)39> Sample=lists:sublist(hd(hd(tl( tuple_to_list( file:consult("./dumped.queue.erl") ) ))),2).
[{9775,1,
<<131,108,0,0,0,1,109,0,0,0,230,42,1,0,0,0,3,102,111,111,
0,0,0,8,57,...>>,
[{local_forwards,["riak2"]}]},
{9776,1,
<<131,108,0,0,0,1,109,0,0,0,231,42,1,0,0,0,3,102,111,111,
0,0,0,8,...>>,
[{local_forwards,["riak2"]}]}]
And I test it with the jerry-rigged anonymous function:
DumpRtQ = fun(Queue,Dest,Count) ->
MaybeQueue = case Count of
all -> Queue;
Num -> lists:sublist(Queue,Num)
end,
Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards = proplists:get_value( local_forwards , Z),
RoutedClusters = proplists:get_value( routed_clusters, Z),
RO = riak_repl_util:from_wire( lists:nth(1, erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end, MaybeQueue ),
OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),
file:write_file(Dest,OutputText)
end.
(dev1@127.0.0.1)61> DumpRtQ(Sample,"/tmp/test.all",all).
ok
(dev1@127.0.0.1)62> DumpRtQ(Sample,"/tmp/test.1",1).
ok
I check on the console that the function operates as expected.
[~%]cat /tmp/test.all
532:foo/9407bar1 ["riak2"],undefined
534:foo/9408bar1 ["riak2"],undefined% [~%]cat /tmp/test.1
532:foo/9407bar1 ["riak2"],undefined%
Then I rewrite the function to use the riak_repl2_rtq:dumpq/0
function. And I test it one last time as before.
DumpRtQ = fun(Dest,Count) ->
Queue = riak_repl2_rtq:dumpq(),
MaybeQueue = case Count of
all -> Queue;
Num -> lists:sublist(Queue,Num)
end,
Results = lists:map( fun({_,_,Y,Z}) ->
LocalForwards = proplists:get_value( local_forwards , Z),
RoutedClusters = proplists:get_value( routed_clusters, Z),
RO = riak_repl_util:from_wire( lists:nth(1, erlang:binary_to_term(Y)) ) ,
Size = erlang:size( term_to_binary(RO) ),
Key = riak_object:key(RO),
Bucket = riak_object:bucket(RO),
{Size, Bucket, Key, LocalForwards,RoutedClusters}
end, MaybeQueue ),
OutputText = lists:foldl(fun({Size,Bucket,Key,LocalForwards,RoutedClusters},Sum) ->
Sum ++ "\n" ++
erlang:integer_to_list(Size) ++ ":" ++
binary:bin_to_list(Bucket) ++ "/" ++
binary:bin_to_list(Key) ++ " " ++
io_lib:format("~p",[LocalForwards]) ++ "," ++
io_lib:format("~p",[RoutedClusters])
end,"", Results),
file:write_file(Dest,OutputText)
end.
(dev1@127.0.0.1)3> DumpRtQ("/tmp/all.txt",all).
ok
(dev1@127.0.0.1)4> DumpRtQ("/tmp/1.txt",1).
ok
[%]wc -l /tmp/all.txt
469 /tmp/all.txt
[%]wc -l /tmp/1.txt
1 /tmp/1.txt
Success!
Worth noting that I also scripted the 2 cluster (2 node) creation and setup like so:
riak_remove_data_dir_contents () {
rm -rf ./dev/dev*/data/(anti_entropy|bitcask|kv_vnode|leveldb|riak_kv_exchange_fsm|riak_repl|ring)(N)
}
riak_stop_dev_nodes () {
for i in ./**/dev[0-9]/bin/riak(:h)(N)
do
$i/riak stop
done
}
riak_make_two_node_rt_repl () {
riak_stop_dev_nodes
riak_remove_data_dir_contents
make devrel DEVNODES=6
sed -i'' -e '/sfwi/{d;}' **/vm.args
./dev/dev1/bin/riak start
./dev/dev4/bin/riak start
./dev/dev1/bin/riak-admin wait-for-service riak_kv
./dev/dev4/bin/riak-admin wait-for-service riak_kv
./dev/dev1/bin/riak-repl clustername riak1
./dev/dev4/bin/riak-repl clustername riak2
./dev/dev1/bin/riak-repl connect 127.0.0.1:10046
./dev/dev1/bin/riak-repl connections
./dev/dev1/bin/riak-repl realtime enable
./dev/dev1/bin/riak-repl realtime enable riak2
./dev/dev1/bin/riak-repl realtime start riak2
}
That code only runs in the Z shell, adopt and survive!