网站首页 > java教程 正文
Spark支持读取数据格式有文本,json,SequenceFile(MapFile),objectFile,csv等格式。
1.文本文件
文本的读写操作:
sc.textFile(dir,1)
rdd.saveAsTextFile(dir)
2.Json
a.json
{"uid":1, "uname":"kyrie", "age":19}
{"uid":2, "uname":"jame", "age":25}
val conf = new SparkConf().setAppName("Jsontest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.read.json("a.json")
df1.select("uid","uname","age").show(10,false)
3.sequenceFile
3.1. 读取SequenceFile
Spark有专门用来读取SequenceFile的接口。在SparkContext中,可以调用sequenceFile(path, keyClass, valueClass, minpartitions),前面提及SequenceFile使用Writable类,因此keyClass和valueClass都必须使用正确的Writable类。
例:读取SequenceFile
val data=sc.sequenceFile(inFile,"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
sc.sequenceFile("/datascience/data/data-writer/adt/dmp_click/data/2017/09/26/12/05/0",classOf[Text],classOf[BytesWritable]).map{
case (k, v) =>
val len = v.getLength
val value = new String(v.getBytes, 0, len, "UTF-8")
k.toString -> value
}.take(100).foreach(println)
3.2. 保存SequenceFile
在Scala中,需要创建一个又可以写出到SequenceFile的类型构成的PairRDD,如果要保存的是Scala的原生类型,可以直接调用saveSequenceFile(path) 。如果键和值不能自动转为Writable类型,或想使用变长类型,可以对数据进行映射操作,在保存之前进行类型转换。
4.objectFile
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental * storage format and may not be supported exactly as is in future Spark releases. It will also * be pretty slow if you use the default serializer (Java serialization), * though the nice thing about it is that there's very little effort required to save arbitrary * objects.
由注释可知,保存的也是sequenceFile,key为:NullWritable,value:为BytesWritable。
sc.objectFile[ClassTag](path)
* Save this RDD as a SequenceFile of serialized objects.rdd.saveAsObjectFile(path: String)
5.csv
读取csv压缩文件或文本文件。
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.4.0" withSources()方式一: val data =sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(s"$dataInput/impression") 方式二:importcom.databricks.spark.csv._sqlContext.csvFile(s"$dataInput/impression") option说明: 1、path:解析的CSV文件的目录,路径支持通配符; 2、header:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true; 3、delimiter:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。 4、quote:默认情况下的引号是'"',我们可以通过设置这个选项来支持别的引号。 5、mode:解析的模式。默认值是PERMISSIVE,支持的选项有 (1)、PERMISSIVE:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored. (2)、DROPMALFORMED:drops lines which have fewer or more tokens than expected (3)、FAILFAST: aborts with a RuntimeException if encounters any malformed line
6.Hadoop输入输出格式
新版的Hadoop API读入文件,newAPIHadoopFile ,写入saveAsNewAPIHadoopFile。
旧版的Hadoop API读入文件,HadoopFile ,写入saveAsHadoopFile
6.1.新接口读取文件
#文本文件
val rdd = sc.hadoopFile("/user/yu.guan/xueyuan/1005484_1_check.tar.gz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK")
.take(10)
.foreach(println)
#k-v文件
sc.newAPIHadoopFile[Text,BytesWritable,SequenceFileInputFormat[Text,BytesWritable]](path).flatMap{ case (_key,value) => val key = _key.toString if( filter(key) ) { try{ val _bitmap = BitmapUtil.fromWritable(value) Some(key -> _bitmap) }catch{ case e : Exception => println( s"${e.getMessage}") println( s"$key\t$path") None } }else None }
6.2.新接口写入文件
new UnionRDD(sc,filterBitmaps).reduceByKey(_ or _,partition_num).map{ case (k,v) => SerializeText(k.toString).writable -> new BytesWritable(v.toBytes) }.saveAsNewAPIHadoopFile(out , classOf[Text] , classOf[BytesWritable] , classOf[SequenceFileOutputFormat[Text,BytesWritable]])
rdd1.flatMap{ case (key,bitmap) => if(!br_topKeys.value.isEmpty){ if(bitmap.cardinality > 0){ val _key = if(br_topKeys.value.contains(key)) key else "others" Some(_key -> bitmap) }else None }else Some(key -> bitmap) }.reduceByKey(_ or _).map{ case (id,bitmap) => SerializeText(id.toString) -> bitmap.toBytes }.sortByKey(true).map[(Text,BytesWritable)]{ case (k,v) => k.writable -> new BytesWritable(v) }.saveAsNewAPIHadoopFile(output,classOf[Text], classOf[BytesWritable], classOf[MapFileOutputFormat])
猜你喜欢
- 2024-10-01 「每日分享」内存文件映射方式读取超大文件踩坑题解析
- 2024-10-01 尚学堂百战程序员之读写配置文件教程
- 2024-10-01 Java中文件使用流操作基础知识(java文件流不关闭的后果)
- 2024-10-01 Java 读取txt文件生成Word文档(java读取txt文件存为字符串)
- 2024-10-01 零基础编程培训系列JAVA入门课程第十一讲Java文件处理
- 2024-10-01 SpringBoot读取.yml配置文件最常见的两种方式
- 2024-10-01 java IO流读取文件并统计文件中各个字符出现的次数
- 2024-10-01 Java读取配置文件config.properties
- 2024-10-01 常见Java问题及笔试题(十八)——说一说代码中读取文件的事
- 2024-10-01 Java 中获取文件路径的方式,你知道几种?
你 发表评论:
欢迎- 05-27JavaScript 中的运算符优先级
- 05-27Java程序员必备:运算符使用中的八大实战要点
- 05-27Java运算符优先级表
- 05-272025-04-29:高度互不相同的最大塔高和。用go语言,给定一个数组
- 05-27PHP排序算法:计数、选择、插入、归并、快速、冒泡、希尔、堆
- 05-27Python高级排序算法应用
- 05-27用好RANK函数 跨表排名不用愁
- 05-27十大排序算法时空复杂度
- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)