Vehicle Location using SpringBoot 3 with gRPC, OpenTelemetry and Micrometer Tracing — Part 2

Now let’s focus on developing Java applications.
First of all, let’s create the proto files and define the methods.
The first one is the common.proto file
syntax = "proto3";
package br.gasmartins.grpc.common;
option java_package = "br.gasmartins.grpc.common";
option java_outer_classname="CommonProto";
option java_multiple_files = true;
message ErrorDetail {
string errorCode = 1;
string message = 2;
map<string, string> metadata = 3;
}
The second is the pagination.proto file
syntax = "proto3";
package br.gasmartins.grpc.common;
option java_package = "br.gasmartins.grpc.common";
option java_outer_classname="PaginationProto";
option java_multiple_files = true;
message Pageable {
uint32 page = 1;
uint32 page_size = 2;
}
message Page {
uint32 page = 2;
uint32 page_size = 3;
uint32 total_pages = 4;
uint64 total_elements = 5;
bool first = 6;
bool last = 7;
bool has_previous = 8;
bool has_next = 9;
}
Now let’s create the sensor_service.proto file
syntax = "proto3";
package br.gasmartins.grpc.sensors;
option java_package = "br.gasmartins.grpc.sensors";
option java_outer_classname="SensorProto";
option java_multiple_files = true;
import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "br/gasmartins/grpc/common/pagination.proto";
service SensorService {
rpc store(stream SensorData) returns (stream SensorData) {
option (google.api.http) = {
post : "/v1/sensors"
body: "*"
};
}
rpc findBySensorId(google.protobuf.StringValue) returns (SensorData) {
option (google.api.http) = {
get: "/v1/sensors/{sensor_id}"
};
}
rpc findByVehicleIdAndOccurredOnBetween(stream SearchSensorDataByVehicleIdParam) returns (stream SensorDataPage) {
option (google.api.http) = {
get: "/v1/sensors?vehicle_id={vehicle_id}&start_occurred_on={start_occurred_on}&end_occurred_on={end_occurred_on}"
};
}
}
message SensorData {
string sensor_id = 1;
string vehicle_id = 2;
VehicleState vehicle_state = 3;
Coordinates coordinates = 4;
float speed = 5;
google.protobuf.Timestamp occurred_on = 6;
optional Location location = 7;
}
enum VehicleState {
MOVING = 0;
STOPPING = 1;
STOPPED = 2;
}
message Coordinates {
double latitude = 1;
double longitude = 2;
}
message Location {
string country = 1;
string state = 2;
string city = 3;
string district = 4;
string address = 5;
string zipCode = 6;
}
message SearchSensorDataByVehicleIdParam {
google.protobuf.StringValue vehicle_id = 1;
google.protobuf.Timestamp start_occurred_on = 2;
google.protobuf.Timestamp end_occurred_on = 3;
br.gasmartins.grpc.common.Pageable pageable = 4;
}
message SensorDataPage {
repeated SensorData data = 1;
br.gasmartins.grpc.common.Page page = 4;
}
Finally, let’s configure the application.yml file of the sensor-service
server:
port: 8080 #we need this to the actuator
spring:
application:
name: sensor-service
main:
allow-bean-definition-overriding: true
data:
elasticsearch:
repositories:
enabled: true
grpc:
server:
port: 8087 #this could be any port
client:
location-client:
address: static://localhost:8085
enable-keep-alive: false
negotiation-type: PLAINTEXT
The implementation can be something like this
import br.gasmartins.grpc.sensors.SearchSensorDataByVehicleIdParam;
import br.gasmartins.grpc.sensors.SensorData;
import br.gasmartins.grpc.sensors.SensorDataPage;
import br.gasmartins.grpc.sensors.SensorServiceGrpc;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.interfaces.grpc.observer.SearchSensorDataByVehicleIdParamStreamObserver;
import br.gasmartins.sensors.interfaces.grpc.observer.SensorDataStreamObserver;
import br.gasmartins.sensors.application.service.SensorService;
import com.google.protobuf.StringValue;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.UUID;
import static net.logstash.logback.marker.Markers.append;
@GrpcService
@RequiredArgsConstructor
@Slf4j
public class SensorGrpcController extends SensorServiceGrpc.SensorServiceImplBase {
private final SensorService service;
@Timed(value = "sensor-data-store.time", description = "Time taken to store sensor data")
@Counted(value = "sensor-data-store.count", description = "Number of requests to store sensor data")
@Override
public StreamObserver<SensorData> store(StreamObserver<SensorData> responseObserver) {
log.info(append("data", responseObserver), "Storing sensor data");
return new SensorDataStreamObserver(this.service, responseObserver);
}
@Timed(value = "sensor-data-by-sensor-id.time", description = "Time taken to return sensor data by sensor id")
@Counted(value = "sensor-data-by-sensor-id.count", description = "Number of requests to search sensor data by sensor id")
@Override
public void findBySensorId(StringValue sensorId, StreamObserver<SensorData> responseObserver) {
log.info(append("sensor_id", sensorId), "Searching sensor by id");
var sensorData = this.service.findById(UUID.fromString(sensorId.getValue()));
log.info(append("data", sensorData), "Sensor data was found successfully");
log.info(append("sensor", sensorData), "Mapping sensor data");
var sensorDataDto = SensorGrpcMapper.mapToDto(sensorData);
log.info(append("sensor", sensorDataDto), "Sensor was mapped successfully");
responseObserver.onNext(sensorDataDto);
responseObserver.onCompleted();
}
@Timed(value = "sensor-data-by-vehicle-id.time", description = "Time taken to return sensor data by vehicle id")
@Counted(value = "sensor-data-by-vehicle-id.count", description = "Number of requests to search sensor data by vehicle id")
@Override
public StreamObserver<SearchSensorDataByVehicleIdParam> findByVehicleIdAndOccurredOnBetween(StreamObserver<SensorDataPage> responseObserver) {
log.info("Searching sensor data");
return new SearchSensorDataByVehicleIdParamStreamObserver(this.service, responseObserver);
}
}
And the observers can be something like this
package br.gasmartins.sensors.interfaces.grpc.observer;
import br.gasmartins.grpc.sensors.SensorData;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.application.service.SensorService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static net.logstash.logback.marker.Markers.append;
;
@RequiredArgsConstructor
@Slf4j
public class SensorDataStreamObserver implements StreamObserver<SensorData> {
private final SensorService service;
private final StreamObserver<SensorData> responseObserver;
private br.gasmartins.sensors.domain.SensorData storedSensorData;
@Override
public void onNext(SensorData sensorDataDto) {
log.info(append("sensor", sensorDataDto), "Mapping sensor");
var sensorData = SensorGrpcMapper.mapToDomain(sensorDataDto);
log.info(append("sensor", sensorData), "Sensor was mapped successfully");
log.info(append("sensor", sensorData), "Storing sensor data");
this.storedSensorData = this.service.store(sensorData);
log.info(append("sensor", sensorData), "Sensor data was stored successfully");
log.info(append("sensor", this.storedSensorData), "Mapping stored sensor");
var storedSensorDataDto = SensorGrpcMapper.mapToDto(this.storedSensorData);
log.info(append("sensor", storedSensorDataDto), "Stored sensor was mapped successfully");
this.responseObserver.onNext(storedSensorDataDto);
}
@Override
public void onError(Throwable t) {
log.error("Error processing request", t);
this.responseObserver.onError(t);
}
@Override
public void onCompleted() {
log.info("All sensor data was found");
this.responseObserver.onCompleted();
}
}
import br.gasmartins.grpc.sensors.SearchSensorDataByVehicleIdParam;
import br.gasmartins.grpc.sensors.SensorDataPage;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.application.service.SensorService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import java.util.UUID;
import static br.gasmartins.sensors.infra.persistence.adapter.mapper.TimestampGrpcMapper.toLocalDatetime;
import static net.logstash.logback.marker.Markers.append;
@RequiredArgsConstructor
@Slf4j
public class SearchSensorDataByVehicleIdParamStreamObserver implements StreamObserver<SearchSensorDataByVehicleIdParam> {
private final SensorService service;
private final StreamObserver<SensorDataPage> responseObserver;
@Override
public void onNext(SearchSensorDataByVehicleIdParam request) {
log.info(append("request", request), "Searching sensor data");
var vehicleId = UUID.fromString(request.getVehicleId().getValue());
var startOccurredOn = toLocalDatetime(request.getStartOccurredOn());
var endOccurredOn = toLocalDatetime(request.getEndOccurredOn());
var pageable = request.getPageable();
var pageRequest = PageRequest.of(pageable.getPage(), pageable.getPageSize());
var page = this.service.findByVehicleIdAndOccurredOnBetween(vehicleId, startOccurredOn, endOccurredOn, pageRequest);
log.info(append("page", page), "Sensor was found successfully");
log.info(append("page", page), "Mapping page");
var pageDto = SensorGrpcMapper.mapToDto(page);
log.info(append("page", pageDto), "Page was mapped successfully");
this.responseObserver.onNext(pageDto);
}
@Override
public void onError(Throwable t) {
log.error("Error searching sensor data", t);
this.responseObserver.onError(t);
}
@Override
public void onCompleted() {
log.info("All request data is received");
this.responseObserver.onCompleted();
}
}
If we want to add trace to some method manually, we can do this:
@NewSpan("Search By Sensor Id")
public Optional<SensorDataEntity> findById(@SpanTag("id") UUID id) {
return this.repository.findById(id);
}
The implementation of the location service can be something like this
import br.gasmartins.grpc.locations.Coordinates;
import br.gasmartins.grpc.locations.Location;
import br.gasmartins.grpc.locations.LocationServiceGrpc;
import br.gasmartins.locations.interfaces.grpc.mapper.CoordinatesControllerMapper;
import br.gasmartins.locations.application.service.LocationService;
import br.gasmartins.locations.interfaces.grpc.mapper.LocationControllerMapper;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
import static net.logstash.logback.marker.Markers.append;
@GrpcService
@RequiredArgsConstructor
@Slf4j
public class LocationGrpcController extends LocationServiceGrpc.LocationServiceImplBase {
private final LocationService service;
@Timed(value = "location-by-coordinates.time", description = "Time taken to get location by coordinates")
@Counted(value = "location-by-coordinates.count", description = "Number of requests to get location by coordinates")
@Override
public void getLocationByCoordinates(Coordinates coordinatesDto, StreamObserver<Location> responseObserver) {
log.info(append("request", coordinatesDto), "Receiving requests");
log.info(append("coordinates", coordinatesDto), "Mapping coordinates");
var coordinates = CoordinatesControllerMapper.mapToDomain(coordinatesDto);
log.info(append("coordinates", coordinates), "Coordinates were mapped successfully");
log.info(append("coordinates", coordinates), "Searching location by coordinates");
var location = this.service.findByCoordinates(coordinates);
log.info(append("location", location), "Location was found successfully");
log.info(append("location", location), "Mapping location");
var locationDto = LocationControllerMapper.mapToDto(location);
log.info(append("location", locationDto), "Location was mapped successfully");
responseObserver.onNext(locationDto);
responseObserver.onCompleted();
}
}
The application.yml file of the location service will be like this
server:
port: 8082
grpc:
server:
port: 8085
spring:
application:
name: location-service
main:
allow-bean-definition-overriding: true
cloud:
openfeign:
micrometer:
enabled: true
okhttp:
enabled: true
management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://localhost:4317
endpoints:
web:
exposure:
include: '*'
metrics:
distribution:
percentiles-histogram:
http:
server:
requests: true
The Feign Client interface to use the PositionStack API is declared this way
@FeignClient(name = "location-client", url = "${webservice.position-stack.url}", configuration = FeignConfiguration.class)
public interface LocationFeignClient {
@CollectionFormat(feign.CollectionFormat.CSV)
@GetMapping("/reverse")
ResponseEntity<LocationDataDto> findByCoordinates(@RequestParam("access_key") String accessKey, @RequestParam("query") List<String> coordinates, @RequestParam("limit") Long limit);
}
Don’t forget to generate the API Key on the site https://positionstack.com/
In order to make feign client export traces we need to add the Capability bean
import feign.Capability;
import feign.micrometer.MicrometerCapability;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FeignConfiguration {
@Bean
public Capability capability(final MeterRegistry registry) {
return new MicrometerCapability(registry);
}
}
The part 3 brings the application usage and unit tests