成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark Shuffle過程分析:Map階段處理流程

大數據 Spark
默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數據的計算。我們可以在Driver和Executor對應的SparkEnv對象創建過程中看到對應的配置。

默認配置情況下,Spark在Shuffle過程中會使用SortShuffleManager來管理Shuffle過程中需要的基本組件,以及對RDD各個Partition數據的計算。我們可以在Driver和Executor對應的SparkEnv對象創建過程中看到對應的配置,如下代碼所示:

 

  1. // Let the user specify short names for shuffle managers 
  2.     val shortShuffleMgrNames = Map( 
  3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
  4.       "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 
  5.     val shuffleMgrName = conf.get("spark.shuffle.manager""sort"
  6.     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) 
  7.     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) 

如果需要修改ShuffleManager實現,則只需要修改配置項spark.shuffle.manager即可,默認支持sort和 tungsten-sort,可以指定自己實現的ShuffleManager類。

因為Shuffle過程中需要將Map結果數據輸出到文件,所以需要通過注冊一個ShuffleHandle來獲取到一個ShuffleWriter對象,通過它來控制Map階段記錄數據輸出的行為。其中,ShuffleHandle包含了如下基本信息:

  • shuffleId:標識Shuffle過程的唯一ID
  • numMaps:RDD對應的Partitioner指定的Partition的個數,也就是ShuffleMapTask輸出的Partition個數
  • dependency:RDD對應的依賴ShuffleDependency

下面我們看下,在SortShuffleManager中是如何注冊Shuffle的,代碼如下所示:

 

  1. override def registerShuffle[K, V, C]( 
  2.       shuffleId: Int
  3.       numMaps: Int
  4.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 
  5.     if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 
  6.       new BypassMergeSortShuffleHandle[K, V]( 
  7.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  8.     } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 
  9.       new SerializedShuffleHandle[K, V]( 
  10.         shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 
  11.     } else { 
  12.       new BaseShuffleHandle(shuffleId, numMaps, dependency) 
  13.     } 
  14.   } 

上面代碼中,對應如下3種ShuffleHandle可以選擇,說明如下:

  • BypassMergeSortShuffleHandle

如果dependency不需要進行Map Side Combine,并且RDD對應的ShuffleDependency中的Partitioner設置的Partition的數量(這個不要和parent RDD的Partition個數混淆,Partitioner指定了map處理結果的Partition個數,每個Partition數據會在Shuffle過程中全部被拉取而拷貝到下游的某個Executor端)小于等于配置參數spark.shuffle.sort.bypassMergeThreshold的值,則會注冊BypassMergeSortShuffleHandle。默認情況下,spark.shuffle.sort.bypassMergeThreshold的取值是200,這種情況下會直接將對RDD的 map處理結果的各個Partition數據寫入文件,并***做一個合并處理。

  • SerializedShuffleHandle

如果ShuffleDependency中的Serializer,允許對將要輸出數據對象進行排序后,再執行序列化寫入到文件,則會選擇創建一個SerializedShuffleHandle。

  • BaseShuffleHandle

除了上面兩種ShuffleHandle以后,其他情況都會創建一個BaseShuffleHandle對象,它會以反序列化的格式處理Shuffle輸出數據。

Map階段處理流程分析

Map階段RDD的計算,對應ShuffleMapTask這個實現類,它最終會在每個Executor上啟動運行,每個ShuffleMapTask處理RDD的一個Partition的數據。這個過程的核心處理邏輯,代碼如下所示:

 

  1. val manager = SparkEnv.get.shuffleManager 
  2.       writer = manager.getWriter[AnyAny](dep.shuffleHandle, partitionId, context) 
  3.       writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[AnyAny]]]) 

上面代碼中,在調用rdd的iterator()方法時,會根據RDD實現類的compute方法指定的處理邏輯對數據進行處理,當然,如果該Partition對應的數據已經處理過并存儲在MemoryStore或DiskStore,直接通過BlockManager獲取到對應的Block數據,而無需每次需要時重新計算。然后,write()方法會將已經處理過的Partition數據輸出到磁盤文件。

在Spark Shuffle過程中,每個ShuffleMapTask會通過配置的ShuffleManager實現類對應的ShuffleManager對象(實際上是在SparkEnv中創建),根據已經注冊的ShuffleHandle,獲取到對應的ShuffleWriter對象,然后通過ShuffleWriter對象將Partition數據寫入內存或文件。所以,接下來我們可能關心每一種ShuffleHandle對應的ShuffleWriter的行為,可以看到SortShuffleManager中獲取到ShuffleWriter的實現代碼,如下所示:

 

  1. /** Get a writer for a given partition. Called on executors by map tasks. */ 
  2.   override def getWriter[K, V]( 
  3.       handle: ShuffleHandle, 
  4.       mapId: Int
  5.       context: TaskContext): ShuffleWriter[K, V] = { 
  6.     numMapsForShuffle.putIfAbsent( 
  7.       handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 
  8.     val env = SparkEnv.get 
  9.     handle match { 
  10.       case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 
  11.         new UnsafeShuffleWriter( 
  12.           env.blockManager, 
  13.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  14.           context.taskMemoryManager(), 
  15.           unsafeShuffleHandle, 
  16.           mapId, 
  17.           context, 
  18.           env.conf) 
  19.       case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 
  20.         new BypassMergeSortShuffleWriter( 
  21.           env.blockManager, 
  22.           shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 
  23.           bypassMergeSortHandle, 
  24.           mapId, 
  25.           context, 
  26.           env.conf) 
  27.       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 
  28.         new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 
  29.     } 
  30.   } 

我們以最簡單的SortShuffleWriter為例進行分析,在SortShuffleManager可以通過getWriter()方法創建一個SortShuffleWriter對象,然后在ShuffleMapTask中調用SortShuffleWriter對象的write()方法處理Map輸出的記錄數據,write()方法的處理代碼,如下所示:

 

  1. /** Write a bunch of records to this task's output */ 
  2.   override def write(records: Iterator[Product2[K, V]]): Unit = { 
  3.     sorter = if (dep.mapSideCombine) { 
  4.       require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!"
  5.       new ExternalSorter[K, V, C]( 
  6.         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 
  7.     } else { 
  8.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  9.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  10.       // if the operation being run is sortByKey. 
  11.       new ExternalSorter[K, V, V]( 
  12.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 
  13.     } 
  14.     sorter.insertAll(records) 
  15.  
  16.     // Don't bother including the time to open the merged output file in the shuffle write time
  17.     // because it just opens a single file, so is typically too fast to measure accurately 
  18.     // (see SPARK-3570). 
  19.     val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 
  20.     val tmp = Utils.tempFileWith(output
  21.     val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 
  22.     val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 
  23.     shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 
  24.     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 
  25.   } 

從SortShuffleWriter類中的write()方法可以看到,最終調用了ExeternalSorter的insertAll()方法,實現了Map端RDD某個Partition數據處理并輸出到內存或磁盤文件,這也是處理Map階段輸出記錄數據最核心、最復雜的過程。我們將其分為兩個階段進行分析:***階段是,ExeternalSorter的insertAll()方法處理過程,將記錄數據Spill到磁盤文件;第二階段是,執行完insertAll()方法之后的處理邏輯,創建Shuffle Block數據文件及其索引文件。

內存緩沖寫記錄數據并Spill到磁盤文件

查看SortShuffleWriter類的write()方法可以看到,在內存中緩存記錄數據的數據結構有兩種:一種是Buffer,對應的實現類PartitionedPairBuffer,設置mapSideCombine=false時會使用該結構;另一種是Map,對應的實現類是PartitionedAppendOnlyMap,設置mapSideCombine=false時會使用該結構。根據是否指定mapSideCombine選項,分別對應不同的處理流程,我們分別說明如下:

設置mapSideCombine=false時

這種情況在Map階段不進行Combine操作,在內存中緩存記錄數據會使用PartitionedPairBuffer這種數據結構來緩存、排序記錄數據,它是一個Append-only Buffer,僅支持向Buffer中追加數據鍵值對記錄,PartitionedPairBuffer的結構如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

默認情況下,PartitionedPairBuffer初始分配的存儲容量為capacity = initialCapacity = 64,實際上這個容量是針對key的容量,因為要存儲的是鍵值對記錄數據,所以實際存儲鍵值對的容量為2*initialCapacity = 128。PartitionedPairBuffer是一個能夠動態擴充容量的Buffer,內部使用一個一維數組來存儲鍵值對,每次擴容結果為當前Buffer容量的2倍,即2*capacity,***支持存儲2^31-1個鍵值對記錄(1073741823個)。

通過上圖可以看到,PartitionedPairBuffer存儲的鍵值對記錄數據,鍵是(partition, key)這樣一個Tuple,值是對應的數據value,而且curSize是用來跟蹤寫入Buffer中的記錄的,key在Buffer中的索引位置為2*curSize,value的索引位置為2*curSize+1,可見一個鍵值對的key和value的存儲在PartitionedPairBuffer內部的數組中是相鄰的。

使用PartitionedPairBuffer緩存鍵值對記錄數據,通過跟蹤實際寫入到Buffer內的記錄數據的字節數來判斷,是否需要將Buffer中的數據Spill到磁盤文件,如下代碼所示:

 

  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 
  2.     var shouldSpill = false 
  3.     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 
  4.       // Claim up to double our current memory from the shuffle memory pool 
  5.       val amountToRequest = 2 * currentMemory - myMemoryThreshold 
  6.       val granted = acquireMemory(amountToRequest) 
  7.       myMemoryThreshold += granted 
  8.       // If we were granted too little memory to grow further (either tryToAcquire returned 0, 
  9.       // or we already had more memory than myMemoryThreshold), spill the current collection 
  10.       shouldSpill = currentMemory >= myMemoryThreshold 
  11.     } 
  12.     shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 
  13.     // Actually spill 
  14.     if (shouldSpill) { 
  15.       _spillCount += 1 
  16.       logSpillage(currentMemory) 
  17.       spill(collection) 
  18.       _elementsRead = 0 
  19.       _memoryBytesSpilled += currentMemory 
  20.       releaseMemory() 
  21.     } 
  22.     shouldSpill 
  23.   } 

上面elementsRead表示存儲到PartitionedPairBuffer中的記錄數,currentMemory是對Buffer中的總記錄數據大小(字節數)的估算,myMemoryThreshold通過配置項spark.shuffle.spill.initialMemoryThreshold來進行設置的,默認值為5 * 1024 * 1024 = 5M。當滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold時,會先嘗試向MemoryManager申請2 * currentMemory – myMemoryThreshold大小的內存,如果能夠申請到,則不進行Spill操作,而是繼續向Buffer中存儲數據,否則就會調用spill()方法將Buffer中數據輸出到磁盤文件。

向PartitionedPairBuffer中寫入記錄數據,以及滿足條件Spill記錄數據到磁盤文件,具體處理流程,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

為了查看按照怎樣的規則進行排序,我們看一下,當不進行Map Side Combine時,創建ExternalSorter對象的代碼如下所示:

 

  1. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't 
  2.       // care whether the keys get sorted in each partition; that will be done on the reduce side 
  3.       // if the operation being run is sortByKey. 
  4.       new ExternalSorter[K, V, V]( 
  5.         context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 

上面aggregator = None,ordering = None,在對PartitionedPairBuffer中的記錄數據Spill到磁盤之前,要使用默認的排序規則進行排序,排序的規則是只對PartitionedPairBuffer中的記錄按Partition ID進行升序排序,可以查看WritablePartitionedPairCollection伴生對象類的代碼(其中PartitionedPairBuffer類實現了特質WritablePartitionedPairCollection),如下所示:

 

  1. /** 
  2.    * A comparator for (Int, K) pairs that orders them by only their partition ID. 
  3.    */ 
  4.   def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] { 
  5.     override def compare(a: (Int, K), b: (Int, K)): Int = { 
  6.       a._1 - b._1 
  7.     } 
  8.   } 

上面圖中,引用了SortShuffleWriter.writeBlockFiles這個子序列圖,用來生成Block數據文件和索引文件,后面我們會單獨說明。通過對RDD進行計算生成一個記錄迭代器對象,通過該迭代器迭代出的記錄會存儲到PartitionedPairBuffer中,當滿足Spill條件時,先對PartitionedPairBuffer中記錄進行排序,***Spill到磁盤文件,這個過程中PartitionedPairBuffer中的記錄數據的變化情況,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

上圖中,對內存中PartitionedPairBuffer中的記錄按照Partition ID進行排序,并且屬于同一個Partition的數據記錄在PartitionedPairBuffer內部的data數組中是連續的。排序結束后,在Spill到磁盤文件時,將對應的Partition ID去掉了,只在文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中連續存儲鍵值對數據,但同時在另一個內存數組結構中會保存文件中每個Partition擁有的記錄數,這樣就能根據Partition的記錄數來順序讀取文件temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d中屬于同一個Partition的全部記錄數據。

ExternalSorter類內部維護了一個SpillFile的ArrayBuffer數組,最終可能會生成多個SpillFile,SpillFile的定義如下所示:

 

  1. private[this] case class SpilledFile( 
  2.     file: File, 
  3.     blockId: BlockId, 
  4.     serializerBatchSizes: Array[Long], 
  5.     elementsPerPartition: Array[Long]) 

每個SpillFile包含一個blockId,標識Map輸出的該臨時文件;serializerBatchSizes表示每次批量寫入到文件的Object的數量,默認為10000,由配置項spark.shuffle.spill.batchSize來控制;elementsPerPartition表示每個Partition中的Object的數量。調用ExternalSorter的insertAll()方法,最終可能有如下3種情況:

  • Map階段輸出記錄數較少,沒有生成SpillFile,那么所有數據都在Buffer中,直接對Buffer中記錄排序并輸出到文件
  • Map階段輸出記錄數較多,生成多個SpillFile,同時Buffer中也有部分記錄數據
  • Map階段輸出記錄數較多,只生成多個SpillFile
  • 有關后續如何對上面3種情況進行處理,可以想見后面對子序列圖SortShuffleWriter.writeBlockFiles的說明。
  • 設置mapSideCombine=true時

這種情況在Map階段會執行Combine操作,在Map階段進行Combine操作能夠降低Map階段數據記錄的總數,從而降低Shuffle過程中數據的跨網絡拷貝傳輸。這時,RDD對應的ShuffleDependency需要設置一個Aggregator用來執行Combine操作,可以看下Aggregator類聲明,代碼如下所示:

 

  1. /** 
  2.  * :: DeveloperApi :: 
  3.  * A set of functions used to aggregate data. 
  4.  * 
  5.  * @param createCombiner function to create the initial value of the aggregation. 
  6.  * @param mergeValue function to merge a new value into the aggregation result. 
  7.  * @param mergeCombiners function to merge outputs from multiple mergeValue function
  8.  */ 
  9. @DeveloperApi 
  10. case class Aggregator[K, V, C] ( 
  11.     createCombiner: V => C, 
  12.     mergeValue: (C, V) => C, 
  13.     mergeCombiners: (C, C) => C) { 
  14.   ... ... 

由于在Map階段只用到了構造Aggregator的幾個函數參數createCombiner、mergeValue、mergeCombiners,我們對這幾個函數詳細說明如下:

  • createCombiner:進行Aggregation開始時,需要設置初始值。因為在Aggregation過程中使用了類似Map的內存數據結構來管理鍵值對,每次加入前會先查看Map內存結構中是否存在Key對應的Value,***次肯定不存在,所以***將某個Key的Value加入到Map內存結構中時,Key在Map內存結構中***次有了Value。
  • mergeValue:某個Key已經在Map結構中存在Value,后續某次又遇到相同的Key和一個新的Value,這時需要通過該函數,將舊Value和新Value進行合并,根據Key檢索能夠得到合并后的新Value。
  • mergeCombiners:一個Map內存結構中Key和Value是由mergeValue生成的,那么在向Map中插入數據,肯定會遇到Map使用容量達到上限,這時需要將記錄數據Spill到磁盤文件,那么多個Spill輸出的磁盤文件中可能存在同一個Key,這時需要對多個Spill輸出的磁盤文件中的Key的多個Value進行合并,這時需要使用mergeCombiners函數進行處理。

該類中定義了combineValuesByKey、combineValuesByKey、combineCombinersByKey,由于這些函數是在Reduce階段使用的,所以在這里先不說明,后續文章我們會單獨詳細來分析。

我們通過下面的序列圖來描述,需要進行Map Side Combine時的處理流程,如下所示:

Spark Shuffle過程分析:Map階段處理流程

對照上圖,我們看一下,當需要進行Map Side Combine時,對應的ExternalSorter類insertAll()方法中的處理邏輯,代碼如下所示:

 

  1. val shouldCombine = aggregator.isDefined 
  2.  
  3.     if (shouldCombine) { 
  4.       // Combine values in-memory first using our AppendOnlyMap 
  5.       val mergeValue = aggregator.get.mergeValue 
  6.       val createCombiner = aggregator.get.createCombiner 
  7.       var kv: Product2[K, V] = null 
  8.       val update = (hadValue: Boolean, oldValue: C) => { 
  9.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) 
  10.       } 
  11.       while (records.hasNext) { 
  12.         addElementsRead() 
  13.         kv = records.next() 
  14.         map.changeValue((getPartition(kv._1), kv._1), update
  15.         maybeSpillCollection(usingMap = true
  16.       } 
  17.     } 

上面代碼中,map是內存數據結構,最重要的是update函數和map的changeValue方法(這里的map對應的實現類是PartitionedAppendOnlyMap)。update函數所做的工作,其實就是對createCombiner和mergeValue這兩個函數的使用,***次遇到一個Key調用createCombiner函數處理,非***遇到同一個Key對應新的Value調用mergeValue函數進行合并處理。map的changeValue方法主要是將Key和Value在map中存儲或者進行修改(對出現的同一個Key的多個Value進行合并,并將合并后的新Value替換舊Value)。

PartitionedAppendOnlyMap是一個經過優化的哈希表,它支持向map中追加數據,以及修改Key對應的Value,但是不支持刪除某個Key及其對應的Value。它能夠支持的存儲容量是0.7 * 2 ^ 29 = 375809638。當達到指定存儲容量或者指定限制,就會將map中記錄數據Spill到磁盤文件,這個過程和前面的類似,不再累述。

創建Shuffle Block數據文件及其索引文件

無論是使用PartitionedPairBuffer,還是使用PartitionedAppendOnlyMap,當需要容量滿足Spill條件時,都會將該內存結構(buffer/map)中記錄數據Spill到磁盤文件,所以Spill到磁盤文件的格式是相同的。對于后續Block數據文件和索引文件的生成邏輯也是相同,如下圖所示:

Spark Shuffle過程分析:Map階段處理流程

假設,我們生成的Shuffle Block文件對應各個參數為:shuffleId=2901,mapId=11825,reduceId=0,這里reduceId是一個NOOP_REDUCE_ID,表示與DiskStore進行磁盤I/O交互操作,而DiskStore期望對應一個(map, reduce)對,但是對于排序的Shuffle輸出,通常Reducer拉取數據后只生成一個文件(Reduce文件),所以這里默認reduceId為0。經過上圖的處理流程,可以生成一個.data文件,也就是Block數據文件;一個.index文件,也就是包含了各個Partition在數據文件中的偏移位置的索引文件。這個過程生成的文件,示例如下所示:

 

  1. shuffle_2901_11825_0.data  
  2. shuffle_2901_11825_0.index 

這樣,對于每個RDD的多個Partition進行處理后,都會生成對應的數據文件和索引文件,后續在Reduce端就可以讀取這些Block文件,這些記錄數據在文件中都是經過分區(Partitioned)的。

責任編輯:未麗燕 來源: 36大數據
相關推薦

2017-03-27 10:48:03

Hive map優化分析

2021-08-11 06:57:16

ShuffleSpark核心

2019-04-22 15:24:24

HadoopSuffleMap端

2021-10-20 10:04:47

鴻蒙HarmonyOS應用

2023-02-08 13:08:31

2025-06-13 08:40:00

ShuffleSpark大數據

2022-03-15 08:25:32

SparkShuffle框架

2019-06-06 15:22:07

SparkShuffle內存

2019-07-26 15:01:42

SparkShuffle內存

2012-08-30 09:48:02

Struts2Java

2023-11-20 07:27:00

云原生Spark

2009-07-03 13:41:44

WinCE編譯過程

2024-07-15 09:58:03

OpenRestyNginx日志

2016-12-14 19:20:07

Spark SQL架構分布式

2011-04-13 14:57:11

ASP.NET請求處理

2009-07-28 11:32:41

光纖鏈路故障

2017-04-24 09:20:05

Spark分析分區器

2010-06-13 14:36:20

RARP協議

2009-09-24 17:11:53

Hibernate處理

2022-08-25 18:48:29

字節跳動CSS開源
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费黄色a视频 | 伦理片97| 久久久久久九九九九 | 亚洲午夜精品视频 | 蜜桃视频在线观看免费视频网站www | 中文字幕视频一区二区 | 久久久久国产精品一区二区 | 日韩中文字幕 | 7777精品伊人久久精品影视 | 日韩美av | 精品国产色 | 国产99精品 | 欧美日韩亚洲国产 | 日本在线免费看最新的电影 | 日韩毛片视频 | 国产精品久久久久久久久久久久久 | 国产精品久久久久久妇女 | www.蜜桃av| 精品一区二区三区在线视频 | 国产a区| 欧美久久久久久 | 99精品国产一区二区青青牛奶 | 欧美精产国品一二三区 | 国产成人综合av | 亚洲第一在线 | 天天拍天天操 | 中文字幕久久精品 | 中文字幕91av| 久久99网 | 午夜视频在线免费观看 | h视频在线播放 | 国产 亚洲 网红 主播 | 国产成人精品亚洲日本在线观看 | 久久av一区二区三区 | www.国产.com| 国产 亚洲 网红 主播 | 日韩在线看片 | 99热精品在线 | 日韩无| 国产精品欧美一区喷水 | 紧缚调教一区二区三区视频 |