Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Last active August 23, 2024 15:49
Show Gist options
  • Save ianmcook/f7b76163c69eee151715cb8e0c44d795 to your computer and use it in GitHub Desktop.
Save ianmcook/f7b76163c69eee151715cb8e0c44d795 to your computer and use it in GitHub Desktop.
Java example to receive Arrow record batches over HTTP and write to file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.ArrayList;
/******** BEGIN IMPORTS ADDED FOR FILE WRITING ********/
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import java.io.File;
import java.io.FileOutputStream;
/******** END IMPORTS ADDED FOR FILE WRITING **********/
public class ArrowHttpClient {
public static void main(String[] args) {
String serverUrl = "http://localhost:8008";
try {
long startTime = System.currentTimeMillis();
URL url = new URL(serverUrl);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
InputStream inputStream = connection.getInputStream();
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
VectorUnloader unloader = new VectorUnloader(root);
ArrowRecordBatch batch;
Schema schema = root.getSchema();
List<ArrowRecordBatch> batches = new ArrayList<>();
int numRows = 0;
while (reader.loadNextBatch()) {
numRows += root.getRowCount();
batch = unloader.getRecordBatch();
batches.add(batch);
}
long endTime = System.currentTimeMillis();
float execTime = (endTime - startTime) / 1000F;
System.out.println(reader.bytesRead() + " bytes received");
System.out.println(numRows + " records received");
System.out.println(batches.size() + " record batches received");
System.out.printf("%.2f seconds elapsed\n", execTime);
reader.close();
/******** BEGIN CODE ADDED FOR FILE WRITING ********/
File file = new File("output.arrows");
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
VectorSchemaRoot root2 = VectorSchemaRoot.create(schema, allocator);
ArrowStreamWriter writer = new ArrowStreamWriter(root2, /*provider*/null, fileOutputStream.getChannel());
) {
VectorLoader loader = new VectorLoader(root2);
writer.start();
for (ArrowRecordBatch bat : batches) {
loader.load(bat);
writer.writeBatch();
}
writer.end();
}
/******** END CODE ADDED FOR FILE WRITING **********/
} else {
System.err.println("Failed with response code: " + connection.getResponseCode());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment