成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go語言開發者的Apache Arrow使用指南:高級數據結構

開發 前端
本文講解了基于array type的三個高級數據結構:Record Batch、Chunked Array和Table。其中Record Batch是Arrow Columnar Format中的結構,可以被所有實現arrow的編程語言所支持;Chunked Array和Table則是在一些編程語言的實現中創建的。

經過對前面兩篇文章《Arrow數據類型》[1]和《Arrow Go實現的內存管理》[2]的學習,我們知道了各種Arrow array type以及它們在內存中的layout,我們了解了Go arrow實現在內存管理上的一些機制和使用原則。

Arrow的array type只是一個定長的、同類型的值序列。在實際應用中,array type更多時候只是充當基礎類型,我們需要具有組合基礎類型能力的更高級的數據結構。在這一篇文章中,我們就來看看Arrow規范以及一些實現中提供的高級數據結構,包括Record Batch、Chunked Array以及Table。

我們先來看看Record Batch[3]。

1. Record Batch

Record這個名字讓我想起了[Pascal編程語言](https://en.wikipedia.org/wiki/Pascal_(programming_language "Pascal編程語言"))中的Record。在Pascal中,Record的角色大致與Go中的Struct類似,也是一組異構字段的集合。下面是《In-Memory Analytics with Apache Arrow》[4]書中的一個Record例子:

// 以Go語言呈現
type Archer struct {
 archer string
 location string
 year int16
}

Record Batch則顧名思義,是一批Record,即一個Record的集合:[N]Archer。

如果將Record的各個字段作為列,將集合中的每個Record作為行,我們能得到如下面示意圖中的結構:

圖片圖片

Go Arrow實現中沒有直接使用“Record Batch”這個名字,而是使用了“Record”,這個“Record”實際代表的就是Record Batch。下面是Go Arrow實現定義的Record接口:

// github.com/apache/arrow/go/arrow/record.go

// Record is a collection of equal-length arrays matching a particular Schema.
// Also known as a RecordBatch in the spec and in some implementations.
//
// It is also possible to construct a Table from a collection of Records that
// all have the same schema.
type Record interface {
    json.Marshaler

    Release()
    Retain()

    Schema() *Schema

    NumRows() int64
    NumCols() int64

    Columns() []Array
    Column(i int) Array
    ColumnName(i int) string
    SetColumn(i int, col Array) (Record, error)

    // NewSlice constructs a zero-copy slice of the record with the indicated
    // indices i and j, corresponding to array[i:j].
    // The returned record must be Release()'d after use.
    //
    // NewSlice panics if the slice is outside the valid range of the record array.
    // NewSlice panics if j < i.
    NewSlice(i, j int64) Record
}

我們依然可以使用Builder模式來創建一個arrow.Record,下面我們就來用Go代碼創建[N]Archer這個Record Batch:

// record_batch.go
func main() {
    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "archer", Type: arrow.BinaryTypes.String},
            {Name: "location", Type: arrow.BinaryTypes.String},
            {Name: "year", Type: arrow.PrimitiveTypes.Int16},
        },
        nil,
    )

    rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
    defer rb.Release()

    rb.Field(0).(*array.StringBuilder).AppendValues([]string{"tony", "amy", "jim"}, nil)
    rb.Field(1).(*array.StringBuilder).AppendValues([]string{"beijing", "shanghai", "chengdu"}, nil)
    rb.Field(2).(*array.Int16Builder).AppendValues([]int16{1992, 1993, 1994}, nil)

    rec := rb.NewRecord()
    defer rec.Release()

    fmt.Println(rec)
}

運行上述示例,輸出如下:

$go run record_batch.go 
record:
  schema:
  fields: 3
    - archer: type=utf8
    - location: type=utf8
    - year: type=int16
  rows: 3
  col[0][archer]: ["tony" "amy" "jim"]
  col[1][location]: ["beijing" "shanghai" "chengdu"]
  col[2][year]: [1992 1993 1994]

在這個示例里,我們看到了一個名為Schema的概念,并且NewRecordBuilder創建時需要傳入一個arrow.Schema的實例。和數據庫表Schema類似,Arrow中的Schema也是一個元數據概念,它包含一系列作為“列”的字段的名稱和類型信息。Schema不僅在Record Batch中使用,在后面的Table中,Schema也是必要元素。

arrow.Record可以通過NewSlice可以ZeroCopy方式共享Record Batch的內存數據,NewSlice會創建一個新的Record Batch,這個Record Batch中的Record與原Record是共享的:

// record_batch_slice.go

sl := rec.NewSlice(0, 2)
fmt.Println(sl)
cols := sl.Columns()
a1 := cols[0]
fmt.Println(a1)

新的sl取了rec的前兩個record,輸出sl得到如下結果:

record:
  schema:
  fields: 3
    - archer: type=utf8
    - location: type=utf8
    - year: type=int16
  rows: 2
  col[0][archer]: ["tony" "amy"]
  col[1][location]: ["beijing" "shanghai"]
  col[2][year]: [1992 1993]

["tony" "amy"]

相同schema的record batch可以合并,我們只需要分配一個更大的Record Batch,然后將兩個待合并的Record batch copy到新Record Batch中就可以了,但顯然這樣做的開銷很大。

Arrow的一些實現中提供了Chunked Array的概念,可以更低開銷的來完成某個列的array的追加。

注:Chunked array并不是Arrow Columnar Format的一部分。

2. Chunked Array

如果說Record Batch本質上是不同Array type的橫向聚合,那么Chunked Array就是相同Array type的縱向聚合了,用Go語法表示就是:[N]Array或[]Array,即array of array。下面是一個Chunked Array的結構示意圖:

圖片圖片

我們看到:Go的Chunked array的實現使用的是一個Array切片:

// github.com/apache/arrow/go/arrow/table.go

// Chunked manages a collection of primitives arrays as one logical large array.
type Chunked struct {
    refCount int64 // refCount must be first in the struct for 64 bit alignment and sync/atomic (https://github.com/golang/go/issues/37262)

    chunks []Array

    length int
    nulls  int
    dtype  DataType
}

按照Go切片的本質,Chunked Array中的各個元素Array間的實際內存buffer并不連續。并且正如示意圖所示:每個Array的長度也并非是一樣的。

注:在《Go語言第一課》[5]中的第15講中有關于切片本質的深入系統的講解。

我們可以使用arrow包提供的NewChunked函數創建一個Chunked Array,具體見下面源碼:

// chunked_array.go

func main() {
    ib := array.NewInt64Builder(memory.DefaultAllocator)
    defer ib.Release()

    ib.AppendValues([]int64{1, 2, 3, 4, 5}, nil)
    i1 := ib.NewInt64Array()
    defer i1.Release()

    ib.AppendValues([]int64{6, 7}, nil)
    i2 := ib.NewInt64Array()
    defer i2.Release()
    
    ib.AppendValues([]int64{8, 9, 10}, nil)
    i3 := ib.NewInt64Array()
    defer i3.Release()

    c := arrow.NewChunked(
        arrow.PrimitiveTypes.Int64,
        []arrow.Array{i1, i2, i3},
    )
    defer c.Release()

    for _, arr := range c.Chunks() {
        fmt.Println(arr)
    }
    
    fmt.Println("chunked length =", c.Len())
    fmt.Println("chunked null count=", c.NullN())
}

我們看到在Chunked Array聚合了多個arrow.Array實例,并且這些arrow.Array實例的長短可不一致,arrow.Chunked的Len()返回的則是Chunked中Array的長度之和。下面是示例程序的輸出結果:

$go run chunked_array.go 
[1 2 3 4 5]
[6 7]
[8 9 10]
chunked length = 10
chunked null count= 0

這樣來看,Chunked Array可以看成一個邏輯上的大Array。

好了,問題來了!Record Batch是用來聚合等長array type的,那么是否有某種數據結構可以用來聚合等長的Chunked Array呢?答案是有的!下面我們就來看看這種結構:Table。

3. Table

Table和Chunked Array一樣并不屬于Arrow Columnar Format的一部分,最初只是Arrow的C++實現中的一個數據結構,Go Arrow的實現也提供了對Table的支持。

Table的結構示意圖如下(圖摘自《In-Memory Analytics with Apache Arrow》[6]一書):

圖片圖片

我們看到:和Record Batch的每列是一個array不同,Table的每一列為一個chunked array,所有列的chunked array的Length是相同的,但各個列的chunked array中的array的長度可以不同。

Table和Record Batch相似的地方是都有自己的Schema。

下面的示意圖(來自這里[7])對Table和Chunked Array做了十分直觀的對比:

圖片圖片

Record Batch是Arrow Columnar format中的一部分,所有語言的實現都支持Record Batch;但Table并非format spec的一部分,并非所有語言的實現對其都提供支持。

另外從圖中看到,由于Table采用了Chunked Array作為列,chunked array下的各個array內部分布并不連續,這讓Table在運行時喪失了一些局部性。

下面我們就使用Go arrow實現來創建一個table,這是一個3列、10行的table:

// table.go

func main() {
 schema := arrow.NewSchema(
  []arrow.Field{
   {Name: "col1", Type: arrow.PrimitiveTypes.Int32},
   {Name: "col2", Type: arrow.PrimitiveTypes.Float64},
   {Name: "col3", Type: arrow.BinaryTypes.String},
  },
  nil,
 )

 col1 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   ib := array.NewInt32Builder(memory.DefaultAllocator)
   defer ib.Release()

   ib.AppendValues([]int32{1, 2, 3}, nil)
   i1 := ib.NewInt32Array()
   defer i1.Release()

   ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
   i2 := ib.NewInt32Array()
   defer i2.Release()

   c := arrow.NewChunked(
    arrow.PrimitiveTypes.Int32,
    []arrow.Array{i1, i2},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(0), chunk)
 }()
 defer col1.Release()

 col2 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   fb := array.NewFloat64Builder(memory.DefaultAllocator)
   defer fb.Release()

   fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
   f1 := fb.NewFloat64Array()
   defer f1.Release()

   fb.AppendValues([]float64{6.6, 7.7}, nil)
   f2 := fb.NewFloat64Array()
   defer f2.Release()

   fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
   f3 := fb.NewFloat64Array()
   defer f3.Release()

   c := arrow.NewChunked(
    arrow.PrimitiveTypes.Float64,
    []arrow.Array{f1, f2, f3},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(1), chunk)
 }()
 defer col2.Release()

 col3 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   sb := array.NewStringBuilder(memory.DefaultAllocator)
   defer sb.Release()

   sb.AppendValues([]string{"s1", "s2"}, nil)
   s1 := sb.NewStringArray()
   defer s1.Release()

   sb.AppendValues([]string{"s3", "s4"}, nil)
   s2 := sb.NewStringArray()
   defer s2.Release()

   sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
   s3 := sb.NewStringArray()
   defer s3.Release()

   c := arrow.NewChunked(
    arrow.BinaryTypes.String,
    []arrow.Array{s1, s2, s3},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(2), chunk)
 }()
 defer col3.Release()

 var tbl arrow.Table
 tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
 defer tbl.Release()

 dumpTable(tbl)
}

func dumpTable(tbl arrow.Table) {
 s := tbl.Schema()
 fmt.Println(s)
 fmt.Println("------")

 fmt.Println("the count of table columns=", tbl.NumCols())
 fmt.Println("the count of table rows=", tbl.NumRows())
 fmt.Println("------")

 for i := 0; i < int(tbl.NumCols()); i++ {
  col := tbl.Column(i)
  fmt.Printf("arrays in column(%s):\n", col.Name())
  chunk := col.Data()
  for _, arr := range chunk.Chunks() {
   fmt.Println(arr)
  }
  fmt.Println("------")
 }
}

我們看到:table創建之前,我們需要準備一個schema,以及各個column。每個column則是一個chunked array。

運行上述代碼,我們得到如下結果:

$go run table.go
schema:
  fields: 3
    - col1: type=int32
    - col2: type=float64
    - col3: type=utf8
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------

table還支持schema變更,我們可以基于上述代碼為table增加一列:

// table_schema_change.go

func main() {
 schema := arrow.NewSchema(
  []arrow.Field{
   {Name: "col1", Type: arrow.PrimitiveTypes.Int32},
   {Name: "col2", Type: arrow.PrimitiveTypes.Float64},
   {Name: "col3", Type: arrow.BinaryTypes.String},
  },
  nil,
 )

 col1 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   ib := array.NewInt32Builder(memory.DefaultAllocator)
   defer ib.Release()

   ib.AppendValues([]int32{1, 2, 3}, nil)
   i1 := ib.NewInt32Array()
   defer i1.Release()

   ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
   i2 := ib.NewInt32Array()
   defer i2.Release()

   c := arrow.NewChunked(
    arrow.PrimitiveTypes.Int32,
    []arrow.Array{i1, i2},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(0), chunk)
 }()
 defer col1.Release()

 col2 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   fb := array.NewFloat64Builder(memory.DefaultAllocator)
   defer fb.Release()

   fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
   f1 := fb.NewFloat64Array()
   defer f1.Release()

   fb.AppendValues([]float64{6.6, 7.7}, nil)
   f2 := fb.NewFloat64Array()
   defer f2.Release()

   fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
   f3 := fb.NewFloat64Array()
   defer f3.Release()

   c := arrow.NewChunked(
    arrow.PrimitiveTypes.Float64,
    []arrow.Array{f1, f2, f3},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(1), chunk)
 }()
 defer col2.Release()

 col3 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   sb := array.NewStringBuilder(memory.DefaultAllocator)
   defer sb.Release()

   sb.AppendValues([]string{"s1", "s2"}, nil)
   s1 := sb.NewStringArray()
   defer s1.Release()

   sb.AppendValues([]string{"s3", "s4"}, nil)
   s2 := sb.NewStringArray()
   defer s2.Release()

   sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
   s3 := sb.NewStringArray()
   defer s3.Release()

   c := arrow.NewChunked(
    arrow.BinaryTypes.String,
    []arrow.Array{s1, s2, s3},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(schema.Field(2), chunk)
 }()
 defer col3.Release()

 var tbl arrow.Table
 tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
 defer tbl.Release()

 dumpTable(tbl)

 col4 := func() *arrow.Column {
  chunk := func() *arrow.Chunked {
   sb := array.NewStringBuilder(memory.DefaultAllocator)
   defer sb.Release()

   sb.AppendValues([]string{"ss1", "ss2"}, nil)
   s1 := sb.NewStringArray()
   defer s1.Release()

   sb.AppendValues([]string{"ss3", "ss4", "ss5"}, nil)
   s2 := sb.NewStringArray()
   defer s2.Release()

   sb.AppendValues([]string{"ss6", "ss7", "ss8", "ss9", "ss10"}, nil)
   s3 := sb.NewStringArray()
   defer s3.Release()

   c := arrow.NewChunked(
    arrow.BinaryTypes.String,
    []arrow.Array{s1, s2, s3},
   )
   return c
  }()
  defer chunk.Release()

  return arrow.NewColumn(arrow.Field{Name: "col4", Type: arrow.BinaryTypes.String}, chunk)
 }()
 defer col4.Release()

 tbl, err := tbl.AddColumn(
  3,
  arrow.Field{Name: "col4", Type: arrow.BinaryTypes.String},
  *col4,
 )
 if err != nil {
  panic(err)
 }

 dumpTable(tbl)
}

運行上述示例,輸出如下:

$go run table_schema_change.go
schema:
  fields: 3
    - col1: type=int32
    - col2: type=float64
    - col3: type=utf8
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------
schema:
  fields: 4
    - col1: type=int32
    - col2: type=float64
    - col3: type=utf8
    - col4: type=utf8
------
the count of table columns= 4
the count of table rows= 10
------
arrays in column(col1):
[1 2 3]
[4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5]
[6.6 7.7]
[8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2"]
["s3" "s4"]
["s5" "s6" "s7" "s8" "s9" "s10"]
------
arrays in column(col4):
["ss1" "ss2"]
["ss3" "ss4" "ss5"]
["ss6" "ss7" "ss8" "ss9" "ss10"]
------

這種對schema變更操作的支持在實際開發中也是非常有用的。

4. 小結

本文講解了基于array type的三個高級數據結構:Record Batch、Chunked Array和Table。其中Record Batch是Arrow Columnar Format中的結構,可以被所有實現arrow的編程語言所支持;Chunked Array和Table則是在一些編程語言的實現中創建的。

三個概念容易混淆,這里給出簡單記法:

  • Record Batch: schema + 長度相同的多個array
  • Chunked Array: []array
  • Table: schema + 總長度相同的多個Chunked Array

注:本文涉及的源代碼在這里[8]可以下載。

5. 參考資料

  • Apache Arrow Glossary[9] - https://arrow.apache.org/docs/format/Glossary.html
  • 參考資料
  • [1] 《Arrow數據類型》: https://tonybai.com/2023/06/25/a-guide-of-using-apache-arrow-for-gopher-part1
  • [2] 《Arrow Go實現的內存管理》: https://tonybai.com/2023/06/30/a-guide-of-using-apache-arrow-for-gopher-part2
  • [3] Record Batch: https://arrow.apache.org/docs/format/Glossary.html#term-record-batch
  • [4] 《In-Memory Analytics with Apache Arrow》: https://book.douban.com/subject/35954154/
  • [5] 《Go語言第一課》: http://gk.link/a/10AVZ
  • [6] 《In-Memory Analytics with Apache Arrow》: https://book.douban.com/subject/35954154/
  • [7] 這里: https://arrow.apache.org/docs/format/Glossary.html#term-table
  • [8] 這里: https://github.com/bigwhite/experiments/blob/master/arrow/advanced-datastructure
  • [9] Apache Arrow Glossary: https://arrow.apache.org/docs/format/Glossary.html
  • [10] “Gopher部落”知識星球: https://wx.zsxq.com/dweb2/index/group/51284458844544
  • [11] 鏈接地址: https://m.do.co/c/bff6eed92687
責任編輯:武曉燕 來源: 白明的贊賞賬戶
相關推薦

2023-09-11 08:47:20

Go模式uilder

2013-08-30 09:41:46

JavaApache CameApache

2021-06-08 10:41:00

Go語言算法

2024-05-07 08:45:16

OpenAILlamaIndex大語言模型

2024-02-01 09:37:42

Kubernetes服務網格? 命令

2018-03-27 23:25:40

Paddle

2011-04-02 13:44:08

2021-12-16 20:12:37

后端開發Sentry

2024-03-29 09:12:43

Go語言工具

2022-01-03 22:59:30

開發SDK數據

2019-08-16 10:55:37

開發者技能AI

2022-01-16 22:16:59

數據庫Sentry開發者

2019-02-21 13:40:35

Javascript面試前端

2017-11-27 13:09:00

AndroidGradle代碼

2022-01-02 23:26:08

開發SDK Sentry

2022-09-29 09:07:08

DataGrip數據倉庫數據庫

2024-07-11 08:50:05

Go語言errors

2022-01-11 20:42:54

開發Sentry標志

2011-04-13 09:55:16

Mail APIBlackBerry

2011-04-13 13:38:57

選項APIBlackBerry
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 色视频在线免费观看 | 狠狠影院 | 久久综合激情 | 欧美精品乱码久久久久久按摩 | 四虎影院美女 | 国产99精品 | 久久精品二区 | 国产精品综合色区在线观看 | 91视视频在线观看入口直接观看 | 精品无码久久久久久国产 | 日韩有码一区 | 欧美成视频 | 91av亚洲| a级大毛片 | 福利网址 | 色片在线观看 | 国产成人啪免费观看软件 | 欧洲视频一区二区 | 二区国产| 黄色香蕉视频在线观看 | 久久久久久久久国产成人免费 | 日韩精品一区二区在线 | 日韩在线免费播放 | 精品一区二区免费视频 | 日韩快播电影 | 国产精品一区久久久 | 美国av片在线观看 | 欧美日韩一 | 成人精品国产免费网站 | 亚洲性人人天天夜夜摸 | 超级乱淫av片免费播放 | 日韩一区二区不卡 | 在线观看国产视频 | 国产高清一区二区三区 | 日本精品一区二区三区在线观看视频 | 亚洲一区二区三区高清 | 狠狠色香婷婷久久亚洲精品 | 久久久久久久久久久久91 | 一级黄色片毛片 | 国产在线一级片 | 久久综合九九 |