Commit 360b9dc7 authored by Bergamote Orange's avatar Bergamote Orange
Browse files

identification & prefix computation steps fix

parent d80b6849
......@@ -60,7 +60,6 @@ public class BatchConfig {
@Bean
public Job identificationJob() {
// @formatter:off
return this.jobBuilderFactory.get("identificationJob")
.incrementer(new RunIdIncrementer())
.start(clusterIdentification())
......@@ -75,7 +74,7 @@ public class BatchConfig {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setVerifyCursorPosition(false);
reader.setDataSource(dataSource);
reader.setSql("select distinct " + LTID_COLUMN + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COLUMN);
reader.setSql("select distinct " + LTID_COL + " from " + EXPOSED_VISITS_TABLE + " order by " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return reader;
}
......@@ -86,12 +85,11 @@ public class BatchConfig {
* @return list of ltids as strings
*/
@Bean
@StepScope
public ItemReader<List<String>> ltidListDBReader() {
JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setVerifyCursorPosition(false);
reader.setSql("select distinct " + LTID_COLUMN + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + "ORDER BY " + LTID_COLUMN);
reader.setSql("select distinct " + LTID_COL + " from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE + " ORDER BY " + LTID_COL);
reader.setRowMapper((rs, i) -> rs.getString(1));
return new ListItemReader(reader);
}
......@@ -146,16 +144,18 @@ public class BatchConfig {
return new SimpleAsyncTaskExecutor("batch-ident");
}
@StepScope
@Bean
public ItemProcessor<String, SinglePlaceExposedVisits> exposedVisitBuilder() {
return new SinglePlaceExposedVisitsBuilder(dataSource);
}
@Bean
@StepScope
public ItemProcessor<SinglePlaceExposedVisits, SinglePlaceCluster> singleClusterPlaceBuilder() {
return new SinglePlaceExposedVisitsProcessor(properties, riskConfigurationService);
}
@Bean
@StepScope
public ItemProcessor<SinglePlaceCluster, List<SinglePlaceClusterPeriod>> singlePlaceClusterPeriodListBuilder() {
return new ClusterToPeriodsProcessor(mapper);
}
......
......@@ -8,10 +8,21 @@ public class BatchConstants {
public static final String EXPOSED_VISITS_TABLE = "exposed_visits";
public static final String SINGLE_PLACE_CLUSTER_PERIOD_TABLE = "cluster_periods";
public static final String SINGLE_PLACE_CLUSTER_TABLE = "cluster_periods";
public static final String LTID_COLUMN = "ltid";
public static final String PERIOD_COLUMN = "period_start";
public static final String TIMESLOT_COLUMN = "timeslot";
public static final String LTID_PARAM = "ltid";
public static final String CLUSTERMAP_JOB_CONTEXT_KEY = "clusterMap";
public static final String LTID_COL = "ltid";
public static final String VENUE_TYPE_COL = "venue_type";
public static final String VENUE_CAT1_COL = "venue_category1";
public static final String VENUE_CAT2_COL = "venue_category2";
public static final String PERIOD_START_COL = "period_start";
public static final String FIRST_TIMESLOT_COL = "first_timeslot";
public static final String LAST_TIMESLOT_COL = "last_timeslot";
public static final String CLUSTER_START_COL = "cluster_start";
public static final String CLUSTER_DURATION_COL = "cluster_duration_in_seconds";
public static final String RISK_LEVEL_COL = "risk_level";
}
......@@ -26,7 +26,7 @@ public class ExposedVisitPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
final String selectDistinctLtidRequest = "SELECT DISTINCT " + LTID_COLUMN + " FROM " + EXPOSED_VISITS_TABLE;
final String selectDistinctLtidRequest = "SELECT DISTINCT " + LTID_COL + " FROM " + EXPOSED_VISITS_TABLE;
final List<String> visitedPlaces = jdbcTemplate.queryForList(selectDistinctLtidRequest, String.class);
Map<String, ExecutionContext> result = new HashMap<>();
......
package fr.gouv.clea.identification;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import fr.gouv.clea.dto.SinglePlaceClusterPeriod;
import fr.gouv.clea.mapper.SinglePlaceClusterPeriodMapper;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import fr.gouv.clea.dto.SinglePlaceCluster;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import javax.sql.DataSource;
import java.util.List;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
import static fr.gouv.clea.config.BatchConstants.*;
@Slf4j
public class SinglePlaceClusterPeriodListWriter extends JdbcBatchItemWriter<List<SinglePlaceClusterPeriod>> {
private final SinglePlaceClusterPeriodMapper mapper;
private final JdbcOperations jdbcTemplate;
private final NamedParameterJdbcOperations jdbcTemplate;
public SinglePlaceClusterPeriodListWriter(SinglePlaceClusterPeriodMapper mapper, DataSource datasource) {
this.mapper = mapper;
this.jdbcTemplate = new JdbcTemplate(datasource);
this.jdbcTemplate = new NamedParameterJdbcTemplate(datasource);
}
@Override
public void write(List<? extends List<SinglePlaceClusterPeriod>> list) {
list.get(0).forEach(singlePlaceClusterPeriod -> {
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
});
final BeanPropertySqlParameterSource parameterSource = new BeanPropertySqlParameterSource(singlePlaceClusterPeriod);
jdbcTemplate.update(getInsertSql(), parameterSource);
});
}
private String getInsertSql() {
return "insert into " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE +
// column names
" (" +
"ltid, " +
"venue_type, " +
"venue_category1, " +
"venue_category2, " +
"period_start, " +
"first_time_slot, " +
"last_time_slot, " +
"cluster_start, " +
"cluster_duration_in_seconds, " +
"risk_level" +
String.join(", ", LTID_COL,
VENUE_TYPE_COL,
VENUE_CAT1_COL,
VENUE_CAT2_COL,
PERIOD_START_COL,
FIRST_TIMESLOT_COL,
LAST_TIMESLOT_COL,
CLUSTER_START_COL,
CLUSTER_DURATION_COL,
RISK_LEVEL_COL) +
")" +
" values " +
// values as parameters from SinglePlaceClusterPeriod attributes
"(" +
":locationTemporaryPublicId, " +
":venueType, " +
":venueCategory1, " +
":venueCategory2, " +
":periodStart, " +
":firstTimeSlot, " +
":lastTimeSlot, " +
":clusterStart, " +
":clusterDurationInSeconds, " +
":riskLevel)";
String.join(", ",
":locationTemporaryPublicId",
":venueType", ":venueCategory1",
":venueCategory2",
":periodStart",
":firstTimeSlot",
":lastTimeSlot",
":clusterStart",
":clusterDurationInSeconds",
":riskLevel")
+ ")";
}
}
......@@ -16,7 +16,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static fr.gouv.clea.config.BatchConstants.LTID_COLUMN;
import static fr.gouv.clea.config.BatchConstants.LTID_COL;
import static fr.gouv.clea.config.BatchConstants.SINGLE_PLACE_CLUSTER_PERIOD_TABLE;
@Slf4j
......@@ -35,7 +35,7 @@ public class SinglePlaceClusterBuilder implements ItemProcessor<String, ClusterF
@Override
public ClusterFileItem process(final String ltid) {
final List<SinglePlaceClusterPeriod> clusterPeriodList = jdbcTemplate.query("select * from " + SINGLE_PLACE_CLUSTER_PERIOD_TABLE
+ " WHERE ltid= ? ORDER BY " + LTID_COLUMN,
+ " WHERE ltid= ? ORDER BY " + LTID_COL,
new SinglePlaceClusterPeriodRowMapper(), UUID.fromString(ltid));
SinglePlaceClusterPeriod singlePlaceClusterPeriod = clusterPeriodList.stream().findFirst().orElse(null);
if (null != singlePlaceClusterPeriod) {
......
package fr.gouv.clea.prefixes;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
......@@ -10,6 +11,7 @@ import org.springframework.batch.item.database.JdbcCursorItemReader;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
public class ListItemReader implements ItemReader<List<String>>, ItemStream {
......@@ -23,7 +25,8 @@ public class ListItemReader implements ItemReader<List<String>>, ItemStream {
for (String clusterLtid; (clusterLtid = this.delegate.read()) != null; ) {
ltidList.add(clusterLtid);
}
return ltidList;
//null return means all input data has been read, and forwards execution to processor
return !ltidList.isEmpty() ? ltidList : null;
}
/**
......
......@@ -16,7 +16,7 @@ import java.util.List;
import java.util.Map;
import static fr.gouv.clea.config.BatchConstants.EXPOSED_VISITS_TABLE;
import static fr.gouv.clea.config.BatchConstants.LTID_COLUMN;
import static fr.gouv.clea.config.BatchConstants.LTID_COL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
......@@ -42,7 +42,7 @@ class ExposedVisitPartitionerTest {
@Test
void partition_returns_map_with_as_much_execution_contexts_as_LTids_returned_by_db() {
final String expectedRequest = "SELECT DISTINCT " + LTID_COLUMN + " FROM " + EXPOSED_VISITS_TABLE;
final String expectedRequest = "SELECT DISTINCT " + LTID_COL + " FROM " + EXPOSED_VISITS_TABLE;
when(jdbcTemplate.queryForList(eq(expectedRequest), eq(String.class))).thenReturn(List.of("lTid1", "lTid1", "lTid2", "lTid3", "lTid3", "lTid3"));
final Map<String, ExecutionContext> result = partitioner.partition(4);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment