Commit a70f2bfa authored by Framboise Orange's avatar Framboise Orange
Browse files

Merge branch 'feature/improve_clea_venue_consumer_it_step3' into 'develop'

refactor(test): clea-venue-consumer : black-box testing

See merge request !43
parents 112d0734 3421ccfe
Pipeline #326902 canceled with stages
in 39 minutes and 55 seconds
package fr.gouv.clea.consumer;
import fr.gouv.clea.consumer.model.ReportStat;
import fr.gouv.clea.consumer.test.IntegrationTest;
import fr.gouv.clea.consumer.test.KafkaManager;
import fr.inria.clea.lsp.utils.TimeUtils;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import static fr.gouv.clea.consumer.test.ElasticManager.assertThatAllDocumentsFromElastic;
import static org.awaitility.Awaitility.await;
@IntegrationTest
public class ConsumeReportStatTest {
@Test
void should_send_report_stats_to_elastic() {
final var instant = Instant.parse("2019-07-22T09:37:42.251Z");
final var reportStat = ReportStat.builder()
.reported(10)
.rejected(2)
.backwards(5)
.forwards(3)
.close(4)
.timestamp(TimeUtils.ntpTimestampFromInstant(instant))
.build();
KafkaManager.whenSendReportStat(reportStat);
await().atMost(Duration.ofSeconds(10)).untilAsserted(
() -> assertThatAllDocumentsFromElastic()
.containsExactlyInAnyOrder(
Map.of(
"@timestamp", "2019-07-22T09:37:42.000Z",
"reported", 10,
"rejected", 2,
"backwards", 5,
"forwards", 3,
"close", 4
)
)
);
}
}
package fr.gouv.clea.consumer;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.repository.visits.ExposedVisitRepository;
import fr.gouv.clea.consumer.test.IntegrationTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import static fr.gouv.clea.consumer.test.ElasticManager.assertThatAllDocumentsFromElastic;
import static fr.gouv.clea.consumer.test.KafkaManager.whenSendDecodedVisit;
import static fr.gouv.clea.consumer.test.ReferenceData.givenBackwardDecodedVisitAt;
import static java.time.ZoneOffset.UTC;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@IntegrationTest
public class ConsumeVisitTest {
private static final Instant TODAY_AT_MIDNIGHT = Instant.now().truncatedTo(ChronoUnit.DAYS);
private static final Instant TODAY_AT_8AM = TODAY_AT_MIDNIGHT.plus(8, ChronoUnit.HOURS);
@Autowired
private ExposedVisitRepository repository;
@Test
void testConsumeVisit() {
DecodedVisit decodedVisit = givenBackwardDecodedVisitAt(TODAY_AT_8AM);
whenSendDecodedVisit(decodedVisit);
// location statistics are stored in elastic
await().atMost(10, SECONDS).untilAsserted(
() -> assertThatAllDocumentsFromElastic().containsExactlyInAnyOrder(
Map.of(
"id", TODAY_AT_8AM + "-vt:1-vc1:2-vc2:3",
"@timestamp", elasticDefaultStringRepresentation(TODAY_AT_8AM),
"venueType", 1,
"venueCategory1", 2,
"venueCategory2", 3,
"forwardVisits", 0,
"backwardVisits", 1
)
)
);
// exposed visit are stored in postgre
var exposedVisitList = repository.findAll();
assertThat(exposedVisitList).hasSize(7);
assertThat(exposedVisitList)
.allMatch(
exposedVisitEntity -> exposedVisitEntity.getTimeSlot() >= 1
&& exposedVisitEntity.getTimeSlot() <= 7
)
.allMatch(exposedVisitEntity -> exposedVisitEntity.getVenueType() == 1)
.allMatch(exposedVisitEntity -> exposedVisitEntity.getVenueCategory1() == 2)
.allMatch(exposedVisitEntity -> exposedVisitEntity.getVenueCategory2() == 3)
.allMatch(exposedVisitEntity -> exposedVisitEntity.getBackwardVisits() == 1)
.allMatch(exposedVisitEntity -> exposedVisitEntity.getForwardVisits() == 0);
}
private Object elasticDefaultStringRepresentation(Instant instant) {
return instant.atOffset(UTC)
.format(DateTimeFormatter.ofPattern(DateFormat.date_time.getPattern()));
}
}
package fr.gouv.clea.consumer.service;
import fr.gouv.clea.consumer.configuration.CleaKafkaProperties;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.model.ReportStat;
import fr.gouv.clea.consumer.test.IntegrationTest;
import fr.gouv.clea.consumer.test.KafkaManager;
import fr.gouv.clea.consumer.test.QrCode;
import fr.gouv.clea.consumer.utils.KafkaVisitSerializer;
import fr.inria.clea.lsp.LocationSpecificPartDecoder;
import fr.inria.clea.lsp.exception.CleaEncodingException;
import fr.inria.clea.lsp.utils.TimeUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.time.Instant;
import java.util.Base64;
import java.util.Map;
import static fr.gouv.clea.consumer.test.ElasticManager.assertThatAllDocumentsFromElastic;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@IntegrationTest
class ConsumerServiceTest {
@Autowired
private CleaKafkaProperties cleaKafkaProperties;
@Test
@DisplayName("test that consumeVisit listener triggers when something is sent to visit queue")
void testConsumeVisit() throws CleaEncodingException {
Map<String, Object> configs = KafkaTestUtils.producerProps(KafkaManager.getBootstrapServers());
Producer<String, DecodedVisit> producer = new DefaultKafkaProducerFactory<>(
configs,
new StringSerializer(),
new KafkaVisitSerializer()
).createProducer();
final var binaryLocationSpecificPart = Base64.getUrlDecoder().decode(QrCode.LOCATION_1_URL.getRef());
DecodedVisit decodedVisit = new DecodedVisit(
Instant.now(),
new LocationSpecificPartDecoder().decodeHeader(binaryLocationSpecificPart),
RandomUtils.nextBoolean()
);
producer.send(new ProducerRecord<>(cleaKafkaProperties.getQrCodesTopic(), decodedVisit));
producer.flush();
producer.close();
// kafka listener has been called when something appears in elastic
await().atMost(10, SECONDS)
.untilAsserted(
() -> assertThatAllDocumentsFromElastic().hasSize(1)
);
}
@Test
@DisplayName("test that consumeStat listener triggers when something is sent to stat queue")
void testConsumeStat() {
Map<String, Object> configs = KafkaTestUtils.producerProps(KafkaManager.getBootstrapServers());
Producer<String, ReportStat> producer = new DefaultKafkaProducerFactory<>(
configs,
new StringSerializer(),
new JsonSerializer<ReportStat>()
).createProducer();
long timestamp = TimeUtils.currentNtpTime();
ReportStat reportStat = ReportStat.builder()
.reported(10)
.rejected(2)
.backwards(5)
.forwards(3)
.close(4)
.timestamp(timestamp)
.build();
producer.send(new ProducerRecord<>(cleaKafkaProperties.getReportStatsTopic(), reportStat));
producer.flush();
producer.close();
// kafka listener has been called when something appears in elastic
await().atMost(10, SECONDS)
.untilAsserted(
() -> assertThatAllDocumentsFromElastic().hasSize(1)
);
}
}
package fr.gouv.clea.consumer.service;
import fr.gouv.clea.consumer.test.IntegrationTest;
import fr.gouv.clea.consumer.test.QrCode;
import fr.gouv.clea.consumer.test.ReferenceData;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -26,7 +26,7 @@ class StatisticsServiceRetryTest {
givenElasticIndexIsFrozen("health-clealocations-2019.07.22");
// when a visit is sent
final var visit = QrCode.defaultVisit()
final var visit = ReferenceData.defaultVisit()
.qrCodeScanTime(Instant.parse("2019-07-22T08:13:00Z"))
.venueType(4)
.venueCategory1(1)
......@@ -48,7 +48,7 @@ class StatisticsServiceRetryTest {
@Test
void should_send_to_kafka_after_some_failed_update_attempts() {
// given a visit has been written in elastic for period "2019-07-22T08:00:00Z"
final var visit = QrCode.defaultVisit()
final var visit = ReferenceData.defaultVisit()
.qrCodeScanTime(Instant.parse("2019-07-22T08:13:00Z"))
.venueType(4)
.venueCategory1(1)
......
package fr.gouv.clea.consumer.service;
import fr.gouv.clea.consumer.model.ReportStat;
import fr.gouv.clea.consumer.model.Visit;
import fr.gouv.clea.consumer.test.IntegrationTest;
import fr.inria.clea.lsp.utils.TimeUtils;
......@@ -303,33 +302,6 @@ class StatisticsServiceTest {
);
}
@Test
void should_send_report_stats_to_elastic() {
final var instant = Instant.parse("2019-07-22T09:37:42.251Z");
final var reportStat = ReportStat.builder()
.reported(10)
.rejected(2)
.backwards(5)
.forwards(3)
.close(4)
.timestamp(TimeUtils.ntpTimestampFromInstant(instant))
.build();
statisticsService.logStats(reportStat);
assertThatAllDocumentsFromElastic()
.containsExactlyInAnyOrder(
Map.of(
"@timestamp", "2019-07-22T09:37:42.000Z",
"reported", 10,
"rejected", 2,
"backwards", 5,
"forwards", 3,
"close", 4
)
);
}
private Object elasticDefaultStringRepresentation(Instant instant) {
return instant.atOffset(UTC)
.format(DateTimeFormatter.ofPattern(DateFormat.date_time.getPattern()));
......
package fr.gouv.clea.consumer.test;
import com.fasterxml.jackson.databind.JsonNode;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.model.ReportStat;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestContext;
......@@ -151,4 +158,31 @@ public class KafkaManager implements TestExecutionListener {
.collect(Collectors.toList());
}
}
public static void whenSendReportStat(ReportStat reportStat) {
Map<String, Object> configs = KafkaTestUtils.producerProps(KafkaManager.getBootstrapServers());
Producer<String, ReportStat> producer = new DefaultKafkaProducerFactory<>(
configs,
new StringSerializer(),
new JsonSerializer<ReportStat>()
).createProducer();
producer.send(new ProducerRecord<>("test.clea.fct.report-stat", reportStat));
producer.flush();
producer.close();
}
public static void whenSendDecodedVisit(DecodedVisit decodedVisit) {
Map<String, Object> configs = KafkaTestUtils.producerProps(KafkaManager.getBootstrapServers());
Producer<String, DecodedVisit> producer = new DefaultKafkaProducerFactory<>(
configs,
new StringSerializer(),
new KafkaVisitSerializer()
).createProducer();
producer.send(new ProducerRecord<>("test.clea.fct.qr-code-scan", decodedVisit));
producer.flush();
producer.close();
}
}
package fr.gouv.clea.consumer.utils;
package fr.gouv.clea.consumer.test;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
......
package fr.gouv.clea.consumer.test;
import fr.gouv.clea.consumer.model.DecodedVisit;
import fr.gouv.clea.consumer.model.Visit;
import fr.inria.clea.lsp.Location;
import fr.inria.clea.lsp.LocationContact;
......@@ -16,7 +17,7 @@ import java.util.UUID;
import static java.time.temporal.ChronoUnit.DAYS;
import static java.time.temporal.ChronoUnit.HOURS;
public class QrCode {
public class ReferenceData {
// server authority public key
private static final String PK_SA = "02c3a58bf668fa3fe2fc206152abd6d8d55102adfee68c8b227676d1fe763f5a06";
......@@ -51,6 +52,35 @@ public class QrCode {
}
}
public static DecodedVisit givenBackwardDecodedVisitAt(Instant qrCodeScanTime) {
return givenDecodedVisitAt(qrCodeScanTime, true);
}
private static DecodedVisit givenDecodedVisitAt(Instant qrCodeScanTime, boolean isBackward) {
try {
final var instant = Instant.now()
.minus(365, DAYS)
.truncatedTo(HOURS);
final var location = createRandomLocation(instant);
final var locationUrl = new URL(location.newDeepLink(qrCodeScanTime.minus(2, HOURS)));
final var binaryLocationSpecificPart = Base64.getUrlDecoder().decode(locationUrl.getRef());
return DecodedVisit.builder()
.qrCodeScanTime(qrCodeScanTime)
.encryptedLocationSpecificPart(
new LocationSpecificPartDecoder().decodeHeader(binaryLocationSpecificPart)
)
.isBackward(isBackward)
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static Location createRandomLocation(Instant instant) {
return Location.builder()
.manualContactTracingAuthorityPublicKey(PK_MCTA)
......@@ -74,8 +104,8 @@ public class QrCode {
.periodDuration(255 /* unlimited */)
.qrCodeRenewalIntervalExponentCompact(0x1F /* no renewal */)
.venueType(1)
.venueCategory1(1)
.venueCategory2(1)
.venueCategory1(2)
.venueCategory2(3)
.build();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment