Write Versioned Layer Data
The spark-support
module provides the class LayerDataFrameWriter
, a custom Spark DataFrameWriter for writing DataFrames to versioned layers.
Project Dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to versioned layer, please add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
The spark connector provides write functionality for the following formats:
- Protobuf
- Avro
- Parquet
- Raw
- Json
- Text
- Csv
Note: Usage specifics
For Raw
format, data converter must be implemented from DataConverter
trait/interface and set using withDataConverter
method. Note that format
method is not required.
Write process
For versioned layers, DataFrame
rows are grouped by the mt_partition
column to create the partition data. The data will be uploaded into the versioned layer using the write engine and afterwards published using the Publish API. Also, DataFrame
can include additional metadata.
Please note that having multiple rows for the same partition is allowed only for Avro
and Parquet
format. For the other formats, this will throw an error.
All provided metadata columns for versioned layer:
Column name | Data Type | Meaning | Require |
mt_partition | String | ID of partition in HERE platform | Yes |
mt_timestamp | Long | Timestamp of creation | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
mt_dataHandle | String | Handler of the data | No |
Metadata column type must be as in the table above, otherwise will throw an IllegalArgumentException
exception. Extra fields that started with mt_
but not provided are ignored at writes.
The following snippet demonstrates how to manipulate and write a DataFrame[Row]
(Dataset<Row>
) as a Protobuf-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
val computationDataFrame = inputDataFrame
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"))
val outDataFrame = computationDataFrame
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
outDataFrame
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
Dataset<Row> computationDataset =
inputDataset
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"));
Dataset<Row> outDataset =
computationDataset
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
JavaLayerDataFrameWriter.create(outDataset)
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save();
sparkSession.stop();
Note: Restrictions
Metadata mt_partition
column must have unique values, otherwise will throw an IllegalArgumentException
exception. Layer must-have application/x-protobuf
content type.
The following snippet demonstrates how to write a DataFrame[Row]
(Dataset<Row>
) as an Avro or Parquet-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
Note: Usage specifics
Avro and Parquet are columnar storage formats. Therefore, the DataFrame
can have duplicates in mt_partition
column. Rows group by mt_partition
and save each group in a single platform partition. Layer must-have application/x-parquet
or application/x-avro-binary
content type in accordance to type.
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains a column data
with message as string.
Note: Restrictions
Metadata mt_partition
column must have unique values, otherwise will throw an IllegalArgumentException
exception. Layer must-have application/octet-stream
content type.
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(
rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]
): GroupedData[VersionedRowMetadata] = {
val bytes = "serializeGroup=>".getBytes ++ rows
.next()
.getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes =
ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame[Row]
(Dataset<Row>
) as a Json row:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
Write Data in Text Format
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains a column text
with message as string.
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame[Row]
(Dataset<Row>
) as a CSV row:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
Note: Usage specifics
The connector groups rows by mt_partition
column value. In this case, must be implemented DataConverter inherited from DataConverter
trait. Also, a versioned layer must-have application/octet-stream
content type.