7.3. Common Quarkus¶
This module contains some class to help handling InputStream within Quarkus efficiently.
Using Uni was not possible for InputStream since Quarkus does not support yet correctly InputStream. A patch is submitted to enable this (see https://github.com/quarkusio/quarkus/pull/37308) “@Blocking” mode must be declared imperatively, which means that a new thread is used.
Two cases occur:
Sending an InputStream to a remote REST API
Receiving an InputStream from a remote REST API
7.3.1. Client and Server Abstract implementation for InputStream¶
In order to make it easier to integrate the InputStream management with back-pressure in all APIs, an abstract implementation is provide both for Client ans Server.
The full example is located in the test part of the ccs-common-quarkus-server.
io.clonecloudstore.common.quarkus.example.model
contains the definition of the model of data (In and Out).io.clonecloudstore.common.quarkus.example.client
contains the ApiClient and its factory and the extension of different abstract needed for the client.
The abstract ClientAbstract defines some abstract methods that must be specified within the final client implementation, in order to include business implementation.
7.3.1.1. Client sending InputStream¶
Note that if several API are intended for this client to send InputStream (various usages), one shall specialized the answer of those abstract methods through more general BusinessIn and BusinessOut types (for instance, using multiple sub elements or using instanceOf check).
/**
* @param context 1 for sending InputStream, -1 for receiving InputStream, or anything else
* @return the headers map
*/
protected abstract Map<String, String> getHeadersFor(I businessIn, int context);
/**
* @return the BusinessOut from the response content and/or headers
*/
protected abstract O getApiBusinessOutFromResponse(final Response response);
7.3.1.2. Client receiving InputStream¶
Note that if several API are intended for this client to receive InputStream (various usages), one shall specialized the answer of those abstract methods through more general BusinessIn and BusinessOut types (for instance, using multiple sub elements or using instanceOf check).
/**
* @param context 1 for sending InputStream, -1 for receiving InputStream, or anything else
* @return the headers map
*/
protected abstract Map<String, String> getHeadersFor(I businessIn, int context);
/**
* @return the BusinessOut from the response content and/or headers
*/
protected abstract O getApiBusinessOutFromResponse(final Response response);
7.3.1.3. Client definition of Service¶
Note that ApiServiceInterface is the API of the server, with specific attention on InputStream,
using a different Java Interface than the server’s one. This is due to the need to access to low level
injected values such as HttpServerRequest
and Closer
.
Note: these declarations are not useful since the client service will never be used for those end points.
@Path(ApiConstants.API_COLLECTIONS)
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Produces(MediaType.APPLICATION_JSON)
Uni<Response> createObject(InputStream content,
@DefaultValue("name") @RestHeader(ApiConstants.X_NAME) String name,
@DefaultValue("0") @RestHeader(ApiConstants.X_LEN) long len);
@Path(ApiConstants.API_COLLECTIONS + "/{business}")
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
Uni<InputStream> readObject(@RestPath String business);
7.3.1.4. Server definition of Service¶
Be careful that API using InputStream (push or pull) are defined with the annotation @Blocking
on server side.
@Path(API_COLLECTIONS)
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@Produces(MediaType.APPLICATION_JSON)
@Blocking
public Uni<Response> createObject(HttpServerRequest request, @Context final Closer closer,
final InputStream inputStream,
@DefaultValue("name") @RestHeader(X_NAME) String name,
@DefaultValue("0") @RestHeader(X_LEN) long len) {
ApiBusinessIn businessIn = new ApiBusinessIn();
businessIn.name = name;
businessIn.len = len;
return createObject(request, closer, businessIn, businessIn.len, null, keepCompressed, inputStream);
}
@Path(API_COLLECTIONS + "/{business}")
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Blocking
public Uni<Response> readObject(@RestPath final String business,
final HttpServerRequest request, @Context final Closer closer) {
ApiBusinessIn businessIn = new ApiBusinessIn();
businessIn.name = business;
String xlen = request.getHeader(X_LEN);
long len = LEN;
if (ParametersChecker.isNotEmpty(xlen)) {
len = Long.parse(xlen);
}
businessIn.len = len;
return readObject(request, closer, businessIn, futureAlreadyCompressed);
}
keepInputStreamCompressed
specifies for each end point if the InputStream shall be kept compressed if already compressed, or uncompressed if compressed.
The Client Factory should be used as ``@ApplicationScoped`` in order to ensure it is always the unique one.
7.3.1.5. Client implementation¶
public ApiBusinessOut postInputStream(final String name, final InputStream content,
final long len, final boolean shallCompress, final boolean alreadyCompressed) throws CcsWithStatusException {
ApiBusinessIn businessIn = new ApiBusinessIn();
businessIn.name = name;
businessIn.len = len;
final var inputStream = prepareInputStreamToSend(content, shallCompress, alreadyCompressed, businessIn);
final var uni = getService().createObject(name, len, inputStream);
return getResultFromPostInputStreamUni(uni, inputStream);
}
public InputStreamBusinessOut<ApiBusinessOut> getInputStream(final String name, final long len,
final boolean acceptCompressed, final boolean shallDecompress)
throws CcsWithStatusException {
ApiBusinessIn businessIn = new ApiBusinessIn();
businessIn.name = name;
businessIn.len = len;
prepareInputStreamToReceive(acceptCompressed, businessIn);
final var uni = getService().readObject(name);
return getInputStreamBusinessOutFromUni(acceptCompressed, shallDecompress, uni);
}
shallCompress
andacceptCompressed
specify if the InputStream must be compressed (either in POST or GET).alreadyCompressed
specifies if the InputStream is already compressed or not in POST.shallDecompress
specifies if the InputStream shall be decompressed if received compressed.
7.3.1.6. Client implementation using Quarkus Service¶
It is possible to use native Quarkus client. (service is the injected ApiService with correct
URL from quarkus.rest-client."org.acme.rest.client.ExtensionsService".url=yourUrl
).
public class ApiClient extends ClientAbstract<ApiBusinessIn, ApiBusinessOut, ApiServiceInterface> {
public boolean checkName(final String name) {
final Uni<Response> uni = getService().checkName(name);
ApiBusinessIn businessIn = new ApiBusinessIn();
businessIn.name = name;
try (final Response response = exceptionMapper.handleUniResponse(uni)) {
return name.equals(response.getHeaderString(X_NAME));
} catch (final CcsClientGenericException | CcsServerGenericException | CcsWithStatusException e) {
return false;
}
}
...
}
Some helpers are created to make it easier to handle the return status.
// Response format
final Uni<Response> uni = getService().checkName(name);
try (final Response response = exceptionMapper.handleUniResponse(uni)) {
// OK
} catch (final CcsClientGenericException | CcsServerGenericException | CcsWithStatusException e) {
// Handle exception
}
// DTO format
final var uni = getService().getObjectMetadata(name);
return (ApiBusinessOut) exceptionMapper.handleUniObject(this, uni);
Note that if a Factory is going to be used for several targets, the factory is then not correctly initialized with the right URI. Therefore the following example shall be followed:
// Still get the Factory using @Inject
@Inject
ApiClientFactory factory;
// Then in method where the client is needed for a particular URI
try (final ApiClient apiClient = factory.newClient(uri)) {
// This method is synchronized on Factory to prevent wrong setup
// (getUri() will return the right URI at construction but not guaranteed later on)
}
7.3.1.7. Server implementation¶
io.clonecloudstore.common.quarkus.server
contains the StreamHandlerAbstract, the StreamServiceAbstract and some filters implementations for the server.
With those abstracts, the code needed is shortest and allow to be extended to any API and usages.
The abstract StreamServiceAbstract defines abstract methods, as StreamHandlerAbstract, that must be specified within the final client implementation, in order to include business implementation.
/**
* @return True if the digest is to be computed on the fly
*/
protected abstract boolean checkDigestToCumpute(I businessIn);
/**
* Check if the request for POST is valid, and if so, adapt the given MultipleActionsInputStream that will
* be used to consume the original InputStream.
* The implementation shall use the business logic to check the validity for this InputStream reception
* (from client to server) and, if valid, use the MultipleActionsInputStream, either as is or as a standard InputStream.
* (example: check through Object Storage that object does not exist yet, and if so
* add the consumption of the stream for the Object Storage object creation).
* Note that the stream might be kept compressed if keepInputStreamCompressed was specified at construction.
*/
protected abstract void checkPushAble(I businessIn, MultipleActionsInputStream inputStream)
throws CcsClientGenericException, CcsServerGenericException;
/**
* Returns a BusinessOut in case of POST (receiving InputStream on server side).
* The implementation shall use the business logic to get the right
* BusinessOut object to return.
* (example: getting the StorageObject object, including the computed or given Hash)
*
* @param businessIn businessIn as passed in constructor
* @param finalHash the final Hash if computed on the fly, or the original given one
* @param size the real size read (from received stream, could be compressed size if decompression is off at
* construction)
*/
protected abstract O getAnswerPushInputStream(I businessIn, String finalHash, long size)
throws CcsClientGenericException, CcsServerGenericException;
/**
* Returns a Map for Headers response in case of POST (receiving InputStream on server side).
* (example: headers for object name, object size, ...)
*
* @param businessIn businessIn as passed in constructor
* @param finalHash the final Hash if computed on the fly, or the original given one
* @param size the real size read
* @param businessOut previously constructed from getAnswerPushInputStream
*/
protected abstract Map<String, String> getHeaderPushInputStream(I businessIn, String finalHash, long size,
O businessOut)
throws CcsClientGenericException, CcsServerGenericException;
![digraph dependencies {
"servicePost" -> "nativeStreamHandler" [label="Creation"];
"nativeStreamHandler" -> "headersCheck" [label="Check compression"];
"headersCheck" -> "checkPushAble" [label="Check if push is allowed"];
"checkPushAble" -> "asyncInputStreamConsumer" [label="InputStream consumption through new thread and create a semaphore"];
"checkPushAble" -> "asyncInputStreamProducer" [label="InputStream producing from Network input"];
"asyncInputStreamConsumer" -> "externalUsage" [label="InputStream consumption through new thread"];
"externalUsage" -> "semaphoreRelease" [label="Release semaphore when consumption ending"];
"asyncInputStreamProducer" -> "inputStreamReception" [label="Feeding InputStream from Network"];
"inputStreamReception" -> "endProducing" [label="Finalize producing InputStream"];
{"semaphoreRelease" "endProducing"} -> "getAnswerPushInputStream" [label="Once production and consumption ending"];
"getAnswerPushInputStream" -> "finalizeConsumer" [label="Ending consumer once everything is done"];
"finalizeConsumer" -> "getHeaderPushInputStream" [label="Get final headers for response"];
"getHeaderPushInputStream" -> "finalAnswer" [label="Final answer"];
}](../_images/graphviz-9a9fdb942d615f36cef424026434d51ced6b9f4f.png)
Illustration of network steps in receiving InputStream within server¶
/**
* The implementation must check using business object that get inputStream request (server sending InputStream as
* result) is valid according to the businessIn from te Rest API and the headers.
* (example: ObjectStorage check of existence of object)
*
* @return True if the read action is valid for this businessIn object and headers
*/
protected abstract boolean checkPullAble(I businessIn, MultiMap headers)
throws CcsClientGenericException, CcsServerGenericException;
/**
* Returns the InputStream required for GET (server is sending the InputStream back to the client).
* The implementation shall use the business logic and controls to get the InputStream to return.
* (example: getting the Object Storage object stream)
*
* @param businessIn businessIn as passed in constructor
*/
protected abstract InputStream getPullInputStream(I businessIn)
throws CcsClientGenericException, CcsServerGenericException;
/**
* Returns a Map for Headers response in case of GET, added to InputStream get above (server is sending the
* InputStream back to the client)
* (example: headers for object name, object size...)
*
* @param businessIn businessIn as passed in constructor
*/
protected abstract Map<String, String> getHeaderPullInputStream(I businessIn)
throws CcsClientGenericException, CcsServerGenericException;
![digraph dependencies {
"serviceGet" -> "nativeStreamHandler" [label="Creation"];
"nativeStreamHandler" -> "headersCheck" [label="Check if push is allowed"];
"headersCheck" -> "checkPullAble" [label="Check if pull is allowed"];
"checkPullAble" -> "getPullInputStream" [label="InputStream and Metadata creation using async operation for InputStream"];
"getPullInputStream" -> "asyncInputStreamProducer" [label="InputStream producing from external resource"];
"getPullInputStream" -> "getHeaderPullInputStream" [label="Get final headers for response from Metadata"];
"getHeaderPullInputStream" -> "headerSend" [label="Send headers as answer"];
"asyncInputStreamProducer" -> "inputStreamEmission" [label="Async emission as response from InputStream"];
"inputStreamEmission" -> "endProducing" [label="End of InputStream sending"];
{"headerSend" "endProducing"} -> "finalAnswer" [label="Final answer"];
}](../_images/graphviz-2bdd64211d16f16b0c841a2834b94c1d68d1a637.png)
Illustration of network steps in sending InputStream within server¶
/**
* Return headers for error message.
* (example: get headers in case of error as Object name, Bucket name...)
*/
protected abstract Map<String, String> getHeaderError(I businessIn, int status);
Note that if several API are intended for this server to send or receive InputStream (various usages), one shall specialized the answer of those abstract methods through more general BusinessIn and BusinessOut types (for instance, using multiple sub elements or using instanceOf check).
@ApplicationScoped
@Path(API_ROOT)
public class ApiService
extends StreamServiceAbstract<ApiBusinessIn, ApiBusinessOut, NativeStreamHandler> {
The interaction with a Driver is done through the extension of StreamHandlerAbstract.
@RequestScoped
public class NativeStreamHandler
extends StreamHandlerAbstract<ApiBusinessIn, ApiBusinessOut> {
public NativeStreamHandler() {
}
// Implement abstract methods
}
7.3.2. TrafficShaping¶
Limiting traffic on network (or any other resource) could be difficult natively. This aims to propose a simple solution.
Since Quarkus implements natively trafficShaping, the project will use this default one.
7.3.3. JsonUtil¶
Since ObjectMapper from Jackson library is often needed for manual integration, this helper returns an ObjectMapper:
If Quarkus has initialized it, the one from Quarkus
If not, a default one, almost equivalent