Commit c49fcea5 authored by Jujube Orange's avatar Jujube Orange
Browse files

refactor(ws-rest): safer way to reset kafka records between tests

parent 799978c2
......@@ -22,7 +22,7 @@ public class KafkaConfiguration {
@Bean
public ProducerFactory<String, DecodedVisit> cleaQrCodesTopicFactory() {
final var configProps = kafkaProperties.buildConsumerProperties();
final var configProps = kafkaProperties.buildProducerProperties();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaVisitSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
......@@ -35,7 +35,7 @@ public class KafkaConfiguration {
@Bean
public ProducerFactory<String, ReportStat> cleaStatsTopicFactory() {
final var configProps = kafkaProperties.buildConsumerProperties();
final var configProps = kafkaProperties.buildProducerProperties();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
......
......@@ -18,6 +18,7 @@ import org.springframework.web.context.request.WebRequest;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
......@@ -42,10 +43,12 @@ public class CleaController implements CleaApi {
@Override
public ResponseEntity<ReportResponse> reportUsingPOST(fr.gouv.clea.ws.api.model.ReportRequest reportRequest) {
final var reportRequestVo = new ReportRequest(
reportRequest.getVisits().stream()
final var visits = reportRequest.getVisits() == null ? Collections.<Visit>emptyList()
: reportRequest.getVisits().stream()
.map(visit -> new Visit(visit.getQrCode(), visit.getQrCodeScanTime()))
.collect(toList()),
.collect(toList());
final var reportRequestVo = new ReportRequest(
visits,
reportRequest.getPivotDate()
);
ReportRequest filtered = this.filterReports(reportRequestVo, webRequest);
......
......@@ -86,7 +86,9 @@ public class CleaRestExceptionHandler extends ResponseEntityExceptionHandler {
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleOtherException(Exception ex, WebRequest webRequest) {
final HttpStatus status = getHttpStatus(ex);
log.error(String.format(ERROR_MESSAGE_TEMPLATE, ex.getLocalizedMessage(), webRequest.getDescription(false)));
log.error(
String.format(ERROR_MESSAGE_TEMPLATE, ex.getLocalizedMessage(), webRequest.getDescription(false)), ex
);
return this.jsonResponseEntity(this.exceptionToApiError(ex, status));
}
......
......@@ -52,9 +52,6 @@ class CleaControllerTest {
.then()
.statusCode(UNSUPPORTED_MEDIA_TYPE.value());
assertThat(KafkaManager.getRecords())
.isEmpty();
}
@Test
......@@ -67,11 +64,6 @@ class CleaControllerTest {
.then()
.statusCode(BAD_REQUEST.value());
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
assertThat(records)
.isEmpty();
}
@Test
......@@ -83,11 +75,6 @@ class CleaControllerTest {
.then()
.statusCode(BAD_REQUEST.value());
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
assertThat(records)
.isEmpty();
}
@Test
......@@ -110,9 +97,6 @@ class CleaControllerTest {
.body("validationErrors[0].rejectedValue", equalTo(null))
.body("validationErrors[0].message", containsString("null"))
.body("validationErrors", hasSize(1));
assertThat(KafkaManager.getRecords())
.isEmpty();
}
@Test
......@@ -155,10 +139,10 @@ class CleaControllerTest {
.body("httpStatus", equalTo(BAD_REQUEST.value()))
.body("timestamp", isStringDateBetweenNowAndTenSecondsAgo())
.body("message", equalTo("Invalid request"))
.body("validationErrors[0].object", equalTo("reportRequest"))
.body("validationErrors[0].object", equalTo("ReportRequest"))
.body("validationErrors[0].field", equalTo("visits"))
.body("validationErrors[0].rejectedValue", equalTo(null))
.body("validationErrors[0].message", equalTo("must not be null"))
.body("validationErrors[0].rejectedValue", hasSize(0))
.body("validationErrors[0].message", equalTo("must not be empty"))
.body("validationErrors", hasSize(1));
}
......@@ -176,10 +160,10 @@ class CleaControllerTest {
.body("httpStatus", equalTo(BAD_REQUEST.value()))
.body("timestamp", isStringDateBetweenNowAndTenSecondsAgo())
.body("message", equalTo("Invalid request"))
.body("validationErrors[0].object", equalTo("reportRequest"))
.body("validationErrors[0].object", equalTo("ReportRequest"))
.body("validationErrors[0].field", equalTo("visits"))
.body("validationErrors[0].rejectedValue", hasSize(0))
.body("validationErrors[0].message", startsWith("size must be between 1 and"))
.body("validationErrors[0].message", equalTo("must not be empty"))
.body("validationErrors", hasSize(1));
}
......@@ -203,8 +187,7 @@ class CleaControllerTest {
.body("success", is(true))
.body("message", is("1 reports processed, 1 rejected"));
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
final var records = KafkaManager.getRecords(1, "dev.clea.fct.visit-scan");
assertThat(records)
.extracting(ConsumerRecord::value)
......@@ -241,8 +224,7 @@ class CleaControllerTest {
.body("success", is(true))
.body("message", is("1 reports processed, 1 rejected"));
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
final var records = KafkaManager.getRecords(1, "dev.clea.fct.visit-scan");
assertThat(records)
.extracting(ConsumerRecord::value)
......@@ -279,8 +261,7 @@ class CleaControllerTest {
.body("success", is(true))
.body("message", is("1 reports processed, 1 rejected"));
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
final var records = KafkaManager.getRecords(1, "dev.clea.fct.visit-scan");
assertThat(records)
.extracting(ConsumerRecord::value)
......@@ -317,8 +298,7 @@ class CleaControllerTest {
.body("success", is(true))
.body("message", is("1 reports processed, 1 rejected"));
final var records = KafkaManager.getRecords()
.records("dev.clea.fct.visit-scan");
final var records = KafkaManager.getRecords(1, "dev.clea.fct.visit-scan");
assertThat(records)
.extracting(ConsumerRecord::value)
......
......@@ -19,7 +19,8 @@ import java.util.Base64;
import java.util.List;
import java.util.UUID;
import static fr.gouv.clea.ws.test.KafkaRecordAssert.assertThat;
import static fr.gouv.clea.ws.test.KafkaManager.assertThatKafkaRecordInTopic;
import static java.time.temporal.ChronoUnit.DAYS;
import static org.assertj.core.api.Assertions.tuple;
@IntegrationTest
......@@ -46,31 +47,27 @@ class ProducerServiceTest {
@DisplayName("test that produceVisits can send decoded lsps to kafka and that we can read them back")
void can_send_decrypted_lsps_to_kafka() {
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
UUID uuid3 = UUID.randomUUID();
UUID uuid1 = UUID.fromString("11111111-1111-1111-1111-111111111111");
UUID uuid2 = UUID.fromString("22222222-2222-2222-2222-222222222222");
UUID uuid3 = UUID.fromString("33333333-3333-3333-3333-333333333333");
byte[] encryptedLocationMessage1 = RandomUtils.nextBytes(21);
byte[] encryptedLocationMessage2 = RandomUtils.nextBytes(22);
byte[] encryptedLocationMessage3 = RandomUtils.nextBytes(23);
boolean isBackward1 = RandomUtils.nextBoolean();
boolean isBackward2 = RandomUtils.nextBoolean();
boolean isBackward3 = RandomUtils.nextBoolean();
Instant qrCodeScanTime1 = newRandomInstant();
Instant qrCodeScanTime2 = newRandomInstant();
Instant qrCodeScanTime3 = newRandomInstant();
Instant qrCodeScanTime1 = Instant.now().minus(1, DAYS);
Instant qrCodeScanTime2 = Instant.now().minus(2, DAYS);
Instant qrCodeScanTime3 = Instant.now().minus(3, DAYS);
List<DecodedVisit> decoded = List.of(
createSerializableDecodedVisit(qrCodeScanTime1, isBackward1, uuid1, encryptedLocationMessage1),
createSerializableDecodedVisit(qrCodeScanTime2, isBackward2, uuid2, encryptedLocationMessage2),
createSerializableDecodedVisit(qrCodeScanTime3, isBackward3, uuid3, encryptedLocationMessage3)
createSerializableDecodedVisit(qrCodeScanTime1, true, uuid1, encryptedLocationMessage1),
createSerializableDecodedVisit(qrCodeScanTime2, true, uuid2, encryptedLocationMessage2),
createSerializableDecodedVisit(qrCodeScanTime3, false, uuid3, encryptedLocationMessage3)
);
producerService.produceVisits(decoded);
final var records = KafkaManager.getRecords();
final var records = KafkaManager.getRecords(3, "dev.clea.fct.visit-scan");
Assertions.assertThat(records)
.extracting(ConsumerRecord::value)
.extracting(
......@@ -83,16 +80,19 @@ class ProducerServiceTest {
)
.containsExactly(
tuple(
uuid1.toString(), Base64.getEncoder().encodeToString(encryptedLocationMessage1),
qrCodeScanTime1.getEpochSecond(), isBackward1
"11111111-1111-1111-1111-111111111111",
Base64.getEncoder().encodeToString(encryptedLocationMessage1),
qrCodeScanTime1.getEpochSecond(), true
),
tuple(
uuid2.toString(), Base64.getEncoder().encodeToString(encryptedLocationMessage2),
qrCodeScanTime2.getEpochSecond(), isBackward2
"22222222-2222-2222-2222-222222222222",
Base64.getEncoder().encodeToString(encryptedLocationMessage2),
qrCodeScanTime2.getEpochSecond(), true
),
tuple(
uuid3.toString(), Base64.getEncoder().encodeToString(encryptedLocationMessage3),
qrCodeScanTime3.getEpochSecond(), isBackward3
"33333333-3333-3333-3333-333333333333",
Base64.getEncoder().encodeToString(encryptedLocationMessage3),
qrCodeScanTime3.getEpochSecond(), false
)
);
}
......@@ -113,8 +113,7 @@ class ProducerServiceTest {
producerService.produceStat(reportStat);
final var record = KafkaManager.getSingleRecord("dev.clea.fct.report-stats");
assertThat(record)
assertThatKafkaRecordInTopic("dev.clea.fct.report-stats")
.hasNoKey()
.hasNoHeader("__TypeId__")
.hasJsonValue("reported", 10)
......@@ -124,8 +123,4 @@ class ProducerServiceTest {
.hasJsonValue("close", 4)
.hasJsonValue("timestamp", timestamp);
}
private Instant newRandomInstant() {
return Instant.ofEpochSecond(RandomUtils.nextLong(0, Instant.now().getEpochSecond()));
}
}
package fr.gouv.clea.ws.test;
import com.fasterxml.jackson.databind.JsonNode;
import fr.gouv.clea.ws.configuration.CleaKafkaProperties;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestContext;
import org.springframework.test.context.TestExecutionListener;
......@@ -16,59 +15,75 @@ import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
* A {@link TestExecutionListener} to start a Kafka container to be used as a
* dependency for SpringBootTests.
* <p>
* It starts a Karfka container statically and export required system properties
* It starts a Kafka container statically and export required system properties
* to override Spring application context configuration.
* <p>
* It starts / closes a consumer before and after each test method.
* It starts / closes a consumer before / after each test method.
* <p>
* Static method {@link KafkaManager#getSingleRecord(String)} can be used to
* Static method {@link KafkaManager#getRecords(int, String)} can be used to
* fetch messages from Kafka.
*/
@Slf4j
public class KafkaManager implements TestExecutionListener {
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.4")
);
private static final List<String> TOPICS = List.of(
"dev.clea.fct.visit-scan",
"dev.clea.fct.report-stats"
);
private static final Map<String, Consumer<String, JsonNode>> CONSUMERS;
static {
KAFKA.start();
System.setProperty("spring.kafka.bootstrap-servers", KAFKA.getBootstrapServers());
}
private static Consumer<String, JsonNode> consumer;
@Override
@SneakyThrows
public void beforeTestMethod(TestContext testContext) {
final var cleaKafkaProperties = testContext.getApplicationContext().getBean(CleaKafkaProperties.class);
final var topics = List.of(
cleaKafkaProperties.getQrCodesTopic(),
cleaKafkaProperties.getStatsTopic()
);
final var config = KafkaTestUtils.consumerProps(KAFKA.getBootstrapServers(), "test-consumer", "false");
consumer = new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer())
.createConsumer();
consumer.subscribe(topics);
// open a consumer to fetch records for each topic
final var consumerConfig = KafkaTestUtils.consumerProps(KAFKA.getBootstrapServers(), "test-consumer", "false");
CONSUMERS = TOPICS.stream()
.collect(
toUnmodifiableMap(
topic -> topic,
topic -> new DefaultKafkaConsumerFactory<>(
consumerConfig, new StringDeserializer(),
new JsonDeserializer()
)
.createConsumer()
)
);
CONSUMERS.forEach((topic, consumer) -> consumer.subscribe(List.of(topic)));
}
@Override
public void afterTestMethod(TestContext testContext) {
consumer.commitSync();
KafkaTestUtils.getRecords(consumer, 1000);
consumer.close();
public void beforeTestMethod(TestContext testContext) {
// flush all kafka templates then fetch records to clear topics
final var kafkaTemplates = testContext.getApplicationContext()
.getBeansOfType(KafkaTemplate.class)
.values();
kafkaTemplates.forEach(KafkaTemplate::flush);
CONSUMERS.values()
.forEach(consumer -> KafkaTestUtils.getRecords(consumer, 100));
}
public static ConsumerRecords<String, JsonNode> getRecords() {
return KafkaTestUtils.getRecords(consumer, 100);
public static ConsumerRecords<String, JsonNode> getRecords(int expectedRecords, String topic) {
final var consumer = CONSUMERS.get(topic);
return KafkaTestUtils.getRecords(consumer, 5000, expectedRecords);
}
public static ConsumerRecord<String, JsonNode> getSingleRecord(String topic) {
return KafkaTestUtils.getSingleRecord(consumer, topic);
public static KafkaRecordAssert assertThatKafkaRecordInTopic(String topic) {
final var consumer = CONSUMERS.get(topic);
final var record = KafkaTestUtils.getSingleRecord(consumer, topic);
return KafkaRecordAssert.assertThat(record);
}
}
clea.conf:
duplicateScanThresholdInSeconds: 10800
logging.level.org.apache.kafka: WARN
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment