PySpark SQL provides read.json("path") to read a single line or multiline (multiple lines) JSON file into PySpark DataFrame and write.json("path") to save or write to JSON file, In this tutorial, you will learn how to read a single file, multiple files, all files from a directory into DataFrame and writing DataFrame back to JSON file using Python example.

Note: PySpark API out of the box supports to read JSON files and many more file formats into PySpark DataFrame.

Table of contents:

PySpark Read JSON file into DataFrame

Using read.json("path") or read.format("json").load("path") you can read a JSON file into a PySpark DataFrame, these methods take a file path as an argument. 

Unlike reading a CSV, By default JSON data source inferschema from an input file.

zipcodes.json file used here can be downloaded from GitHub project.


# Read JSON file into dataframe
df = spark.read.json("resources/zipcodes.json")
df.printSchema()
df.show()

When you use format("json") method, you can also specify the Data sources by their fully qualified name as below.


# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') 
        .load("resources/zipcodes.json")

Read JSON file from multiline

PySpark JSON data source provides multiple options to read files in different options, use multiline option to read JSON files scattered across multiple lines. By default multiline option, is set to false.

Below is the input file we going to read, this same file is also available at Github


[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

Using read.option("multiline","true")


# Read multiline json file
multiline_df = spark.read.option("multiline","true") 
      .json("resources/multiline-zipcode.json")
multiline_df.show()    

Reading multiple files at a time

Using the read.json() method you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma, for example


# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json','resources/zipcode2.json'])
df2.show()  

Reading all files in a directory

We can read all JSON files from a directory into DataFrame just by passing directory as a path to the json() method.


# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

Reading files with a user-specified custom schema

PySpark Schema defines the structure of the data, in other words, it is the structure of the DataFrame. PySpark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.

If you know the schema of the file ahead and do not want to use the default inferSchema option, use schema option to specify user-defined custom column names and data types.

Use the PySpark StructType class to create a custom schema, below we initiate this class and use add a method to add columns to it by providing the column name, data type and nullable option.


# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) 
        .json("resources/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

Read JSON file using PySpark SQL

PySpark SQL also provides a way to read a JSON file by creating a temporary view directly from the reading file using spark.sqlContext.sql(“load JSON to temporary view”)


spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()

Options while reading JSON file

nullValues

Using nullValues option you can specify the string in a JSON to consider as null. For example, if you want to consider a date column with a value “1900-01-01” set null on DataFrame.

dateFormat

dateFormat option to used to set the format of the input DateType and TimestampType columns. Supports all java.text.SimpleDateFormat formats.

Note: Besides the above options, PySpark JSON dataset also supports many other options.

Applying DataFrame transformations

Once you have create PySpark DataFrame from the JSON file, you can apply all transformation and actions DataFrame support. Please refer to the link for more details. 

Write PySpark DataFrame to JSON file

Use the PySpark DataFrameWriter object “write” method on DataFrame to write a JSON file. 


df2.write.json("/tmp/spark_output/zipcodes.json")

PySpark Options while writing JSON files

While writing a JSON file you can use several options.  

Other options available nullValue,dateFormat

PySpark Saving modes

PySpark DataFrameWriter also has a method mode() to specify SaveMode; the argument to this method either takes overwrite, append, ignore, errorifexists.

overwrite – mode is used to overwrite the existing file

append – To add the data to the existing file

ignore – Ignores write operation when the file already exists

errorifexists or error – This is a default option when the file already exists, it returns an error


 df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")

Source code for reference

This example is also available at GitHub PySpark Example Project for reference.


from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder 
    .master("local[1]") 
    .appName("SparkByExamples.com") 
    .getOrCreate()

# Read JSON file into dataframe    
df = spark.read.json("resources/zipcodes.json")
df.printSchema()
df.show()

# Read multiline json file
multiline_df = spark.read.option("multiline","true") 
      .json("resources/multiline-zipcode.json")
multiline_df.show()

#Read multiple files
df2 = spark.read.json(
    ['resources/zipcode2.json','resources/zipcode1.json'])
df2.show()    

#Read All JSON files from a directory
df3 = spark.read.json("resources/*.json")
df3.show()

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) 
        .json("resources/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS" + 
      " (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode3").show()

# PySpark write Parquet File
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")

Conclusion:

In this tutorial, you have learned how to read a JSON file with single line record and multiline record into PySpark DataFrame, and also learned reading single and multiple files at a time and writing JSON file back to DataFrame using different save options.

References:

Happy Learning !!