Vehicle Location using SpringBoot 3 with gRPC, OpenTelemetry and Micrometer Tracing — Part 2

Gabriel Martins
5 min readNov 21, 2023

Now let’s focus on developing Java applications.

First of all, let’s create the proto files and define the methods.

The first one is the common.proto file

syntax = "proto3";

package br.gasmartins.grpc.common;

option java_package = "br.gasmartins.grpc.common";
option java_outer_classname="CommonProto";
option java_multiple_files = true;

message ErrorDetail {
string errorCode = 1;
string message = 2;
map<string, string> metadata = 3;
}

The second is the pagination.proto file

syntax = "proto3";

package br.gasmartins.grpc.common;

option java_package = "br.gasmartins.grpc.common";
option java_outer_classname="PaginationProto";
option java_multiple_files = true;

message Pageable {
uint32 page = 1;
uint32 page_size = 2;
}

message Page {
uint32 page = 2;
uint32 page_size = 3;
uint32 total_pages = 4;
uint64 total_elements = 5;
bool first = 6;
bool last = 7;
bool has_previous = 8;
bool has_next = 9;
}

Now let’s create the sensor_service.proto file

syntax = "proto3";

package br.gasmartins.grpc.sensors;

option java_package = "br.gasmartins.grpc.sensors";
option java_outer_classname="SensorProto";
option java_multiple_files = true;

import "google/api/annotations.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
import "br/gasmartins/grpc/common/pagination.proto";

service SensorService {

rpc store(stream SensorData) returns (stream SensorData) {
option (google.api.http) = {
post : "/v1/sensors"
body: "*"
};
}

rpc findBySensorId(google.protobuf.StringValue) returns (SensorData) {
option (google.api.http) = {
get: "/v1/sensors/{sensor_id}"
};
}

rpc findByVehicleIdAndOccurredOnBetween(stream SearchSensorDataByVehicleIdParam) returns (stream SensorDataPage) {
option (google.api.http) = {
get: "/v1/sensors?vehicle_id={vehicle_id}&start_occurred_on={start_occurred_on}&end_occurred_on={end_occurred_on}"
};
}

}

message SensorData {

string sensor_id = 1;
string vehicle_id = 2;
VehicleState vehicle_state = 3;
Coordinates coordinates = 4;
float speed = 5;
google.protobuf.Timestamp occurred_on = 6;
optional Location location = 7;

}

enum VehicleState {

MOVING = 0;
STOPPING = 1;
STOPPED = 2;

}

message Coordinates {
double latitude = 1;
double longitude = 2;
}

message Location {

string country = 1;
string state = 2;
string city = 3;
string district = 4;
string address = 5;
string zipCode = 6;

}

message SearchSensorDataByVehicleIdParam {
google.protobuf.StringValue vehicle_id = 1;
google.protobuf.Timestamp start_occurred_on = 2;
google.protobuf.Timestamp end_occurred_on = 3;
br.gasmartins.grpc.common.Pageable pageable = 4;
}

message SensorDataPage {
repeated SensorData data = 1;
br.gasmartins.grpc.common.Page page = 4;
}

Finally, let’s configure the application.yml file of the sensor-service

server:
port: 8080 #we need this to the actuator

spring:
application:
name: sensor-service
main:
allow-bean-definition-overriding: true
data:
elasticsearch:
repositories:
enabled: true

grpc:
server:
port: 8087 #this could be any port
client:
location-client:
address: static://localhost:8085
enable-keep-alive: false
negotiation-type: PLAINTEXT

The implementation can be something like this

import br.gasmartins.grpc.sensors.SearchSensorDataByVehicleIdParam;
import br.gasmartins.grpc.sensors.SensorData;
import br.gasmartins.grpc.sensors.SensorDataPage;
import br.gasmartins.grpc.sensors.SensorServiceGrpc;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.interfaces.grpc.observer.SearchSensorDataByVehicleIdParamStreamObserver;
import br.gasmartins.sensors.interfaces.grpc.observer.SensorDataStreamObserver;
import br.gasmartins.sensors.application.service.SensorService;
import com.google.protobuf.StringValue;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.UUID;

import static net.logstash.logback.marker.Markers.append;

@GrpcService
@RequiredArgsConstructor
@Slf4j
public class SensorGrpcController extends SensorServiceGrpc.SensorServiceImplBase {

private final SensorService service;

@Timed(value = "sensor-data-store.time", description = "Time taken to store sensor data")
@Counted(value = "sensor-data-store.count", description = "Number of requests to store sensor data")
@Override
public StreamObserver<SensorData> store(StreamObserver<SensorData> responseObserver) {
log.info(append("data", responseObserver), "Storing sensor data");
return new SensorDataStreamObserver(this.service, responseObserver);
}

@Timed(value = "sensor-data-by-sensor-id.time", description = "Time taken to return sensor data by sensor id")
@Counted(value = "sensor-data-by-sensor-id.count", description = "Number of requests to search sensor data by sensor id")
@Override
public void findBySensorId(StringValue sensorId, StreamObserver<SensorData> responseObserver) {
log.info(append("sensor_id", sensorId), "Searching sensor by id");
var sensorData = this.service.findById(UUID.fromString(sensorId.getValue()));
log.info(append("data", sensorData), "Sensor data was found successfully");

log.info(append("sensor", sensorData), "Mapping sensor data");
var sensorDataDto = SensorGrpcMapper.mapToDto(sensorData);
log.info(append("sensor", sensorDataDto), "Sensor was mapped successfully");

responseObserver.onNext(sensorDataDto);
responseObserver.onCompleted();
}

@Timed(value = "sensor-data-by-vehicle-id.time", description = "Time taken to return sensor data by vehicle id")
@Counted(value = "sensor-data-by-vehicle-id.count", description = "Number of requests to search sensor data by vehicle id")
@Override
public StreamObserver<SearchSensorDataByVehicleIdParam> findByVehicleIdAndOccurredOnBetween(StreamObserver<SensorDataPage> responseObserver) {
log.info("Searching sensor data");
return new SearchSensorDataByVehicleIdParamStreamObserver(this.service, responseObserver);
}

}

And the observers can be something like this

package br.gasmartins.sensors.interfaces.grpc.observer;

import br.gasmartins.grpc.sensors.SensorData;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.application.service.SensorService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import static net.logstash.logback.marker.Markers.append;

;

@RequiredArgsConstructor
@Slf4j
public class SensorDataStreamObserver implements StreamObserver<SensorData> {

private final SensorService service;

private final StreamObserver<SensorData> responseObserver;

private br.gasmartins.sensors.domain.SensorData storedSensorData;

@Override
public void onNext(SensorData sensorDataDto) {
log.info(append("sensor", sensorDataDto), "Mapping sensor");
var sensorData = SensorGrpcMapper.mapToDomain(sensorDataDto);
log.info(append("sensor", sensorData), "Sensor was mapped successfully");

log.info(append("sensor", sensorData), "Storing sensor data");
this.storedSensorData = this.service.store(sensorData);
log.info(append("sensor", sensorData), "Sensor data was stored successfully");

log.info(append("sensor", this.storedSensorData), "Mapping stored sensor");
var storedSensorDataDto = SensorGrpcMapper.mapToDto(this.storedSensorData);
log.info(append("sensor", storedSensorDataDto), "Stored sensor was mapped successfully");
this.responseObserver.onNext(storedSensorDataDto);
}

@Override
public void onError(Throwable t) {
log.error("Error processing request", t);
this.responseObserver.onError(t);
}

@Override
public void onCompleted() {
log.info("All sensor data was found");
this.responseObserver.onCompleted();
}

}
import br.gasmartins.grpc.sensors.SearchSensorDataByVehicleIdParam;
import br.gasmartins.grpc.sensors.SensorDataPage;
import br.gasmartins.sensors.interfaces.grpc.mapper.SensorGrpcMapper;
import br.gasmartins.sensors.application.service.SensorService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;

import java.util.UUID;

import static br.gasmartins.sensors.infra.persistence.adapter.mapper.TimestampGrpcMapper.toLocalDatetime;
import static net.logstash.logback.marker.Markers.append;

@RequiredArgsConstructor
@Slf4j
public class SearchSensorDataByVehicleIdParamStreamObserver implements StreamObserver<SearchSensorDataByVehicleIdParam> {

private final SensorService service;
private final StreamObserver<SensorDataPage> responseObserver;

@Override
public void onNext(SearchSensorDataByVehicleIdParam request) {
log.info(append("request", request), "Searching sensor data");
var vehicleId = UUID.fromString(request.getVehicleId().getValue());
var startOccurredOn = toLocalDatetime(request.getStartOccurredOn());
var endOccurredOn = toLocalDatetime(request.getEndOccurredOn());
var pageable = request.getPageable();
var pageRequest = PageRequest.of(pageable.getPage(), pageable.getPageSize());
var page = this.service.findByVehicleIdAndOccurredOnBetween(vehicleId, startOccurredOn, endOccurredOn, pageRequest);
log.info(append("page", page), "Sensor was found successfully");

log.info(append("page", page), "Mapping page");
var pageDto = SensorGrpcMapper.mapToDto(page);
log.info(append("page", pageDto), "Page was mapped successfully");
this.responseObserver.onNext(pageDto);
}

@Override
public void onError(Throwable t) {
log.error("Error searching sensor data", t);
this.responseObserver.onError(t);
}

@Override
public void onCompleted() {
log.info("All request data is received");
this.responseObserver.onCompleted();
}

}

If we want to add trace to some method manually, we can do this:


@NewSpan("Search By Sensor Id")
public Optional<SensorDataEntity> findById(@SpanTag("id") UUID id) {
return this.repository.findById(id);
}

The implementation of the location service can be something like this

import br.gasmartins.grpc.locations.Coordinates;
import br.gasmartins.grpc.locations.Location;
import br.gasmartins.grpc.locations.LocationServiceGrpc;
import br.gasmartins.locations.interfaces.grpc.mapper.CoordinatesControllerMapper;
import br.gasmartins.locations.application.service.LocationService;
import br.gasmartins.locations.interfaces.grpc.mapper.LocationControllerMapper;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

import static net.logstash.logback.marker.Markers.append;

@GrpcService
@RequiredArgsConstructor
@Slf4j
public class LocationGrpcController extends LocationServiceGrpc.LocationServiceImplBase {

private final LocationService service;

@Timed(value = "location-by-coordinates.time", description = "Time taken to get location by coordinates")
@Counted(value = "location-by-coordinates.count", description = "Number of requests to get location by coordinates")
@Override
public void getLocationByCoordinates(Coordinates coordinatesDto, StreamObserver<Location> responseObserver) {
log.info(append("request", coordinatesDto), "Receiving requests");

log.info(append("coordinates", coordinatesDto), "Mapping coordinates");
var coordinates = CoordinatesControllerMapper.mapToDomain(coordinatesDto);
log.info(append("coordinates", coordinates), "Coordinates were mapped successfully");

log.info(append("coordinates", coordinates), "Searching location by coordinates");
var location = this.service.findByCoordinates(coordinates);
log.info(append("location", location), "Location was found successfully");

log.info(append("location", location), "Mapping location");
var locationDto = LocationControllerMapper.mapToDto(location);
log.info(append("location", locationDto), "Location was mapped successfully");

responseObserver.onNext(locationDto);
responseObserver.onCompleted();
}

}

The application.yml file of the location service will be like this

server:
port: 8082

grpc:
server:
port: 8085

spring:
application:
name: location-service
main:
allow-bean-definition-overriding: true
cloud:
openfeign:
micrometer:
enabled: true
okhttp:
enabled: true

management:
tracing:
enabled: true
sampling:
probability: 1.0
otlp:
tracing:
endpoint: http://localhost:4317
endpoints:
web:
exposure:
include: '*'
metrics:
distribution:
percentiles-histogram:
http:
server:
requests: true

The Feign Client interface to use the PositionStack API is declared this way

@FeignClient(name = "location-client", url = "${webservice.position-stack.url}", configuration = FeignConfiguration.class)
public interface LocationFeignClient {


@CollectionFormat(feign.CollectionFormat.CSV)
@GetMapping("/reverse")
ResponseEntity<LocationDataDto> findByCoordinates(@RequestParam("access_key") String accessKey, @RequestParam("query") List<String> coordinates, @RequestParam("limit") Long limit);


}

Don’t forget to generate the API Key on the site https://positionstack.com/

In order to make feign client export traces we need to add the Capability bean

import feign.Capability;
import feign.micrometer.MicrometerCapability;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FeignConfiguration {

@Bean
public Capability capability(final MeterRegistry registry) {
return new MicrometerCapability(registry);
}
}

The part 3 brings the application usage and unit tests

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Gabriel Martins
Gabriel Martins

Written by Gabriel Martins

Just a brazilian guy who loves technology and wants to share the experiences he has learned

No responses yet