Getting Started with Data Ingestion Using Spark

On This Page

Overview

You can use the Apache Spark open-source data engine to work with data in the platform. This tutorial demonstrates how to run Spark jobs for reading and writing data in different formats (converting the data format), and for running SQL queries on the data. For more information about Spark, see the Spark v2.4.4 quick-start guide.

Before You Begin

To follow this tutorial, you must first ingest some data, such as a CSV or Parquet file, into the platform (i.e., write data to a platform data container). For information about the available data-ingestion methods, see the Ingesting and Preparing Data and Ingesting and Consuming Files getting-started tutorials.

Data Formats

The Spark jobs in this tutorial process data in the following data formats:

  • Comma Separated Value (CSV)

  • Parquet — an Apache columnar storage format that can be used in Apache Hadoop.
    For more information about Parquet, see https://parquet.apache.org/.
    For more information about Hadoop, see the Apache Hadoop web site.

  • NoSQL — the platform’s NoSQL format. A NoSQL table is a collection of items (objects) and their attributes. “items” are the equivalent of NoSQL database rows, and “attributes” are the equivalent of NoSQL database columns.
    All items in the platform share one common attribute, which serves as an item’s name and primary key. The value of this attribute must be unique to each item within a given NoSQL table. The primary key enables unique identification of specific items in the table, and efficient sharding of the table items.
    For more information, see the NoSQL Databases overview.

    You can use Spark Datasets, or the platform’s NoSQL Web API, to add, retrieve, and remove NoSQL table items. You can also use the platform’s Spark API extensions or NoSQL Web API to extend the basic functionality of Spark Datasets (for example, to conditionally update an item in a NoSQL table). For more information, see the related API references.

Using a Web Notebook

A common way to run Spark data jobs is by using web notebook for performing interactive data analytics, such as Jupyter Notebook or Apache Zeppelin. You create a web notebook with notes that define Spark jobs for interacting with the data, and then run the jobs from the web notebook. The code can be written in any of the supported language interpreters. This tutorial contains examples in Scala and Python. For more information about Jupyter Notebook or Zeppelin, see the respective product documentation. See also Running Spark Jobs from a Web Notebook in the Spark reference overview.

The examples in this tutorial were tested with Spark v2.4.4.

Selecting the Programming Language and Creating a Spark Session

In JupyterLab, select to create a new Python or Scala notebook.

Scala Jupyter Notebooks
Version 2.10.0 of the platform doesn’t support Scala Jupyter notebooks. See the Software Specifications and Restrictions.

In Zeppelin, create a new note in your Zeppelin notebook and load the desired interpreter at the start of your code paragraphs:

  • %spark loads the default Scala interpreter.
  • %pyspark loads the Python interpreter.

Then, add the following code in your Jupyter notebook cell or Zeppelin note paragraph to perform required imports and create a new Spark session; you’re encouraged to change the appName string to provide a more unique description:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.types._
    
    val spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
    
    import sys
    from pyspark.sql import SparkSession
    from pyspark.sql import *
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    
    spark = SparkSession.builder.appName("My Spark Application").getOrCreate()
    
    

    At the end of your code flow, add a cell/paragraph with the following code to stop the Spark session and release its resources:

      spark.stop()
      
      spark.stop()
      
      

      Sample Workflows

      Following are some possible workflows that use the Spark jobs outlined in this tutorial:

      Workflow 1: Convert a CSV File into a Partitioned Parquet Table
      1. Write a CSV file to a platform data container.

      2. Convert the CSV file into a Parquet table.

      3. Run SQL queries on the data in Parquet table.

      Workflow 2: Convert a Parquet Table into a NoSQL Table
      1. Write a Parquet table to a platform data container.

      2. Convert the Parquet table into a NoSQL table.

      3. Run SQL queries on the data in NoSQL table.

      Reading the Data

      Reading CSV Data

      Use the following code to read data in CSV format.
      You can read both CSV files and CSV directories.

      Defining the Table Schema
      To read CSV data using a Spark DataFrame, Spark needs to be aware of the schema of the data. You can either define the schema programmatically as part of the read operation as demonstrated in this section, or let Spark infer the schema as outlined in the Spark SQL and DataFrames documentation (e.g., option("inferSchema", "true") in Scala or csv(..., inferSchema="true") in Python). (Note that inferSchema requires an extra pass over the data.)
      Note
      Before running the read job, ensure that the referenced data source exists.
      Syntax

      The header and delimiter options are optional.

        val <schema variable> = StructType(List(
            StructField("<column name>", <column type>, nullable = <Boolean value>),
            <additional StructField() columns> of the same format, for each column>))
        val <DF variable> = spark.read.schema(<schema variable>)
            .option("header", "<true/false>")
            .option("delimiter", "<delimiter>")
            .csv("v3io://<container name>/<path to CSV data>")
        
        <schema variable> = StructType([
            StructField("<column name>", <column type>, <is-nullable Boolean value>),
            <additional StructField() columns> of the same format, for each column>)])
        <DF variable> = spark.read.schema(<schema variable>) \
            .option("header", "<true/false>") \
            .option("delimiter", "<delimiter>") \
            .csv("v3io://<container name>/<path to CSV data>")
        
        Example

        The following example reads a /mydata/nycTaxi.csv CSV file from the “bigdata” container into a myDF DataFrame variable.

          val schema = StructType(List(
              StructField("pickup_time", LongType, nullable = true),
              StructField("dropoff_time", LongType, nullable = true),
              StructField("passenger_count", LongType, nullable = true),
              StructField("trip_distance", DoubleType, nullable = true),
              StructField("payment_type", LongType, nullable = true),
              StructField("fare_amount", DoubleType, nullable = true),
              StructField("tip_amount", DoubleType, nullable = true),
              StructField("tolls_amount", DoubleType, nullable = true),
              StructField("total_amount", DoubleType, nullable = true)
          ))
          val myDF = spark.read.schema(schema)
              .option("header", "false")
              .option("delimiter", "|")
              .csv("v3io://bigdata/mydata/nycTaxi.csv")
          
          schema = StructType([
              StructField("pickup_time", LongType(), True),
              StructField("dropoff_time", LongType(), True),
              StructField("passenger_count", LongType(), True),
              StructField("trip_distance", DoubleType(), True),
              StructField("payment_type", LongType(), True),
              StructField("fare_amount", DoubleType(), True),
              StructField("tip_amount", DoubleType(), True),
              StructField("tolls_amount", DoubleType(), True),
              StructField("total_amount", DoubleType(), True)
          ])
          myDF = spark.read.schema(schema) \
              .option("header", "false") \
              .option("delimiter", "|") \
              .csv("v3io://bigdata/mydata/nycTaxi.csv")
          

          Reading Parquet Data

          Use the following code to read data as a Parquet database table.

          Note
          Before running the read job, ensure that the referenced data source exists.
          Syntax
            val <DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")
            
            <DF variable> = spark.read.parquet("v3io://<container name>/<path to Parquet data>")
            
            Example

            The following example reads a /mydata/my-parquet-table Parquet database table from the “bigdata” container into a myDF DataFrame variable.

              val myDF = spark.read.parquet("v3io://bigdata/mydata/my-parquet-table")
              
              myDf = spark.read.parquet("v3io://bigdata/mydata/my-parquet-table")
              

              Reading NoSQL Data

              Use the following code to read data as a NoSQL table.

              Defining the Table Schema
              When using a Spark DataFrame to read data that was written in the platform using a NoSQL Spark DataFrame, the schema of the table structure is automatically identified and retrieved (unless you select to explicitly define the schema for the read operation). However, to read NoSQL data that was written to a table in another way, you first need to define the table schema. You can either define the schema programmatically as part of the read operation as demonstrated in this section, or let the platform infer the schema by using the inferSchema option (option("inferSchema", "true")). For more information, see Defining the Table Schema in the Spark NoSQL DataFrame reference.
              Note
              Before running the read job, ensure that the referenced data source exists.
              Syntax
                val <schema variable> = StructType(List(
                    StructField("<column name>", <column type>, nullable = <Boolean value>),
                    <additional StructField() columns> of the same format, for each column>))
                val <DF variable> = spark.read.schema(<schema variable>)
                    .format("io.iguaz.v3io.spark.sql.kv")
                    .load("v3io://<container name>/<path to a NoSQL table>")
                
                <schema variable> = StructType([
                    StructField("<column name>", <column type>, <is-nullable Boolean value>),
                    <additional StructField() columns> of the same format, for each column>)])
                <DF variable> = spark.read.schema(<schema variable>) \
                    .format("io.iguaz.v3io.spark.sql.kv") \
                    .load("v3io://<container name>/<path to a NoSQL table>")
                
                Example

                The following example reads a /mydata/flights NoSQL table from the “bigdata” container into a myDF DataFrame variable.

                  val schema = StructType(List(
                      StructField("id", StringType, nullable = false),
                      StructField("origin_country", StringType, nullable = true),
                      StructField("call_sign", StringType, nullable = true),
                      StructField("velocity", DoubleType, nullable = true),
                      StructField("altitude", DoubleType, nullable = true),
                      StructField("__mtime_secs", LongType, nullable = true)
                  ))
                  val myDF = spark.read.schema(schema)
                      .format("io.iguaz.v3io.spark.sql.kv")
                      .load("v3io://bigdata/mydata/flights")
                  
                  schema = StructType([
                      StructField("id", StringType(), False),
                      StructField("origin_country", StringType(), True),
                      StructField("call_sign", StringType(), True),
                      StructField("velocity", DoubleType(), True),
                      StructField("altitude", DoubleType(), True),
                      StructField("__mtime_secs", LongType(), True)
                  ])
                  myDF = spark.read.schema(schema) \
                      .format("io.iguaz.v3io.spark.sql.kv") \
                      .load("v3io://bigdata/mydata/flights")
                  

                  Writing the Data (Converting the Format)

                  Writing Parquet Data

                  Use the following code to write data as a Parquet database table.

                  Syntax
                    <DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")
                    
                    <DF variable>.write.parquet("v3io://<container name>/<path to Parquet data>")
                    
                    Example

                    The following example converts the data that is currently associated with the myDF DataFrame variable into a /mydata/my-parquet-table Parquet database table in the “bigdata” container.

                      myDF.write.parquet("v3io://bigdata/mydata/my-parquet-table")
                      
                      myDF.write.parquet("v3io://bigdata/mydata/my-parquet-table")
                      

                      Writing NoSQL Data

                      Use the following code to write data as a NoSQL table.

                      Syntax
                        <DF variable>.write.format("io.iguaz.v3io.spark.sql.kv")
                            .option("key", <key column>)
                            .save("v3io://<container name>/<path to a NoSQL table>")
                        
                        <DF variable>.write.format("io.iguaz.v3io.spark.sql.kv") \
                            .option("key", <key column>) \
                            .save("v3io://<container name>/<path to a NoSQL table>")
                        
                        Example

                        The following example converts the data that is currently associated with the myDF DataFrame variable into a /mydata/my-nosql-table NoSQL table in the “bigdata” container.

                          myDF.write.format("io.iguaz.v3io.spark.sql.kv")
                              .option("key", "ID").save("v3io://bigdata/mydata/my-nosql-table")
                          
                          myDF.write.format("io.iguaz.v3io.spark.sql.kv") \
                              .option("key", "ID").save("v3io://bigdata/mydata/my-nosql-table")
                          

                          Writing CSV Data

                          Use the following code to write data in CSV format.
                          You can write both CSV files and CSV directories.

                          Syntax

                          The header and delimiter options are optional.

                            <DF variable>.write
                                .option("header", "<true/false>")
                                .option("delimiter", "<delimiter>")
                                .csv("v3io://<container name>/<path to CSV data>")
                            
                            <DF variable>.write \
                                .option("header", "<true/false>") \
                                .option("delimiter", "<delimiter>") \
                                .csv("v3io://<container name>/<path to CSV data>")
                            
                            Example

                            The following example converts the data that is currently associated with the myDF DataFrame variable into /mydata/my-csv-data CSV data in the “bigdata” container.

                              myDF.write.option("header", "true").option("delimiter", ",")
                                  .csv("v3io://bigdata/mydata/my-csv-data")
                              
                              myDF.write.option("header", "true").option("delimiter", ",") \
                                  .csv("v3io://bigdata/mydata/my-csv-data")
                              

                              Running SQL Data Queries

                              Use the following syntax to run an SQL query on your data.

                              Syntax

                              The call to show is optional.

                                <DF variable>.createOrReplaceTempView("<SQL table name>")
                                spark.sql("<SQL query string>").show()
                                
                                <DF variable>.createOrReplaceTempView("<SQL table name>")
                                spark.sql("<SQL query string>").show()
                                
                                Example

                                The following example creates a temporary myTable SQL table for the database associated with the myDF DataFrame variable, and runs an SQL query on this table:

                                  myDF.createOrReplaceTempView("myTable")
                                  spark.sql("select column1, count(1) as count from myTable
                                      where column2='xxx' group by column1").show()
                                  
                                  myDF.createOrReplaceTempView("myTable")
                                  spark.sql("select column1, \
                                      count(1) as count from myTable where column2='xxx' group by column1") \
                                      .show()
                                  

                                  See Also