Commit d80b6849 authored by Bergamote Orange's avatar Bergamote Orange
Browse files

wip - batch opti init

batch structure with indermediate data table - compiles but not working

refactoring - missing prefix memory reader
parent 0785e4fe
......@@ -13,6 +13,7 @@
<properties>
<java.version>11</java.version>
<org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
</properties>
<dependencies>
......@@ -73,6 +74,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.4.2.Final</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
......@@ -143,6 +150,32 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>11</source>
<target>11</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${org.mapstruct.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<compilerArg>
-Amapstruct.defaultComponentModel=spring
</compilerArg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
......
package fr.gouv.clea.config;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.identification.ExposedVisitPartitioner;
import fr.gouv.clea.identification.RiskConfigurationService;
import fr.gouv.clea.identification.SinglePlaceExposedVisitsItemWriter;
import fr.gouv.clea.identification.SinglePlaceExposedVisitsProcessor;
import fr.gouv.clea.identification.reader.ExposedVisitItemReader;
import fr.gouv.clea.identification.reader.SinglePlaceExposedVisitItemReader;
import fr.gouv.clea.indexation.model.output.ClusterFile;
import fr.gouv.clea.indexation.processors.IndexationProcessor;
import fr.gouv.clea.indexation.readers.IndexationReader;
import fr.gouv.clea.identification.*;
import fr.gouv.clea.indexation.model.output.ClusterFileItem;
import fr.gouv.clea.indexation.processors.SinglePlaceClusterBuilder;
import fr.gouv.clea.indexation.readers.PrefixesMemoryReader;
import fr.gouv.clea.indexation.writers.IndexationWriter;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import fr.gouv.clea.prefixes.ListItemReader;
import fr.gouv.clea.prefixes.PrefixesComputingProcessor;
import fr.gouv.clea.prefixes.PrefixesMemoryWriter;
import fr.gouv.clea.prefixes.PrefixesStorageService;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static fr.gouv.clea.config.BatchConstants.*;
@Configuration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private RiskConfigurationService riskConfigurationService;
@Autowired
private BatchProperties properties;
@Autowired
private DataSource dataSource;
@Bean
public Job identificationPartitionedJob(Step identificationPartitionedMasterStep) {
return this.jobBuilderFactory.get("partitionedJob")
.incrementer(new RunIdIncrementer())
.start(identificationPartitionedMasterStep)
.next(clusterIndexation())
.build();
}
@Bean
public Step identificationPartitionedMasterStep(PartitionHandler partitionHandler) {
return this.stepBuilderFactory.get("identification-partitioned-step-master")
.partitioner("partitioner", partitioner())
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Step clusterIndexation() {
return stepBuilderFactory.get("clusterIndexation")
.<List<SinglePlaceCluster>, HashMap<String, ClusterFile>> chunk(1)
.reader(new IndexationReader())
.processor(new IndexationProcessor(properties))
.writer(new IndexationWriter(properties))
.build();
}
@Bean
public Step identificationStepWorker(ExposedVisitItemReader exposedVisitItemReader) {
final SinglePlaceExposedVisitItemReader reader = new SinglePlaceExposedVisitItemReader();
reader.setDelegate(exposedVisitItemReader);
return stepBuilderFactory.get("identification-step-worker")
.listener(promotionListener())
.<SinglePlaceExposedVisits, SinglePlaceCluster>chunk(properties.getChunkSize())
.reader(reader)
.processor(new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService))
.writer(new SinglePlaceExposedVisitsItemWriter())
.build();
}
@Bean
public TaskExecutorPartitionHandler partitionHandler(TaskExecutor taskExecutor, Step identificationStepWorker) {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setGridSize(properties.getGridSize());
partitionHandler.setStep(identificationStepWorker);
partitionHandler.setTaskExecutor(taskExecutor());
return partitionHandler;
}
@Bean
public ExposedVisitPartitioner partitioner() {
return new ExposedVisitPartitioner(this.dataSource);
}
@Bean
@StepScope
public ExposedVisitItemReader exposedVisitItemReader(PagingQueryProvider pagingQueryProvider,
@Value("#{stepExecutionContext['ltid']}") UUID ltid) {
return new ExposedVisitItemReader(this.dataSource, pagingQueryProvider, Map.of("ltid", ltid));
}
@Bean
@StepScope
public SqlPagingQueryProviderFactoryBean pagingQueryProvider(@Value("#{stepExecutionContext['ltid']}") UUID ltid) {
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("*");
factoryBean.setFromClause(EXPOSED_VISITS_TABLE);
factoryBean.setWhereClause(LTID_COLUMN + " = :ltid");
factoryBean.setSortKeys(Map.of(PERIOD_COLUMN, Order.ASCENDING, TIMESLOT_COLUMN,Order.ASCENDING));
return factoryBean;
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] { CLUSTERMAP_JOB_CONTEXT_KEY });
return listener;
}
@Bean
TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("batch-ident");
}
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private RiskConfigurationService riskConfigurationService;
@Autowired
private PrefixesStorageService prefixesStorageService;
@Autowired
private BatchProperties properties;
@Autowired
private DataSource dataSource;
@Autowired
private SinglePlaceClusterPeriodMapper mapper;
@Bean
public Job identificationJob() {
// @formatter:off
return this.jobBuilderFactory.get("identificationJob")
.incrementer(new RunIdIncrementer())
.start(clusterIdentification())
.next(prefixesComputing())
.next(clusterIndexation())
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<String> ltidDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setVerifyCursorPosition(false);
reader.setDataSource(dataSource);
reader.setSql("select distinct " + LTID_COLUMN + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COLUMN);
reader.setRowMapper((rs, i) -> rs.getString(1));
return reader;
}
/**
* Agregates elements from delegate to a list and returns it all in one
*
* @return list of ltids as strings
*/
@Bean
@StepScope
public ItemReader<List<String>> ltidListDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("select distinct " + LTID_COLUMN + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + "ORDER BY " + LTID_COLUMN);
reader.setRowMapper((rs, i) -> rs.getString(1));
return new ListItemReader(reader);
}
// =================
// Cluster identification & indexation steps
// =================
@Bean
public Step clusterIdentification() {
CompositeItemProcessor<String, List<SinglePlaceClusterPeriod>> compositeProcessor = new CompositeItemProcessor<>();
compositeProcessor.setDelegates(List.of(
exposedVisitBuilder(), // from String to ExposedVisit
singleClusterPlaceBuilder(), // from ExposedVisit to SingleClusterPlace
singlePlaceClusterPeriodListBuilder()) // from SingleClusterPlace to List<SinglePlaceClusterPeriods>
);
return stepBuilderFactory.get("identification")
.<String, List<SinglePlaceClusterPeriod>>chunk(1000)
.reader(ltidDBReader())
.processor(compositeProcessor)
.writer(new SinglePlaceClusterPeriodListWriter(mapper, dataSource))
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Step prefixesComputing() {
return stepBuilderFactory.get("prefixes")
.<List<String>, List<String>>chunk(1)
.reader(ltidListDBReader())
.processor(new PrefixesComputingProcessor(properties))
.writer(new PrefixesMemoryWriter(prefixesStorageService))
.taskExecutor(taskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Step clusterIndexation() {
return stepBuilderFactory.get("indexation")
.<String, ClusterFileItem>chunk(1)
.reader(new PrefixesMemoryReader())
.processor(new SinglePlaceClusterBuilder(dataSource, mapper)) // build a Map of ClusterFile at once
.writer(new IndexationWriter(properties)) // build Files and index
.build();
}
@Bean
TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("batch-ident");
}
@StepScope
public ItemProcessor<String, SinglePlaceExposedVisits> exposedVisitBuilder() {
return new SinglePlaceExposedVisitsBuilder(dataSource);
}
@Bean
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
}
@Bean
public ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> singlePlaceClusterPeriodListBuilder() {
return new ClusterToPeriodsProcessor(mapper);
}
}
......@@ -6,6 +6,8 @@ import lombok.experimental.UtilityClass;
public class BatchConstants {
public static final String EXPOSED_VISITS_TABLE = "exposed_visits";
public static final String SINGLE_PLACE_CLUSTER_PERIOD_TABLE = "cluster_periods";
public static final String SINGLE_PLACE_CLUSTER_TABLE = "cluster_periods";
public static final String LTID_COLUMN = "ltid";
public static final String PERIOD_COLUMN = "period_start";
public static final String TIMESLOT_COLUMN = "timeslot";
......
......@@ -19,14 +19,10 @@ public class ClusterPeriod {
private int firstTimeSlot;
private int lastTimeSlot;
@JsonProperty("s")
private long clusterStart;
@JsonProperty("d")
private int clusterDurationInSeconds;
@JsonProperty("r")
private float riskLevel;
public void adjustLimit(final ExposedVisit v) {
......
......@@ -22,7 +22,7 @@ public class SinglePlaceCluster {
@Builder.Default
private List<ClusterPeriod> periods = new ArrayList<>();
public static SinglePlaceCluster initialise(SinglePlaceExposedVisits record) {
public static SinglePlaceCluster initialize(SinglePlaceExposedVisits record) {
return SinglePlaceCluster.builder()
.locationTemporaryPublicId(record.getLocationTemporaryPublicId())
.venueType(record.getVenueType())
......
package fr.gouv.clea.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.UUID;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SinglePlaceClusterPeriod {
private UUID locationTemporaryPublicId;
private int venueType;
private int venueCategory1;
private int venueCategory2;
private long periodStart;
private int firstTimeSlot;
private int lastTimeSlot;
private long clusterStart;
private int clusterDurationInSeconds;
private float riskLevel;
}
package fr.gouv.clea.identification;
import fr.gouv.clea.dto.SinglePlaceCluster;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import org.springframework.batch.item.ItemProcessor;
import java.util.List;
import java.util.stream.Collectors;
public class ClusterToPeriodsProcessor implements ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> {
private final SinglePlaceClusterPeriodMapper mapper;
public ClusterToPeriodsProcessor(SinglePlaceClusterPeriodMapper mapper) {
this.mapper = mapper;
}
@Override
public List<SinglePlaceClusterPeriod> process(SinglePlaceCluster cluster) {
return cluster.getPeriods().stream()
.map(period -> mapper.map(cluster, period))
.collect(Collectors.toUnmodifiableList());
}
}
package fr.gouv.clea.identification;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import fr.gouv.clea.dto.SinglePlaceCluster;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import javax.sql.DataSource;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
@Slf4j
public class SinglePlaceClusterPeriodListWriter extends JdbcBatchItemWriter<List<SinglePlaceClusterPeriod>> {
private final SinglePlaceClusterPeriodMapper mapper;
private final JdbcOperations jdbcTemplate;
public SinglePlaceClusterPeriodListWriter(SinglePlaceClusterPeriodMapper mapper, DataSource datasource) {
this.mapper = mapper;
this.jdbcTemplate = new JdbcTemplate(datasource);
}
@Override
public void write(List<? extends List<SinglePlaceClusterPeriod>> list) {
list.get(0).forEach(singlePlaceClusterPeriod -> {
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
});
}
private String getInsertSql() {
return "insert into " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE +
// column names
" (" +
"ltid, " +
"venue_type, " +
"venue_category1, " +
"venue_category2, " +
"period_start, " +
"first_time_slot, " +
"last_time_slot, " +
"cluster_start, " +
"cluster_duration_in_seconds, " +
"risk_level" +
")" +
" values " +
// values as parameters from SinglePlaceClusterPeriod attributes
"(" +
":locationTemporaryPublicId, " +
":venueType, " +
":venueCategory1, " +
":venueCategory2, " +
":periodStart, " +
":firstTimeSlot, " +
":lastTimeSlot, " +
":clusterStart, " +
":clusterDurationInSeconds, " +
":riskLevel)";
}
}
package fr.gouv.clea.identification;
import fr.gouv.clea.dto.SinglePlaceExposedVisits;
import fr.gouv.clea.entity.ExposedVisit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static fr.gouv.clea.config.BatchConstants.*;
/**
* This class is executing in many Threads
*/
@Slf4j
@StepScope
public class SinglePlaceExposedVisitsBuilder implements ItemProcessor<String, SinglePlaceExposedVisits> {
JdbcTemplate jdbcTemplate;
AtomicLong counter = new AtomicLong();
public SinglePlaceExposedVisitsBuilder(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public SinglePlaceExposedVisits process(final String ltid) {
final List<ExposedVisit> list = jdbcTemplate.query("select * from " + EXPOSED_VISITS_TABLE
+ " WHERE ltid= ? ORDER BY " + PERIOD_COLUMN + ", " + TIMESLOT_COLUMN,
new ExposedVisitRowMapper(), UUID.fromString(ltid));
ExposedVisit v = list.stream().findFirst().orElse(null);
if (null != v) {
long ln = counter.incrementAndGet();
if (0 == ln % 1000) {
log.info("Loaded {} visits, current LTId={} ", ln, ltid);
}
return SinglePlaceExposedVisits.builder()
.locationTemporaryPublicId(v.getLocationTemporaryPublicId())
.venueType(v.getVenueType()).venueCategory1(v.getVenueCategory1())
.venueCategory2(v.getVenueCategory2()).visits(list).build();
}
return null;
}
@AfterStep
public ExitStatus afterStep(ExecutionContext ctx) {
log.info("building {} SinglePlaceExposedVisits", counter.get());
return ExitStatus.COMPLETED;
}
}
package fr.gouv.clea.identification;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ItemWriter;