Commit 605c6402 authored by Deniro StopCovid's avatar Deniro StopCovid Committed by Redford StopCovid

feat: Use the property var robert.server.request-time-delta-tolerance mapped...

feat: Use the property var robert.server.request-time-delta-tolerance mapped on the env var ROBERT_SERVER_REQUEST_TIME_DELTA_TOLERANCE
parent c6a0a564
......@@ -19,7 +19,7 @@
<groupId>fr.gouv.stopc</groupId>
<artifactId>robert-server</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<name>robert-server</name>
<packaging>pom</packaging>
<description>Projet principal</description>
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>fr.gouv.stopc</groupId>
<artifactId>robert-server</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>robert-crypto-grpc-server-messaging</artifactId>
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>fr.gouv.stopc</groupId>
<artifactId>robert-server</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</parent>
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>fr.gouv.stopc</groupId>
<artifactId>robert-server</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>robert-crypto-grpc-server</artifactId>
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>fr.gouv.stopc</groupId>
<artifactId>robert-server</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
</parent>
<artifactId>robert-server-batch</artifactId>
......
......@@ -5,6 +5,9 @@ import java.util.Map;
import javax.inject.Inject;
import fr.gouv.stopc.robert.server.batch.processor.RegistrationProcessor;
import fr.gouv.stopc.robertserver.database.model.Registration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
......@@ -29,6 +32,7 @@ import fr.gouv.stopc.robertserver.database.model.Contact;
import fr.gouv.stopc.robertserver.database.service.ContactService;
import fr.gouv.stopc.robertserver.database.service.IRegistrationService;
@Slf4j
@Configuration
@EnableBatchProcessing
public class ContactsProcessingConfiguration {
......@@ -47,13 +51,21 @@ public class ContactsProcessingConfiguration {
private final PropertyLoader propertyLoader;
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Inject
public ContactsProcessingConfiguration(final IServerConfigurationService serverConfigurationService,
final IRegistrationService registrationService,
final ContactService contactService,
final ICryptoServerGrpcClient cryptoServerClient,
final ScoringStrategyService scoringStrategyService,
final PropertyLoader propertyLoader
final PropertyLoader propertyLoader,
final JobBuilderFactory jobBuilderFactory,
final StepBuilderFactory stepBuilderFactory
) {
this.serverConfigurationService = serverConfigurationService;
......@@ -62,23 +74,52 @@ public class ContactsProcessingConfiguration {
this.cryptoServerClient = cryptoServerClient;
this.scoringStrategyService = scoringStrategyService;
this.propertyLoader = propertyLoader;
this.stepBuilderFactory = stepBuilderFactory;
this.jobBuilderFactory = jobBuilderFactory;
}
@Bean
public Job scoreAndProcessRisks(Step stepContact, Step stepRegistration) {
BatchMode batchMode;
try {
batchMode = BatchMode.valueOf(this.propertyLoader.getBatchMode());
} catch (IllegalArgumentException e) {
log.error("Unrecognized batch mode {}", this.propertyLoader.getBatchMode());
batchMode = BatchMode.NONE;
}
if (batchMode == BatchMode.FULL_REGISTRATION_SCAN_COMPUTE_RISK) {
log.info("Launching registration batch (No contact scoring, risk computation)");
return this.jobBuilderFactory.get("processRegistration").flow(stepRegistration).end().build();
} else if (batchMode == BatchMode.SCORE_CONTACTS_AND_COMPUTE_RISK) {
log.info("Launching contact batch (Contact scoring, Risk computation)");
return this.jobBuilderFactory.get("processContacts").flow(stepContact).end().build();
}
return null;
}
@Bean
public Job readReport(JobBuilderFactory jobBuilderFactory, Step step) {
return jobBuilderFactory.get("processContacts").flow(step).end().build();
public Step stepContact(
MongoItemReader<Contact> mongoContactItemReader,
MongoItemWriter<Contact> mongoContactItemWriter) {
return this.stepBuilderFactory.get("readContacts").<Contact, Contact>chunk(CHUNK_SIZE).reader(mongoContactItemReader)
.processor(contactsProcessor()).writer(mongoContactItemWriter).build();
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory, MongoItemReader<Contact> mongoItemReader,
MongoItemWriter<Contact> mongoItemWriter, IServerConfigurationService serverConfigurationService) {
return stepBuilderFactory.get("read").<Contact, Contact>chunk(CHUNK_SIZE).reader(mongoItemReader)
.processor(contactsProcessor()).writer(mongoItemWriter).build();
public Step stepRegistration(
MongoItemReader<Registration> mongoRegistrationItemReader,
MongoItemWriter<Registration> mongoRegistrationItemWriter) {
return this.stepBuilderFactory.get("readRegistrations").<Registration, Registration>chunk(CHUNK_SIZE).reader(mongoRegistrationItemReader)
.processor(registrationsProcessor()).writer(mongoRegistrationItemWriter).build();
}
@Bean
public MongoItemReader<Contact> mongoItemReader(MongoTemplate mongoTemplate) {
public MongoItemReader<Contact> mongoContactItemReader(MongoTemplate mongoTemplate) {
MongoItemReader<Contact> reader = new MongoItemReader<>();
......@@ -97,7 +138,26 @@ public class ContactsProcessingConfiguration {
}
@Bean
public MongoItemWriter<Contact> mongoItemWriter(MongoTemplate mongoTemplate) {
public MongoItemReader<Registration> mongoRegistrationItemReader(MongoTemplate mongoTemplate) {
MongoItemReader<Registration> reader = new MongoItemReader<>();
reader.setTemplate(mongoTemplate);
reader.setSort(new HashMap<String, Sort.Direction>() {{
put("_id", Direction.DESC);
}});
reader.setTargetType(Registration.class);
reader.setQuery("{}");
return reader;
}
@Bean
public MongoItemWriter<Contact> mongoContactItemWriter(MongoTemplate mongoTemplate) {
Map<String, Direction> sortDirection = new HashMap<>();
sortDirection.put("timeInsertion", Direction.DESC);
MongoItemWriter<Contact> writer = new MongoItemWriterBuilder<Contact>().template(mongoTemplate)
......@@ -105,6 +165,13 @@ public class ContactsProcessingConfiguration {
return writer;
}
@Bean
public MongoItemWriter<Registration> mongoRegistrationItemWriter(MongoTemplate mongoTemplate) {
MongoItemWriter<Registration> writer = new MongoItemWriterBuilder<Registration>().template(mongoTemplate)
.collection("Registration").build();
return writer;
}
@Bean
public ItemProcessor<Contact, Contact> contactsProcessor() {
return new ContactProcessor(
......@@ -116,4 +183,20 @@ public class ContactsProcessingConfiguration {
this.propertyLoader) {
};
}
@Bean
public ItemProcessor<Registration, Registration> registrationsProcessor() {
return new RegistrationProcessor(
this.serverConfigurationService,
this.registrationService,
this.scoringStrategyService,
this.propertyLoader) {
};
}
private enum BatchMode {
NONE,
FULL_REGISTRATION_SCAN_COMPUTE_RISK,
SCORE_CONTACTS_AND_COMPUTE_RISK;
}
}
......@@ -32,4 +32,7 @@ public class ScoringAlgorithmConfiguration {
// Constant for risk averaging
private double softMaxB;
// Tolerance for timestamp that may exceed the epoch duration
private int epochTolerance;
}
......@@ -5,10 +5,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import fr.gouv.stopc.robert.crypto.grpc.server.messaging.*;
import fr.gouv.stopc.robert.server.batch.service.impl.ScoringStrategyV2ServiceImpl;
import fr.gouv.stopc.robert.server.batch.utils.ScoringUtils;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.util.CollectionUtils;
......@@ -130,30 +129,14 @@ public class ContactProcessor implements ItemProcessor<Contact, Contact> {
}
List<EpochExposition> epochsToKeep = step9ScoreAndAddContactInListOfExposedEpochs(contact, epoch, registration);
int latestRiskEpoch = registration.getLatestRiskEpoch();
// Only consider epochs that are after the last notification for scoring
List<EpochExposition> scoresSinceLastNotif = CollectionUtils.isEmpty(epochsToKeep) ?
new ArrayList<>()
: epochsToKeep.stream()
.filter(ep -> ep.getEpochId() > latestRiskEpoch)
.collect(Collectors.toList());
// TODO: delay to end of batch for all registrations and epochs that have been affected
// If at risk detection is delayed to end of batch, no aggregate scoring here
// If not, scoring must be done here. If at risk trigger on single exposed epoch,
// then remove loop and get epochExposition[epoch] and launch aggregate but protect setAtRisk if set to true
int numberOfAtRiskExposedEpochs = 0;
for (EpochExposition epochExposition : scoresSinceLastNotif) {
double finalRiskForEpoch = this.scoringStrategy.aggregate(epochExposition.getExpositionScores());
if (finalRiskForEpoch > this.propertyLoader.getRiskThreshold()) {
log.info("Risk detected. Scored aggregate risk for epoch {}: {}", epochExposition.getEpochId(), finalRiskForEpoch);
numberOfAtRiskExposedEpochs++;
break;
}
}
registration.setAtRisk(numberOfAtRiskExposedEpochs >= this.scoringStrategy.getNbEpochsScoredAtRiskThreshold());
ScoringUtils.updateRegistrationIfRisk(
registration,
epochsToKeep,
this.serverConfigurationService.getServiceTimeStart(),
this.propertyLoader.getRiskThreshold(),
this.scoringStrategy
);
this.registrationService.saveRegistration(registration);
this.contactService.delete(contact);
......@@ -227,29 +210,16 @@ public class ContactProcessor implements ItemProcessor<Contact, Contact> {
.build());
}
List<EpochExposition> epochsToKeep = getExposedEpochsWithoutEpochsOlderThanContagiousPeriod(exposedEpochs);
int currentEpochId = TimeUtils.getCurrentEpochFrom(this.serverConfigurationService.getServiceTimeStart());
List<EpochExposition> epochsToKeep = ScoringUtils.getExposedEpochsWithoutEpochsOlderThanContagiousPeriod(
exposedEpochs,
currentEpochId,
this.propertyLoader.getContagiousPeriod(),
this.serverConfigurationService.getEpochDurationSecs());
registrationRecord.setExposedEpochs(epochsToKeep);
return epochsToKeep;
}
/**
* Keep epochs within the contagious period
* @param exposedEpochs
* @return
*/
private List<EpochExposition> getExposedEpochsWithoutEpochsOlderThanContagiousPeriod(List<EpochExposition> exposedEpochs) {
int currentEpochId = TimeUtils.getCurrentEpochFrom(this.serverConfigurationService.getServiceTimeStart());
// Purge exposed epochs list from epochs older than contagious period (C_T)
return CollectionUtils.isEmpty(exposedEpochs) ?
new ArrayList<>()
: exposedEpochs.stream().filter(epoch -> {
int nbOfEpochsToKeep = (this.propertyLoader.getContagiousPeriod() * 24 * 3600)
/ this.serverConfigurationService.getEpochDurationSecs();
return (currentEpochId - epoch.getEpochId()) <= nbOfEpochsToKeep;
}).collect(Collectors.toList());
}
private long castIntegerToLong(int x, int nbOfSignificantBytes) {
int shift = nbOfSignificantBytes * 8;
return Integer.toUnsignedLong(x << shift >>> shift);
......
package fr.gouv.stopc.robert.server.batch.processor;
import fr.gouv.stopc.robert.server.batch.service.ScoringStrategyService;
import fr.gouv.stopc.robert.server.batch.utils.PropertyLoader;
import fr.gouv.stopc.robert.server.batch.utils.ScoringUtils;
import fr.gouv.stopc.robert.server.common.service.IServerConfigurationService;
import fr.gouv.stopc.robert.server.common.utils.TimeUtils;
import fr.gouv.stopc.robertserver.database.model.EpochExposition;
import fr.gouv.stopc.robertserver.database.model.Registration;
import fr.gouv.stopc.robertserver.database.service.IRegistrationService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import java.util.List;
@Slf4j
@AllArgsConstructor
public class RegistrationProcessor implements ItemProcessor<Registration, Registration> {
private IServerConfigurationService serverConfigurationService;
private IRegistrationService registrationService;
private ScoringStrategyService scoringStrategy;
private PropertyLoader propertyLoader;
@Override
public Registration process(Registration registration) {
int currentEpochId = TimeUtils.getCurrentEpochFrom(this.serverConfigurationService.getServiceTimeStart());
List<EpochExposition> epochsToKeep = ScoringUtils.getExposedEpochsWithoutEpochsOlderThanContagiousPeriod(
registration.getExposedEpochs(),
currentEpochId,
this.propertyLoader.getContagiousPeriod(),
this.serverConfigurationService.getEpochDurationSecs());
log.info("Epoch to keep: {} ", epochsToKeep);
ScoringUtils.updateRegistrationIfRisk(
registration,
epochsToKeep,
this.serverConfigurationService.getServiceTimeStart(),
this.propertyLoader.getRiskThreshold(),
this.scoringStrategy
);
this.registrationService.saveRegistration(registration);
return null;
}
}
......@@ -86,7 +86,7 @@ public class ScoringStrategyV2ServiceImpl implements ScoringStrategyService {
@Override
public ScoringResult execute(Contact contact) throws RobertScoringException {
final int epochDurationInMinutes = serverConfigurationService.getEpochDurationSecs() / 60;
final int epochDurationInMinutes = this.serverConfigurationService.getEpochDurationSecs() / 60;
// Variables
final List<Number>[] listRSSI = new ArrayList[epochDurationInMinutes];
......@@ -109,10 +109,30 @@ public class ScoringStrategyV2ServiceImpl implements ScoringStrategyService {
double t0 = messageDetails.get(0).getTimeCollectedOnDevice();
int vectorSize = messageDetails.size();
double tf = messageDetails.get(vectorSize - 1).getTimeCollectedOnDevice();
/* Over run verification */
if ( (tf - t0) > (epochDurationInMinutes * 60 + configuration.getEpochTolerance()) )
{
String errorMessage = String.format(
"Skip contact because some hello messages are coming too late: %s sec after first message",
tf - t0);
log.error(errorMessage);
// Initializing values to 0.0 will ignore this problematic contact in the overall summation
return ScoringResult.builder()
.rssiScore(0.0)
.duration(0)
.nbContacts(0)
.build();
}
for (int k = 0; k < vectorSize; k++) {
HelloMessageDetail messageDetail = messageDetails.get(k);
double timestampDelta = messageDetail.getTimeCollectedOnDevice() - t0;
int index = (int) Math.floor(timestampDelta / 60.0);
index = index > epochDurationInMinutes ? epochDurationInMinutes - 1 : index;
if ((index >= 0) && (index < epochDurationInMinutes)) {
// Drop error Hello messages with too big RSSI (corresponding to errors in iOS
// and Android)
......@@ -122,12 +142,7 @@ public class ScoringStrategyV2ServiceImpl implements ScoringStrategyService {
int rssi = Math.min(messageDetail.getRssiCalibrated(), configuration.getRssiMax());
listRSSI[index].add(rssi);
}
} else {
String errorMessage = String.format("Epoch in minutes too big {}", index);
log.error(errorMessage);
throw new RobertScoringException(errorMessage);
}
}
// Phase 2: Average RSSI
......
......@@ -28,5 +28,8 @@ public class PropertyLoader {
private Integer contagiousPeriod;
@Value("${robert.scoring.scoring-algo-r0}")
private Double r0ScoringAlgorithm;
private Double r0ScoringAlgorithm;
@Value("${robert.scoring.batch-mode}")
private String batchMode;
}
package fr.gouv.stopc.robert.server.batch.utils;
import fr.gouv.stopc.robert.server.batch.model.ScoringResult;
import fr.gouv.stopc.robert.server.batch.service.ScoringStrategyService;
import fr.gouv.stopc.robert.server.common.utils.TimeUtils;
import fr.gouv.stopc.robertserver.database.model.EpochExposition;
import fr.gouv.stopc.robertserver.database.model.Registration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public final class ScoringUtils {
private ScoringUtils() {
throw new AssertionError();
}
/**
* Keep epochs within the contagious period
* @param exposedEpochs
* @return
*/
public static List<EpochExposition> getExposedEpochsWithoutEpochsOlderThanContagiousPeriod(
List<EpochExposition> exposedEpochs,
int currentEpochId,
int contagiousPeriod,
int epochDuration) {
// Purge exposed epochs list from epochs older than contagious period (C_T)
return CollectionUtils.isEmpty(exposedEpochs) ?
new ArrayList<>()
: exposedEpochs.stream().filter(epoch -> {
int nbOfEpochsToKeep = (contagiousPeriod * 24 * 3600) / epochDuration;
log.info("currentEpochId ({})- epoch.getEpochId() ({})) <= nbOfEpochsToKeep({}); {} ",
currentEpochId,
epoch.getEpochId(),
nbOfEpochsToKeep,
((currentEpochId - epoch.getEpochId()) <= nbOfEpochsToKeep));
return (currentEpochId - epoch.getEpochId()) <= nbOfEpochsToKeep;
}).collect(Collectors.toList());
}
public static void updateRegistrationIfRisk(Registration registration,
List<EpochExposition> epochExpositions,
long timeStart,
double riskThreshold,
ScoringStrategyService scoringStrategy) {
int latestRiskEpoch = registration.getLatestRiskEpoch();
// Only consider epochs that are after the last notification for scoring
List<EpochExposition> scoresSinceLastNotif = CollectionUtils.isEmpty(epochExpositions) ?
new ArrayList<>()
: epochExpositions.stream()
.filter(ep -> ep.getEpochId() > latestRiskEpoch)
.collect(Collectors.toList());
// Create a single list with all contact scores from all epochs
List<Double> allScoresFromAllEpochs = scoresSinceLastNotif.stream()
.map(EpochExposition::getExpositionScores)
.map(item -> item.stream().mapToDouble(Double::doubleValue).sum())
.collect(Collectors.toList());
Double totalRisk = scoringStrategy.aggregate(allScoresFromAllEpochs);
if (totalRisk >= riskThreshold) {
log.info("Risk detected. Aggregated risk since {}: {} greater than threshold {}",
latestRiskEpoch,
totalRisk,
riskThreshold);
// A risk has been detected, move time marker to now so that further risks are only posterior to this one
int newLatestRiskEpoch = TimeUtils.getCurrentEpochFrom(timeStart);
registration.setLatestRiskEpoch(newLatestRiskEpoch);
log.info("Updating latest risk epoch {}", newLatestRiskEpoch);
registration.setAtRisk(true);
}
}
}
......@@ -34,6 +34,9 @@ robert.scoring.soft-max-a=${ROBERT_SCORING_SOFT_MAX_A:4.342}
robert.scoring.soft-max-b=${ROBERT_SCORING_SOFT_MAX_B:0.2}
robert.scoring.algo-version=${ROBERT_SCORING_ALGO_VERSION:2}
robert.scoring.scoring-algo-r0=${ROBERT_SCORING_R0:0.007}
robert.scoring.epoch-tolerance=${ROBERT_SCORING_EPOCH_TOLERANCE:60}
robert.scoring.batch-mode=${ROBERT_SCORING_BATCH_MODE:SCORE_CONTACTS_AND_COMPUTE_RISK}
robert.protocol.hello-message-timestamp-tolerance=${ROBERT_PROTOCOL_HELLO_TOLERANCE:180}
......
......@@ -6,10 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.security.Key;
import java.security.SecureRandom;
......@@ -60,6 +57,7 @@ import fr.gouv.stopc.robertserver.database.model.Registration;
import fr.gouv.stopc.robertserver.database.service.ContactService;
import fr.gouv.stopc.robertserver.database.service.IRegistrationService;
import lombok.extern.slf4j.Slf4j;
import test.fr.gouv.stopc.robertserver.batch.utils.ProcessorTestUtils;
import javax.crypto.spec.SecretKeySpec;
......@@ -131,14 +129,14 @@ public class ContactProcessorTest {
@Test
public void testProcessContactWithABadEncryptedCountryCodeFails() {
this.registration = this.registrationService.createRegistration(generateIdA());
this.registration = this.registrationService.createRegistration(ProcessorTestUtils.generateIdA());
assertTrue(this.registration.isPresent());
try {
final long tpstStart = this.serverConfigurationService.getServiceTimeStart();
final long currentTime = TimeUtils.convertUnixMillistoNtpSeconds(new Date().getTime());
final int currentEpochId = TimeUtils.getNumberOfEpochsBetween(tpstStart, currentTime);
byte[] ebid = this.cryptoService.generateEBID(new CryptoSkinny64(serverKey), currentEpochId, this.generateIdA());
byte[] ebid = this.cryptoService.generateEBID(new CryptoSkinny64(serverKey), currentEpochId, ProcessorTestUtils.generateIdA());
// Create a fake Encrypted Country Code (ECC)
byte[] encryptedCountryCode = new byte[] { (byte) 0xff };
......@@ -167,14 +165,14 @@ public class ContactProcessorTest {
@Test
public void testProcessContactWitNoMessagesFails() {