pyspark.SparkContext.hadoopRDD#
- SparkContext.hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)[source]#
- Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for meth:SparkContext.sequenceFile. - New in version 1.1.0. - Parameters
- inputFormatClassstr
- fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”) 
- keyClassstr
- fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”) 
- valueClassstr
- fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”) 
- keyConverterstr, optional
- fully qualified name of a function returning key WritableConverter 
- valueConverterstr, optional
- fully qualified name of a function returning value WritableConverter 
- confdict, optional
- Hadoop configuration, passed in as a dict 
- batchSizeint, optional, default 0
- The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) 
 
- Returns
- RDD
- RDD of tuples of key and corresponding value 
 
 - See also - Examples - >>> import os >>> import tempfile - Set the related classes - >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat" >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat" >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text" - >>> with tempfile.TemporaryDirectory(prefix="hadoopRDD") as d: ... path = os.path.join(d, "old_hadoop_file") ... ... # Create the conf for writing ... write_conf = { ... "mapred.output.format.class": output_format_class, ... "mapreduce.job.output.key.class": key_class, ... "mapreduce.job.output.value.class": value_class, ... "mapreduce.output.fileoutputformat.outputdir": path, ... } ... ... # Write a temporary Hadoop file ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) ... rdd.saveAsHadoopDataset(conf=write_conf) ... ... # Create the conf for reading ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} ... ... loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf) ... collected = sorted(loaded.collect()) - >>> collected [(0, '1\t'), (0, '1\ta'), (0, '3\tx')]