Created
June 19, 2019 20:50
-
-
Save slomo/413a771d341d2e69f2ff434061674fe3 to your computer and use it in GitHub Desktop.
SeekableFileChannel from Azure Blob Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package default; | |
import com.microsoft.azure.storage.blob.BlobRange; | |
import com.microsoft.azure.storage.blob.BlockBlobURL; | |
import com.microsoft.azure.storage.blob.DownloadResponse; | |
import com.microsoft.azure.storage.blob.ReliableDownloadOptions; | |
import java.io.IOException; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SeekableByteChannel; | |
public class SeekableBlobByteChannel implements SeekableByteChannel { | |
private final BlockBlobURL blockBlobURL; | |
private final byte[] buffer; | |
private final int bufferSize; | |
private long size = -1; | |
private long position = -1; | |
private long bufferIndex = -1; | |
private boolean open = true; | |
public SeekableBlobByteChannel(BlockBlobURL blockBlobURL, int bufferSize) { | |
this.blockBlobURL = blockBlobURL; | |
this.bufferSize = bufferSize; | |
this.buffer = new byte[bufferSize]; | |
} | |
public int read(ByteBuffer byteBuffer) throws IOException { | |
long startBufferIndex = position / bufferSize * bufferSize; | |
if (position >= size) { | |
return -1; | |
} | |
if (startBufferIndex != bufferIndex) { | |
DownloadResponse downloadResponse = blockBlobURL | |
.download(new BlobRange().withOffset(startBufferIndex).withCount((long) bufferSize), null, false, null) | |
.blockingGet(); | |
Iterable<ByteBuffer> blobDataSnippets = downloadResponse.body( | |
new ReliableDownloadOptions().withMaxRetryRequests(0) | |
).blockingIterable(); | |
int bufferOffset = 0; | |
for (ByteBuffer blobData : blobDataSnippets) { | |
int length = blobData.remaining(); | |
blobData.get(buffer, bufferOffset, length); | |
bufferOffset += length; | |
} | |
bufferIndex = startBufferIndex; | |
} | |
int bufferOffset = (int)(position - bufferIndex); | |
int dataLength = Math.min(byteBuffer.remaining(), bufferSize - bufferOffset); | |
byteBuffer.put(buffer, bufferOffset, dataLength); | |
position += dataLength; | |
return dataLength; | |
} | |
public int write(ByteBuffer byteBuffer) throws IOException { | |
throw new IOException(this + " is read only"); | |
} | |
public long position() throws IOException { | |
return position; | |
} | |
public SeekableByteChannel position(long newPosition) throws IOException { | |
if (position >= size()) { | |
throw new IOException("Cannot read after end"); | |
} | |
position = newPosition; | |
return this; | |
} | |
public long size() throws IOException { | |
if (size < 0) { | |
size = blockBlobURL.getProperties().blockingGet().headers().contentLength(); | |
} | |
return size; | |
} | |
public SeekableByteChannel truncate(long l) throws IOException { | |
throw new IOException(this + " is read only"); | |
} | |
public boolean isOpen() { | |
return open; | |
} | |
public void close() throws IOException { | |
this.open = false; | |
this.position = -1; | |
} | |
@Override | |
public String toString() { | |
return "SeekableBlobByteChannel{" + | |
"blockBlobURL=" + blockBlobURL + | |
", bufferSize=" + bufferSize + | |
", size=" + size + | |
", position=" + position + | |
", open=" + true + | |
'}'; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment