Skip to content

Instantly share code, notes, and snippets.

@samredai
Created September 7, 2022 23:40
Show Gist options
  • Save samredai/53ee2250f91d55c9ac44067513937b4d to your computer and use it in GitHub Desktop.
Save samredai/53ee2250f91d55c9ac44067513937b4d to your computer and use it in GitHub Desktop.
Iceberg: Write records to an Iceberg table
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.UUID;
// Load a catalog
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());
properties.put(CatalogProperties.URI, "jdbc:postgresql://postgres:5432/demo_catalog");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", "admin");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/home/iceberg/warehouse");
properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());
JdbcCatalog catalog = new JdbcCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);
// Create a table with a schema
Namespace nyc = Namespace.of("nyc");
TableIdentifier name = TableIdentifier.of(nyc, "test3");
Schema schema = new Schema(
Types.NestedField.required(1, "level", Types.StringType.get()));
catalog.createTable(name, schema);
// Create a record
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("level", "debug")));
builder.add(record.copy(ImmutableMap.of("level", "info")));
builder.add(record.copy(ImmutableMap.of("level", "error")));
builder.add(record.copy(ImmutableMap.of("level", "fatal")));
// Write record and then convert the data writer to a data file
File testFile = new File("/home/iceberg/warehouse/" + UUID.randomUUID().toString());
OutputFile file = Files.localOutput(testFile);
DataWriter<Record> dataWriter =
Parquet.writeData(file)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try {
for (Record record : builder.build()) {
dataWriter.write(record);
}
} finally {
dataWriter.close();
}
DataFile dataFile = dataWriter.toDataFile();
// Load the table, append the data file, and commit it
Table tbl = catalog.loadTable(name);
tbl.newAppend().appendFile(dataFile).commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment