introduce per zone c-square indexing

parent 2ea96804
ARG NAMESPACE
ARG TARGET_REF
FROM ${NAMESPACE}/base_gdal:${TARGET_REF}
LABEL maintainer="matthieu.simonin@inria.fr"
RUN mkdir -p /opt/prog
WORKDIR /opt/prog
COPY . .
RUN chmod +x main
RUN pip3 install -r requirements.txt
RUN pip3 install robinhood-aiokafka==0.4.19
ENTRYPOINT ["/entrypoint.sh", "./main"]
# Useful commands
-- adapt them to your environment
```
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic ais.dynamic
python ~/workspace/repos/platform/libs/alert01/messages.py 5677 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ais.dynamic --property parse.key=true --property key.separator=\|
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ais.dynamic --property print.key=true --from-beginning
docker run --network host -ti msimonin/sesame_alert01-all --bootstrap_servers localhost:9092
```
This diff is collapsed.
#!/usr/bin/env bash
python3 main.py "$@"
#!/usr/bin/env python
"""
Mutate the original AIS with some minimal pre-computed informations.
"""
import argparse
import asyncio
import functools
import concurrent.futures
import glob
import logging
import math
import pickle
import sys
from sesamelib.geometry import IndexedGeometry, distance
from sesamelib.faust import BaseDynamicMessage
from utils import _mutate
import faust
APP_NAME = "mutation"
if __name__ == "__main__":
parser = argparse.ArgumentParser("Mutate the ais dynamic signal")
parser.add_argument("--bootstrap_servers", "-b",
help="Kafka bootstrap servers",
action="append",
required=True)
args = parser.parse_args()
files = glob.glob("data/*.pickle")
# will hold all the indexed geometries
geometries = []
for f in files:
with open(f, "br") as f:
geometry = pickle.load(f)
geometries.append(geometry)
loop = asyncio.get_event_loop()
pool = concurrent.futures.ThreadPoolExecutor()
app = faust.App(
APP_NAME,
loop=loop,
broker="kafka://{}".format(",".join(args.bootstrap_servers))
)
topic = app.topic("ais.dynamic",
key_type=str,
value_type=BaseDynamicMessage)
pmsgs = {}
@app.agent(topic)
async def ca1_15(msgs):
async for msg in msgs:
# because of AIS, or, me, maybe
x = msg.y
y = msg.x
for geometry in geometries:
d = await loop.run_in_executor(pool,
functools.partial(distance,
x,
y,
geometry))
if d <= 0.0:
# Let's do something simple for now: outputing nowhere
print("ca1_15 : in zone %s / %s" % (geometry.mrgid, msg))
app.finalize()
worker = faust.Worker(app,
loglevel="INFO",
loop=loop)
worker.execute_from_commandline()
pool.shutdown()
123|{"mmsi": 123, "tagblock_timestamp": 123, "timestamp": 123, "true_heading": 123, "sog": 123, "y":21.0, "x":21.0}
123|{"mmsi": 123, "tagblock_timestamp": 123, "timestamp": 123, "true_heading": 123, "sog": 123, "y":47.4, "x":-5.71}
#!/usr/bin/env python
from sesamelib.geometry import SesameGeometry, ENVELOPE_SINGLE
import json
import numpy as np
import os
import time
import sys
HERE = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
if __name__ == "__main__":
file_path = os.path.join(HERE, "data/{}.xml".format(sys.argv[1]))
# generate messages that can be consumed by kafka producer
# e.g:
geometry = SesameGeometry.from_gml_file(file_path, envelope=ENVELOPE_SINGLE)
x1, x2, y1, y2 = geometry.get_envelope()
# Make data.
X = np.linspace(x1, x2, 100)
Y = np.linspace(y1, y2, 100)
current = 0.0
timestamp = 1488326400
for idx, x in enumerate(X):
for y in Y:
current = time.time()
msg = {"mmsi": "1234",
# Ais message have this weirdness to invert x and y
"x": y,
"y": x,
"tagblock_timestamp": current,
"sog": current,
"true_heading": current,
}
current = current + 1
print("{}|{}".format(int(current), json.dumps(msg)))
time.sleep(0)
faust >= 1.4.2, <1.5
sesamelib
......@@ -59,6 +59,8 @@ services:
NAMESPACE: $NAMESPACE
depends_on:
- base_gdal
- kafka
- zookeeper
indexion:
image: $NAMESPACE/indexion:$TARGET_REF
......@@ -86,19 +88,18 @@ services:
depends_on:
- base_gdal
ca3-015:
image: $NAMESPACE/ca3-015:$TARGET_REF
ca1-015-csquare:
image: $NAMESPACE/ca1-015:$TARGET_REF
command: --bootstrap_servers kafka:9092
container_name: ca3-015
container_name: ca1-015-csquare
restart: always
build:
context: ca3-015
context: ca1-015-csquare
args:
TARGET_REF: $TARGET_REF
NAMESPACE: $NAMESPACE
depends_on:
- base_gdal
############################
# Third party
############################
......
......@@ -5,9 +5,6 @@ Mutate the original AIS with some minimal pre-computed informations.
import argparse
import asyncio
import collections
import copy
from datetime import datetime
import functools
import concurrent.futures
import logging
......@@ -17,7 +14,8 @@ import sys
from sesamelib.geometry import distance
from sesamelib.faust import MutatedDynamicMessage
from .util import _mutate
from utils import _mutate
import faust
......@@ -91,8 +89,8 @@ if __name__ == "__main__":
zone_distance = zone_distance
zone_distance = d
mtm = _mutate(msg, pmsgs, zone_mrgid, zone_distance)
mtm = _mutate(msg=msg, pmsgs=pmsgs, grid_x=x1, grid_y=x2,
zone_mrgid=zone_mrgid, zone_distance=zone_distance)
# finally emit the mutated message
# NOTE(msimonin): Surprinsingly we need to force the type to str
# here
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment