专业的JAVA编程教程与资源

网站首页 > java教程 正文

Spark中读写不同类型文件(如何改文件的读写类型)

temp10 2024-10-01 22:26:04 java教程 10 ℃ 0 评论

Spark支持读取数据格式有文本,json,SequenceFile(MapFile),objectFile,csv等格式。

1.文本文件

文本的读写操作:

Spark中读写不同类型文件(如何改文件的读写类型)

sc.textFile(dir,1)

rdd.saveAsTextFile(dir)

2.Json

a.json

{"uid":1, "uname":"kyrie", "age":19}

{"uid":2, "uname":"jame", "age":25}

val conf = new SparkConf().setAppName("Jsontest")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val df1 = sqlContext.read.json("a.json")

df1.select("uid","uname","age").show(10,false)

3.sequenceFile

3.1. 读取SequenceFile

Spark有专门用来读取SequenceFile的接口。在SparkContext中,可以调用sequenceFile(path, keyClass, valueClass, minpartitions),前面提及SequenceFile使用Writable类,因此keyClass和valueClass都必须使用正确的Writable类。

例:读取SequenceFile

val data=sc.sequenceFile(inFile,"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

sc.sequenceFile("/datascience/data/data-writer/adt/dmp_click/data/2017/09/26/12/05/0",classOf[Text],classOf[BytesWritable]).map{

case (k, v) =>

val len = v.getLength

val value = new String(v.getBytes, 0, len, "UTF-8")

k.toString -> value

}.take(100).foreach(println)

3.2. 保存SequenceFile

在Scala中,需要创建一个又可以写出到SequenceFile的类型构成的PairRDD,如果要保存的是Scala的原生类型,可以直接调用saveSequenceFile(path) 。如果键和值不能自动转为Writable类型,或想使用变长类型,可以对数据进行映射操作,在保存之前进行类型转换。

4.objectFile

* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental
* storage format and may not be supported exactly as is in future Spark releases. It will also
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
由注释可知,保存的也是sequenceFile,key为:NullWritable,value:为BytesWritable。
sc.objectFile[ClassTag](path)
* Save this RDD as a SequenceFile of serialized objects.rdd.saveAsObjectFile(path: String)

5.csv

读取csv压缩文件或文本文件。

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0" withSources()方式一: val data =sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(s"$dataInput/impression")
方式二:importcom.databricks.spark.csv._sqlContext.csvFile(s"$dataInput/impression")
option说明:
  1、path:解析的CSV文件的目录,路径支持通配符;
  2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true;
  3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。
  4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。
  5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有
    (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
    (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected
    (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line

6.Hadoop输入输出格式

新版的Hadoop API读入文件,newAPIHadoopFile ,写入saveAsNewAPIHadoopFile。

旧版的Hadoop API读入文件,HadoopFile ,写入saveAsHadoopFile

6.1.新接口读取文件

#文本文件

val rdd = sc.hadoopFile("/user/yu.guan/xueyuan/1005484_1_check.tar.gz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)

.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK")

.take(10)

.foreach(println)

#k-v文件

sc.newAPIHadoopFile[Text,BytesWritable,SequenceFileInputFormat[Text,BytesWritable]](path).flatMap{
case (_key,value) =>
val key = _key.toString
if( filter(key) ) {
try{
val _bitmap = BitmapUtil.fromWritable(value)
Some(key -> _bitmap)
}catch{
case e : Exception =>
println( s"${e.getMessage}")
println( s"$key\t$path")
None
}
}else None
}

6.2.新接口写入文件

new UnionRDD(sc,filterBitmaps).reduceByKey(_ or _,partition_num).map{
case (k,v) => SerializeText(k.toString).writable -> new BytesWritable(v.toBytes)
}.saveAsNewAPIHadoopFile(out , classOf[Text] , classOf[BytesWritable] , classOf[SequenceFileOutputFormat[Text,BytesWritable]])
rdd1.flatMap{
case (key,bitmap) =>
if(!br_topKeys.value.isEmpty){
if(bitmap.cardinality > 0){
val _key = if(br_topKeys.value.contains(key)) key else "others"
Some(_key -> bitmap)
}else None
}else Some(key -> bitmap)
}.reduceByKey(_ or _).map{
case (id,bitmap) => SerializeText(id.toString) -> bitmap.toBytes
}.sortByKey(true).map[(Text,BytesWritable)]{
case (k,v) => k.writable -> new BytesWritable(v)
}.saveAsNewAPIHadoopFile(output,classOf[Text], classOf[BytesWritable], classOf[MapFileOutputFormat])

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表