Skip to content

Instantly share code, notes, and snippets.

@HungUnicorn
Last active December 8, 2017 10:18
Show Gist options
  • Save HungUnicorn/8a5c40fcf1e25c51cf77dc24a227d6d4 to your computer and use it in GitHub Desktop.
Save HungUnicorn/8a5c40fcf1e25c51cf77dc24a227d6d4 to your computer and use it in GitHub Desktop.
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
public class JsonIntoRow implements FlatMapFunction<JsonNode, Row> {
private TypeInformation<Row> typeInfo;
private static JsonRowDeserializationSchema deserializationSchema;
public JsonIntoRow(TypeInformation<Row> typeInfo){
this.typeInfo = typeInfo;
}
@Override
public void flatMap(JsonNode value, Collector<Row> out) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
byte[] serializedJson = objectMapper.writeValueAsBytes(value);
deserializationSchema = new JsonRowDeserializationSchema(typeInfo);
deserializationSchema.setFailOnMissingField(false);
Row deserializedJson = deserializationSchema.deserialize(serializedJson);
out.collect(deserializedJson);
}
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final TypeInformation<Row> typeInformation = Types.ROW(
new String[] {"orderNumber", "sales", "country"},
new TypeInformation<?>[] { Types.STRING(), Types.DOUBLE(), Types.INT()}
);
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
final DataStream<Row> stream = FlinkUtil
.readCleanStream(env, URL,
eventTypes, "StreamSQL")
.flatMap(new JsonIntoRow(typeInformation));
tableEnv.registerDataStreamInternal("sampleTable", stream); // Exception this line
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment