Get Data
To get data from catalogs, add the data-engine
module as a dependencies to your project.
The data-engine
module provides high level abstractions on top of the data-client
when working with the HERE platform data. This module can read and manage both metadata and data.
The platform supports three types of data layers: versioned, volatile, and stream.
Versioned Layers
The data in versioned layers is available as long as the specified version of the catalog exists. This means you can cache fetched blobs on the client side.
To get data (a blob) for a stream of partitions belonging to a versioned layer, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)
val dataAsBytes: Future[Source[(Partition, Array[Byte]), NotUsed]] =
queryApi
.getPartitions(version, layer)
.map { partitions =>
partitions.mapAsync(parallelism = 10) { partition =>
readEngine.getDataAsBytes(partition).map { data =>
(partition, data)
}
}
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
int parallelism = 10;
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
CompletionStage<Source<Pair<Partition, byte[]>, NotUsed>> dataAsBytes =
queryApi
.getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
.thenApply(
metadata ->
metadata.mapAsync(
parallelism,
partition ->
readEngine
.getDataAsBytes(partition)
.thenApply(data -> new Pair<>(partition, data))));
To get data as an Akka source, add the following:
val dataAsSource: Future[Source[Source[ByteString, NotUsed], NotUsed]] =
queryApi
.getPartitions(version, layer)
.map { partitions =>
partitions.mapAsync(parallelism = 10) { partition =>
readEngine.getDataAsSource(partition)
}
}
CompletionStage<Source<Source<ByteString, NotUsed>, NotUsed>> dataAsSource =
queryApi
.getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
.thenApply(
partitions ->
partitions.mapAsync(
parallelism, partition -> readEngine.getDataAsSource(partition)));
To transform data into a custom object, add the following:
val data: Future[Source[CustomDomainObject, NotUsed]] =
queryApi
.getPartitions(version, layer)
.map { partitions =>
partitions.mapAsync(parallelism = 10) { partition =>
readEngine.get(partition, bytes => CustomDomainObject.fromBytes(bytes))
}
}
CompletionStage<Source<JavaCustomDomainObject, NotUsed>> data =
queryApi
.getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
.thenApply(
partitions ->
partitions.mapAsync(
parallelism,
partition ->
readEngine.get(
partition, bytes -> JavaCustomDomainObject.fromBytes(bytes))));
Stream Layer
Data in stream layers consists of events pushed to consumers as long as the producer publishes them.
To subscribe to a stream layer, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)
def processPayload(data: Array[Byte]): Done = {
println("Received data: " + data)
Done
}
val subscription: Future[Subscription] =
queryApi.subscribe(streamingLayerId,
ConsumerSettings("consumer-name", consumerId = "consumer-id"))
subscription.foreach { subscription =>
subscription.partitions
.mapAsync(parallelism = 10) { partition: Partition =>
readEngine.getDataAsBytes(partition)
}
.map { payload: Array[Byte] =>
processPayload(payload)
}
.runWith(Sink.ignore)
.andThen {
case Success(_) => println("Done")
case Failure(exception) => println(s"Failed with $exception")
}
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
int parallelism = 10;
CompletionStage<Subscription> subscriptionFuture =
queryApi.subscribe(
layer, new ConsumerSettings.Builder().withGroupName("test-consumer").build());
subscriptionFuture
.thenApply(
subscription -> {
return subscription
.getPartitions()
.mapAsync(parallelism, readEngine::getDataAsBytes)
.map(payload -> processPayload(payload))
.runWith(Sink.ignore(), myMaterializer);
})
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
});
private Done processPayload(byte[] data) {
System.out.println("Received data: " + data);
return Done.getInstance();
}
To shutdown your subscription, use SubscriptionControl:
subscription.subscriptionControl.shutdown()
subscription.getSubscriptionControl().shutdown();
To provide a handler/callback function during the subscription process, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
def processPartition(partition: Partition): Unit =
println("Received partition: " + partition)
val subscriptionControl: Future[SubscriptionControl] =
queryApi.subscribe(streamingLayerId,
ConsumerSettings("consumer-name"),
partition => processPartition(partition))
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
CompletionStage<SubscriptionControl> subscriptionFuture =
queryApi.subscribe(
"stream-layer",
new ConsumerSettings.Builder()
.withGroupName("test-consumer")
.withConsumerId("consumer-id")
.build(),
partition -> processPartition(partition));
private void processPartition(Partition partition) {
System.out.println("Received partition: " + partition);
}
Read Data from Stream Layers on Multiple Workers
Depending on the stream layer throughput configuration, you can set up distributed workers that consume the same stream layer. If all workers share the same consumer group (defined by the ConsumerSetting.groupName
during when you create the subscription, and unique ConsumerSetting.consumerId
(for http-connector
only) for each worker), stream events are distributed between workers. For processing stream layers, use at-least-once delivery semantics to have the same events dispatched to same/different workers.
If a consumer needs to recovery the worker (in case of its failure) , create a new subscription with the same ConsumerSetting.groupName
and ConsumerSetting.consumerId
(for http-connector
only)
If a consumer needs to re-process a stream layer from the beginning, create a new subscription with a different groupName.
Initial Offsets and Checkpoint
Use ConsumerSettings.offset
to configure how offsets and checkpoints are managed in your subscription.
-
EarliestOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. The checkpoints are generated automatically.
-
ManualOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. You must call SubscriptionControl.acknowledge
for every received partition. When required, use SubscriptionControl.checkpoint
to send offsets to the platform.
-
LatestOffset: Subscribe for latest (new) available partitions. Any data already available or checkpoints are ignored by the subscription.
Volatile Layer
Data in volatile layers can change over time. This means data (a blob) for the same partition can potentially contain different content. Normally volatile layers can represent traffic information, weather, and other similar content.
When using volatile data, performance is often an important factor. To speed-up interactions with the HERE platform, you can cache metadata and keep getting blobs as needed.
To get data from a volatile layer, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)
def downloadData(partition: Partition): Future[Option[String]] =
readEngine
.getDataAsBytes(partition)
.map(bytes => Some(new String(bytes)))
.recover { case _ => None }
val partitions: Future[Source[Partition, NotUsed]] =
queryApi.getVolatilePartitions(layerId)
partitions.flatMap { ps: Source[Partition, NotUsed] =>
ps.mapAsync(parallelism = 10) { partition: Partition =>
downloadData(partition)
}
.runWith(Sink.foreach(println))
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
(partition) -> {
return readEngine
.getDataAsBytes(partition)
.thenApply(Optional::of)
.exceptionally(failure -> Optional.empty());
};
CompletionStage<Source<Partition, NotUsed>> partitions =
queryApi.getVolatilePartitions(
layer, new VolatilePartitionsFilter.Builder().build(), Collections.emptySet());
int parallelism = 10;
partitions.thenApply(
partitionsSource -> {
return partitionsSource
.mapAsync(parallelism, fetchData::apply)
.runWith(Sink.foreach(System.out::println), myMaterializer);
});
For both functions, getVolatilePartitions
and getVolatilePartitionsAsIterator
you can pass an optional filter parameter. The VolatilePartitionsFilter
can be either empty
or a combination of since
filter and/or filter byIds
. Multiple of such filters can be joined by logical and
operator. All the partitions in the volatile layer will be matched when the VolatilePartitionsFilter
is empty
.
For example:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)
def downloadData(partition: Partition): Future[Option[String]] =
readEngine
.getDataAsBytes(partition)
.map(bytes => Some(new String(bytes)))
.recover { case _ => None }
val timestampSinceEpochInMs = 1571406320000L
val timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000
val emptyFilter: VolatilePartitionsFilter = VolatilePartitionsFilter.empty
val sinceFilter1: VolatilePartitionsFilter =
VolatilePartitionsFilter.since(timestampSinceEpochInMs)
val sinceFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.since(
timestampSinceEpochInMs) and VolatilePartitionsFilter
.since(timestampSinceEpochInMsPlusOneHour)
val byIdsFilter1: VolatilePartitionsFilter =
VolatilePartitionsFilter.byIds(Set("1", "2", "3"))
val byIdsFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("1"))
val byIdsFilter3: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("4"))
val combinedFilter
: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(Set("1")) and VolatilePartitionsFilter
.since(timestampSinceEpochInMs)
val partitions: Future[Source[Partition, NotUsed]] =
queryApi.getVolatilePartitions(layerId, combinedFilter)
partitions.flatMap { ps: Source[Partition, NotUsed] =>
ps.mapAsync(parallelism = 10) { partition: Partition =>
downloadData(partition)
}
.runWith(Sink.foreach(println))
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
(partition) -> {
return readEngine
.getDataAsBytes(partition)
.thenApply(Optional::of)
.exceptionally(failure -> Optional.empty());
};
Long timestampSinceEpochInMs = 1571406320000L;
Long timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000;
VolatilePartitionsFilter emptyFilter = new VolatilePartitionsFilter.Builder().build();
VolatilePartitionsFilter sinceFilter1 =
new VolatilePartitionsFilter.Builder().withSinceTimestamp(timestampSinceEpochInMs).build();
VolatilePartitionsFilter sinceFilter2 =
new VolatilePartitionsFilter.Builder()
.withSinceTimestamp(timestampSinceEpochInMs)
.build()
.and(
new VolatilePartitionsFilter.Builder()
.withSinceTimestamp(timestampSinceEpochInMsPlusOneHour)
.build());
String partition1 = "1";
String partition2 = "2";
String partition3 = "3";
String partition4 = "4";
Set<String> partitions1 = new HashSet<String>();
partitions1.add(partition1);
Set<String> partitions4 = new HashSet<String>();
partitions4.add(partition4);
Set<String> partitions123 = new HashSet<String>();
partitions123.add(partition1);
partitions123.add(partition2);
partitions123.add(partition3);
VolatilePartitionsFilter byIdsFilter1 =
new VolatilePartitionsFilter.Builder().withIds(partitions123).build();
VolatilePartitionsFilter byIdsFilter2 =
new VolatilePartitionsFilter.Builder()
.withIds(partitions123)
.build()
.and(new VolatilePartitionsFilter.Builder().withIds(partitions1).build());
VolatilePartitionsFilter byIdsFilter3 =
new VolatilePartitionsFilter.Builder()
.withIds(partitions123)
.build()
.and(new VolatilePartitionsFilter.Builder().withIds(partitions4).build());
VolatilePartitionsFilter combinedFilter =
new VolatilePartitionsFilter.Builder()
.withIds(partitions1)
.build()
.and(
new VolatilePartitionsFilter.Builder()
.withSinceTimestamp(timestampSinceEpochInMs)
.build());
CompletionStage<Source<Partition, NotUsed>> partitions =
queryApi.getVolatilePartitions(layer, combinedFilter, Collections.emptySet());
int parallelism = 10;
partitions.thenApply(
partitionsSource -> {
return partitionsSource
.mapAsync(parallelism, fetchData::apply)
.runWith(Sink.foreach(System.out::println), myMaterializer);
});
You can also use VolatilePartitionsFilter
builder.
Fetch Data in Parallel
As shown in the example, use the parallelism
parameter to manage how many parallel requests Data Client Library makes to fetch blobs. The optimal value depends on the node configuration, RAM, CPU, and the network. Using more than 100 parallel requests had a negative effect on performance.
Index Layer
To use pagination perform method QueryApi.queryIndexParts
. It returns a list of Part Ids which represent the layer parts that can be used to limit the scope of a query operation. This allows to run parallel queries with multiple parts. The user has to provide the desired number of parts and the service will return a list of Part Ids. Please note in some cases the requested number of parts will make them too small and in this case the service might return lesser amount of the parts than requested. You can find example how to fetch index parts in subsection below Retrieve Index Parts.
To retrieve data from the index layer, you must first call the method QueryApi.queryIndex
. QueryApi.queryIndex
returns the IndexPartition
s that matches a given query. If the query is not provided, the value "timestamp=ge=0" will be used by default, and it would mean that all the partitions will be matched.
Then, call the method ReadEngine.getDataAsBytes
on each IndexPartition
to retrieve the corresponding data using the blob API. Given that Fetching the data for one partition takes some time, and that QueryApi.queryIndex
may return hundreds or even thousands of partitions, we recommend to fetch the data corresponding to these partitions in parallel. You can find an example below about how to first query the index and then retrieve the data corresponding to the IndexPartitions
in parallel.
The right level of parallelism depends on the machine that runs the code and the size of the objects to retrieve:
- If you set the level of parallelism too low, the network bandwidth will not be fully used because of the request execution overhead
- If you set the parallelism too high, for example more than 200 parallel downloads on a single ActorSystem, you'll start seeing warnings about too much pressure being put on the Akka HTTP connection pool. This happens because the code fills up the number of asynchronous tasks, and the Data Client Library does not provide any backpressure mechanism in this case.
Trying out several levels of parallelism is a good way to get the best download performances. You can start with 10 parallel downloads per machine and increase this number by 10 until you see a degradation of the performances.
Note: Usage specifics
The QueryApi.queryIndex
method queries index partitions in parts by default. This behaviour can be altered by using the configuration property query-by-parts
(See chapter on Configuration).
Retrieve Index Parts
To perform part
queries, perform a QueryApi.queryIndexParts
method.
import scala.concurrent.Await
import scala.concurrent.duration._
val numberOfParts = 50
val indexParts =
Await.result(queryApi.queryIndexParts(indexLayerId, numberOfParts), 10.seconds)
int numberOfParts = 50;
IndexParts indexParts =
queryApi.queryIndexParts(indexLayerId, numberOfParts).toCompletableFuture().join();
Query an Index layer
To query indexed data, you must provide some search criteria in the RSQL query language.
RSQL supports the following logical operators:
Operator | Description |
; or and | Logical AND |
, or or | Logical OR |
RSQL supports the following comparison operators:
Operator | Description |
== | Equal |
!= | Not Equal |
< or =lt= | Less Than |
<= or =le= | Less or Equal |
> or =gt= | Greater Than |
>= or =ge= | Greater or Equal |
< or =lt= | Less Than |
Below are some examples of RSQL expressions:
someIntKey==42
someStringKey!=abc
someStringKey=="Hello World!"
someIntKey<100;someBooleanKey==true
(someIntKey=gt=23,someStringKey==xyz);someBooleanKey==true
Note that, to make the examples above work, the IndexLayerType
of the queried index layer must contain the following IndexDefinition
objects:
- IndexDefinition("someIntKey", IndexType.Int)
- IndexDefinition("someBooleanKey", IndexType.Boolean)
- IndexDefinition("someStringKey", IndexType.Boolean)
The following code snippet retrieves the partitions whose someIntKey
attribute is above 42 and someStringKey
attribute is not "abc". It returns an Akka Source to simplify the parallel fetching of the data corresponding to the returned partitions, see the next subsection Retrieve Indexed Data for more information about how to retrieve data.
import scala.concurrent.Await
import scala.concurrent.duration._
val queryString = "someIntKey>42;someStringKey!=abc"
val parallelism = 10
val foundIndexPartitionsAsSource: Source[IndexPartition, NotUsed] =
Source(indexParts.parts)
.mapAsync(parallelism) { part =>
queryApi
.queryIndex(indexLayerId, Some(queryString), Some(part))
}
.flatMapConcat(identity)
String queryString = "someIntKey>42;someStringKey!=abc";
Source<IndexPartition, NotUsed> indexPartitionsSource =
Source.from(indexParts.getParts())
.mapAsync(
10, part -> queryApi.queryIndex(indexLayerId, Optional.of(queryString), part))
.flatMapConcat(s -> s);
For more information about the format of and constraints on the queries, see also the Get the Data Handle section of the Data API Developer Guide.
Retrieve Indexed Data
The previous subsection Query an Index layer showed how to query an index layer using QueryApi.queryIndex
. The code snippet below illustrates how to retrieve the data corresponding to each partition in parallel from the IndexPartition
s returned by QueryApi.queryIndex
using Akka streams and ReadEngine.getDataAsBytes
:
println(
"Download the data corresponding to the index partitions previously found by the queryIndex method")
implicit val materializer: ActorMaterializer = ActorMaterializer()
def youCanProcessTheDataHere(byteData: Array[Byte]): Unit = ???
foundIndexPartitionsAsSource
.mapAsyncUnordered(parallelism)(partition => readEngine.getDataAsBytes(partition))
.runForeach((byteData: Array[Byte]) => youCanProcessTheDataHere(byteData))
.await
println("Computation finished. Shutting down the HTTP connections and the actor system.")
Await.ready(CoordinatedShutdown(actorSystem).run(UnknownReason), Duration.Inf)
ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem);
System.out.println(
"Download the data corresponding to the index partitions previously found by the queryIndex method");
int parallelism = 10;
indexPartitionsSource
.mapAsyncUnordered(parallelism, readEngine::getDataAsBytes)
.runForeach(this::youCanProcessTheDataHere, actorMaterializer)
.toCompletableFuture()
.join();
System.out.println(
"Computation finished. Shutting down the HTTP connections and the actor system.");
CoordinatedShutdown.get(actorSystem)
.runAll(CoordinatedShutdown.unknownReason())
.toCompletableFuture()
.join();
Object Store Layer
A key that you have control over references the data in the Object Store layer. The data is mutable and parallel writes are allowed. That means if multiple writers are writing to the same key in parallel, the last request to finish on the server will win. Client code should be ready to expect this situation when getting the data for that specific key.
To get data as an Akka source, add the following:
val readEngine = DataEngine().readEngine(catalogHrn, settings)
val applyDecompression = readEngine
.getObjectMetadata(layer, key)
.map(_.getContentEncoding().contains(ContentEncoding.gzip))
.await
val dataAsSource: Future[Source[ByteString, NotUsed]] =
readEngine
.getObjectDataAsSource2(layer, key, applyDecompression)
val dataAsBytes: Future[Array[Byte]] =
readEngine
.getObjectDataAsBytes2(layer, key, applyDecompression)
val dataAsSourceWithRange: Future[Source[ByteString, NotUsed]] =
readEngine
.getObjectDataAsSource2(layer, key, applyDecompression, ByteRange.fromRange(5, 10))
val dataAsBytesWithRange: Future[Array[Byte]] =
readEngine
.getObjectDataAsBytes2(layer, key, applyDecompression, ByteRange.fromRange(5, 10))
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
CompletionStage<Source<ByteString, NotUsed>> dataAsSource =
readEngine.getObjectDataAsSource2(layer, key, false, ByteRange.all());
CompletionStage<Source<ByteString, NotUsed>> dataAsSourceWithRange =
readEngine.getObjectDataAsSource2(layer, key, false, ByteRange.fromRange(5, 10));
CompletionStage<byte[]> dataAsByteArray =
readEngine.getObjectDataAsBytes2(layer, key, false, ByteRange.all());
CompletionStage<byte[]> dataAsByteArrayWithRange =
readEngine.getObjectDataAsBytes2(layer, key, false, ByteRange.fromRange(5, 10));
Interactive Map Layer
The data in an interactive map layer is available as long as the specified catalog exists.
To get the data from an interactive map layer, use the following methods.
To return all the features found for the provided list of IDs in an interactive map layer, you can use the QueryApi.getFeatureCollectionByIds
API call.
The method takes six arguments as follows:
-
layerId
- The layer ID of the layer. -
ids
- List of feature IDs to be retrieved from the interactive map layer. -
selection
- List of properties to be returned in the features result list. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter) -
author
- The author of the feature collection (optional parameter)
The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionByIds
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val ids = Seq("feature-1", "feature-2")
val context = InteractiveMapContext.EXTENSION
val version = 123L
val author = "<feature-collection-author>"
val response: Future[FeatureCollection] =
queryApi.getFeatureCollectionByIds(
layerId,
ids,
Set.empty,
Some(context),
Some(version),
Some(author)
)
val featureCollection = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
List<String> ids = Collections.singletonList("feature-1");
long version = 123L;
String author = "<feature-collection-author>";
FeatureCollection featureCollection =
queryApi
.getFeatureCollectionByIds(
layerId,
ids,
Collections.emptySet(),
Optional.of(context),
OptionalLong.of(version),
Optional.of(author))
.toCompletableFuture()
.join();
To return the features which are inside a bounding box of an interactive map layer, you can use the QueryApi.getFeatureCollectionByBbox
API call.
The method takes seven arguments as follows:
-
layerId
- The layer ID of the layer. -
bbox
- The bounding box in which the features need to be searched. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To return the quadbins which are inside a bounding box of an interactive map layer, you can use the QueryApi.getQuadBinsByTileId
API call. See Get clustered features in a tile in the data dev guide for an explanation of quadbins. Note that the function will return the quadbins for the smallest quadkey that contains the bounding box.
The method takes eight arguments as follows:
-
layerId
- The layer ID of the layer. -
bbox
- The bounding box in which the features need to be searched. -
searchParam
- List of additional feature filters resulting in a subset of features. -
relativeResolution
- The resolution of the quadbin. Valid values are [0, 4]. -
noBuffer
- Do not place a buffer around quadbin. -
countMode
- The count mode for the quadbin. See below for valid values. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To return the hexbins which are inside a bounding box of an interactive map layer, you can use the QueryApi.getHexBinsByTileId
API call. See Get clustered features in a tile in the data dev guide for an explanation of hexbins. Note that the function will return the hexbins for the smallest quadkey that contains the bounding box.
The method takes eleven arguments as follows:
-
layerId
- The layer ID of the layer. -
bbox
- The bounding box in which the features need to be searched. -
searchParam
- List of additional feature filters resulting in a subset of features. -
absoluteResolution
- The H3 hexagon resolution. Valid values are [0,13]. -
relativeResolution
- Relative resolution added to the absolute one. Valid values are [-2,2]. -
property
- A property of the original features for which to calculate statistics. -
pointMode
- Returns the centroid of hexagons as GeoJson features. Default is false. -
singleCoord
- Force to evaluate the first object coordinate only. Default is false. -
sampling
- Sampling ratio of underlying dataset. See below for valid values. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To return the features selected by tile type and tile id of an interactive map layer, you can use the QueryApi.getFeatureCollectionByTile
API call.
The method takes eight arguments as follows:
-
layerId
- The layer ID of the layer. -
tileId
- The tile ID to be queried. -
tileType
- The type of tile identifier. Available values are quadkey, web, tms and here. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To return the quadbins, which are inside a tile with a tile type of an interactive map layer, you can use the QueryApi.getQuadBinsByTileId
API call. See Get clustered features in a tile in the data dev guide for an explanation of quadbins. Note that the function will return the quadbins for the smallest quadkey that contains the tile if the tile is not of type quadkey.
The method takes seven arguments as follows:
-
layerId
- The layer ID of the layer. -
tileId
- The tile ID to be queried. -
tileType
- The type of tile identifier. Available values are quadkey, web, tms and here. -
searchParam
- List of additional feature filters resulting in a subset of features. -
relativeResolution
- The resolution of the quadbin. Valid values are [0, 4]. -
noBuffer
- Do not place a buffer around quadbin. -
countMode
- The count mode for the quadbin. See below for valid values. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values.
To return the hexbins, which are inside a tile with a tile type of an interactive map layer, you can use the QueryApi.getHexBinsByTileId
API call. See Get clustered features in a tile in the data dev guide for an explanation of hexbins. Note that the function will return the hexbins for the smallest quadkey that contains the tile if the tile is not of type quadkey.
The method takes nine arguments as follows:
-
layerId
- The layer ID of the layer. -
tileId
- The tile ID to be queried. -
tileType
- The type of tile identifier. Available values are quadkey, web, tms and here. -
searchParam
- List of additional feature filters resulting in a subset of features. -
absoluteResolution
- The H3 hexagon resolution. Valid values are [0,13]. -
relativeResolution
- Relative resolution added to the absolute one. Valid values are [-2,2]. -
property
- A property of the original features for which to calculate statistics. -
pointMode
- Returns the centroid of hexagons as GeoJson features. Default is false. -
singleCoord
- Force to evaluate the first object coordinate only. Default is false. -
sampling
- Sampling ratio of underlying dataset. See below for valid values. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values.
To return the features by search params from an interactive map layer, you can use the QueryApi.getFeatureCollectionBySearchParam
API call.
The method takes seven arguments as follows:
-
layerId
- The layer ID of the layer. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter) -
author
- The author of the feature collection (optional parameter)
The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionBySearchParam
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val searchParams = Set(SearchParam("p.prop1", SearchOperator.EQUAL, "some-value1"),
SearchParam.fromString("p.prop2>=10"))
val selection = Set("p.prop1", "p.prop2")
val limit = 100
val context = InteractiveMapContext.EXTENSION
val version = 123L
val author = "<feature-collection-author>"
val response: Future[FeatureCollection] =
queryApi.getFeatureCollectionBySearchParam(layerId,
searchParams,
selection,
Some(limit),
Some(context),
Some(version),
Some(author))
val featureCollection = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
SearchParam searchParam1 = new SearchParam("p.prop1", SearchOperator.EQUAL, "some-value");
SearchParam searchParam2 = SearchParam.fromString("p.prop2>=10");
Set<SearchParam> searchParams = new HashSet<>();
searchParams.add(searchParam1);
searchParams.add(searchParam2);
Set<String> selection = new HashSet<>(Arrays.asList("p.prop1", "p.prop2"));
int limit = 100;
InteractiveMapContext context = InteractiveMapContext.EXTENSION;
long version = 123L;
String author = "<feature-collection-author>";
FeatureCollection featureCollection =
queryApi
.getFeatureCollectionBySearchParam(
layerId,
searchParams,
selection,
OptionalInt.of(limit),
Optional.of(context),
OptionalLong.of(version),
Optional.of(author))
.toCompletableFuture()
.join();
To return the features which are inside the specified circle with the specified latitude and longitude as center, you can use the QueryApi.getFeatureCollectionBySpatialSearchCircle
API call.
The method takes nine arguments as follows:
-
layerId
- The layer ID of the layer. -
latitude
- The latitude in WGS'84 decimal degree (-90 to +90) of the center Point. -
longitude
- The longitude in WGS'84 decimal degree (-180 to +180) of the center Point. -
radius
- Radius in meter of the circle. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To return the features which intersect the specified reference feature's geometry, you can use the QueryApi.getFeatureCollectionBySpatialSearchFeature
API call.
The method takes ten arguments as follows:
-
layerId
- The layer ID of the layer. -
refCatalogHrn
- The catalog HRN where the layer containing the referenced feature is stored. -
refLayerId
-The layer ID where the referenced feature is stored. -
refFeatureId
-The feature ID in the referenced layer. -
radius
- Radius in meter which is added as a buffer to the geometry. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map Context (optional parameter) - see below for the list of valid values -
version
- The version of the feature collection (optional parameter)
To return the features which intersect the provided geometry, you can use the QueryApi.getFeatureCollectionBySpatialSearchGeometry
API call.
The method takes eight arguments as follows:
-
layerId
- The layer ID of the layer. -
geometry
- The geometry which is used as an origin for the search. -
radius
- Radius in meter which is added as a buffer to the geometry. -
searchParam
- List of additional feature filters resulting in a subset of features. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionBySpatialSearchGeometry
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val geometry = Point(coordinates = Some(immutable.Seq(10.0, 12.0)))
val radius = 50
val context = InteractiveMapContext.EXTENSION
val version = 123L
val response: Future[FeatureCollection] =
queryApi.getFeatureCollectionBySpatialSearchGeometry(
layerId,
geometry,
Some(radius),
Set.empty,
Set.empty,
None,
Some(context),
Some(version)
)
val featureCollection = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Geometry geometry =
new Point.Builder().withCoordinates(new ArrayList<>(Arrays.asList(10.0, 12.0))).build();
int radius = 50;
InteractiveMapContext context = InteractiveMapContext.EXTENSION;
OptionalLong version = OptionalLong.of(123L);
FeatureCollection featureCollection =
queryApi
.getFeatureCollectionBySpatialSearchGeometry(
layerId,
geometry,
OptionalInt.of(radius),
Collections.emptySet(),
Collections.emptySet(),
OptionalInt.empty(),
Optional.of(context),
version)
.toCompletableFuture()
.join();
To return the features by iterating over all the features in an interactive map layer, you can use the QueryApi.getFeatureCollectionByIterate
API call.
The method takes six arguments as follows:
-
layerId
- The layer ID of the layer. -
pageToken
- The page token where the iteration will continue. -
selection
- List of properties to be returned in the features result list. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K). -
context
- Interactive Map context (optional parameter) - see below for the list of valid values. -
version
- The version of the feature collection (optional parameter)
To get all the tiles containing features from a specified layer, you can use the QueryApi.getTilesContainingFeatures
API call.
The method takes five arguments as follows:
-
layerId
- The layer ID of the layer. -
startingTiles
- The high level tiles to iterate. The only supported tile type is quadkey. If it is empty, all tiles within web mercator bounds (latitude between -85.05° and +85.05°) will be retrieved. -
targetZoomLevel
- The target level of tiles to return. All returned tiles will be at the same target level. -
parallelization
- Maximum number of parallel calls to underlying service. -
context
- Interactive Map context (optional parameter) - see below for the list of valid values.
The snippet below demonstrates the usage of the QueryApi.getTilesContainingFeatures
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val startingTiles = Seq(Tile("120", TileType.QUADKEY))
val targetZoomLevel = 8
val parallelization = 1
val tilesAsSource: Future[Source[Seq[Tile], NotUsed]] =
queryApi.getTilesContainingFeatures(layerId,
startingTiles,
targetZoomLevel,
parallelization,
Some(context))
val response: Future[Seq[Seq[Tile]]] =
Await.result(tilesAsSource, timeout).runWith(Sink.seq[Seq[Tile]])
val tiles: Seq[Tile] = Await.result(response, timeout).flatten
println(s"FeatureCount: ${tiles.size}")
To return the statistics information of an interactive map layer, you can use the QueryApi.getIMLStatistics
API call.
The method takes one argument as follows:
-
layerId
- The layer ID of the layer.
The snippet below demonstrates the usage of the QueryApi.getIMLStatistics
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val response = queryApi.getIMLStatistics(layerId)
val statistics = Await.result(response, timeout)
println(s"FeatureCount: ${statistics.getCount.getValue}")
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Statistics statistics = queryApi.getIMLStatistics(layerId).toCompletableFuture().join();
To retrieve the existing Changesets from an interactive map layer, you can use the QueryApi.getFeaturesChanges
API call.
The method takes four parameters as follows:
-
layerId
- The layer ID of the layer. -
startVersion
- The start of the version-range [version >= startVersion] -
endVersion
- The end of the version-range [version <= endVersion] -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K).
The snippet below demonstrates the usage of the QueryApi.getFeaturesChanges
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val startVersion = 0L
val endVersion = 1000L
val limit = 100
val response = queryApi.getFeatureChanges(
layerId,
startVersion,
endVersion,
Some(limit)
)
val source = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long startVersion = 0L;
long endVersion = 1000L;
int limit = 100;
Source<Pair<Long, Changeset>, NotUsed> source =
queryApi
.getFeatureChanges(layerId, startVersion, endVersion, OptionalInt.of(limit))
.toCompletableFuture()
.join();
To retrieve one Changeset
by defined by a version from an interactive map layer, you can use the QueryApi.getFeatureChangesByVersion
API call.
The method takes three parameters as follows:
-
layerId
- The layer ID of the layer. -
version
- The version of a Changeset. -
limit
- The maximum number of features in the response (Default is 30K and maximum is 100K).
The snippet below demonstrates the usage of the QueryApi.getFeatureChangesByVersion
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val version = 8L
val limit = 100
val response = queryApi.getFeatureChangesByVersion(
layerId,
version,
Some(limit)
)
val changeset = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long version = 8L;
int limit = 100;
Changeset changeset =
queryApi
.getFeatureChangesByVersion(layerId, version, OptionalInt.of(limit))
.toCompletableFuture()
.join();
To get statistics about changesets in an interactive map layer, you can use the QueryApi.getFeatureChangesStatistics
API call.
The method takes one parameter as follows:
-
layerId
- The layer ID of the layer.
The snippet below demonstrates the usage of the QueryApi.getFeatureChangesStatistics
API:
val queryApi = DataClient().queryApi(catalogHrn, settings)
val response = queryApi.getFeatureChangesStatistics(layerId)
val statistics = Await.result(response, timeout)
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ChangesetStatistics statistics =
queryApi.getFeatureChangesStatistics(layerId).toCompletableFuture().join();
To export all the features from an interactive map layer, you can use the readEngine.exportIMLFeatures
API call.
The method takes two arguments as follows:
-
layerId
- The layer ID of the layer. -
batchSize
- The batch size to iterate from an interactive map Layer
The snippet below demonstrates the usage of the readEngine.exportIMLFeatures
API:
val readEngine = DataEngine().readEngine(catalogHrn, settings)
val batchSize = 100
val futureResponse = readEngine.exportIMLFeatures(layerId, Some(batchSize)).runWith(Sink.seq)
val response: Seq[Seq[Feature]] = Await.result(futureResponse, timeout)
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
int batchSize = 100;
Source<List<Feature>, NotUsed> responseSource =
readEngine.exportIMLFeatures(layerId, OptionalInt.of(batchSize));
List<List<Feature>> response =
responseSource.runWith(Sink.seq(), myMaterializer).toCompletableFuture().join();
NOTE: Additional filters can be used with SearchParams resulting in a subset of features. Allowed prefixes to be used for property search are:
'p.' - used to access values stored in 'properties' property of the feature
'f.' - used to access values which are added by default in the stored feature,The possible values are: 'f.id', 'f.createdAt' and 'f.updatedAt'.
Example -
p.property_1=property_value_1 or
f.special_property_1=special_property_value_1
User can provide some search criteria in SearchParams using following logical operators:
"=" - equals
"!=" - not equals
">=" or "=gte=" - greater than or equals
"<=" or "=lte=" - less than or equals
">" or "=gt=" - greater than
"<" or "=lt=" - less than
"@>" or "=cs=" - contains
Valid values for Interactive Map requests
Context
-
DEFAULT
= The default value if none is given. For composite layers the operation occurs based on the extension rules. For normal layers this is the only valid context. -
EXTENSION
= The operation will be executed only in the extension and no operation will be performed in the extended layer. -
SUPER
= Only applicable for read-operations. The operation will be executed only in the layer being extended (super layer).
Valid Values quadbin/hexbin requests:
CountMode
-
real
= Real feature counts. Best accuracy, but slow. Not recommended for big result sets -
estimated
= Estimated feature counts. Low accuracy, but very fast. Recommended for big result sets. -
mixed
= (default). Estimated feature counts combined with real ones. If the estimation is low a real count gets applied. Fits most use cases -
bool
= test if data exists in tile but does not count features. The returned count property set to 1, for non empty tiles
Sampling
Sets the sampling ratio of the underlying dataset values.
-
off
= (default) Complete dataset is sampled. -
low
= 1/8 of the dataset is sampled. -
lowmed
= 1/32 of the dataset is sampled. -
med
= 1/128 of the dataset is sampled. -
medhigh
= 1/1024 of the dataset is sampled. -
high
= 1/4096 of the dataset is sampled.