V3IOUtils Object

On This Page
Fully Qualified Name

org.apache.spark.streaming.v3io.V3IOUtils

Description

A singleton utility object for creating Spark input streams that can be used to consume platform stream records via the Spark Streaming API.

V3IO Streams
To use this object you must first create one or more V3IO streams using the platform's create-stream API. See the CreateStream operation of the Streaming Web API.

Summary

Prototype

object V3IOUtils
Methods
  • createDirectStream

    def createDirectStream[
        V: ClassTag, VD <: Decoder[V] : ClassTag, R: ClassTag](
        ssc:              StreamingContext,
        v3ioParams:       Map[String, String],
        streamNames:      Set[String],
        messageHandler:   RecordAndMetadata[V] => R)
        : InputDStream[R]
    

Prototype

object V3IOUtils

createDirectStream Method

Creates a Spark input-stream object that can be used to consume record data and metadata from the specified platform streams, using the Spark Streaming API (v3.2.3). The new input stream pulls stream records by querying the platform directly, without using a receiver.

Syntax

createDirectStream[
    V:    ClassTag,
    VD <: Decoder[V] : ClassTag,
    R:    ClassTag](
    ssc:              StreamingContext,
    v3ioParams:       Map[String, String],
    streamNames:      Set[String],
    messageHandler:   RecordAndMetadata[V] => R)
    : InputDStream[R]

Parameters

ssc

Spark streaming-context object.

  • Requirement: Required
v3ioParams

An optional map of platform configuration properties that configure the creation and behavior of the returned Spark input stream. For example, Map("default-data-block-size" -> "524288") configures the block size for read operations to 0.5 MB (524288 bytes).

  • Type: Map[String, String] — a map of platform-property name and value string pairs
  • Requirement: Optional
Note
All property values are provided as strings. The "Property Type" information in the following descriptions indicates the property type that the string represents.
Start-Position Configuration Properties

You can optionally set the following properties to determine the start position for the new Spark input stream:

  • spark.streaming.v3io.streamStartPosition — the stream start-position type.

    • Property Type: String
    • Default Value: "latest"

    The following property values are supported. The start-position information applies to each of the shards in the source V3IO streams (see streamNames):

    • "earliest" — start at the location of the earliest ingested record in the shard.

    • "latest" (default) — start at the end of the shard.

    • "time" — start at the location of the earliest ingested record in the shard beginning at the base time configured in the spark.streaming.v3io.streamStartTimeMS property. For shards without a matching record ingestion time (i.e., if all records in the shard arrived before the configured base time) the start position is set to the end of the shard.

    • "<record sequence number>" — start at the location of the record whose sequence number matches the property value (for example, "1"). For shards without a matching record sequence number, the start position is set to the end of the shard.

    • spark.streaming.v3io.streamStartTimeMS — the base time for a time-based stream start position (spark.streaming.v3io.streamStartPosition="time"), as a Unix timestamp in milliseconds. For example, 1511260205000 sets the base time for determining the start position to 21 Nov 2017 at 10:30:05 AM UTC.

      • Property Type: Long
      • Default Value: 0 (Unix Epoch — 1 Jan 1970 at 00:00:00 UTC)
streamNames

A set of one or more fully qualified V3IO stream paths of the format v3io://<container name>/<stream path> — where <container name> is the name of the stream's parent container and <stream path> is the path to the stream within the specified container. For example, v3io://mycontainer/mydata/metrics_stream.

Note

All streams paths in the streamNames set must point to existing V3IO streams that reside in the same container.

  • Type: Set[String]
  • Requirement: Required
messageHandler

An optional record handler function for converting a stream record of type RecordAndMetadata into the desired input-stream record type, which is indicated by the handler's return value (R). The default record handler returns the record's data (also known as the record payload or value), decoded as the value type of the RecordAndMetadata class instance (V).

  • Type: A record-handler function

    messageHandler: RecordAndMetadata[V] =&gt; R
    
  • Requirement: Optional

Type Parameters

  • V — the type of the input stream's record data ("value").
  • VD — the decoder class to use for converting the record data into the specified type — the input stream's value type, as set in the method's V type parameter. See Stream-Data Encoding and Decoding Types.
  • R — the input stream's record type, which is the type of the record handler's return value (see messageHandler). This type parameter is applicable only when providing a record handler in the method call. The default input-stream record type is the decoding type of the record data, as set in the method's V type parameter.

Return Value

Returns an instance of a Spark input stream (InputDStream), which you can use with the Spark Streaming API to consume stream records in the specified input-stream record type (see the method's R type parameter).

Examples

Example 1

Create a Spark input stream for "/DriverStream" and "/Passengers" V3IO streams in a "mycontainer" container using the RecordAndMetadata.payloadWithMetadata method as the record handler. The input stream will return PayloadWithMetadata record objects that contain string-decoded record data and related metadata:

val batchInterval = Seconds(1)
val sparkConf = new SparkConf()
                .setAppName("My Car Streams Application")
val ssc = new StreamingContext(sparkConf, batchInterval)
val cfgProperties = Map.empty[String, String]
val streamNames = Set("/DriverStream", "/Passengers")
 
val carsInputDStream = {
    val recordHandler = (rmd: RecordAndMetadata[String]) =>
        rmd.payloadWithMetadata()

    V3IOUtils.createDirectStream[String, StringDecoder,
        PayloadWithMetadata[String]](
        ssc,
        cfgProperties,
        streamNames,
        recordHandler)
}

Example 2

Create a Spark input stream for "/WebClicks" and "/ServerLogs" V3IO streams within a "/Web/Streams/" directory in a "mycontainer" container, starting at the earliest ingested record found in each of the stream shards. Use the default record handler to create an input stream that returns the record data as a string:

val batchInterval = Seconds(2)
val sparkConf = new SparkConf()
                .setAppName("My Web Streams Application")
val ssc = new StreamingContext(sparkConf, batchInterval)
val cfgProperties = Map("spark.streaming.v3io.streamStartPosition" -> "earliest")
val streamNames =
    Set("/Web/Streams/WebClicks", "/Web/Streams/ServerLogs")

val webInputDStream = {
    V3IOUtils.createDirectStream[String, StringDecoder](
    ssc,
    cfgProperties,
    streamNames)
}