Skip to content

Instantly share code, notes, and snippets.

@egalpin
Last active February 23, 2024 00:51
Show Gist options
  • Save egalpin/1831808e5676c0b278c9732e42fab4e5 to your computer and use it in GitHub Desktop.
Save egalpin/1831808e5676c0b278c9732e42fab4e5 to your computer and use it in GitHub Desktop.
JsonMutationFn using Beam and Jackson Databind
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
/**
* This DoFn is mostly used to rename json fields, but is flexible enough, thanks to json pointers,
* to support renaming an entity from a json source document that might be arbitrarily nested or
* buried and rename it to a top-level field.
*/
public class JsonMutationFn extends DoFn<String, String> {
private static final JsonPointer TERMINAL_NODE = JsonPointer.compile("");
private final List<NodeMutationFn> _mutations;
private static final JsonMapper MAPPER = new JsonMapper();
private JsonMutationFn(List<NodeMutationFn> mutations) {
_mutations = mutations;
}
private static JsonNode safeReadTree(JsonMapper mapper, String input) {
try {
return mapper.readTree(input);
} catch (IOException ex) {
return mapper.createObjectNode();
}
}
private static JsonNode safeReadTree(String input) {
return safeReadTree(MAPPER, input);
}
private static ObjectNode safeReadAsObject(String input) {
return (ObjectNode) safeReadTree(input);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private final List<NodeMutationFn> _mutations;
private Builder() {
_mutations = new ArrayList<>();
}
private static void validateIsRootLevelPtr(JsonPointer ptr) {
if (!ptr.equals(TERMINAL_NODE) && !ptr.tail().equals(TERMINAL_NODE)) {
throw new IllegalArgumentException(
String.format(
"Target fields must be at the root level. Offending JsonPointer: %s", ptr));
}
}
public Builder move(String source, String target) {
JsonPointer targetPtr = JsonPointer.compile(target);
// validateIsRootLevelPtr(targetPtr);
_mutations.add(ObjectNodeMutation.move(JsonPointer.compile(source), targetPtr));
return this;
}
public Builder copy(String source, String target) {
JsonPointer targetPtr = JsonPointer.compile(target);
// validateIsRootLevelPtr(targetPtr);
_mutations.add(ObjectNodeMutation.copy(JsonPointer.compile(source), targetPtr));
return this;
}
public Builder remove(String source) {
JsonPointer sourcePtr = JsonPointer.compile(source);
_mutations.add(ObjectNodeMutation.remove(sourcePtr));
return this;
}
public Builder add(String target, String value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.add(targetPtr, value));
return this;
}
public Builder add(String target, long value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.add(targetPtr, value));
return this;
}
public Builder add(String target, int value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.add(targetPtr, value));
return this;
}
public Builder add(String target, double value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.add(targetPtr, value));
return this;
}
public Builder add(String target, boolean value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.add(targetPtr, value));
return this;
}
public Builder addToArray(String target, String value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value));
return this;
}
public Builder addToArray(String target, long value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value));
return this;
}
public Builder addToArray(String target, int value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value));
return this;
}
public Builder addToArray(String target, double value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value));
return this;
}
public Builder addToArray(String target, boolean value) {
JsonPointer targetPtr = JsonPointer.compile(target);
_mutations.add(ObjectNodeMutation.addToArray(targetPtr, value));
return this;
}
public JsonMutationFn build() {
return new JsonMutationFn(_mutations);
}
}
@VisibleForTesting
public ObjectNode applyMutations(ObjectNode inputNode) {
for (NodeMutationFn fn : _mutations) {
fn.apply(inputNode);
}
return inputNode;
}
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
ObjectNode inputNode = safeReadAsObject(c.element());
c.output(applyMutations(inputNode).toString());
}
}
import static net.javacrumbs.jsonunit.JsonMatchers.jsonEquals;
import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.Test;
public class JsonMutationFnTest {
public static String getJsonResourceAsString(String resourceName)
throws IOException, URISyntaxException {
URI inputJsonPath = this.getClass().getClassLoader().getResource(resourceName).toURI();
return new String(Files.readAllBytes(Paths.get(inputJsonPath)), StandardCharsets.UTF_8);
}
@Test
public void testMoveJsonNodes() throws Exception {
String input = getJsonResourceAsString("mutate_json_nodes_input.json");
String output =
Utils.safeReadTree(getJsonResourceAsString("mutate_json_nodes_output.json"))
.toString();
JsonMutationFn jsonMutations =
JsonMutationFn.builder()
.copy("/foo", "/foo_copy")
.move("/i_am_a_missing_field", "/missing")
.move("/i_am/a_missing/nested_field", "/missing_nested")
.move("/foo", "/foo_nest/test")
.move("/baz/jazz/aardvark", "/preserved_array")
.move("/preserved_array/0", "/a_from_aardvark")
.addToArray("/preserved_array", 0.159)
.add("/added_long", 0L)
.add("/added_string", "hi")
.add("/added_double", 1.0)
.remove("/baz/jazz")
.build();
assertThat(jsonMutations.applyMutations(Utils.safeReadAsObject(input)), jsonEquals(output));
}
}
{
"foo": "bar",
"baz": {
"jazz": {
"aardvark": ["a", "b", "c", 4]
}
}
}
{
"a_from_aardvark": "a",
"added_double": 1.0,
"added_long": 0,
"added_string": "hi",
"baz": {},
"foo_copy": "bar",
"foo_nest": {
"test": "bar"
},
"preserved_array": ["b", "c", 4, 0.159]
}
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.beam.sdk.transforms.SerializableFunction;
abstract class NodeMutationFn implements SerializableFunction<ObjectNode, ObjectNode> {
public abstract ObjectNode apply(ObjectNode node);
}
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
public class ObjectNodeMutation {
private static final JsonMapper MAPPER = new JsonMapper();
static JsonNode getParentNode(ObjectNode inputNode, JsonPointer ptr) {
return inputNode.at(ptr.head());
}
private static String getFieldName(JsonPointer ptr) {
return ptr.last().getMatchingProperty();
}
private static JsonNode getExistingNode(
ObjectNode inputNode, JsonPointer ptr, boolean shouldRemoveNode) {
JsonNode parentNode = getParentNode(inputNode, ptr);
JsonNode existingValue;
if (shouldRemoveNode) {
if (parentNode.isArray()) {
existingValue = ((ArrayNode) parentNode).remove(ptr.last().getMatchingIndex());
} else {
existingValue = ((ObjectNode) parentNode).remove(ptr.last().getMatchingProperty());
}
} else {
if (parentNode.isArray()) {
existingValue = parentNode.get(ptr.last().getMatchingIndex());
} else {
existingValue = parentNode.get(ptr.last().getMatchingProperty());
}
}
return existingValue;
}
// Simple conditional wrapper for mutations
public static NodeMutationFn conditionally(
@Nullable SerializableObjectNodePredicate condition, NodeMutationFn fn) {
return inputNode ->
condition == null || condition.test(inputNode) ? fn.apply(inputNode) : inputNode;
}
private static NodeMutationFn moveOrCopy(
JsonPointer fromField, JsonPointer toField, boolean shouldRemoveNode) {
return inputNode -> {
if (inputNode.at(fromField).isMissingNode()) {
// The field name to move from is not found in input at all. Note that this is distinct
// from the case where a NullNode is returned; in such a case, we still set the NullNode
// to the translated field. Simply return the input verbatim.
return inputNode;
}
JsonNode target = getParentNode(inputNode, toField);
if (target.isMissingNode()) {
target = inputNode.withObject(toField.head());
}
String fieldName = getFieldName(toField);
((ObjectNode) target).set(fieldName, getExistingNode(inputNode, fromField, shouldRemoveNode));
return inputNode;
};
}
public static NodeMutationFn move(JsonPointer fromField, JsonPointer toField) {
return inputNode -> moveOrCopy(fromField, toField, true).apply(inputNode);
}
public static NodeMutationFn copy(JsonPointer fromField, JsonPointer toField) {
return inputNode -> moveOrCopy(fromField, toField, false).apply(inputNode);
}
public static NodeMutationFn remove(JsonPointer fromField) {
return inputNode -> {
if (inputNode.at(fromField).isMissingNode()) {
return inputNode;
}
getExistingNode(inputNode, fromField, true);
return inputNode;
};
}
private static NodeMutationFn addToArray(JsonPointer targetField, JsonNode value) {
return inputNode -> {
JsonNode parent = getExistingNode(inputNode, targetField, false);
((ArrayNode) parent).add(value);
return inputNode;
};
}
public static NodeMutationFn addToArray(JsonPointer targetField, String value) {
return inputNode -> addToArray(targetField, TextNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn addToArray(JsonPointer targetField, long value) {
return inputNode -> addToArray(targetField, LongNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn addToArray(JsonPointer targetField, int value) {
return inputNode -> addToArray(targetField, IntNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn addToArray(JsonPointer targetField, double value) {
return inputNode -> addToArray(targetField, DoubleNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn addToArray(JsonPointer targetField, boolean value) {
return inputNode -> addToArray(targetField, BooleanNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn setNull(JsonPointer targetField) {
return inputNode -> {
JsonNode parent = getParentNode(inputNode, targetField);
((ObjectNode) parent).putNull(targetField.last().getMatchingProperty());
return inputNode;
};
}
static NodeMutationFn set(JsonPointer targetField, JsonNode value) {
return inputNode -> {
JsonNode parent = getParentNode(inputNode, targetField);
if (parent.isMissingNode()) {
set(targetField.head(), MAPPER.createObjectNode()).apply(inputNode);
parent = getParentNode(inputNode, targetField);
}
((ObjectNode) parent).set(targetField.last().getMatchingProperty(), value);
return inputNode;
};
}
public static NodeMutationFn set(JsonPointer targetField, String value) {
return inputNode -> set(targetField, TextNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn set(JsonPointer targetField, long value) {
return inputNode -> set(targetField, LongNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn set(JsonPointer targetField, int value) {
return inputNode -> set(targetField, IntNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn set(JsonPointer targetField, double value) {
return inputNode -> set(targetField, DoubleNode.valueOf(value)).apply(inputNode);
}
public static NodeMutationFn set(JsonPointer targetField, boolean value) {
return inputNode -> set(targetField, BooleanNode.valueOf(value)).apply(inputNode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment