Commit 8cd174c7 authored by calocedre TAC's avatar calocedre TAC
Browse files

Merge remote-tracking branch 'origin/develop' into fix/clea/ws-rest-duplicate2

parents b6eb320c 3361c597
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_DIR" value="${CLEA_BATCH_LOG_FILE_PATH:-/logs}" />
<property name="LOG_FILENAME" value="${CLEA_BATCH_LOG_FILE_NAME:-clea-batch}" />
<property name="ERROR_LOG_FILENAME" value="${CLEA_BATCH_ERROR_LOG_FILE_NAME:-clea-batch}.error" />
<appender name="RollingFile"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_DIR}/${LOG_FILENAME}.log</file>
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d %p %C{1.} [%t][%file:%line] %m%n</Pattern>
</encoder>
<rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- rollover daily and when the file reaches 10 MegaBytes -->
<fileNamePattern>${LOG_DIR}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
</rollingPolicy>
</appender>
<appender name="RollingErrorFile"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_DIR}/${ERROR_LOG_FILENAME}.log</file>
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d %p %C{1.} [%t][%file:%line] %m%n</Pattern>
</encoder>
<rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- rollover daily and when the file reaches 10 MegaBytes -->
<fileNamePattern>${LOG_DIR}/${ERROR_LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz
</fileNamePattern>
<maxFileSize>10MB</maxFileSize>
</rollingPolicy>
</appender>
<springProfile name="dev">
<include resource="org/springframework/boot/logging/logback/base.xml" />
<logger name="org.springframework" level="OFF"/>
</springProfile>
<springProfile name="!dev">
<!-- LOG everything at INFO level -->
<root level="info">
<appender-ref ref="RollingFile" />
</root>
<!-- at TRACE level -->
<logger name="trace" level="trace" additivity="false">
<appender-ref ref="RollingFile" />
</logger>
<!-- at WARN level -->
<logger name="warn" level="warn" additivity="false">
<appender-ref ref="RollingFile" />
</logger>
<!-- at ERROR level -->
<logger name="error" level="error" additivity="false">
<appender-ref ref="RollingErrorFile" />
</logger>
</springProfile>
</configuration>
......@@ -3,6 +3,13 @@
PROGNAM=$(basename $0)
die() { echo "[$PROGNAM] $*" 1>&2 ; exit 1; }
# redirect logs to log file
test -f "${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.log" || die "log file: ${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.log does not exist"
test -f "${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.error.log" || die "error log file: ${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.error.log does not exist"
exec >> "${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.log"
exec 2>> "${CLEA_BATCH_LOG_FILE_PATH}/${CLEA_BATCH_LOG_FILE_NAME}.error.log"
WORKDIR=${CLEA_BATCH_CLUSTER_OUTPUT_PATH:-/tmp/v1}
BUCKET=${BUCKET:-}
......@@ -20,7 +27,7 @@ if ! java -jar clea-batch.jar $@ ; then
fi
echo "[$PROGNAM] Coying files...."
echo "[$PROGNAM] Copying files...."
# Test that output folder exists, computing NBFILES fails if folder doesn't exist
......
package fr.gouv.clea.consumer.configuration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@RefreshScope
@ConditionalOnProperty(value = "clea.conf.scheduling.purge.enabled", havingValue = "true")
@Configuration
@EnableScheduling
......
package fr.gouv.clea.consumer.configuration;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.Positive;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
@Validated
@RefreshScope
@Configuration
@ConfigurationProperties(prefix = "clea.conf")
public class VenueConsumerConfiguration {
@Min(value = 600)
private long durationUnitInSeconds;
@Min(value = 1800)
private long statSlotDurationInSeconds;
@Positive
private int driftBetweenDeviceAndOfficialTimeInSecs;
@Positive
private int cleaClockDriftInSecs;
@Min(value = 10)
@Max(value = 30)
private int retentionDurationInDays;
}
package fr.gouv.clea.consumer.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.Table;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "STAT_LOCATION")
public class StatLocation {
@EmbeddedId
private StatLocationKey statLocationKey;
@Column(name = "backward_visits")
private long backwardVisits;
@Column(name = "forward_visits")
private long forwardVisits;
}
package fr.gouv.clea.consumer.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Embeddable;
import java.io.Serializable;
import java.time.Instant;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Embeddable
public class StatLocationKey implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "period")
private Instant period;
@Column(name = "venue_type")
private int venueType;
@Column(name = "venue_category1")
private int venueCategory1;
@Column(name = "venue_category2")
private int venueCategory2;
}
package fr.gouv.clea.consumer.repository;
import fr.gouv.clea.consumer.model.StatLocation;
import fr.gouv.clea.consumer.model.StatLocationKey;
import org.springframework.data.jpa.repository.JpaRepository;
public interface IStatLocationRepository extends JpaRepository<StatLocation, StatLocationKey> {
}
......@@ -3,5 +3,4 @@ package fr.gouv.clea.consumer.service;
public interface IExposedVisitEntityService {
void deleteOutdatedExposedVisits();
}
package fr.gouv.clea.consumer.service;
import fr.gouv.clea.consumer.model.Visit;
public interface IStatService {
void logStats(Visit visit);
}
package fr.gouv.clea.consumer.service.impl;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.service.IConsumerService;
......@@ -7,13 +14,9 @@ import fr.gouv.clea.consumer.service.IDecodedVisitService;
import fr.gouv.clea.consumer.service.IVisitExpositionAggregatorService;
import fr.gouv.clea.consumer.utils.MessageFormatter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@RefreshScope
@Slf4j
public class ConsumerService implements IConsumerService {
......
package fr.gouv.clea.consumer.service.impl;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import fr.gouv.clea.consumer.configuration.VenueConsumerConfiguration;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.service.IDecodedVisitService;
......@@ -9,13 +17,6 @@ import fr.inria.clea.lsp.LocationSpecificPart;
import fr.inria.clea.lsp.LocationSpecificPartDecoder;
import fr.inria.clea.lsp.exception.CleaEncryptionException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
@Component
@Slf4j
......@@ -25,20 +26,16 @@ public class DecodedVisitService implements IDecodedVisitService {
private final CleaEciesEncoder cleaEciesEncoder;
private final int driftBetweenDeviceAndOfficialTimeInSecs;
private final int cleaClockDriftInSecs;
private final VenueConsumerConfiguration config;
@Autowired
public DecodedVisitService(
LocationSpecificPartDecoder decoder,
CleaEciesEncoder cleaEciesEncoder,
@Value("${clea.conf.driftBetweenDeviceAndOfficialTimeInSecs}") int driftBetweenDeviceAndOfficialTimeInSecs,
@Value("${clea.conf.cleaClockDriftInSecs}") int cleaClockDriftInSecs) {
VenueConsumerConfiguration config) {
this.decoder = decoder;
this.cleaEciesEncoder = cleaEciesEncoder;
this.driftBetweenDeviceAndOfficialTimeInSecs = driftBetweenDeviceAndOfficialTimeInSecs;
this.cleaClockDriftInSecs = cleaClockDriftInSecs;
this.config = config;
}
@Override
......@@ -80,7 +77,11 @@ public class DecodedVisitService implements IDecodedVisitService {
if (qrCodeRenewalInterval == 0) {
return false;
}
return Duration.between(visit.getQrCodeScanTime(), visit.getQrCodeValidityStartTime()).abs().toSeconds()
> (qrCodeRenewalInterval + driftBetweenDeviceAndOfficialTimeInSecs + cleaClockDriftInSecs);
boolean isDrifting = Duration.between(visit.getQrCodeScanTime(), visit.getQrCodeValidityStartTime()).abs().toSeconds()
> (qrCodeRenewalInterval + config.getDriftBetweenDeviceAndOfficialTimeInSecs() + config.getCleaClockDriftInSecs());
if (!isDrifting && visit.getQrCodeScanTime().isBefore(visit.getQrCodeValidityStartTime())) {
visit.setQrCodeScanTime(visit.getQrCodeValidityStartTime());
}
return isDrifting;
}
}
package fr.gouv.clea.consumer.service.impl;
import fr.gouv.clea.consumer.repository.IExposedVisitRepository;
import fr.gouv.clea.consumer.service.IExposedVisitEntityService;
import fr.inria.clea.lsp.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import javax.transaction.Transactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.transaction.Transactional;
import fr.gouv.clea.consumer.configuration.VenueConsumerConfiguration;
import fr.gouv.clea.consumer.repository.IExposedVisitRepository;
import fr.gouv.clea.consumer.service.IExposedVisitEntityService;
import fr.inria.clea.lsp.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
@Component
@RefreshScope
@Slf4j
public class ExposedVisitEntityService implements IExposedVisitEntityService {
private final IExposedVisitRepository repository;
private final int retentionDurationInDays;
private final int durationUnitInSeconds;
private final VenueConsumerConfiguration config;
@Autowired
public ExposedVisitEntityService(
IExposedVisitRepository repository,
@Value("${clea.conf.retentionDurationInDays}") int retentionDurationInDays,
@Value("${clea.conf.durationUnitInSeconds}") int durationUnitInSeconds
) {
VenueConsumerConfiguration config) {
this.repository = repository;
this.retentionDurationInDays = retentionDurationInDays;
this.durationUnitInSeconds = durationUnitInSeconds;
this.config = config;
}
@Override
......@@ -38,7 +36,7 @@ public class ExposedVisitEntityService implements IExposedVisitEntityService {
public void deleteOutdatedExposedVisits() {
try {
long start = System.currentTimeMillis();
int count = this.repository.purge(TimeUtils.currentNtpTime(), durationUnitInSeconds, TimeUtils.NB_SECONDS_PER_HOUR * 24, retentionDurationInDays);
int count = this.repository.purge(TimeUtils.currentNtpTime(), (int) config.getDurationUnitInSeconds(), TimeUtils.NB_SECONDS_PER_HOUR * 24, config.getRetentionDurationInDays());
long end = System.currentTimeMillis();
log.info("successfully purged {} entries from DB in {} seconds", count, (end - start) / 1000);
} catch (Exception e) {
......
package fr.gouv.clea.consumer.service.impl;
import fr.gouv.clea.consumer.configuration.VenueConsumerConfiguration;
import fr.gouv.clea.consumer.model.StatLocation;
import fr.gouv.clea.consumer.model.StatLocationKey;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.repository.IStatLocationRepository;
import fr.gouv.clea.consumer.service.IStatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
@Component
@Slf4j
public class StatService implements IStatService {
private final IStatLocationRepository repository;
private final VenueConsumerConfiguration config;
@Autowired
public StatService(
IStatLocationRepository repository,
VenueConsumerConfiguration configuration) {
this.repository = repository;
this.config = configuration;
}
@Override
public void logStats(Visit visit) {
StatLocationKey statLocationKey = StatLocationKey.builder()
.period(this.getStatPeriod(visit))
.venueType(visit.getVenueType())
.venueCategory1(visit.getVenueCategory1())
.venueCategory2(visit.getVenueCategory2())
.build();
Optional<StatLocation> optional = repository.findById(statLocationKey);
StatLocation statLocation;
if (optional.isEmpty()) {
statLocation = newStatLocation(statLocationKey, visit);
} else {
statLocation = updateStatLocation(optional.get(), visit);
}
repository.save(statLocation);
log.info("saved stat period: {}, venueType: {} venueCategory1: {}, venueCategory2: {}, backwardVisits: {}, forwardVisits: {}",
statLocation.getStatLocationKey().getPeriod(),
statLocation.getStatLocationKey().getVenueType(),
statLocation.getStatLocationKey().getVenueCategory1(),
statLocation.getStatLocationKey().getVenueCategory2(),
statLocation.getBackwardVisits(),
statLocation.getForwardVisits()
);
}
protected StatLocation newStatLocation(StatLocationKey statLocationKey, Visit visit) {
return StatLocation.builder()
.statLocationKey(statLocationKey)
.backwardVisits(visit.isBackward() ? 1 : 0)
.forwardVisits(visit.isBackward() ? 0 : 1)
.build();
}
protected StatLocation updateStatLocation(StatLocation statLocation, Visit visit) {
if (visit.isBackward()) {
statLocation.setBackwardVisits(statLocation.getBackwardVisits() + 1);
} else {
statLocation.setForwardVisits(statLocation.getForwardVisits() + 1);
}
return statLocation;
}
protected Instant getStatPeriod(Visit visit) {
long secondsToRemove = visit.getQrCodeScanTime().getEpochSecond() % config.getStatSlotDurationInSeconds();
return visit.getQrCodeScanTime().minus(secondsToRemove, ChronoUnit.SECONDS);
}
}
package fr.gouv.clea.consumer.service.impl;
import fr.gouv.clea.consumer.configuration.VenueConsumerConfiguration;
import fr.gouv.clea.consumer.model.ExposedVisitEntity;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.repository.IExposedVisitRepository;
import fr.gouv.clea.consumer.service.IStatService;
import fr.gouv.clea.consumer.service.IVisitExpositionAggregatorService;
import fr.inria.clea.lsp.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
......@@ -9,17 +20,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import fr.gouv.clea.consumer.configuration.VenueConsumerConfiguration;
import fr.gouv.clea.consumer.model.ExposedVisitEntity;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.repository.IExposedVisitRepository;
import fr.gouv.clea.consumer.service.IVisitExpositionAggregatorService;
import fr.inria.clea.lsp.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class VisitExpositionAggregatorService implements IVisitExpositionAggregatorService {
......@@ -28,12 +28,16 @@ public class VisitExpositionAggregatorService implements IVisitExpositionAggrega
private final VenueConsumerConfiguration configuration;
private final IStatService statService;
@Autowired
public VisitExpositionAggregatorService(
IExposedVisitRepository repository,
VenueConsumerConfiguration configuration) {
VenueConsumerConfiguration configuration,
IStatService statService) {
this.repository = repository;
this.configuration = configuration;
this.statService = statService;
}
@Override
......@@ -48,7 +52,7 @@ public class VisitExpositionAggregatorService implements IVisitExpositionAggrega
int firstExposedSlot = Math.max(0, (int) scanTimeSlot - exposureTime + 1);
int lastExposedSlot = Math.min(this.getPeriodMaxSlot(visit.getPeriodDuration()), (int) scanTimeSlot + exposureTime - 1);
List<ExposedVisitEntity> exposedVisits = repository.findAllByLocationTemporaryPublicIdAndPeriodStart(visit.getLocationTemporaryPublicId(), this.periodStartFromCompressedPeriodStart(visit.getCompressedPeriodStartTime()));
List<ExposedVisitEntity> exposedVisits = repository.findAllByLocationTemporaryPublicIdAndPeriodStart(visit.getLocationTemporaryPublicId(), periodStartFromCompressedPeriodStart(visit.getCompressedPeriodStartTime()));
List<ExposedVisitEntity> toUpdate = new ArrayList<>();
List<ExposedVisitEntity> toPersist = new ArrayList<>();
......@@ -71,6 +75,8 @@ public class VisitExpositionAggregatorService implements IVisitExpositionAggrega
repository.saveAll(merged);
log.info("Persisting {} new visits!", toPersist.size());
log.info("Updating {} existing visits!", toUpdate.size());
statService.logStats(visit);
} else {
log.info("LTId: {}, qrScanTime: {} - No visit to persist / update", visit.getLocationTemporaryPublicId(), visit.getQrCodeScanTime());
}
......@@ -82,7 +88,7 @@ public class VisitExpositionAggregatorService implements IVisitExpositionAggrega
protected int getPeriodMaxSlot(int periodDuration) {
// This check should go in venue consumer configuration validation
if (Duration.ofHours(1).toSeconds() % periodDuration == 0) {
log.error("durationUnitInSeconds does not have a valid value: {}. 1 hour / durationUnitInSeconds has a reminder!");
log.error("durationUnitInSeconds does not have a valid value: {}. 3600(secs) / durationUnitInSeconds has a reminder!", periodDuration);
}
if (periodDuration == 255) {
return Integer.MAX_VALUE;
......@@ -110,7 +116,7 @@ public class VisitExpositionAggregatorService implements IVisitExpositionAggrega
protected ExposedVisitEntity newExposedVisit(Visit visit, int slotIndex) {
// TODO: visit.getPeriodStart returning an Instant
long periodStart = this.periodStartFromCompressedPeriodStart(visit.getCompressedPeriodStartTime());
long periodStart = periodStartFromCompressedPeriodStart(visit.getCompressedPeriodStartTime());
return ExposedVisitEntity.builder()
.locationTemporaryPublicId(visit.getLocationTemporaryPublicId())
.venueType(visit.getVenueType())
......
......@@ -4,6 +4,7 @@ clea:
driftBetweenDeviceAndOfficialTimeInSecs: 300
retentionDurationInDays: 14
durationUnitInSeconds: 1800
statSlotDurationInSeconds: 3600
scheduling:
purge:
cron: "0 0 1 * * *"
......
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_DIR" value="${CLEA_VENUE_CONSUMER_LOG_FILE_PATH:-/logs}" />
<property name="LOG_FILENAME" value="${CLEA_VENUE_CONSUMER_LOG_FILE_NAME:-clea-venue-consumer}" />
<property name="ERROR_LOG_FILENAME" value="${CLEA_VENUE_CONSUMER_ERROR_LOG_FILE_NAME:-clea-venue-consumer}.error" />
<appender name="RollingFile"