Commit 8b552216 authored by Bergamote Orange's avatar Bergamote Orange
Browse files

more unit tests & config for prefixes computing chunk size

parent e8295559
......@@ -25,4 +25,6 @@ public class BatchConstants {
public static final String PREFIXES_PARTITION_KEY = "prefixes";
public static final String LTIDS_LIST_PARTITION_KEY = "ltids";
public static final String SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD = "select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " WHERE ltid= ?";
}
......@@ -27,4 +27,6 @@ public class BatchProperties {
private int identificationStepChunkSize;
private int indexationStepChunkSize;
private int prefixesComputingStepChunkSize;
}
......@@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
......@@ -47,10 +48,10 @@ public class IndexationStepBatchConfig {
private ClusterPeriodModelsMapper mapper;
@Bean
public Step clustersIndexation(final ObjectMapper objectMapper) {
public Step clustersIndexation(final ObjectMapper objectMapper, final JdbcTemplate jdbcTemplate) {
return this.stepBuilderFactory.get("clustersIndexation")
.partitioner("partitioner", prefixPartitioner())
.partitionHandler(partitionHandler(objectMapper))
.partitionHandler(partitionHandler(objectMapper, jdbcTemplate))
.build();
}
......@@ -61,20 +62,20 @@ public class IndexationStepBatchConfig {
}
@Bean
public TaskExecutorPartitionHandler partitionHandler(final ObjectMapper objectMapper) {
public TaskExecutorPartitionHandler partitionHandler(final ObjectMapper objectMapper, final JdbcTemplate jdbcTemplate) {
final TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setGridSize(properties.getGridSize());
partitionHandler.setStep(partitionedClustersIndexation(objectMapper));
partitionHandler.setStep(partitionedClustersIndexation(objectMapper, jdbcTemplate));
partitionHandler.setTaskExecutor(indexationTaskExecutor());
return partitionHandler;
}
@Bean
public Step partitionedClustersIndexation(final ObjectMapper objectMapper) {
public Step partitionedClustersIndexation(final ObjectMapper objectMapper, final JdbcTemplate jdbcTemplate) {
return stepBuilderFactory.get("partitionedClustersIndexation")
.<Map.Entry<String, List<String>>, ClusterFile>chunk(properties.getIndexationStepChunkSize())
.reader(memoryMapItemReader(null, null))
.processor(singlePlaceClusterBuilder()) // build a Map of ClusterFile at once
.processor(singlePlaceClusterBuilder(jdbcTemplate)) // build a Map of ClusterFile at once
.writer(indexationWriter(objectMapper)) // build Files and index
.build();
}
......@@ -89,8 +90,8 @@ public class IndexationStepBatchConfig {
}
@Bean
public ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> singlePlaceClusterBuilder() {
return new SinglePlaceClusterBuilder(dataSource, mapper, properties);
public ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> singlePlaceClusterBuilder(final JdbcTemplate jdbcTemplate) {
return new SinglePlaceClusterBuilder(jdbcTemplate, mapper);
}
@Bean
......
......@@ -35,7 +35,7 @@ public class PrefixesStepBatchConfig {
@Bean
public Step prefixesComputing() {
return stepBuilderFactory.get("prefixesComputing")
.<List<String>, List<String>>chunk(1000)
.<List<String>, List<String>>chunk(properties.getPrefixesComputingStepChunkSize())
.reader(ltidListDBReader())
.writer(new PrefixesMemoryWriter(properties, prefixesStorageService))
.build();
......
......@@ -46,7 +46,7 @@ public class GenerateClusterIndexTasklet implements Tasklet {
return RepeatStatus.FINISHED;
}
private void generateClusterIndex(final Long jobId, final Set<String> prefixes) throws IOException {
void generateClusterIndex(final Long jobId, final Set<String> prefixes) throws IOException {
ClusterFileIndex clusterFileIndex = ClusterFileIndex.builder()
.iteration(jobId.intValue())
......@@ -55,7 +55,7 @@ public class GenerateClusterIndexTasklet implements Tasklet {
log.info("Generating cluster index : " + outputPath + File.separator + CLUSTER_INDEX_FILENAME);
Path jsonPath = Paths.get(outputPath + File.separator + CLUSTER_INDEX_FILENAME);
Path jsonPath = Path.of(outputPath, CLUSTER_INDEX_FILENAME);
File jsonIndex = jsonPath.toFile();
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.writeValue(jsonIndex, clusterFileIndex);
......
......@@ -9,6 +9,7 @@ import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.model.output.Prefix;
import fr.gouv.clea.mapper.ClusterPeriodModelsMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.jdbc.core.JdbcTemplate;
......@@ -21,30 +22,22 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
import static fr.gouv.clea.config.BatchConstants.SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD;
@Slf4j
@RequiredArgsConstructor
public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> {
private final JdbcTemplate jdbcTemplate;
private final ClusterPeriodModelsMapper mapper;
private final BatchProperties properties;
public SinglePlaceClusterBuilder(
final DataSource dataSource,
final ClusterPeriodModelsMapper mapper,
final BatchProperties properties) {
jdbcTemplate = new JdbcTemplate(dataSource);
this.mapper = mapper;
this.properties = properties;
}
@Override
public ClusterFile process(final Map.Entry<String, List<String>> prefixLtidsEntry) {
log.debug("Processing prefix {} files...", prefixLtidsEntry.getKey());
ClusterFile clusterFile = new ClusterFile();
clusterFile.setName(Prefix.of(prefixLtidsEntry.getValue().get(0), properties.getStaticPrefixLength()));
clusterFile.setName(prefixLtidsEntry.getKey());
prefixLtidsEntry.getValue().forEach(createClusterFile(clusterFile));
return clusterFile;
......@@ -61,7 +54,7 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String
};
}
private ClusterFileItem createClusterFileItem(SinglePlaceClusterPeriod firstPeriod, List<ClusterPeriod> clusterPeriods) {
ClusterFileItem createClusterFileItem(SinglePlaceClusterPeriod firstPeriod, List<ClusterPeriod> clusterPeriods) {
return ClusterFileItem.ofCluster(SinglePlaceCluster.builder()
.locationTemporaryPublicId(firstPeriod.getLocationTemporaryPublicId())
.venueCategory1(firstPeriod.getVenueCategory1())
......@@ -71,9 +64,8 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String
.build());
}
private List<SinglePlaceClusterPeriod> getSinglePlaceClusterPeriods(final String ltid) {
return jdbcTemplate.query("select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " WHERE ltid= ?",
new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
List<SinglePlaceClusterPeriod> getSinglePlaceClusterPeriods(final String ltid) {
return jdbcTemplate.query(SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD, new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
}
private List<ClusterPeriod> buildClusterPeriods(final List<SinglePlaceClusterPeriod> clusterPeriodList) {
......
......@@ -24,7 +24,7 @@ clea:
duration-unit-in-seconds: 1800
static-prefix-length: ${CLEA_BATCH_PREFIX_LENGTH:2}
files-output-path: ${CLEA_BATCH_OUTPUT_PATH:/tmp/v1}
#TODO: totally arbitrary values, find out actual plausible values
grid-size: 6
identification-step-chunk-size: 1000
indexation-step-chunk-size: 1000
\ No newline at end of file
indexation-step-chunk-size: 1000
prefixes-computing-step-chunk-size: 1000
\ No newline at end of file
package fr.gouv.clea.indexation.index;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.indexation.model.output.ClusterFileIndex;
import fr.gouv.clea.service.PrefixesStorageService;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
import static fr.gouv.clea.config.BatchConstants.CLUSTER_INDEX_FILENAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
public class GenerateClusterIndexTaskletTest {
class GenerateClusterIndexTaskletTest {
@Captor
private ArgumentCaptor<ClusterFileIndex> clusterFileIndexCaptor;
@Mock
private ObjectMapper objectMapper;
@Mock
private BatchProperties batchProperties;
@Mock
private PrefixesStorageService prefixesStorageService;
@Test
void generateClusterIndex_uses_objectMapper_to_create_clusterFileIndex_from_provided_prefixes() throws IOException {
String outputPath = "outputPath";
when(batchProperties.getFilesOutputPath()).thenReturn(outputPath);
final GenerateClusterIndexTasklet tasklet = new GenerateClusterIndexTasklet(batchProperties, prefixesStorageService, objectMapper);
final Long jobId = 1L;
Set<String> prefixes = Set.of("prefix1", "prefix2", "prefix3");
File jsonIndex = Path.of(outputPath, CLUSTER_INDEX_FILENAME).toFile();
tasklet.generateClusterIndex(jobId, prefixes);
verify(objectMapper, times(1)).enable(SerializationFeature.INDENT_OUTPUT);
verify(objectMapper, times(1)).writeValue(eq(jsonIndex), clusterFileIndexCaptor.capture());
assertThat(clusterFileIndexCaptor.getValue().getPrefixes()).containsExactlyElementsOf(prefixes);
assertThat(clusterFileIndexCaptor.getValue().getIteration()).isEqualTo(jobId.intValue());
}
}
package fr.gouv.clea.indexation.processor;
import fr.gouv.clea.dto.ClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.indexation.SinglePlaceClusterPeriodRowMapper;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.model.output.ExposureRow;
import fr.gouv.clea.mapper.ClusterPeriodModelsMapper;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static fr.gouv.clea.config.BatchConstants.SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
class SinglePlaceClusterBuilderTest {
@Captor
private ArgumentCaptor<UUID> uuidArgumentCaptor;
@Captor
private ArgumentCaptor<String> sqlStringArgumentCaptor;
@Mock
private JdbcTemplate jdbcTemplate;
@Mock
private ClusterPeriodModelsMapper mapper;
@InjectMocks
private SinglePlaceClusterBuilder processor;
@Test
void getSinglePLaceClusterPeriods_queries_jdbcTemplate_with_provided_ltid() {
final String ltid = "729e373a-7ba3-4571-a5fc-c4591d6202cc";
processor.getSinglePlaceClusterPeriods(ltid);
verify(jdbcTemplate).query(sqlStringArgumentCaptor.capture(), any(SinglePlaceClusterPeriodRowMapper.class), uuidArgumentCaptor.capture());
assertThat(uuidArgumentCaptor.getValue()).isEqualTo(UUID.fromString(ltid));
assertThat(sqlStringArgumentCaptor.getValue()).isEqualTo(SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD);
}
@Test
void process_returns_clusterFile_with_prefixes_and_ltids_from_provided_list() {
final String prefix = "f6";
final UUID ltid1 = UUID.fromString("f67b2973-a1d5-4105-99d2-623262ced561");
final UUID ltid2 = UUID.fromString("f67b2973-a1d5-4105-99d2-623262ced562");
final List<String> ltidsList = List.of(ltid1.toString(), ltid2.toString());
final Map.Entry<String, List<String>> input = new AbstractMap.SimpleEntry<>(prefix, ltidsList);
final int venueCat1 = 0;
final int venueCat2 = 1;
final int venueType = 2;
// POJOs concerning first ltid treatment
final ClusterPeriod clusterPeriod1 = ClusterPeriod.builder().build();
final SinglePlaceClusterPeriod period1 = buildSinglePlaceClusterPeriod(ltid1, venueCat1, venueCat2, venueType);
// POJOs concerning second ltid treatment
final ClusterPeriod clusterPeriod2 = ClusterPeriod.builder().build();
final SinglePlaceClusterPeriod period2 = buildSinglePlaceClusterPeriod(ltid2, venueCat1, venueCat2, venueType);
when(jdbcTemplate.query(eq(SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD), any(SinglePlaceClusterPeriodRowMapper.class), eq(ltid1))).thenReturn(List.of(period1));
when(jdbcTemplate.query(eq(SQL_SELECT_BY_LTID_IN_SINGLEPLACECLUSTERPERIOD), any(SinglePlaceClusterPeriodRowMapper.class), eq(ltid2))).thenReturn(List.of(period2));
when(mapper.map(period1)).thenReturn(clusterPeriod1);
when(mapper.map(period2)).thenReturn(clusterPeriod2);
final ClusterFile result = processor.process(input);
assertThat(result).isNotNull();
assertThat(result.getName()).isEqualTo(prefix);
final ClusterFileItem expectedItem1 = buildClusterFileItem(ltid1, venueCat1, venueCat2, venueType, List.of(clusterPeriod1));
final ClusterFileItem expectedItem2 = buildClusterFileItem(ltid2, venueCat1, venueCat2, venueType, List.of(clusterPeriod2));
assertThat(result.getItems()).containsExactly(expectedItem1, expectedItem2);
}
@Test
void createClusterFileItem_returns_ClusterFileItem_from_input_periods() {
final UUID ltid1 = UUID.fromString("f67b2973-a1d5-4105-99d2-623262ced561");
final int venueCat1 = 0;
final int venueCat2 = 1;
final int venueType = 2;
final int startTimestamp1 = 3;
final int startTimestamp2 = 4;
final long duration1 = 0L;
final long duration2 = 1L;
final float riskLevel1 = 0;
final float riskLevel2 = 1;
final ClusterPeriod clusterPeriod1 = ClusterPeriod.builder()
.clusterStart(startTimestamp1)
.clusterDurationInSeconds((int) duration1)
.riskLevel(riskLevel1)
.build();
final ClusterPeriod clusterPeriod2 = ClusterPeriod.builder()
.clusterStart(startTimestamp2)
.clusterDurationInSeconds((int) duration2)
.riskLevel(riskLevel2)
.build();
final SinglePlaceClusterPeriod period1 = buildSinglePlaceClusterPeriod(ltid1, venueCat1, venueCat2, venueType);
final ClusterFileItem result = processor.createClusterFileItem(period1, List.of(clusterPeriod1, clusterPeriod2));
assertThat(result.getTemporaryLocationId()).isEqualTo(ltid1.toString());
final var expectedExposureRow1 = buildExposureRow(startTimestamp1, duration1, riskLevel1);
final var expectedExposureRow2 = buildExposureRow(startTimestamp2, duration2, riskLevel2);
assertThat(result.getExposures()).containsExactly(expectedExposureRow1, expectedExposureRow2);
}
private ExposureRow buildExposureRow(int startTimestamp1, long duration1, float riskLevel1) {
return ExposureRow.builder()
.startTimestamp(startTimestamp1)
.durationInSeconds(duration1)
.riskLevel(riskLevel1)
.build();
}
private SinglePlaceClusterPeriod buildSinglePlaceClusterPeriod(UUID ltid2, int venueCat1, int venueCat2, int venueType) {
return SinglePlaceClusterPeriod.builder()
.locationTemporaryPublicId(ltid2)
.venueCategory1(venueCat1)
.venueCategory2(venueCat2)
.venueType(venueType)
.build();
}
private ClusterFileItem buildClusterFileItem(UUID ltid1, int venueCat1, int venueCat2, int venueType, List<ClusterPeriod> clusterPeriods) {
return ClusterFileItem.ofCluster(SinglePlaceCluster.builder()
.locationTemporaryPublicId(ltid1)
.venueCategory1(venueCat1)
.venueCategory2(venueCat2)
.venueType(venueType)
.periods(clusterPeriods)
.build());
}
}
package fr.gouv.clea.prefixes;
import fr.gouv.clea.indexation.model.output.Prefix;
import fr.gouv.clea.service.PrefixesStorageService;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
@ExtendWith(MockitoExtension.class)
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
class PrefixesMemoryWriterTest {
@Mock
private PrefixesStorageService prefixesStorageService;
private final int prefixesLength = 2;
@Test
void write_adds_prefix_if_absent_and_adds_ltid_to_same_prefix() {
String prefix = "c1";
final String uuid1 = "c10e44c1-4aa9-46c5-b7cd-c6521088202a";
final String uuid2 = "c10e44c1-4aa9-46c5-b7cd-c6521088202b";
// list of list of strings because of writer interface contract
List<String> list = List.of(uuid1, uuid2);
final List<? extends List<String>> ltidsList = List.of(list);
final PrefixesMemoryWriter writer = new PrefixesMemoryWriter(prefixesStorageService, prefixesLength);
writer.write(ltidsList);
Mockito.verify(prefixesStorageService, times(list.size())).addPrefixIfAbsent(ArgumentMatchers.matches(prefix));
Mockito.verify(prefixesStorageService, times(list.size())).addLtidToPrefix(eq(prefix), anyString());
Mockito.verify(prefixesStorageService, times(1)).addLtidToPrefix(prefix, uuid1);
Mockito.verify(prefixesStorageService, times(1)).addLtidToPrefix(prefix, uuid2);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment