Skip to content

Instantly share code, notes, and snippets.

@dkincaid
Last active December 10, 2015 17:48
Show Gist options
  • Save dkincaid/4470361 to your computer and use it in GitHub Desktop.
Save dkincaid/4470361 to your computer and use it in GitHub Desktop.
Example of an issue with trying to use two PailTap's reading from the same Pail in a query.
/* If I execute only the clientQuery or only the emailQuery by themselves everything works right.
I set breakpoints inside the ExtractClientEdgeFields() and ExtractClientId() functions and they
are called with only the Data objects with the correct property types.
However, if I execute this query as it is shown here then only one of the two functions is called
with all of the Data objects from both taps. */
public static Subquery clientEmail(String pailPath) {
PailTap clientEdgeTap = clientEdgeTap(pailPath);
PailTap clientTap = petOwnerTap(pailPath);
Subquery clientQuery = new Subquery("?sapid", "?clientid")
.predicate(clientEdgeTap, "_", "?client-edge-data")
.predicate(new ExtractClientEdgeFields(), "?client-edge-data").out("?sapid", "?clientid");
Subquery emailQuery = new Subquery("?clientid", "?email")
.predicate(clientTap, "_", "?pet-owner-data")
.predicate(new ExtractClientId(), "?pet-owner-data").out("?clientid", "?email");
Subquery fullQuery = new Subquery("?sapid", "?clientid", "?email")
.predicate(clientQuery, "?sapid", "?clientid")
.predicate(emailQuery, "?clientid", "?email");
return fullQuery;
}
public static void main(String[] args) throws IOException {
Tap stdout = new StdoutTap();
Subquery query = clientEmail(PAIL_PATH);
Api.execute(stdout, query);
}
public static class ExtractClientEdgeFields extends CascalogFunction {
@Override
public void operate(FlowProcess process, FunctionCall call) {
Data data = (Data) call.getArguments().getObject(0);
// TODO Why do I need this if statement? We should only be getting ClientEdge Data in here
if (data.getDataunit().getSetField() == DataUnit._Fields.CLIENT) {
ClientEdge clientEdge = data.getDataunit().getClient();
PracticeId practiceId = clientEdge.getPractice();
ClientId clientId = clientEdge.getClient();
call.getOutputCollector().add(new Tuple(practiceId.getSap_id(), clientId.getPims_client_id().getId()));
}
}
}
public static class ExtractClientId extends CascalogFunction {
@Override
public void operate(FlowProcess flow_process, FunctionCall fn_call) {
Data data = (Data) fn_call.getArguments().getObject(0);
// TODO Why do I need this?
if (data.getDataunit().getSetField() == DataUnit._Fields.CLIENT_PROPERTY) {
ClientProperty clientProperty = data.getDataunit().getClient_property();
if (clientProperty.getProperty().getSetField() == ClientPropertyValue._Fields.EMAIL) {
fn_call.getOutputCollector().add(new Tuple(clientProperty.getId().getPims_client_id().getId(), clientProperty.getProperty().getEmail()));
}
}
}
}
public static PailTap attributeTap(String path, final TFieldIdEnum... fields) {
List<String>[] attrs = new List[] {
new ArrayList<String>() {{
for (TFieldIdEnum field : fields) {
add("" + field.getThriftFieldId());
}
}}
};
return splitDataTap(path, attrs);
}
public static PailTap splitDataTap(String path, List<String>[] attrs) {
PailTap.PailTapOptions pailTapOptions = new PailTap.PailTapOptions();
pailTapOptions.spec = PailTap.makeSpec(null, new SplitDataPailStructure());
pailTapOptions.attrs = attrs;
return new PailTap(path, pailTapOptions);
}
public static PailTap clientTap(String pailPath) {
return attributeTap(pailPath, DataUnit._Fields.CLIENT_PROPERTY);
}
public static PailTap clientEdgeTap(String pailPath) {
return attributeTap(pailPath, DataUnit._Fields.CLIENT);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment