Skip to content

Commit 1bf9d10

Browse files
committed
Add readArrowReader method to allow loading a dataframe from an ArrowReader #528
1 parent 1ca037b commit 1bf9d10

File tree

3 files changed

+41
-19
lines changed

3 files changed

+41
-19
lines changed

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

33
import org.apache.arrow.memory.RootAllocator
4+
import org.apache.arrow.vector.ipc.ArrowReader
45
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
56
import org.jetbrains.kotlinx.dataframe.AnyFrame
67
import org.jetbrains.kotlinx.dataframe.DataFrame
@@ -170,3 +171,11 @@ public fun DataFrame.Companion.readArrowFeather(
170171
} else {
171172
readArrowFeather(File(path), nullability)
172173
}
174+
175+
/**
176+
* Read [Arrow any format](https://arrow.apache.org/docs/java/ipc.html#reading-writing-ipc-formats) data from existing [reader]
177+
*/
178+
public fun DataFrame.Companion.readArrowReader(
179+
reader: ArrowReader,
180+
nullability: NullabilityOptions = NullabilityOptions.Infer
181+
): AnyFrame = readArrowReaderImpl(reader, nullability)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.arrow.vector.VarCharVector
3232
import org.apache.arrow.vector.VectorSchemaRoot
3333
import org.apache.arrow.vector.complex.StructVector
3434
import org.apache.arrow.vector.ipc.ArrowFileReader
35+
import org.apache.arrow.vector.ipc.ArrowReader
3536
import org.apache.arrow.vector.ipc.ArrowStreamReader
3637
import org.apache.arrow.vector.types.pojo.Field
3738
import org.apache.arrow.vector.util.DateUtility
@@ -262,17 +263,7 @@ internal fun DataFrame.Companion.readArrowIPCImpl(
262263
allocator: RootAllocator = Allocator.ROOT,
263264
nullability: NullabilityOptions = NullabilityOptions.Infer,
264265
): AnyFrame {
265-
ArrowStreamReader(channel, allocator).use { reader ->
266-
val flattened = buildList {
267-
val root = reader.vectorSchemaRoot
268-
val schema = root.schema
269-
while (reader.loadNextBatch()) {
270-
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
271-
add(df)
272-
}
273-
}
274-
return flattened.concatKeepingSchema()
275-
}
266+
return readArrowReaderImpl(ArrowStreamReader(channel, allocator), nullability)
276267
}
277268

278269
/**
@@ -283,14 +274,36 @@ internal fun DataFrame.Companion.readArrowFeatherImpl(
283274
allocator: RootAllocator = Allocator.ROOT,
284275
nullability: NullabilityOptions = NullabilityOptions.Infer,
285276
): AnyFrame {
286-
ArrowFileReader(channel, allocator).use { reader ->
277+
return readArrowReaderImpl(ArrowFileReader(channel, allocator), nullability)
278+
}
279+
280+
/**
281+
* Read [Arrow any format](https://arrow.apache.org/docs/java/ipc.html#reading-writing-ipc-formats) data from existing [reader]
282+
*/
283+
internal fun DataFrame.Companion.readArrowReaderImpl(
284+
reader: ArrowReader,
285+
nullability: NullabilityOptions = NullabilityOptions.Infer
286+
): AnyFrame {
287+
reader.use {
287288
val flattened = buildList {
288-
reader.recordBlocks.forEach { block ->
289-
reader.loadRecordBatch(block)
290-
val root = reader.vectorSchemaRoot
291-
val schema = root.schema
292-
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
293-
add(df)
289+
when (reader) {
290+
is ArrowFileReader -> {
291+
reader.recordBlocks.forEach { block ->
292+
reader.loadRecordBatch(block)
293+
val root = reader.vectorSchemaRoot
294+
val schema = root.schema
295+
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
296+
add(df)
297+
}
298+
}
299+
is ArrowStreamReader -> {
300+
val root = reader.vectorSchemaRoot
301+
val schema = root.schema
302+
while (reader.loadNextBatch()) {
303+
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
304+
add(df)
305+
}
306+
}
294307
}
295308
}
296309
return flattened.concatKeepingSchema()

docs/StardustDocs/topics/read.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ val df = DataFrame.readArrowFeather(file)
445445

446446
[`DataFrame`](DataFrame.md) supports reading [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format)
447447
and [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files)
448-
from raw Channel (ReadableByteChannel for streaming and SeekableByteChannel for random access), InputStream, File or ByteArray.
448+
from raw Channel (ReadableByteChannel for streaming and SeekableByteChannel for random access), ArrowReader, InputStream, File or ByteArray.
449449

450450
> If you use Java 9+, follow the [Apache Arrow Java compatibility](https://arrow.apache.org/docs/java/install.html#java-compatibility) guide.
451451
>

0 commit comments

Comments
 (0)