Created
April 27, 2016 16:22
-
-
Save IgorBerman/e84f2bf93856274f6043b9e66948148d to your computer and use it in GitHub Desktop.
Avro writer for flink rolling sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.flink.streaming.connectors.fs.avro; | |
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.util.Arrays; | |
import java.util.HashMap; | |
import java.util.Map; | |
import org.apache.avro.Schema; | |
import org.apache.avro.Schema.Type; | |
import org.apache.avro.file.CodecFactory; | |
import org.apache.avro.file.DataFileConstants; | |
import org.apache.avro.file.DataFileWriter; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.io.DatumWriter; | |
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.api.java.typeutils.InputTypeConfigurable; | |
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; | |
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer; | |
import org.apache.flink.streaming.connectors.fs.RollingSink; | |
import org.apache.flink.streaming.connectors.fs.Writer; | |
import org.apache.hadoop.fs.FSDataOutputStream; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
/** | |
* Implementation of AvroKeyValue writer that can be used in Sink. | |
<pre> | |
Usage: | |
{@code | |
RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path"); | |
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/")); | |
sink.setPendingSuffix(".avro"); | |
Map<String,String> properties = new HashMap<>(); | |
Schema longSchema = Schema.create(Type.LONG); | |
String keySchema = longSchema.toString(); | |
String valueSchema = longSchema.toString(); | |
properties.put(AvroSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema); | |
properties.put(AvroSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema); | |
properties.put(AvroSinkWriter.CONF_COMPRESS, Boolean.toString(true)); | |
properties.put(AvroSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); | |
sink.setWriter(new AvroSinkWriter<Long , Long>(properties)); | |
sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB, | |
} | |
</pre> | |
to test with s3: | |
<pre> | |
{@code | |
create core-site.xml(I haven't other way to test locally) | |
<configuration> | |
<property> | |
<name>fs.s3.impl</name> | |
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> | |
</property> | |
<property> | |
<name>fs.s3a.access.key</name> | |
<value>xxx</value> | |
</property> | |
<property> | |
<name>fs.s3a.secret.key</name> | |
<value>yyy</value> | |
</property> | |
<property> | |
<!-- probably with hdfs installation it won't be needed --> | |
<name>fs.s3a.buffer.dir</name> | |
<value>/tmp</value> | |
</property> | |
</configuration> | |
and add following dependencies(not sure what is best option here): | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-aws</artifactId> | |
<version>2.7.0</version> | |
<scope>provided</scope> | |
<exclusions> | |
<exclusion> | |
<artifactId>guava</artifactId> | |
<groupId>com.google.guava</groupId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
} | |
</pre> | |
*/ | |
public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable { | |
private static final long serialVersionUID = 1L; | |
public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; | |
public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; | |
public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS; | |
public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC; | |
public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level"; | |
public static final String CONF_XZ_LEVEL = "avro.xz.level"; | |
private transient FSDataOutputStream outputStream; | |
private transient AvroKeyValueWriter<K, V> writer; | |
private Class<K> keyClass; | |
private Class<V> valueClass; | |
private final Map<String, String> properties; | |
/** | |
* C'tor for the writer | |
* <p> | |
* You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) | |
* @param properties | |
*/ | |
public AvroSinkWriter(Map<String, String> properties) { | |
this.properties = properties; | |
} | |
private AvroSinkWriter(Class<K> keyClass, Class<V> valueClass, Map<String, String> properties) { | |
this.properties = properties; | |
this.keyClass = keyClass; | |
this.valueClass = valueClass; | |
} | |
private boolean getBoolean(Map<String,String> conf, String key, boolean def) { | |
String value = conf.get(key); | |
if (value == null) { | |
return def; | |
} | |
return Boolean.parseBoolean(value); | |
} | |
private int getInt(Map<String,String> conf, String key, int def) { | |
String value = conf.get(key); | |
if (value == null) { | |
return def; | |
} | |
return Integer.parseInt(value); | |
} | |
//this derived from AvroOutputFormatBase.getCompressionCodec(..) | |
private CodecFactory getCompressionCodec(Map<String,String> conf) { | |
if (getBoolean(conf, CONF_COMPRESS, false)) { | |
int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL); | |
int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL); | |
String outputCodec = conf.get(CONF_COMPRESS_CODEC); | |
if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { | |
return CodecFactory.deflateCodec(deflateLevel); | |
} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { | |
return CodecFactory.xzCodec(xzLevel); | |
} else { | |
return CodecFactory.fromString(outputCodec); | |
} | |
} | |
return CodecFactory.nullCodec(); | |
} | |
@Override | |
public void open(FSDataOutputStream outStream) throws IOException { | |
if (outputStream != null) { | |
throw new IllegalStateException("AvroSinkWriter has already been opened."); | |
} | |
if (keyClass == null) { | |
throw new IllegalStateException("Key Class has not been initialized."); | |
} | |
if (valueClass == null) { | |
throw new IllegalStateException("Value Class has not been initialized."); | |
} | |
this.outputStream = outStream; | |
CodecFactory compressionCodec = getCompressionCodec(properties); | |
String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); | |
if (keySchemaString == null) { | |
throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); | |
} | |
@SuppressWarnings("deprecation") | |
Schema keySchema = Schema.parse(keySchemaString); | |
String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA); | |
if (valueSchemaString == null) { | |
throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property"); | |
} | |
@SuppressWarnings("deprecation") | |
Schema valueSchema = Schema.parse(valueSchemaString); | |
writer = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, outputStream); | |
} | |
@Override | |
public void flush() throws IOException { | |
if (writer != null) { | |
writer.sync(); | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
if (writer != null) { | |
writer.close(); | |
} | |
writer = null; | |
outputStream = null; | |
} | |
@Override | |
public void write(Tuple2<K, V> element) throws IOException { | |
if (outputStream == null) { | |
throw new IllegalStateException("SequenceFileWriter has not been opened."); | |
} | |
writer.write(element.f0, element.f1); | |
} | |
@Override | |
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { | |
if (!type.isTupleType()) { | |
throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); | |
} | |
TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; | |
if (tupleType.getArity() != 2) { | |
throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); | |
} | |
TypeInformation<K> keyType = tupleType.getTypeAt(0); | |
TypeInformation<V> valueType = tupleType.getTypeAt(1); | |
this.keyClass = keyType.getTypeClass(); | |
this.valueClass = valueType.getTypeClass(); | |
} | |
@Override | |
public Writer<Tuple2<K, V>> duplicate() { | |
return new AvroSinkWriter<K, V>(keyClass, valueClass, properties); | |
} | |
private static final class AvroKeyValueWriter<K,V> { | |
/** A writer for the Avro container file. */ | |
private final DataFileWriter<GenericRecord> mAvroFileWriter; | |
/** The writer schema for the generic record entries of the Avro container file. */ | |
private final Schema mKeyValuePairSchema; | |
/** A reusable Avro generic record for writing key/value pairs to the file. */ | |
private final AvroKeyValue<Object, Object> mOutputRecord; | |
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException { | |
// Create the generic record schema for the key/value pair. | |
mKeyValuePairSchema = AvroKeyValue.getSchema(keySchema, valueSchema); | |
// Create an Avro container file and a writer to it. | |
DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema); | |
mAvroFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); | |
mAvroFileWriter.setCodec(compressionCodec); | |
mAvroFileWriter.setSyncInterval(syncInterval); | |
mAvroFileWriter.create(mKeyValuePairSchema, outputStream); | |
// Create a reusable output record. | |
mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema)); | |
} | |
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream) throws IOException { | |
this(keySchema, valueSchema, compressionCodec, outputStream, DataFileConstants.DEFAULT_SYNC_INTERVAL); | |
} | |
void write(K key, V value) throws IOException { | |
mOutputRecord.setKey(key); | |
mOutputRecord.setValue(value); | |
mAvroFileWriter.append(mOutputRecord.get()); | |
} | |
void close() throws IOException { | |
mAvroFileWriter.close(); | |
} | |
long sync() throws IOException { | |
return mAvroFileWriter.sync(); | |
} | |
} | |
//taken from AvroKeyValue avro-mapr lib | |
private static class AvroKeyValue<K, V> { | |
/** The name of the key value pair generic record. */ | |
static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair"; | |
/** The namespace of the key value pair generic record. */ | |
static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce"; | |
/** The name of the generic record field containing the key. */ | |
static final String KEY_FIELD = "key"; | |
/** The name of the generic record field containing the value. */ | |
static final String VALUE_FIELD = "value"; | |
/** The key/value generic record wrapped by this class. */ | |
final GenericRecord mKeyValueRecord; | |
/** | |
* Wraps a GenericRecord that is a key value pair. | |
*/ | |
AvroKeyValue(GenericRecord keyValueRecord) { | |
mKeyValueRecord = keyValueRecord; | |
} | |
GenericRecord get() { | |
return mKeyValueRecord; | |
} | |
void setKey(K key) { | |
mKeyValueRecord.put(KEY_FIELD, key); | |
} | |
void setValue(V value) { | |
mKeyValueRecord.put(VALUE_FIELD, value); | |
} | |
/** | |
* Creates a KeyValuePair generic record schema. | |
* | |
* @return A schema for a generic record with two fields: 'key' and 'value'. | |
*/ | |
static Schema getSchema(Schema keySchema, Schema valueSchema) { | |
Schema schema = Schema.createRecord( | |
KEY_VALUE_PAIR_RECORD_NAME, "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false); | |
schema.setFields(Arrays.asList( | |
new Schema.Field(KEY_FIELD, keySchema, "The key", null), | |
new Schema.Field(VALUE_FIELD, valueSchema, "The value", null))); | |
return schema; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment