Skip to content

Instantly share code, notes, and snippets.

@z5ottu
Forked from bowrocker/riak_kv_vnode.erl
Created May 15, 2018 19:05
Show Gist options
  • Save z5ottu/4ac434e324a3e4c52d5707f5df847bc8 to your computer and use it in GitHub Desktop.
Save z5ottu/4ac434e324a3e4c52d5707f5df847bc8 to your computer and use it in GitHub Desktop.
riak_kv_vnode.erl changes to compile with YZ merge problem
%% -------------------------------------------------------------------
%%
%% riak_kv_vnode: VNode Implementation
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_kv_vnode).
-behaviour(riak_core_vnode).
%% API
-export([test_vnode/1, put/7]).
-export([start_vnode/1,
start_vnodes/1,
get/3,
del/3,
put/6,
local_get/2,
local_put/2,
local_put/3,
coord_put/6,
readrepair/6,
list_keys/4,
fold/3,
get_vclocks/2,
vnode_status/1,
ack_keys/1,
repair/1,
repair_status/1,
repair_filter/1,
hashtree_pid/1,
rehash/3,
request_hashtree_pid/1,
reformat_object/2,
stop_fold/1]).
%% riak_core_vnode API
-export([init/1,
terminate/2,
handle_command/3,
handle_overload_command/3,
handle_coverage/4,
is_empty/1,
delete/1,
request_hash/1,
object_info/1,
nval_map/1,
handle_handoff_command/3,
handoff_starting/2,
handoff_cancelled/1,
handoff_finished/2,
handle_handoff_data/2,
encode_handoff_item/2,
handle_exit/3,
handle_info/2]).
-export([handoff_data_encoding_method/0]).
-include_lib("riak_kv_vnode.hrl").
-include_lib("riak_kv_map_phase.hrl").
-include_lib("riak_core/include/riak_core_pb.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([put_merge/6]). %% For fsm_eqc_vnode
-endif.
%% N.B. The ?INDEX macro should be called any time the object bytes on
%% disk are modified.
-ifdef(TEST).
%% Use values so that test compile doesn't give 'unused vars' warning.
-define(INDEX(A,B,C), _=element(1,{A,B,C}), ok).
-else.
-define(INDEX(Obj, Reason, Partition), yz_kv:index(Obj, Reason, Partition)).
-endif.
-record(mrjob, {cachekey :: term(),
bkey :: term(),
reqid :: term(),
target :: pid()}).
-record(state, {idx :: partition(),
mod :: module(),
modstate :: term(),
mrjobs :: term(),
vnodeid :: undefined | binary(),
delete_mode :: keep | immediate | pos_integer(),
bucket_buf_size :: pos_integer(),
index_buf_size :: pos_integer(),
key_buf_size :: pos_integer(),
async_folding :: boolean(),
in_handoff = false :: boolean(),
hashtrees :: pid() }).
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
-type index() :: non_neg_integer().
-type state() :: #state{}.
-define(DEFAULT_HASHTREE_TOKENS, 90).
-record(putargs, {returnbody :: boolean(),
coord:: boolean(),
lww :: boolean(),
bkey :: {binary(), binary()},
robj :: term(),
index_specs=[] :: [{index_op(), binary(), index_value()}],
reqid :: non_neg_integer(),
bprops :: maybe_improper_list(),
starttime :: non_neg_integer(),
prunetime :: undefined| non_neg_integer(),
is_index=false :: boolean(), %% set if the b/end supports indexes
counter_op = undefined :: undefined | integer() %% if set this is a counter operation
}).
-spec maybe_create_hashtrees(state()) -> state().
maybe_create_hashtrees(State) ->
maybe_create_hashtrees(riak_kv_entropy_manager:enabled(), State).
-spec maybe_create_hashtrees(boolean(), state()) -> state().
maybe_create_hashtrees(false, State) ->
State;
maybe_create_hashtrees(true, State=#state{idx=Index}) ->
%% Only maintain a hashtree if a primary vnode
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
case riak_core_ring:vnode_type(Ring, Index) of
primary ->
RP = riak_kv_util:responsible_preflists(Index),
case riak_kv_index_hashtree:start(Index, RP, self()) of
{ok, Trees} ->
monitor(process, Trees),
State#state{hashtrees=Trees};
Error ->
lager:info("riak_kv/~p: unable to start index_hashtree: ~p",
[Index, Error]),
erlang:send_after(1000, self(), retry_create_hashtree),
State#state{hashtrees=undefined}
end;
_ ->
State
end.
%% API
start_vnode(I) ->
riak_core_vnode_master:get_vnode_pid(I, riak_kv_vnode).
start_vnodes(IdxList) ->
riak_core_vnode_master:get_vnode_pid(IdxList, riak_kv_vnode).
test_vnode(I) ->
riak_core_vnode:start_link(riak_kv_vnode, I, infinity).
get(Preflist, BKey, ReqId) ->
%% Assuming this function is called from a FSM process
%% so self() == FSM pid
get(Preflist, BKey, ReqId, {fsm, undefined, self()}).
get(Preflist, BKey, ReqId, Sender) ->
Req = ?KV_GET_REQ{bkey=sanitize_bkey(BKey),
req_id=ReqId},
riak_core_vnode_master:command(Preflist,
Req,
Sender,
riak_kv_vnode_master).
del(Preflist, BKey, ReqId) ->
riak_core_vnode_master:command(Preflist,
?KV_DELETE_REQ{bkey=sanitize_bkey(BKey),
req_id=ReqId},
riak_kv_vnode_master).
%% Issue a put for the object to the preflist, expecting a reply
%% to an FSM.
put(Preflist, BKey, Obj, ReqId, StartTime, Options) when is_integer(StartTime) ->
put(Preflist, BKey, Obj, ReqId, StartTime, Options, {fsm, undefined, self()}).
put(Preflist, BKey, Obj, ReqId, StartTime, Options, Sender)
when is_integer(StartTime) ->
riak_core_vnode_master:command(Preflist,
?KV_PUT_REQ{
bkey = sanitize_bkey(BKey),
object = Obj,
req_id = ReqId,
start_time = StartTime,
options = Options},
Sender,
riak_kv_vnode_master).
local_put(Index, Obj) ->
local_put(Index, Obj, []).
local_put(Index, Obj, Options) ->
BKey = {riak_object:bucket(Obj), riak_object:key(Obj)},
Ref = make_ref(),
ReqId = erlang:phash2(erlang:now()),
StartTime = riak_core_util:moment(),
Sender = {raw, Ref, self()},
put({Index, node()}, BKey, Obj, ReqId, StartTime, Options, Sender),
receive
{Ref, Reply} ->
Reply
end.
local_get(Index, BKey) ->
Ref = make_ref(),
ReqId = erlang:phash2(erlang:now()),
Sender = {raw, Ref, self()},
get({Index,node()}, BKey, ReqId, Sender),
receive
{Ref, {r, Result, Index, ReqId}} ->
Result;
{Ref, Reply} ->
{error, Reply}
end.
%% Issue a put for the object to the preflist, expecting a reply
%% to an FSM.
coord_put(IndexNode, BKey, Obj, ReqId, StartTime, Options) when is_integer(StartTime) ->
coord_put(IndexNode, BKey, Obj, ReqId, StartTime, Options, {fsm, undefined, self()}).
coord_put(IndexNode, BKey, Obj, ReqId, StartTime, Options, Sender)
when is_integer(StartTime) ->
riak_core_vnode_master:command(IndexNode,
?KV_PUT_REQ{
bkey = sanitize_bkey(BKey),
object = Obj,
req_id = ReqId,
start_time = StartTime,
options = [coord | Options]},
Sender,
riak_kv_vnode_master).
%% Do a put without sending any replies
readrepair(Preflist, BKey, Obj, ReqId, StartTime, Options) ->
put(Preflist, BKey, Obj, ReqId, StartTime, [rr | Options], ignore).
list_keys(Preflist, ReqId, Caller, Bucket) ->
riak_core_vnode_master:command(Preflist,
#riak_kv_listkeys_req_v2{
bucket=Bucket,
req_id=ReqId,
caller=Caller},
ignore,
riak_kv_vnode_master).
fold(Preflist, Fun, Acc0) ->
riak_core_vnode_master:sync_spawn_command(Preflist,
?FOLD_REQ{
foldfun=Fun,
acc0=Acc0},
riak_kv_vnode_master).
get_vclocks(Preflist, BKeyList) ->
riak_core_vnode_master:sync_spawn_command(Preflist,
?KV_VCLOCK_REQ{bkeys=BKeyList},
riak_kv_vnode_master).
%% @doc Get status information about the node local vnodes.
-spec vnode_status([{partition(), pid()}]) -> [{atom(), term()}].
vnode_status(PrefLists) ->
ReqId = erlang:phash2({self(), os:timestamp()}),
%% Get the status of each vnode
riak_core_vnode_master:command(PrefLists,
?KV_VNODE_STATUS_REQ{},
{raw, ReqId, self()},
riak_kv_vnode_master),
wait_for_vnode_status_results(PrefLists, ReqId, []).
%% @doc Repair the given `Partition'.
-spec repair(partition()) ->
{ok, Pairs::[{partition(), node()}]} |
{down, Down::[{partition(), node()}]}.
repair(Partition) ->
Service = riak_kv,
MP = {riak_kv_vnode, Partition},
FilterModFun = {?MODULE, repair_filter},
riak_core_vnode_manager:repair(Service, MP, FilterModFun).
%% @doc Get the status of the repair process for the given `Partition'.
-spec repair_status(partition()) -> not_found | in_progress.
repair_status(Partition) ->
riak_core_vnode_manager:repair_status({riak_kv_vnode, Partition}).
%% @doc Given a `Target' partition generate a `Filter' fun to use
%% during partition repair.
-spec repair_filter(partition()) -> Filter::function().
repair_filter(Target) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
riak_core_repair:gen_filter(Target,
Ring,
nval_map(Ring),
riak_core_bucket:default_object_nval(),
fun object_info/1).
-spec hashtree_pid(index()) -> {ok, pid()}.
hashtree_pid(Partition) ->
riak_core_vnode_master:sync_command({Partition, node()},
{hashtree_pid, node()},
riak_kv_vnode_master,
infinity).
%% Asynchronous version of {@link hashtree_pid/1} that sends a message back to
%% the calling process. Used by the {@link riak_kv_entropy_manager}.
-spec request_hashtree_pid(index()) -> ok.
request_hashtree_pid(Partition) ->
ReqId = {hashtree_pid, Partition},
riak_core_vnode_master:command({Partition, node()},
{hashtree_pid, node()},
{raw, ReqId, self()},
riak_kv_vnode_master).
%% Used by {@link riak_kv_exchange_fsm} to force a vnode to update the hashtree
%% for repaired keys. Typically, repairing keys will trigger read repair that
%% will update the AAE hash in the write path. However, if the AAE tree is
%% divergent from the KV data, it is possible that AAE will try to repair keys
%% that do not have divergent KV replicas. In that case, read repair is never
%% triggered. Always rehashing keys after any attempt at repair ensures that
%% AAE does not try to repair the same non-divergent keys over and over.
rehash(Preflist, Bucket, Key) ->
riak_core_vnode_master:command(Preflist,
{rehash, Bucket, Key},
ignore,
riak_kv_vnode_master).
-spec reformat_object(index(), {riak_object:bucket(), riak_object:key()}) ->
ok | {error, term()}.
reformat_object(Partition, BKey) ->
riak_core_vnode_master:sync_spawn_command({Partition, node()},
{reformat_object,
sanitize_bkey(BKey)},
riak_kv_vnode_master).
%% VNode callbacks
init([Index]) ->
Mod = app_helper:get_env(riak_kv, storage_backend),
Configuration = app_helper:get_env(riak_kv),
BucketBufSize = app_helper:get_env(riak_kv, bucket_buffer_size, 1000),
IndexBufSize = app_helper:get_env(riak_kv, index_buffer_size, 100),
KeyBufSize = app_helper:get_env(riak_kv, key_buffer_size, 100),
WorkerPoolSize = app_helper:get_env(riak_kv, worker_pool_size, 10),
{ok, VId} = get_vnodeid(Index),
DeleteMode = app_helper:get_env(riak_kv, delete_mode, 3000),
AsyncFolding = app_helper:get_env(riak_kv, async_folds, true) == true,
case catch Mod:start(Index, Configuration) of
{ok, ModState} ->
%% Get the backend capabilities
State = #state{idx=Index,
async_folding=AsyncFolding,
mod=Mod,
modstate=ModState,
vnodeid=VId,
delete_mode=DeleteMode,
bucket_buf_size=BucketBufSize,
index_buf_size=IndexBufSize,
key_buf_size=KeyBufSize,
mrjobs=dict:new()},
case AsyncFolding of
true ->
%% Create worker pool initialization tuple
FoldWorkerPool = {pool, riak_kv_worker, WorkerPoolSize, []},
State2 = maybe_create_hashtrees(State),
{ok, State2, [FoldWorkerPool]};
false ->
{ok, State}
end;
{error, Reason} ->
lager:error("Failed to start ~p Reason: ~p",
[Mod, Reason]),
riak:stop("backend module failed to start."),
{error, Reason};
{'EXIT', Reason1} ->
lager:error("Failed to start ~p Reason: ~p",
[Mod, Reason1]),
riak:stop("backend module failed to start."),
{error, Reason1}
end.
handle_overload_command(?KV_PUT_REQ{}, Sender, Idx) ->
riak_core_vnode:reply(Sender, {fail, Idx, overload}); % subvert ReqId
handle_overload_command(?KV_GET_REQ{req_id=ReqID}, Sender, Idx) ->
riak_core_vnode:reply(Sender, {r, {error, overload}, Idx, ReqID});
handle_overload_command(_, _, _) ->
%% not handled yet
ok.
handle_command(?KV_PUT_REQ{bkey=BKey,
object=Object,
req_id=ReqId,
start_time=StartTime,
options=Options},
Sender, State=#state{idx=Idx}) ->
StartTS = os:timestamp(),
riak_core_vnode:reply(Sender, {w, Idx, ReqId}),
UpdState = do_put(Sender, BKey, Object, ReqId, StartTime, Options, State),
update_vnode_stats(vnode_put, Idx, StartTS),
{noreply, UpdState};
handle_command(?KV_GET_REQ{bkey=BKey,req_id=ReqId},Sender,State) ->
do_get(Sender, BKey, ReqId, State);
handle_command(#riak_kv_listkeys_req_v2{bucket=Input, req_id=ReqId, caller=Caller}, _Sender,
State=#state{async_folding=AsyncFolding,
key_buf_size=BufferSize,
mod=Mod,
modstate=ModState,
idx=Idx}) ->
case Input of
{filter, Bucket, Filter} ->
ok;
Bucket ->
Filter = none
end,
BufferMod = riak_kv_fold_buffer,
case Bucket of
'_' ->
{ok, Capabilities} = Mod:capabilities(ModState),
AsyncBackend = lists:member(async_fold, Capabilities),
case AsyncFolding andalso AsyncBackend of
true ->
Opts = [async_fold];
false ->
Opts = []
end,
BufferFun =
fun(Results) ->
UniqueResults = lists:usort(Results),
Caller ! {ReqId, {kl, Idx, UniqueResults}}
end,
FoldFun = fold_fun(buckets, BufferMod, Filter),
ModFun = fold_buckets;
_ ->
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
AsyncBackend = lists:member(async_fold, Capabilities),
case AsyncFolding andalso AsyncBackend of
true ->
Opts = [async_fold, {bucket, Bucket}];
false ->
Opts = [{bucket, Bucket}]
end,
BufferFun =
fun(Results) ->
Caller ! {ReqId, {kl, Idx, Results}}
end,
FoldFun = fold_fun(keys, BufferMod, Filter),
ModFun = fold_keys
end,
Buffer = BufferMod:new(BufferSize, BufferFun),
FinishFun =
fun(Buffer1) ->
riak_kv_fold_buffer:flush(Buffer1),
Caller ! {ReqId, Idx, done}
end,
case list(FoldFun, FinishFun, Mod, ModFun, ModState, Opts, Buffer) of
{async, AsyncWork} ->
{async, {fold, AsyncWork, FinishFun}, Caller, State};
_ ->
{noreply, State}
end;
handle_command(?KV_DELETE_REQ{bkey=BKey, req_id=ReqId}, _Sender, State) ->
do_delete(BKey, ReqId, State);
handle_command(?KV_VCLOCK_REQ{bkeys=BKeys}, _Sender, State) ->
{reply, do_get_vclocks(BKeys, State), State};
handle_command(?FOLD_REQ{foldfun=FoldFun, acc0=Acc0}, Sender, State) ->
%% The function in riak_core used for object folding expects the
%% bucket and key pair to be passed as the first parameter, but in
%% riak_kv the bucket and key have been separated. This function
%% wrapper is to address this mismatch.
FoldWrapper = fun(Bucket, Key, Value, Acc) ->
FoldFun({Bucket, Key}, Value, Acc)
end,
do_fold(FoldWrapper, Acc0, Sender, State);
%% entropy exchange commands
handle_command({hashtree_pid, Node}, _, State=#state{hashtrees=HT}) ->
%% Handle riak_core request forwarding during ownership handoff.
case node() of
Node ->
%% Following is necessary in cases where anti-entropy was enabled
%% after the vnode was already running
case HT of
undefined ->
State2 = maybe_create_hashtrees(State),
{reply, {ok, State2#state.hashtrees}, State2};
_ ->
{reply, {ok, HT}, State}
end;
_ ->
{reply, {error, wrong_node}, State}
end;
handle_command({rehash, Bucket, Key}, _, State=#state{mod=Mod, modstate=ModState}) ->
case do_get_binary(Bucket, Key, Mod, ModState) of
{ok, Bin, _UpdModState} ->
update_hashtree(Bucket, Key, Bin, State);
_ ->
%% Make sure hashtree isn't tracking deleted data
riak_kv_index_hashtree:delete({Bucket, Key}, State#state.hashtrees)
end,
{noreply, State};
%% Commands originating from inside this vnode
handle_command({backend_callback, Ref, Msg}, _Sender,
State=#state{mod=Mod, modstate=ModState}) ->
Mod:callback(Ref, Msg, ModState),
{noreply, State};
handle_command({mapexec_error_noretry, JobId, Err}, _Sender, #state{mrjobs=Jobs}=State) ->
NewState = case dict:find(JobId, Jobs) of
{ok, Job} ->
Jobs1 = dict:erase(JobId, Jobs),
#mrjob{target=Target} = Job,
gen_fsm:send_event(Target, {mapexec_error_noretry, self(), Err}),
State#state{mrjobs=Jobs1};
error ->
State
end,
{noreply, NewState};
handle_command({mapexec_reply, JobId, Result}, _Sender, #state{mrjobs=Jobs}=State) ->
NewState = case dict:find(JobId, Jobs) of
{ok, Job} ->
Jobs1 = dict:erase(JobId, Jobs),
#mrjob{target=Target} = Job,
gen_fsm:send_event(Target, {mapexec_reply, Result, self()}),
State#state{mrjobs=Jobs1};
error ->
State
end,
{noreply, NewState};
handle_command(?KV_VNODE_STATUS_REQ{},
_Sender,
State=#state{idx=Index,
mod=Mod,
modstate=ModState}) ->
BackendStatus = {backend_status, Mod, Mod:status(ModState)},
VNodeStatus = [BackendStatus],
{reply, {vnode_status, Index, VNodeStatus}, State};
handle_command({reformat_object, BKey}, _Sender, State) ->
{Reply, UpdState} = do_reformat(BKey, State),
{reply, Reply, UpdState};
handle_command({fix_incorrect_index_entry, {done, ForUpgrade}}, _Sender,
State=#state{mod=Mod, modstate=ModState}) ->
case Mod:mark_indexes_fixed(ModState, ForUpgrade) of %% only defined for eleveldb backend
{ok, NewModState} ->
{reply, ok, State#state{modstate=NewModState}};
{error, _Reason} ->
{reply, error, State}
end;
handle_command({fix_incorrect_index_entry, Keys, ForUpgrade},
_Sender,
State=#state{mod=Mod,
modstate=ModState}) ->
Reply =
case Mod:fix_index(Keys, ForUpgrade, ModState) of
{ok, _UpModState} ->
ok;
{ignore, _UpModState} ->
ignore;
{error, Reason, _UpModState} ->
{error, Reason};
{reply, Totals, _UpModState} ->
Totals
end,
{reply, Reply, State};
handle_command({get_index_entries, Opts},
Sender,
State=#state{mod=Mod,
modstate=ModState0}) ->
ForUpgrade = not proplists:get_value(downgrade, Opts, false),
BufferSize = proplists:get_value(batch_size, Opts, 1),
{ok, Caps} = Mod:capabilities(ModState0),
case lists:member(index_reformat, Caps) of
true ->
ModState = Mod:set_legacy_indexes(ModState0, not ForUpgrade),
Status = Mod:fixed_index_status(ModState),
case {ForUpgrade, Status} of
{true, true} -> {reply, done, State};
{_, _} ->
BufferMod = riak_kv_fold_buffer,
ResultFun =
fun(Results) ->
% Send result batch and wait for acknowledgement
% before moving on (backpressure to avoid flooding caller).
BatchRef = make_ref(),
riak_core_vnode:reply(Sender, {self(), BatchRef, Results}),
Monitor = riak_core_vnode:monitor(Sender),
receive
{ack_keys, BatchRef} ->
erlang:demonitor(Monitor, [flush]);
{'DOWN', Monitor, process, _Pid, _Reason} ->
throw(index_reformat_client_died)
end
end,
Buffer = BufferMod:new(BufferSize, ResultFun),
FoldFun = fun(B, K, Buf) -> BufferMod:add({B, K}, Buf) end,
FinishFun =
fun(FinalBuffer) ->
BufferMod:flush(FinalBuffer),
riak_core_vnode:reply(Sender, done)
end,
FoldOpts = [{index, incorrect_format, ForUpgrade}, async_fold],
case list(FoldFun, FinishFun, Mod, fold_keys, ModState, FoldOpts, Buffer) of
{async, AsyncWork} ->
{async, {fold, AsyncWork, FinishFun}, Sender, State};
_ ->
{noreply, State}
end
end;
false ->
lager:error("Backend ~p does not support incorrect index query", [Mod]),
{reply, ignore, State}
end.
%% @doc Handle a coverage request.
%% More information about the specification for the ItemFilter
%% parameter can be found in the documentation for the
%% {@link riak_kv_coverage_filter} module.
handle_coverage(?KV_LISTBUCKETS_REQ{item_filter=ItemFilter},
_FilterVNodes,
Sender,
State=#state{async_folding=AsyncFolding,
bucket_buf_size=BufferSize,
mod=Mod,
modstate=ModState}) ->
%% Construct the filter function
Filter = riak_kv_coverage_filter:build_filter(all, ItemFilter, undefined),
BufferMod = riak_kv_fold_buffer,
Buffer = BufferMod:new(BufferSize, result_fun(Sender)),
FoldFun = fold_fun(buckets, BufferMod, Filter),
FinishFun = finish_fun(BufferMod, Sender),
{ok, Capabilities} = Mod:capabilities(ModState),
AsyncBackend = lists:member(async_fold, Capabilities),
case AsyncFolding andalso AsyncBackend of
true ->
Opts = [async_fold];
false ->
Opts = []
end,
case list(FoldFun, FinishFun, Mod, fold_buckets, ModState, Opts, Buffer) of
{async, AsyncWork} ->
{async, {fold, AsyncWork, FinishFun}, Sender, State};
_ ->
{noreply, State}
end;
handle_coverage(#riak_kv_listkeys_req_v3{bucket=Bucket,
item_filter=ItemFilter},
FilterVNodes, Sender, State) ->
%% v3 == no backpressure
ResultFun = result_fun(Bucket, Sender),
Opts = [{bucket, Bucket}],
handle_coverage_keyfold(Bucket, ItemFilter, ResultFun,
FilterVNodes, Sender, Opts, State);
handle_coverage(?KV_LISTKEYS_REQ{bucket=Bucket,
item_filter=ItemFilter},
FilterVNodes, Sender, State) ->
%% v4 == ack-based backpressure
ResultFun = result_fun_ack(Bucket, Sender),
Opts = [{bucket, Bucket}],
handle_coverage_keyfold(Bucket, ItemFilter, ResultFun,
FilterVNodes, Sender, Opts, State);
handle_coverage(#riak_kv_index_req_v1{bucket=Bucket,
item_filter=ItemFilter,
qry=Query},
FilterVNodes, Sender, State) ->
%% v1 == no backpressure
handle_coverage_index(Bucket, ItemFilter, Query,
FilterVNodes, Sender, State, fun result_fun/2);
handle_coverage(?KV_INDEX_REQ{bucket=Bucket,
item_filter=ItemFilter,
qry=Query},
FilterVNodes, Sender, State) ->
%% v2 = ack-based backpressure
handle_coverage_index(Bucket, ItemFilter, Query,
FilterVNodes, Sender, State, fun result_fun_ack/2).
handle_coverage_index(Bucket, ItemFilter, Query,
FilterVNodes, Sender,
State=#state{mod=Mod,
modstate=ModState},
ResultFunFun) ->
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
IndexBackend = lists:member(indexes, Capabilities),
case IndexBackend of
true ->
%% Update stats...
riak_kv_stat:update(vnode_index_read),
ResultFun = ResultFunFun(Bucket, Sender),
Opts = [{index, Bucket, Query},
{bucket, Bucket}],
%% @HACK
%% Really this should be decided in the backend
%% if there was a index_query fun.
FoldType = case riak_index:return_body(Query) of
true -> fold_objects;
false -> fold_keys
end,
handle_coverage_fold(FoldType, Bucket, ItemFilter, ResultFun,
FilterVNodes, Sender, Opts, State);
false ->
{reply, {error, {indexes_not_supported, Mod}}, State}
end.
%% Convenience for handling both v3 and v4 coverage-based key fold operations
handle_coverage_keyfold(Bucket, ItemFilter, Query,
FilterVNodes, Sender, State,
ResultFunFun) ->
handle_coverage_fold(fold_keys, Bucket, ItemFilter, Query,
FilterVNodes, Sender, State, ResultFunFun).
%% Until a bit of a refactor can occur to better abstract
%% index operations, allow the ModFun for folding to be declared
%% to support index operations that can return objects
handle_coverage_fold(FoldType, Bucket, ItemFilter, ResultFun,
FilterVNodes, Sender, Opts0,
State=#state{async_folding=AsyncFolding,
idx=Index,
key_buf_size=BufferSize,
mod=Mod,
modstate=ModState}) ->
%% Construct the filter function
FilterVNode = proplists:get_value(Index, FilterVNodes),
Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
BufferMod = riak_kv_fold_buffer,
Buffer = BufferMod:new(BufferSize, ResultFun),
FoldFun = fold_fun(keys, BufferMod, Filter),
FinishFun = finish_fun(BufferMod, Sender),
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
AsyncBackend = lists:member(async_fold, Capabilities),
Opts = case AsyncFolding andalso AsyncBackend of
true ->
[async_fold | Opts0];
false ->
Opts0
end,
case list(FoldFun, FinishFun, Mod, FoldType, ModState, Opts, Buffer) of
{async, AsyncWork} ->
{async, {fold, AsyncWork, FinishFun}, Sender, State};
_ ->
{noreply, State}
end.
%% While in handoff, vnodes have the option of returning {forward, State}
%% which will cause riak_core to forward the request to the handoff target
%% node. For riak_kv, we issue a put locally as well as forward it in case
%% the vnode has already handed off the previous version. All other requests
%% are handled locally and not forwarded since the relevant data may not have
%% yet been handed off to the target node. Since we do not forward deletes it
%% is possible that we do not clear a tombstone that was already handed off.
%% This is benign as the tombstone will eventually be re-deleted.
handle_handoff_command(Req=?KV_PUT_REQ{}, Sender, State) ->
{noreply, NewState} = handle_command(Req, Sender, State),
{forward, NewState};
%% Handle all unspecified cases locally without forwarding
handle_handoff_command(Req, Sender, State) ->
handle_command(Req, Sender, State).
%% callback used by dynamic ring sizing to determine where
%% requests should be forwarded. Puts/deletes are forwarded
%% during the operation, all other requests are not
request_hash(?KV_PUT_REQ{bkey=BKey}) ->
riak_core_util:chash_key(BKey);
request_hash(?KV_DELETE_REQ{bkey=BKey}) ->
riak_core_util:chash_key(BKey);
request_hash(_Req) ->
undefined.
handoff_starting(_TargetNode, State) ->
{true, State#state{in_handoff=true}}.
handoff_cancelled(State) ->
{ok, State#state{in_handoff=false}}.
handoff_finished(_TargetNode, State) ->
{ok, State}.
handle_handoff_data(BinObj, State) ->
try
{BKey, Val} = decode_binary_object(BinObj),
{B, K} = BKey,
case do_diffobj_put(BKey, riak_object:from_binary(B, K, Val),
State) of
{ok, UpdModState} ->
{reply, ok, State#state{modstate=UpdModState}};
{error, Reason, UpdModState} ->
{reply, {error, Reason}, State#state{modstate=UpdModState}};
Err ->
{reply, {error, Err}, State}
end
catch Error:Reason2 ->
lager:warning("Unreadable object discarded in handoff: ~p:~p",
[Error, Reason2]),
{reply, ok, State}
end.
encode_handoff_item({B, K}, V) ->
%% before sending data to another node change binary version
%% to one supported by the cluster. This way we don't send
%% unsupported formats to old nodes
ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
try
Value = riak_object:to_binary_version(ObjFmt, B, K, V),
encode_binary_object(B, K, Value)
catch Error:Reason ->
lager:warning("Handoff encode failed: ~p:~p",
[Error,Reason]),
corrupted
end.
is_empty(State=#state{mod=Mod, modstate=ModState}) ->
IsEmpty = Mod:is_empty(ModState),
case IsEmpty of
true ->
{true, State};
false ->
Size = maybe_calc_handoff_size(State),
{false, Size, State}
end.
maybe_calc_handoff_size(#state{mod=Mod,modstate=ModState}) ->
{ok, Capabilities} = Mod:capabilities(ModState),
case lists:member(size, Capabilities) of
true -> Mod:data_size(ModState);
false -> undefined
end.
delete(State=#state{idx=Index,mod=Mod, modstate=ModState}) ->
%% clear vnodeid first, if drop removes data but fails
%% want to err on the side of creating a new vnodeid
{ok, cleared} = clear_vnodeid(Index),
case Mod:drop(ModState) of
{ok, UpdModState} ->
ok;
{error, Reason, UpdModState} ->
lager:error("Failed to drop ~p. Reason: ~p~n", [Mod, Reason]),
ok
end,
case State#state.hashtrees of
undefined ->
ok;
HT ->
riak_kv_index_hashtree:destroy(HT)
end,
{ok, State#state{modstate=UpdModState,vnodeid=undefined,hashtrees=undefined}}.
terminate(_Reason, #state{mod=Mod, modstate=ModState}) ->
Mod:stop(ModState),
ok.
handle_info(retry_create_hashtree, State=#state{hashtrees=undefined}) ->
State2 = maybe_create_hashtrees(State),
case State2#state.hashtrees of
undefined ->
ok;
_ ->
lager:info("riak_kv/~p: successfully started index_hashtree on retry",
[State#state.idx])
end,
{ok, State2};
handle_info(retry_create_hashtree, State) ->
{ok, State};
handle_info({'DOWN', _, _, Pid, _}, State=#state{hashtrees=Pid}) ->
State2 = State#state{hashtrees=undefined},
State3 = maybe_create_hashtrees(State2),
{ok, State3};
handle_info({'DOWN', _, _, _, _}, State) ->
{ok, State};
handle_info({final_delete, BKey, RObjHash}, State = #state{mod=Mod, modstate=ModState}) ->
UpdState = case do_get_term(BKey, Mod, ModState) of
{ok, RObj} ->
case delete_hash(RObj) of
RObjHash ->
do_backend_delete(BKey, RObj, State);
_ ->
State
end;
_ ->
State
end,
{ok, UpdState}.
handle_exit(_Pid, Reason, State) ->
%% A linked processes has died so the vnode
%% process should take appropriate action here.
%% The default behavior is to crash the vnode
%% process so that it can be respawned
%% by riak_core_vnode_master to prevent
%% messages from stacking up on the process message
%% queue and never being processed.
lager:error("Linked process exited. Reason: ~p", [Reason]),
{stop, linked_process_crash, State}.
%% @private
%% upon receipt of a client-initiated put
do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) ->
case proplists:get_value(bucket_props, Options) of
undefined ->
BProps = riak_core_bucket:get_bucket(Bucket);
BProps ->
BProps
end,
case proplists:get_value(rr, Options, false) of
true ->
PruneTime = undefined;
false ->
PruneTime = StartTime
end,
Coord = proplists:get_value(coord, Options, false),
CounterOp = proplists:get_value(counter_op, Options, undefined),
PutArgs = #putargs{returnbody=proplists:get_value(returnbody,Options,false) orelse Coord,
coord=Coord,
lww=proplists:get_value(last_write_wins, BProps, false),
bkey=BKey,
robj=RObj,
reqid=ReqID,
bprops=BProps,
starttime=StartTime,
prunetime=PruneTime,
counter_op = CounterOp},
{PrepPutRes, UpdPutArgs} = prepare_put(State, PutArgs),
{Reply, UpdState} = perform_put(PrepPutRes, State, UpdPutArgs),
riak_core_vnode:reply(Sender, Reply),
update_index_write_stats(UpdPutArgs#putargs.is_index, UpdPutArgs#putargs.index_specs),
UpdState.
do_backend_delete(BKey, RObj, State = #state{%idx = Idx,
mod = Mod,
modstate = ModState}) ->
%% object is a tombstone or all siblings are tombstones
%% Calculate the index specs to remove...
%% JDM: This should just be a tombstone by this point, but better
%% safe than sorry.
IndexSpecs = riak_object:diff_index_specs(undefined, RObj),
%% Do the delete...
{Bucket, Key} = BKey,
case Mod:delete(Bucket, Key, IndexSpecs, ModState) of
{ok, UpdModState} ->
%?INDEX(RObj, delete, Idx),
riak_kv_index_hashtree:delete(BKey, State#state.hashtrees),
update_index_delete_stats(IndexSpecs),
State#state{modstate = UpdModState};
{error, _Reason, UpdModState} ->
State#state{modstate = UpdModState}
end.
%% Compute a hash of the deleted object
delete_hash(RObj) ->
erlang:phash2(RObj, 4294967296).
prepare_put(State=#state{vnodeid=VId,
mod=Mod,
modstate=ModState},
PutArgs=#putargs{bkey={Bucket, _Key},
lww=LWW,
coord=Coord,
robj=RObj,
starttime=StartTime}) ->
%% Can we avoid reading the existing object? If this is not an
%% index backend, and the bucket is set to last-write-wins, then
%% no need to incur additional get. Otherwise, we need to read the
%% old object to know how the indexes have changed.
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
IndexBackend = lists:member(indexes, Capabilities),
case LWW andalso not IndexBackend of
true ->
ObjToStore =
case Coord of
true ->
riak_object:increment_vclock(RObj, VId, StartTime);
false ->
RObj
end,
{{true, ObjToStore}, PutArgs#putargs{is_index = false}};
false ->
prepare_put(State, PutArgs, IndexBackend)
end.
prepare_put(#state{vnodeid=VId,
mod=Mod,
modstate=ModState},
PutArgs=#putargs{bkey={Bucket, Key},
robj=RObj,
bprops=BProps,
coord=Coord,
lww=LWW,
starttime=StartTime,
prunetime=PruneTime,
counter_op = CounterOp},
IndexBackend) ->
GetReply =
case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
ok;
{ok, TheOldObj, _UpdModState} ->
{ok, TheOldObj}
end,
case GetReply of
ok ->
case IndexBackend of
true ->
IndexSpecs = riak_object:index_specs(RObj);
false ->
IndexSpecs = []
end,
ObjToStore = prepare_new_put(Coord, RObj, VId, StartTime, CounterOp),
{{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}};
{ok, OldObj} ->
case put_merge(Coord, LWW, OldObj, RObj, VId, StartTime) of
{oldobj, OldObj1} ->
{{false, OldObj1}, PutArgs};
{newobj, NewObj} ->
VC = riak_object:vclock(NewObj),
AMObj = enforce_allow_mult(NewObj, BProps),
IndexSpecs = case IndexBackend of
true ->
riak_object:diff_index_specs(AMObj,
OldObj);
false ->
[]
end,
ObjToStore = case PruneTime of
undefined ->
AMObj;
_ ->
riak_object:set_vclock(AMObj,
vclock:prune(VC,
PruneTime,
BProps))
end,
ObjToStore2 = handle_counter(Coord, CounterOp, VId, ObjToStore),
{{true, ObjToStore2},
PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}}
end
end.
%% @Doc in the case that this a co-ordinating put, prepare the object.
prepare_new_put(true, RObj, VId, StartTime, CounterOp) when is_integer(CounterOp) ->
VClockUp = riak_object:increment_vclock(RObj, VId, StartTime),
%% coordinating a _NEW_ counter operation means
%% creating + incrementing the counter.
%% Make a new counter, stuff it in the riak_object
riak_kv_counter:update(VClockUp, VId, CounterOp);
prepare_new_put(true, RObj, VId, StartTime, _CounterOp) ->
riak_object:increment_vclock(RObj, VId, StartTime);
prepare_new_put(false, RObj, _VId, _StartTime, _CounterOp) ->
RObj.
handle_counter(true, CounterOp, VId, RObj) when is_integer(CounterOp) ->
riak_kv_counter:update(RObj, VId, CounterOp);
handle_counter(false, CounterOp, _Vid, RObj) when is_integer(CounterOp) ->
%% non co-ord put, merge the values if there are siblings
%% 'cos that is the point of CRDTs / counters: no siblings
riak_kv_counter:merge(RObj);
handle_counter(_Coord, _CounterOp, _VId, RObj) ->
%% i.e. not a counter op
RObj.
perform_put({false, Obj},
#state{idx=Idx}=State,
#putargs{returnbody=true,
reqid=ReqID}) ->
{{dw, Idx, Obj, ReqID}, State};
perform_put({false, _Obj},
#state{idx=Idx}=State,
#putargs{returnbody=false,
reqid=ReqId}) ->
{{dw, Idx, ReqId}, State};
perform_put({true, Obj},
#state{idx=Idx,
mod=Mod,
modstate=ModState}=State,
#putargs{returnbody=RB,
bkey={Bucket, Key},
reqid=ReqID,
index_specs=IndexSpecs}) ->
case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of
{{ok, UpdModState}, EncodedVal} ->
update_hashtree(Bucket, Key, EncodedVal, State),
%?INDEX(Obj, put, Idx),
case RB of
true ->
Reply = {dw, Idx, Obj, ReqID};
false ->
Reply = {dw, Idx, ReqID}
end;
{{error, _Reason, UpdModState}, _EncodedVal} ->
Reply = {fail, Idx, ReqID}
end,
{Reply, State#state{modstate=UpdModState}}.
do_reformat({Bucket, Key}=BKey, State=#state{mod=Mod, modstate=ModState}) ->
case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
Reply = {error, not_found},
UpdState = State;
{ok, RObj, _UpdModState} ->
%% since it is assumed capabilities have been properly set
%% to the desired version, to reformat, all we need to do
%% is submit a new write
PutArgs = #putargs{returnbody=false,
bkey=BKey,
reqid=undefined,
index_specs=[]},
case perform_put({true, RObj}, State, PutArgs) of
{{fail, _, _}, UpdState} ->
Reply = {error, backend_error};
{_, UpdState} ->
Reply = ok
end
end,
{Reply, UpdState}.
%% @private
%% enforce allow_mult bucket property so that no backend ever stores
%% an object with multiple contents if allow_mult=false for that bucket
enforce_allow_mult(Obj, BProps) ->
case proplists:get_value(allow_mult, BProps) of
true -> Obj;
_ ->
case riak_object:get_contents(Obj) of
[_] -> Obj;
Mult ->
{MD, V} = select_newest_content(Mult),
riak_object:set_contents(Obj, [{MD, V}])
end
end.
%% @private
%% choose the latest content to store for the allow_mult=false case
select_newest_content(Mult) ->
hd(lists:sort(
fun({MD0, _}, {MD1, _}) ->
riak_core_util:compare_dates(
dict:fetch(<<"X-Riak-Last-Modified">>, MD0),
dict:fetch(<<"X-Riak-Last-Modified">>, MD1))
end,
Mult)).
%% @private
put_merge(false, true, _CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=true
{newobj, UpdObj};
put_merge(false, false, CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=false
ResObj = riak_object:syntactic_merge(CurObj, UpdObj),
case ResObj =:= CurObj of
true ->
{oldobj, CurObj};
false ->
{newobj, ResObj}
end;
put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=true, LWW=true
{newobj, riak_object:increment_vclock(UpdObj, VId, StartTime)};
put_merge(true, false, CurObj, UpdObj, VId, StartTime) ->
UpdObj1 = riak_object:increment_vclock(UpdObj, VId, StartTime),
UpdVC = riak_object:vclock(UpdObj1),
CurVC = riak_object:vclock(CurObj),
%% Check the coord put will replace the existing object
case vclock:get_counter(VId, UpdVC) > vclock:get_counter(VId, CurVC) andalso
vclock:descends(CurVC, UpdVC) == false andalso
vclock:descends(UpdVC, CurVC) == true of
true ->
{newobj, UpdObj1};
false ->
%% If not, make sure it does
{newobj, riak_object:increment_vclock(
riak_object:merge(CurObj, UpdObj1), VId, StartTime)}
end.
%% @private
do_get(_Sender, BKey, ReqID,
State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
StartTS = os:timestamp(),
Retval = do_get_term(BKey, Mod, ModState),
update_vnode_stats(vnode_get, Idx, StartTS),
{reply, {r, Retval, Idx, ReqID}, State}.
%% @private
do_get_term({Bucket, Key}, Mod, ModState) ->
case do_get_object(Bucket, Key, Mod, ModState) of
{ok, Obj, _UpdModState} ->
{ok, Obj};
%% @TODO Eventually it would be good to
%% make the use of not_found or notfound
%% consistent throughout the code.
{error, not_found, _UpdatedModstate} ->
{error, notfound};
{error, Reason, _UpdatedModstate} ->
{error, Reason};
Err ->
Err
end.
do_get_binary(Bucket, Key, Mod, ModState) ->
case uses_r_object(Mod, ModState, Bucket) of
true ->
Mod:get_object(Bucket, Key, true, ModState);
false ->
Mod:get(Bucket, Key, ModState)
end.
do_get_object(Bucket, Key, Mod, ModState) ->
case uses_r_object(Mod, ModState, Bucket) of
true ->
Mod:get_object(Bucket, Key, false, ModState);
false ->
case do_get_binary(Bucket, Key, Mod, ModState) of
{ok, ObjBin, _UpdModState} ->
try
case riak_object:from_binary(Bucket, Key, ObjBin) of
{error, Reason} ->
throw(Reason);
RObj ->
{ok, RObj, _UpdModState}
end
catch _:_ ->
lager:warning("Unreadable object ~p/~p discarded",
[Bucket,Key]),
{error, not_found, _UpdModState}
end;
Else ->
Else
end
end.
%% @private
%% @doc This is a generic function for operations that involve
%% listing things from the backend. Examples are listing buckets,
%% listing keys, or doing secondary index queries.
list(FoldFun, FinishFun, Mod, ModFun, ModState, Opts, Buffer) ->
case Mod:ModFun(FoldFun, Buffer, Opts, ModState) of
{ok, Acc} ->
FinishFun(Acc);
{async, AsyncWork} ->
{async, AsyncWork}
end.
%% @private
fold_fun(buckets, BufferMod, none) ->
fun(Bucket, Buffer) ->
BufferMod:add(Bucket, Buffer)
end;
fold_fun(buckets, BufferMod, Filter) ->
fun(Bucket, Buffer) ->
case Filter(Bucket) of
true ->
BufferMod:add(Bucket, Buffer);
false ->
Buffer
end
end;
fold_fun(keys, BufferMod, none) ->
fun(_, Key, Buffer) ->
BufferMod:add(Key, Buffer)
end;
fold_fun(keys, BufferMod, Filter) ->
fun(_, Key, Buffer) ->
case Filter(Key) of
true ->
BufferMod:add(Key, Buffer);
false ->
Buffer
end
end.
%% @private
result_fun(Sender) ->
fun(Items) ->
riak_core_vnode:reply(Sender, Items)
end.
%% @private
result_fun(Bucket, Sender) ->
fun(Items) ->
riak_core_vnode:reply(Sender, {Bucket, Items})
end.
%% wait for acknowledgement that results were received before
%% continuing, as a way of providing backpressure for processes that
%% can't handle results as fast as we can send them
result_fun_ack(Bucket, Sender) ->
fun(Items) ->
Monitor = riak_core_vnode:monitor(Sender),
riak_core_vnode:reply(Sender, {{self(), Monitor}, Bucket, Items}),
receive
{Monitor, ok} ->
erlang:demonitor(Monitor, [flush]);
{Monitor, stop_fold} ->
erlang:demonitor(Monitor, [flush]),
throw(stop_fold);
{'DOWN', Monitor, process, _Pid, _Reason} ->
throw(receiver_down)
end
end.
%% @doc If a listkeys request sends a result of `{From, Bucket,
%% Items}', that means it wants acknowledgement of those items before
%% it will send more. Call this function with that `From' to trigger
%% the next batch.
-spec ack_keys(From::{pid(), reference()}) -> term().
ack_keys({Pid, Ref}) ->
Pid ! {Ref, ok}.
stop_fold({Pid, Ref}) ->
Pid ! {Ref, stop_fold}.
%% @private
finish_fun(BufferMod, Sender) ->
fun(Buffer) ->
finish_fold(BufferMod, Buffer, Sender)
end.
%% @private
finish_fold(BufferMod, Buffer, Sender) ->
BufferMod:flush(Buffer),
riak_core_vnode:reply(Sender, done).
%% @private
do_delete(BKey, ReqId, State) ->
Mod = State#state.mod,
ModState = State#state.modstate,
Idx = State#state.idx,
DeleteMode = State#state.delete_mode,
%% Get the existing object.
case do_get_term(BKey, Mod, ModState) of
{ok, RObj} ->
%% Object exists, check if it should be deleted.
case riak_kv_util:obj_not_deleted(RObj) of
undefined ->
case DeleteMode of
keep ->
%% keep tombstones indefinitely
{reply, {fail, Idx, ReqId}, State};
immediate ->
UpdState = do_backend_delete(BKey, RObj, State),
{reply, {del, Idx, ReqId}, UpdState};
Delay when is_integer(Delay) ->
erlang:send_after(Delay, self(),
{final_delete, BKey,
delete_hash(RObj)}),
%% Nothing checks these messages - will just reply
%% del for now until we can refactor.
{reply, {del, Idx, ReqId}, State}
end;
_ ->
%% not a tombstone or not all siblings are tombstones
{reply, {fail, Idx, ReqId}, State}
end;
_ ->
%% does not exist in the backend
{reply, {fail, Idx, ReqId}, State}
end.
%% @private
do_fold(Fun, Acc0, Sender, State=#state{async_folding=AsyncFolding,
mod=Mod,
modstate=ModState}) ->
{ok, Capabilities} = Mod:capabilities(ModState),
AsyncBackend = lists:member(async_fold, Capabilities),
case AsyncFolding andalso AsyncBackend of
true ->
Opts = [async_fold];
false ->
Opts = []
end,
case Mod:fold_objects(Fun, Acc0, Opts, ModState) of
{ok, Acc} ->
{reply, Acc, State};
{async, Work} ->
FinishFun =
fun(Acc) ->
riak_core_vnode:reply(Sender, Acc)
end,
{async, {fold, Work, FinishFun}, Sender, State};
ER ->
{reply, ER, State}
end.
%% @private
do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) ->
[{BKey, do_get_vclock(BKey,Mod,ModState)} || BKey <- KeyList].
%% @private
do_get_vclock({Bucket, Key}, Mod, ModState) ->
case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} -> vclock:fresh();
{ok, Obj, _UpdModState} -> riak_object:vclock(Obj)
end.
%% @private
%% upon receipt of a handoff datum, there is no client FSM
do_diffobj_put({Bucket, Key}, DiffObj,
StateData=#state{mod=Mod,
modstate=ModState,
idx=Idx}) ->
StartTS = os:timestamp(),
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
IndexBackend = lists:member(indexes, Capabilities),
case do_get_object(Bucket, Key, Mod, ModState) of
{error, not_found, _UpdModState} ->
case IndexBackend of
true ->
IndexSpecs = riak_object:index_specs(DiffObj);
false ->
IndexSpecs = []
end,
case encode_and_put(DiffObj, Mod, Bucket, Key,
IndexSpecs, ModState) of
{{ok, _UpdModState} = InnerRes, EncodedVal} ->
update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
update_vnode_stats(vnode_put, Idx, StartTS),
%?INDEX(DiffObj, handoff, Idx),
InnerRes;
{InnerRes, _Val} ->
InnerRes
end;
{ok, OldObj, _UpdModState} ->
%% Merge handoff values with the current - possibly discarding
%% if out of date. Ok to set VId/Starttime undefined as
%% they are not used for non-coordinating puts.
case put_merge(false, false, OldObj, DiffObj, undefined, undefined) of
{oldobj, _} ->
{ok, ModState};
{newobj, NewObj} ->
AMObj = enforce_allow_mult(NewObj, riak_core_bucket:get_bucket(Bucket)),
case IndexBackend of
true ->
IndexSpecs = riak_object:diff_index_specs(AMObj, OldObj);
false ->
IndexSpecs = []
end,
case encode_and_put(AMObj, Mod, Bucket, Key,
IndexSpecs, ModState) of
{{ok, _UpdModState} = InnerRes, EncodedVal} ->
update_hashtree(Bucket, Key, EncodedVal, StateData),
update_index_write_stats(IndexBackend, IndexSpecs),
update_vnode_stats(vnode_put, Idx, StartTS),
%?INDEX(AMObj, handoff, Idx),
InnerRes;
{InnerRes, _EncodedVal} ->
InnerRes
end
end
end.
-spec update_hashtree(binary(), binary(), binary(), state()) -> ok.
update_hashtree(Bucket, Key, Val, #state{hashtrees=Trees}) ->
case get_hashtree_token() of
true ->
riak_kv_index_hashtree:async_insert_object({Bucket, Key}, Val, Trees),
ok;
false ->
riak_kv_index_hashtree:insert_object({Bucket, Key}, Val, Trees),
put(hashtree_tokens, max_hashtree_tokens()),
ok
end.
get_hashtree_token() ->
Tokens = get(hashtree_tokens),
case Tokens of
undefined ->
put(hashtree_tokens, max_hashtree_tokens() - 1),
true;
N when N > 0 ->
put(hashtree_tokens, Tokens - 1),
true;
_ ->
false
end.
-spec max_hashtree_tokens() -> pos_integer().
max_hashtree_tokens() ->
app_helper:get_env(riak_kv,
anti_entropy_max_async,
?DEFAULT_HASHTREE_TOKENS).
%% @private
%% Get the vnodeid, assigning and storing if necessary
get_vnodeid(Index) ->
F = fun(Status) ->
case proplists:get_value(vnodeid, Status, undefined) of
undefined ->
assign_vnodeid(os:timestamp(),
riak_core_nodeid:get(),
Status);
VnodeId ->
{VnodeId, Status}
end
end,
update_vnode_status(F, Index). % Returns {ok, VnodeId} | {error, Reason}
%% Assign a unique vnodeid, making sure the timestamp is unique by incrementing
%% into the future if necessary.
assign_vnodeid(Now, NodeId, Status) ->
{Mega, Sec, _Micro} = Now,
NowEpoch = 1000000*Mega + Sec,
LastVnodeEpoch = proplists:get_value(last_epoch, Status, 0),
VnodeEpoch = erlang:max(NowEpoch, LastVnodeEpoch+1),
VnodeId = <<NodeId/binary, VnodeEpoch:32/integer>>,
UpdStatus = [{vnodeid, VnodeId}, {last_epoch, VnodeEpoch} |
proplists:delete(vnodeid,
proplists:delete(last_epoch, Status))],
{VnodeId, UpdStatus}.
%% Clear the vnodeid - returns {ok, cleared}
clear_vnodeid(Index) ->
F = fun(Status) ->
{cleared, proplists:delete(vnodeid, Status)}
end,
update_vnode_status(F, Index). % Returns {ok, VnodeId} | {error, Reason}
update_vnode_status(F, Index) ->
VnodeFile = vnode_status_filename(Index),
ok = filelib:ensure_dir(VnodeFile),
case read_vnode_status(VnodeFile) of
{ok, Status} ->
update_vnode_status2(F, Status, VnodeFile);
{error, enoent} ->
update_vnode_status2(F, [], VnodeFile);
ER ->
ER
end.
update_vnode_status2(F, Status, VnodeFile) ->
case F(Status) of
{Ret, Status} -> % No change
{ok, Ret};
{Ret, UpdStatus} ->
case write_vnode_status(UpdStatus, VnodeFile) of
ok ->
{ok, Ret};
ER ->
ER
end
end.
vnode_status_filename(Index) ->
P_DataDir = app_helper:get_env(riak_core, platform_data_dir),
VnodeStatusDir = app_helper:get_env(riak_kv, vnode_status,
filename:join(P_DataDir, "kv_vnode")),
filename:join(VnodeStatusDir, integer_to_list(Index)).
read_vnode_status(File) ->
case file:consult(File) of
{ok, [Status]} when is_list(Status) ->
{ok, proplists:delete(version, Status)};
ER ->
ER
end.
write_vnode_status(Status, File) ->
VersionedStatus = [{version, 1} | proplists:delete(version, Status)],
TmpFile = File ++ "~",
case file:write_file(TmpFile, io_lib:format("~p.", [VersionedStatus])) of
ok ->
file:rename(TmpFile, File);
ER ->
ER
end.
%% @private
wait_for_vnode_status_results([], _ReqId, Acc) ->
Acc;
wait_for_vnode_status_results(PrefLists, ReqId, Acc) ->
receive
{ReqId, {vnode_status, Index, Status}} ->
UpdPrefLists = proplists:delete(Index, PrefLists),
wait_for_vnode_status_results(UpdPrefLists,
ReqId,
[{Index, Status} | Acc]);
_ ->
wait_for_vnode_status_results(PrefLists, ReqId, Acc)
end.
%% @private
-spec update_vnode_stats(vnode_get | vnode_put, partition(), erlang:timestamp()) ->
ok.
update_vnode_stats(Op, Idx, StartTS) ->
riak_kv_stat:update({Op, Idx, timer:now_diff( os:timestamp(), StartTS)}).
%% @private
update_index_write_stats(false, _IndexSpecs) ->
ok;
update_index_write_stats(true, IndexSpecs) ->
{Added, Removed} = count_index_specs(IndexSpecs),
riak_kv_stat:update({vnode_index_write, Added, Removed}).
%% @private
update_index_delete_stats(IndexSpecs) ->
{_Added, Removed} = count_index_specs(IndexSpecs),
riak_kv_stat:update({vnode_index_delete, Removed}).
%% @private
%% @doc Given a list of index specs, return the number to add and
%% remove.
count_index_specs(IndexSpecs) ->
%% Count index specs...
F = fun({add, _, _}, {AddAcc, RemoveAcc}) ->
{AddAcc + 1, RemoveAcc};
({remove, _, _}, {AddAcc, RemoveAcc}) ->
{AddAcc, RemoveAcc + 1}
end,
lists:foldl(F, {0, 0}, IndexSpecs).
nval_map(Ring) ->
riak_core_bucket:bucket_nval_map(Ring).
%% @private
object_info({Bucket, _Key}=BKey) ->
Hash = riak_core_util:chash_key(BKey),
{Bucket, Hash}.
%% @private
%% Encoding and decoding selection:
handoff_data_encoding_method() ->
riak_core_capability:get({riak_kv, handoff_data_encoding}, encode_zlib).
%% Decode a binary object. We first try to interpret the data as a "new format" object which indicates
%% its encoding method, but if that fails we use the legacy zlib and protocol buffer decoding:
decode_binary_object(BinaryObject) ->
try binary_to_term(BinaryObject) of
{ Method, BinObj } ->
case Method of
encode_raw -> {B, K, Val} = BinObj,
BKey = {B, K},
{BKey, Val};
_ -> lager:error("Invalid handoff encoding ~p", [Method]),
throw(invalid_handoff_encoding)
end;
_ -> lager:error("Request to decode invalid handoff object"),
throw(invalid_handoff_object)
%% An exception means we have a legacy handoff object:
catch
_:_ -> do_zlib_decode(BinaryObject)
end.
do_zlib_decode(BinaryObject) ->
DecodedObject = zlib:unzip(BinaryObject),
PBObj = riak_core_pb:decode_riakobject_pb(DecodedObject),
BKey = {PBObj#riakobject_pb.bucket,PBObj#riakobject_pb.key},
{BKey, PBObj#riakobject_pb.val}.
encode_binary_object(Bucket, Key, Value) ->
Method = handoff_data_encoding_method(),
case Method of
encode_raw -> EncodedObject = { Bucket, Key, iolist_to_binary(Value) },
return_encoded_binary_object(Method, EncodedObject);
%% zlib encoding is a special case, we return the legacy format:
encode_zlib -> PBEncodedObject = riak_core_pb:encode_riakobject_pb(#riakobject_pb{bucket=Bucket, key=Key, val=Value}),
zlib:zip(PBEncodedObject)
end.
%% Return objects in a consistent form:
return_encoded_binary_object(Method, EncodedObject) ->
term_to_binary({ Method, EncodedObject }).
-spec encode_and_put(
Obj::riak_object:riak_object(), Mod::term(), Bucket::riak_object:bucket(),
Key::riak_object:key(), IndexSpecs::list(), ModState::term()) ->
{{ok, UpdModState::term()}, EncodedObj::binary()} |
{{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}.
encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) ->
case uses_r_object(Mod, ModState, Bucket) of
true ->
Mod:put_object(Bucket, Key, IndexSpecs, Obj, ModState);
false ->
ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
EncodedVal = riak_object:to_binary(ObjFmt, Obj),
{Mod:put(Bucket, Key, IndexSpecs, EncodedVal, ModState), EncodedVal}
end.
uses_r_object(Mod, ModState, Bucket) ->
{ok, Capabilities} = Mod:capabilities(Bucket, ModState),
lists:member(uses_r_object, Capabilities).
sanitize_bkey({{<<"default">>, B}, K}) ->
{B, K};
sanitize_bkey(BKey) ->
BKey.
-ifdef(TEST).
%% Check assigning a vnodeid twice in the same second
assign_vnodeid_restart_same_ts_test() ->
Now1 = {1314,224520,343446}, %% TS=1314224520
Now2 = {1314,224520,345865}, %% as unsigned net-order int <<78,85,121,136>>
NodeId = <<1, 2, 3, 4>>,
{Vid1, Status1} = assign_vnodeid(Now1, NodeId, []),
?assertEqual(<<1, 2, 3, 4, 78, 85, 121, 136>>, Vid1),
%% Simulate clear
Status2 = proplists:delete(vnodeid, Status1),
%% Reassign
{Vid2, _Status3} = assign_vnodeid(Now2, NodeId, Status2),
?assertEqual(<<1, 2, 3, 4, 78, 85, 121, 137>>, Vid2).
%% Check assigning a vnodeid with a later date
assign_vnodeid_restart_later_ts_test() ->
Now1 = {1000,000000,0}, %% <<59,154,202,0>>
Now2 = {2000,000000,0}, %% <<119,53,148,0>>
NodeId = <<1, 2, 3, 4>>,
{Vid1, Status1} = assign_vnodeid(Now1, NodeId, []),
?assertEqual(<<1, 2, 3, 4, 59,154,202,0>>, Vid1),
%% Simulate clear
Status2 = proplists:delete(vnodeid, Status1),
%% Reassign
{Vid2, _Status3} = assign_vnodeid(Now2, NodeId, Status2),
?assertEqual(<<1, 2, 3, 4, 119,53,148,0>>, Vid2).
%% Check assigning a vnodeid with a later date - just in case of clock skew
assign_vnodeid_restart_earlier_ts_test() ->
Now1 = {2000,000000,0}, %% <<119,53,148,0>>
Now2 = {1000,000000,0}, %% <<59,154,202,0>>
NodeId = <<1, 2, 3, 4>>,
{Vid1, Status1} = assign_vnodeid(Now1, NodeId, []),
?assertEqual(<<1, 2, 3, 4, 119,53,148,0>>, Vid1),
%% Simulate clear
Status2 = proplists:delete(vnodeid, Status1),
%% Reassign
%% Should be greater than last offered - which is the 2mil timestamp
{Vid2, _Status3} = assign_vnodeid(Now2, NodeId, Status2),
?assertEqual(<<1, 2, 3, 4, 119,53,148,1>>, Vid2).
%% Test
vnode_status_test_() ->
{setup,
fun() ->
filelib:ensure_dir("kv_vnode_status_test/.test"),
?cmd("chmod u+rwx kv_vnode_status_test"),
?cmd("rm -rf kv_vnode_status_test"),
application:set_env(riak_kv, vnode_status, "kv_vnode_status_test"),
ok
end,
fun(_) ->
application:unset_env(riak_kv, vnode_status),
?cmd("chmod u+rwx kv_vnode_status_test"),
?cmd("rm -rf kv_vnode_status_test"),
ok
end,
[?_test(begin % initial create failure
?cmd("rm -rf kv_vnode_status_test || true"),
?cmd("mkdir kv_vnode_status_test"),
?cmd("chmod -w kv_vnode_status_test"),
F = fun([]) ->
{shouldfail, [badperm]}
end,
Index = 0,
?assertEqual({error, eacces}, update_vnode_status(F, Index))
end),
?_test(begin % create successfully
?cmd("chmod +w kv_vnode_status_test"),
F = fun([]) ->
{created, [created]}
end,
Index = 0,
?assertEqual({ok, created}, update_vnode_status(F, Index))
end),
?_test(begin % update successfully
F = fun([created]) ->
{updated, [updated]}
end,
Index = 0,
?assertEqual({ok, updated}, update_vnode_status(F, Index))
end),
?_test(begin % update failure
?cmd("chmod 000 kv_vnode_status_test/0"),
?cmd("chmod 500 kv_vnode_status_test"),
F = fun([updated]) ->
{shouldfail, [updatedagain]}
end,
Index = 0,
?assertEqual({error, eacces}, update_vnode_status(F, Index))
end)
]}.
dummy_backend(BackendMod) ->
Ring = riak_core_ring:fresh(16,node()),
riak_core_ring_manager:set_ring_global(Ring),
application:set_env(riak_kv, async_folds, false),
application:set_env(riak_kv, storage_backend, BackendMod),
application:set_env(riak_core, default_bucket_props, []),
application:set_env(bitcask, data_root, bitcask_test_dir()),
application:set_env(eleveldb, data_root, eleveldb_test_dir()),
application:set_env(riak_kv, multi_backend_default, multi_dummy_memory1),
application:set_env(riak_kv, multi_backend,
[{multi_dummy_memory1, riak_kv_memory_backend, []},
{multi_dummy_memory2, riak_kv_memory_backend, []}]).
bitcask_test_dir() ->
"./test.bitcask-temp-data".
eleveldb_test_dir() ->
"./test.eleveldb-temp-data".
clean_test_dirs() ->
?cmd("rm -rf " ++ bitcask_test_dir()),
?cmd("rm -rf " ++ eleveldb_test_dir()).
backend_with_known_key(BackendMod) ->
dummy_backend(BackendMod),
{ok, S1} = init([0]),
B = <<"f">>,
K = <<"b">>,
O = riak_object:new(B, K, <<"z">>),
{noreply, S2} = handle_command(?KV_PUT_REQ{bkey={B,K},
object=O,
req_id=123,
start_time=riak_core_util:moment(),
options=[]},
{raw, 456, self()},
S1),
{S2, B, K}.
list_buckets_test_() ->
{foreach,
fun() ->
riak_core_ring_manager:setup_ets(test),
clean_test_dirs(),
application:start(sasl),
Env = application:get_all_env(riak_kv),
application:start(folsom),
riak_core_stat_cache:start_link(),
riak_kv_stat:register_stats(),
Env
end,
fun(Env) ->
riak_core_ring_manager:cleanup_ets(test),
riak_core_stat_cache:stop(),
application:stop(folsom),
application:stop(sasl),
[application:unset_env(riak_kv, K) ||
{K, _V} <- application:get_all_env(riak_kv)],
[application:set_env(riak_kv, K, V) || {K, V} <- Env]
end,
[
fun(_) ->
{"bitcask list buckets",
fun() ->
list_buckets_test_i(riak_kv_bitcask_backend)
end
}
end,
fun(_) ->
{"eleveldb list buckets",
fun() ->
list_buckets_test_i(riak_kv_eleveldb_backend)
end
}
end,
fun(_) ->
{"memory list buckets",
fun() ->
list_buckets_test_i(riak_kv_memory_backend),
ok
end
}
end,
fun(_) ->
{"multi list buckets",
fun() ->
list_buckets_test_i(riak_kv_multi_backend),
ok
end
}
end
]
}.
list_buckets_test_i(BackendMod) ->
{S, B, _K} = backend_with_known_key(BackendMod),
Caller = new_result_listener(buckets),
handle_coverage(?KV_LISTBUCKETS_REQ{item_filter=none}, [],
{fsm, {456, {0, node()}}, Caller}, S),
?assertEqual({ok, [B]}, results_from_listener(Caller)),
flush_msgs().
filter_keys_test() ->
riak_core_ring_manager:setup_ets(test),
clean_test_dirs(),
{S, B, K} = backend_with_known_key(riak_kv_memory_backend),
Caller1 = new_result_listener(keys),
handle_coverage(?KV_LISTKEYS_REQ{bucket=B,
item_filter=fun(_) -> true end}, [],
{fsm, {124, {0, node()}}, Caller1}, S),
?assertEqual({ok, [K]}, results_from_listener(Caller1)),
Caller2 = new_result_listener(keys),
handle_coverage(?KV_LISTKEYS_REQ{bucket=B,
item_filter=fun(_) -> false end}, [],
{fsm, {125, {0, node()}}, Caller2}, S),
?assertEqual({ok, []}, results_from_listener(Caller2)),
Caller3 = new_result_listener(keys),
handle_coverage(?KV_LISTKEYS_REQ{bucket= <<"g">>,
item_filter=fun(_) -> true end}, [],
{fsm, {126, {0, node()}}, Caller3}, S),
?assertEqual({ok, []}, results_from_listener(Caller3)),
riak_core_ring_manager:cleanup_ets(test),
flush_msgs().
%% include bitcask.hrl for HEADER_SIZE macro
-include_lib("bitcask/include/bitcask.hrl").
%% Verify that a bad CRC on read will not crash the vnode, which when done in
%% preparation for a write prevents the write from going through.
bitcask_badcrc_test() ->
riak_core_ring_manager:setup_ets(test),
clean_test_dirs(),
{S, B, K} = backend_with_known_key(riak_kv_bitcask_backend),
DataDir = filename:join(bitcask_test_dir(), "0"),
[DataFile] = filelib:wildcard(DataDir ++ "/*.data"),
{ok, Fh} = file:open(DataFile, [read, write]),
ok = file:pwrite(Fh, ?HEADER_SIZE, <<0>>),
file:close(Fh),
O = riak_object:new(B, K, <<"y">>),
{noreply, _} = handle_command(?KV_PUT_REQ{bkey={B,K},
object=O,
req_id=123,
start_time=riak_core_util:moment(),
options=[]},
{raw, 456, self()},
S),
riak_core_ring_manager:cleanup_ets(test),
flush_msgs().
new_result_listener(Type) ->
case Type of
buckets ->
ResultFun = fun() -> result_listener_buckets([]) end;
keys ->
ResultFun = fun() -> result_listener_keys([]) end
end,
spawn(ResultFun).
result_listener_buckets(Acc) ->
receive
{'$gen_event', {_, done}} ->
result_listener_done(Acc);
{'$gen_event', {_, Results}} ->
result_listener_buckets(Results ++ Acc)
after 5000 ->
result_listener_done({timeout, Acc})
end.
result_listener_keys(Acc) ->
receive
{'$gen_event', {_, done}} ->
result_listener_done(Acc);
{'$gen_event', {_, {_Bucket, Results}}} ->
result_listener_keys(Results ++ Acc);
{'$gen_event', {_, {From, _Bucket, Results}}} ->
riak_kv_vnode:ack_keys(From),
result_listener_keys(Results ++ Acc)
after 5000 ->
result_listener_done({timeout, Acc})
end.
result_listener_done(Result) ->
receive
{get_results, Pid} ->
Pid ! {listener_results, Result}
end.
results_from_listener(Listener) ->
Listener ! {get_results, self()},
receive
{listener_results, Result} ->
{ok, Result}
after 5000 ->
{error, listener_timeout}
end.
flush_msgs() ->
receive
_Msg ->
flush_msgs()
after
0 ->
ok
end.
-endif.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment