spark中怎么将读取的每一行的数据按某几行拼接成一行? 新手,求指教,谢谢!

2024-12-25 23:42:14
推荐回答(1个)
回答1:

assert(args.length > 1)
val _from = args(0)
val _to = args(1)

val s = sc.textFile(_from).collect()
val n = if (args.length > 2) args(2).toInt else 2
val numSlices = s.length / n
val x = sc.parallelize(s, numSlices).zipWithIndex().aggregate(List.empty[List[String]])(seqOp = (result, lineWithIndex) => {
  lineWithIndex match {
    case (line, index) =>
      if (index % n == 0) List(line) :: result else (line :: result.head) :: result.tail
  }
}, combOp = (x, y) => x ::: y).map(_.reverse mkString " ")
sc.parallelize(x).saveAsTextFile(_to)

textFile 不能指定分区数目,所以只能parallelize, n是每几行一合并,RDD的aggregate方法与foldLeft类似,因为RDD并行,合并之后的行间顺序不确定的

下面给出非RDD操作示例

val s = List("123", "234", "345", "456", "567", "678")
val n = 3
// spark RDD 没有 foldLeft
val x = s.zipWithIndex.foldLeft(List.empty[List[String]])((result, lineWithIndex) => {
  lineWithIndex match {
    case (line, index) =>
      if (index % n == 0) List(line) :: result else (line :: result.head) :: result.tail
  }
}).reverse.map(_.reverse)
println(x)