Commit ab754bda authored by Bergamote Orange's avatar Bergamote Orange
Browse files

packages reorg + empty intermediate database at batch begining

comment on empty db tasklet

work in progress - debugging multiple ltids in pipeline end files

tmp version - partitioning per prefix
parent 6f4cad89
......@@ -3,40 +3,51 @@ package fr.gouv.clea.config;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.identification.*;
import fr.gouv.clea.identification.ExposedVisitRowMapper;
import fr.gouv.clea.identification.RiskConfigurationService;
import fr.gouv.clea.identification.processor.ClusterToPeriodsProcessor;
import fr.gouv.clea.identification.processor.SinglePlaceExposedVisitsBuilder;
import fr.gouv.clea.identification.processor.SinglePlaceExposedVisitsProcessor;
import fr.gouv.clea.identification.writer.SinglePlaceClusterPeriodListWriter;
import fr.gouv.clea.indexation.IndexationPartitioner;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.processors.SinglePlaceClusterBuilder;
import fr.gouv.clea.indexation.readers.IteratorItemReader;
import fr.gouv.clea.indexation.readers.MemoryMapItemReader;
import fr.gouv.clea.indexation.readers.StepExecutionContextReader;
import fr.gouv.clea.indexation.writers.IndexationWriter;
import fr.gouv.clea.init.EmptyIntermediateDBTasklet;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import fr.gouv.clea.prefixes.ListItemReader;
import fr.gouv.clea.prefixes.PrefixesMemoryWriter;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import javax.sql.DataSource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import static fr.gouv.clea.config.BatchConstants.*;
@Slf4j
@Configuration
public class BatchConfig {
......@@ -65,17 +76,17 @@ public class BatchConfig {
public Job identificationJob() {
return this.jobBuilderFactory.get("identificationJob")
.incrementer(new RunIdIncrementer())
.start(clusterIdentification())
.start(emptyIntermediateDBstep())
.next(clusterIdentification())
.next(prefixesComputing())
.next(clusterIndexation())
.next(masterStepIndexation())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<String> ltidDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setVerifyCursorPosition(false);
reader.setDataSource(dataSource);
reader.setSql("select distinct " + LTID_COL + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
......@@ -104,20 +115,29 @@ public class BatchConfig {
@Bean
public Step clusterIdentification() {
CompositeItemProcessor<String, List<SinglePlaceClusterPeriod>> compositeProcessor = new CompositeItemProcessor<>();
final CompositeItemProcessor<String, List<SinglePlaceClusterPeriod>> compositeProcessor = new CompositeItemProcessor<>();
compositeProcessor.setDelegates(List.of(
exposedVisitBuilder(), // from String to ExposedVisit
singleClusterPlaceBuilder(), // from ExposedVisit to SingleClusterPlace
singlePlaceClusterPeriodListBuilder()) // from SingleClusterPlace to List<SinglePlaceClusterPeriods>
);
final SynchronizedItemStreamReader<String> reader = new SynchronizedItemStreamReader<>();
reader.setDelegate(ltidDBReader());
return stepBuilderFactory.get("identification")
.<String, List<SinglePlaceClusterPeriod>>chunk(1000)
.reader(ltidDBReader())
.reader(reader)
.processor(compositeProcessor)
.writer(new SinglePlaceClusterPeriodListWriter(mapper, dataSource))
.writer(new SinglePlaceClusterPeriodListWriter(dataSource))
.taskExecutor(taskExecutor())
.throttleLimit(10)
.throttleLimit(20)
.build();
}
@Bean
public Step emptyIntermediateDBstep() {
return stepBuilderFactory.get("emptyIntermediateDB")
.tasklet(emptyIntermediateDB())
.build();
}
......@@ -127,41 +147,83 @@ public class BatchConfig {
.<List<String>, List<String>>chunk(1000)
.reader(ltidListDBReader())
.writer(new PrefixesMemoryWriter(properties, prefixesStorageService))
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Step clusterIndexation() {
MemoryMapItemReader reader = new MemoryMapItemReader((prefixesStorageService.getPrefixWithAssociatedLtidsMap().entrySet())::iterator);
// MemoryMapItemReader reader = new MemoryMapItemReader((prefixesStorageService.getPrefixWithAssociatedLtidsMap().entrySet())::iterator);
return stepBuilderFactory.get("indexation")
//FIXME: set config for chunk size
.<Map.Entry<String, List<String>>, ClusterFile>chunk(1)
.reader(reader)
.processor(new SinglePlaceClusterBuilder(dataSource, mapper, properties)) // build a Map of ClusterFile at once
.writer(new IndexationWriter(properties, prefixesStorageService)) // build Files and index
.reader(memoryMapItemReader(null, null))
.processor(singlePlaceClusterBuilder()) // build a Map of ClusterFile at once
.writer(indexationWriter()) // build Files and index
.build();
}
@Bean
public IndexationWriter indexationWriter() {
return new IndexationWriter(properties, prefixesStorageService);
}
@Bean
public ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> singlePlaceClusterBuilder() {
return new SinglePlaceClusterBuilder(dataSource, mapper, properties);
}
@Bean
@StepScope
public ItemReader<Map.Entry<String, List<String>>> memoryMapItemReader(@Value("#{stepExecutionContext['prefix']}") String prefix,
@Value("#{stepExecutionContext['ltids']}") List<String> ltids) {
return new StepExecutionContextReader(prefix, ltids);
}
@Bean
public TaskExecutorPartitionHandler partitionHandler() {
final TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
//FIXME: set config for gridSize
partitionHandler.setGridSize(5);
partitionHandler.setStep(clusterIndexation());
partitionHandler.setTaskExecutor(new SimpleAsyncTaskExecutor());
return partitionHandler;
}
@Bean
public Step masterStepIndexation() {
return this.stepBuilderFactory.get("indexationMaster")
.partitioner("partitioner", prefixPartitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public Partitioner prefixPartitioner() {
log.info("callToPartitioner");
return new IndexationPartitioner(prefixesStorageService);
}
@Bean
TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("batch-ident");
}
@Bean
@StepScope
public Tasklet emptyIntermediateDB() {
return new EmptyIntermediateDBTasklet(dataSource);
}
@Bean
public ItemProcessor<String, SinglePlaceExposedVisits> exposedVisitBuilder() {
return new SinglePlaceExposedVisitsBuilder(dataSource);
return new SinglePlaceExposedVisitsBuilder(dataSource, new ExposedVisitRowMapper());
}
@Bean
@StepScope
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
}
@Bean
@StepScope
public ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> singlePlaceClusterPeriodListBuilder() {
return new ClusterToPeriodsProcessor(mapper);
}
......
......@@ -10,7 +10,7 @@ public class BatchConstants {
public static final String PERIOD_COLUMN = "period_start";
public static final String TIMESLOT_COLUMN = "timeslot";
public static final String LTID_PARAM = "ltid";
public static final String LTIDS_PARAM = "ltids";
public static final String LTID_COL = "ltid";
public static final String VENUE_TYPE_COL = "venue_type";
......
package fr.gouv.clea.config;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
@Component
public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
@Override
public void setDataSource(DataSource dataSource) {
// prevents spring batch from storing JobRepository in database and use in-memory map instead
}
}
package fr.gouv.clea.identification;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static fr.gouv.clea.config.BatchConstants.*;
@Slf4j
public class ExposedVisitPartitioner implements Partitioner {
private final JdbcOperations jdbcTemplate;
private int partitionsNumber = 0;
public ExposedVisitPartitioner(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
final String selectDistinctLtidRequest = "SELECT DISTINCT " + LTID_COL + " FROM " + EXPOSED_VISITS_TABLE;
final List<String> visitedPlaces = jdbcTemplate.queryForList(selectDistinctLtidRequest, String.class);
Map<String, ExecutionContext> result = new HashMap<>();
if (visitedPlaces.isEmpty()) {
return result;
}
String lastEvaluatedLtid = visitedPlaces.get(0);
for (String currentLtid : visitedPlaces) {
/*
first item: create exec context, register first ltid in context, register context
and create context for potential next ltid
*/
if (result.isEmpty()) {
lastEvaluatedLtid = createNewPartitionFromLtid(result, currentLtid);
}
/*
different currentLtid value = new ltid: register it in exec context, save exec context and create new one for
potential next ltid
*/
if (!currentLtid.equals(lastEvaluatedLtid)) {
lastEvaluatedLtid = createNewPartitionFromLtid(result, currentLtid);
}
}
log.debug("Detected ltids: {}", result);
return result;
}
private String createNewPartitionFromLtid(final Map<String, ExecutionContext> result, final String currentLtid) {
ExecutionContext ctx = new ExecutionContext();
ctx.put(LTID_PARAM, currentLtid);
createPartitionFromContext(result, ctx);
return currentLtid;
}
private void createPartitionFromContext(final Map<String, ExecutionContext> result, final ExecutionContext tmpExecutionContext) {
result.put("partition" + partitionsNumber, tmpExecutionContext);
partitionsNumber = result.size();
}
}
package fr.gouv.clea.identification;
package fr.gouv.clea.identification.processor;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import java.util.List;
......
package fr.gouv.clea.identification;
package fr.gouv.clea.identification.processor;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.entity.ExposedVisit;
import fr.gouv.clea.identification.ExposedVisitRowMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.annotation.AfterStep;
......@@ -24,28 +25,35 @@ import static fr.gouv.clea.config.BatchConstants.*;
@StepScope
public class SinglePlaceExposedVisitsBuilder implements ItemProcessor<String, SinglePlaceExposedVisits> {
JdbcTemplate jdbcTemplate;
AtomicLong counter = new AtomicLong();
private final JdbcTemplate jdbcTemplate;
public SinglePlaceExposedVisitsBuilder(DataSource dataSource) {
private final AtomicLong counter = new AtomicLong();
private final ExposedVisitRowMapper rowMapper;
public SinglePlaceExposedVisitsBuilder(DataSource dataSource, ExposedVisitRowMapper rowMapper) {
jdbcTemplate = new JdbcTemplate(dataSource);
this.rowMapper = rowMapper;
}
@Override
public SinglePlaceExposedVisits process(final String ltid) {
final long time1 = System.currentTimeMillis();
final List<ExposedVisit> list = jdbcTemplate.query("select * from " + EXPOSED_VISITS_TABLE
+ " WHERE ltid= ? ORDER BY " + PERIOD_COLUMN + ", " + TIMESLOT_COLUMN,
new ExposedVisitRowMapper(), UUID.fromString(ltid));
rowMapper, UUID.fromString(ltid));
ExposedVisit v = list.stream().findFirst().orElse(null);
if (null != v) {
long ln = counter.incrementAndGet();
if (0 == ln % 1000) {
log.info("Loaded {} visits, current LTId={} ", ln, ltid);
}
final long time2 = System.currentTimeMillis();
return SinglePlaceExposedVisits.builder()
.locationTemporaryPublicId(v.getLocationTemporaryPublicId())
.venueType(v.getVenueType()).venueCategory1(v.getVenueCategory1())
.venueCategory2(v.getVenueCategory2()).visits(list).build();
}
return null;
}
......
package fr.gouv.clea.identification;
package fr.gouv.clea.identification.processor;
import fr.gouv.clea.config.BatchProperties;
import fr.gouv.clea.dto.ClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.entity.ExposedVisit;
import fr.gouv.clea.identification.RiskConfigurationService;
import fr.gouv.clea.identification.RiskLevelConfig;
import fr.gouv.clea.utils.ExposedVisitComparator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
......
package fr.gouv.clea.identification;
package fr.gouv.clea.identification.writer;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.*;
import javax.sql.DataSource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static fr.gouv.clea.config.BatchConstants.*;
@Slf4j
public class SinglePlaceClusterPeriodListWriter extends JdbcBatchItemWriter<List<SinglePlaceClusterPeriod>> {
private final SinglePlaceClusterPeriodMapper mapper;
public class SinglePlaceClusterPeriodListWriter implements ItemWriter<List<SinglePlaceClusterPeriod>> {
private final NamedParameterJdbcOperations jdbcTemplate;
public SinglePlaceClusterPeriodListWriter(SinglePlaceClusterPeriodMapper mapper, DataSource datasource) {
this.mapper = mapper;
public SinglePlaceClusterPeriodListWriter(DataSource datasource) {
this.jdbcTemplate = new NamedParameterJdbcTemplate(datasource);
}
@Override
public void write(List<? extends List<SinglePlaceClusterPeriod>> lists) {
lists.forEach(list ->
list.forEach(singlePlaceClusterPeriod -> {
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
}));
final List<SinglePlaceClusterPeriod> flatList = lists.stream().flatMap(List::stream).collect(Collectors.toList());
final SqlParameterSource[] parameters = SqlParameterSourceUtils.createBatch(flatList);
jdbcTemplate.batchUpdate(getInsertSql(), parameters);
}
private String getInsertSql() {
......
package fr.gouv.clea.indexation;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static fr.gouv.clea.config.BatchConstants.LTIDS_PARAM;
@Slf4j
public class IndexationPartitioner implements Partitioner {
private final PrefixesStorageService prefixesStorageService;
public IndexationPartitioner(PrefixesStorageService prefixesStorageService) {
this.prefixesStorageService = prefixesStorageService;
}
/**
* Create a set of distinct {@link ExecutionContext} instances together with
* a unique identifier for each one. The identifiers should be short,
* mnemonic values, and only have to be unique within the return value (e.g.
* use an incrementer).
*
* @param gridSize the size of the map to return
* @return a map from identifier to input parameters
*/
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
final Map<String, List<String>> map = prefixesStorageService.getPrefixWithAssociatedLtidsMap();
final Map<String, ExecutionContext> result = new HashMap<>();
for (Map.Entry<String, List<String>> stringListEntry : map.entrySet()) {
final ExecutionContext value = new ExecutionContext();
value.put(LTIDS_PARAM, stringListEntry.getValue());
value.put("prefix", stringListEntry.getKey());
result.put("partition-" + stringListEntry.getKey(), value);
}
return result;
}
}
......@@ -22,6 +22,8 @@ import java.util.stream.Collectors;
import static fr.gouv.clea.config.BatchConstants.LTID_COL;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
import static fr.gouv.clea.prefixes.PrefixesStorageService.ltidsList;
import static fr.gouv.clea.prefixes.PrefixesStorageService.multipleLtidCount;
@Slf4j
public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String, List<String>>, ClusterFile> {
......@@ -30,8 +32,6 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String
private final SinglePlaceClusterPeriodMapper mapper;
private final BatchProperties properties;
AtomicLong counter = new AtomicLong();
public SinglePlaceClusterBuilder(
final DataSource dataSource,
final SinglePlaceClusterPeriodMapper mapper,
......@@ -45,19 +45,25 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String
@Override
public ClusterFile process(final Map.Entry<String, List<String>> ltids) {
jdbcTemplate.setQueryTimeout(120000);
ClusterFile clusterFile = new ClusterFile();
clusterFile.setName(Prefix.of(ltids.getValue().get(0), properties.prefixLength));
ltids.getValue().forEach(ltid -> {
if (ltidsList.contains(ltid)) {
log.info("ltid already exists: {}", ltid);
multipleLtidCount++;
} else {
ltidsList.add(ltid);
}
if (multipleLtidCount % 200 == 0 && multipleLtidCount != 0) {
log.info("multipleLtidCount: {}", multipleLtidCount);
}
final List<SinglePlaceClusterPeriod> clusterPeriodList = jdbcTemplate.query("select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE
+ " WHERE ltid= ? ORDER BY " + LTID_COL,
new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
SinglePlaceClusterPeriod singlePlaceClusterPeriod = clusterPeriodList.stream().findFirst().orElse(null);
if (null != singlePlaceClusterPeriod) {
long ln = counter.incrementAndGet();
if (0 == ln % 1000) {
log.info("Loaded {} singlePlaceClusterPeriod, current LTId={} ", ln, ltid);
}
List<ClusterPeriod> clusterPeriods = clusterPeriodList.stream().map(mapper::map).collect(Collectors.toList());
clusterFile.addItem(ClusterFileItem.ofCluster(SinglePlaceCluster.builder()
.locationTemporaryPublicId(singlePlaceClusterPeriod.getLocationTemporaryPublicId())
......@@ -69,10 +75,6 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<Map.Entry<String
}
});
log.info("Created cluterFile object containing {} items with prefix : {} ",
ltids.getValue().size(),
clusterFile.getName());
return clusterFile;
}
}
package fr.gouv.clea.indexation.readers;
import org.springframework.batch.core.annotation.BeforeRead;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
public class IteratorItemReader<T> implements ItemReader<T> {
......
package fr.gouv.clea.indexation.readers;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class MemoryMapItemReader extends IteratorItemReader<Map.Entry<String, List<String>>> {
public MemoryMapItemReader(Callable<Iterator<Map.Entry<String, List<String>>>> iterator) {
super(iterator);
}
}
package fr.gouv.clea.indexation.readers;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;