update

parent 5ae20bc8
......@@ -5,6 +5,8 @@ LABEL maintainer="matthieu.simonin@inria.fr"
RUN mkdir -p /opt/prog
WORKDIR /opt/prog
RUN apt install -y git
COPY . .
RUN chmod +x main
RUN pip3 install -r requirements.txt
......
......@@ -4,9 +4,9 @@
```
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic ais.dynamic
cat messages.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ais.dynamic --property parse.key=true --property key.separator=\|
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ais.dynamic --property print.key=true --from-beginning
python ~/workspace/repos/platform/libs/alert01/messages.py 5677 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ais.dynamic --property parse.key=true --property key.separator=\|
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ais.dynamic --property print.key=true --from-beginning
docker run --network host -ti msimonin/sesame_alert01 --bootstrap_servers localhost:9092 5677
```
......@@ -41,7 +41,7 @@ if __name__ == "__main__":
args = parser.parse_args()
print(args)
geometry = load_geometry("data/{}.xml".format(args.mrgid))
geometry = load_geometry("data/{}.xml".format(args.mrgid), only_exteriors=True)
geom = geometry["geometry"]
app = faust.App(
......@@ -49,21 +49,21 @@ if __name__ == "__main__":
broker = "kafka://{}".format(",".join(args.bootstrap_servers))
)
topic = app.topic("ais.dynamic", value_type=DynamicMessage)
out = app.topic("ais.alert")
out_topic = app.topic("ais.alert")
async def debug(msg):
print(msg)
@app.agent(topic)
@app.agent(topic, sink=[out_topic, debug])
async def france(msgs):
async for msg in msgs:
d = distance(msg.y, msg.x, geom)
if d <= 0.0:
print("Inside the eez")
print(msg.y, msg.x)
_msg = msg.__dict__
_msg.update({"app_name": APP_NAME, "desc": args.mrgid})
await out.send(key=args.mrgid, value=_msg)
else:
pass
yield _msg
app.finalize()
worker = faust.Worker(app, loglevel="INFO")
worker.execute_from_commandline()
123|{"mmsi": 123, "tagblock_timestamp":123, "true_heading": 123, "sog": 123, "x":21.0, "y":21.0}
123|{"mmsi": 123, "tagblock_timestamp":123, "true_heading": 123, "sog": 123, "x":47.4, "y":-5.71}
123|{"mmsi": 123, "tagblock_timestamp": 123, "timestamp": 123, "true_heading": 123, "sog": 123, "y":21.0, "x":21.0}
123|{"mmsi": 123, "tagblock_timestamp": 123, "timestamp": 123, "true_heading": 123, "sog": 123, "y":47.4, "x":-5.71}
#!/usr/bin/env python
from utils import load_geometry
import json
import numpy as np
import os
import time
import sys
HERE = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
if __name__ == "__main__":
file_path = os.path.join(HERE, "data/{}.xml".format(sys.argv[1]))
# generate messages that can be consumed by kafka producer
# e.g:
geometry = load_geometry(file_path)
geom = geometry["geometry"]
x1, x2, y1, y2 = geom.GetEnvelope()
# Make data.
X = np.linspace(x1, x2, 100)
Y = np.linspace(y1, y2, 100)
current = 0.0
timestamp = 1488326400
current = time.time()
for idx, x in enumerate(X):
for y in Y:
msg = {"mmsi": str(current),
# Ais message have this weirdness to invert x and y
"x": y,
"y": x,
"tagblock_timestamp": timestamp,
"timestamp": timestamp * 1000,
"sog": current,
"true_heading": current,
}
current = current + 1
print("{}|{}".format(int(current), json.dumps(msg)))
time.sleep(0)
faust==1.0.30
-e git+https://github.com/robinhood/faust#egg=master
......@@ -39,7 +39,7 @@ def update_not_none(geometry, key, value):
geometry.update({key: value.text})
def load_geometry(gml_path):
def load_geometry(gml_path, only_exteriors=False):
"""
Load the Geometry from a gml file.
......@@ -54,7 +54,7 @@ def load_geometry(gml_path):
tree = ET.parse(gml_path)
root = tree.getroot()
multisurface = root.findall(".//{http://www.opengis.net/gml}MultiSurface")
name = root.find(".//{geo.vliz.be/MarineRegions}geoname")
mrgid = root.find(".//{geo.vliz.be/MarineRegions}mrgid")
......@@ -63,14 +63,31 @@ def load_geometry(gml_path):
update_not_none(geometry, "name", name)
update_not_none(geometry, "mrgid", mrgid)
multisurface = root.findall(".//{http://www.opengis.net/gml}MultiSurface")
if len(multisurface) != 1:
logging.error("[%s] We found %s multisurface" % (gml_path, len(multisurface)))
return None
geometry.update({"type": "full"})
if only_exteriors:
# we remove all the interiors polygons first
polygons = root.findall(".//{http://www.opengis.net/gml}Polygon")
for p in polygons:
interiors = p.findall(".//{http://www.opengis.net/gml}interior")
for interior in interiors:
p.remove(interior)
geometry.update({"type": "only_exteriors"})
geom = ogr.CreateGeometryFromGML(ET.tostring(multisurface[0], encoding="unicode"))
geometry.update({
"geometry": geom
})
# NOTE(msimonin): we treat the first levels differently as it contains all
# the exteriors.
for idx in range(geom.GetGeometryCount()):
......@@ -81,10 +98,9 @@ def load_geometry(gml_path):
def plot_zone(geometry):
import matplotlib.pyplot as plt
""" Draw lines corresponding to the zone. The exterior lines are plotted
first, the number of interior lines are limited to LIMIT_INTERIOR """
import matplotlib.pyplot as plt
geom = geometry["geometry"]
for lines in geometry["points"]:
......
......@@ -32,9 +32,12 @@ PROPERTIES = {
"x": {"type": "double"},
"y": {"type": "double"},
"tagblock_timestamp": {"type": "text"},
# Set by the alert op
"timestamp": {"type": "date"},
"app_name": {"type": "keyword"},
"desc": {"type": "keyword"},
# generated ones
"location": {"type": "geo_point"},
"timestamp": {"type": "date"},
}
BODY = {
......
......@@ -41,11 +41,15 @@ if __name__ == "__main__":
help="Kafka bootstrap servers",
action="append",
required=True)
parser.add_argument("-p", "--pause", help="Limit throughput by pausing between two transmissions",
type=float, default=0.0)
parser.add_argument("glob", type=str,
help="input files to parse in a glob format")
args = parser.parse_args()
print(args)
print(args)
bootstrap_servers = args.bootstrap_servers
source_glob = args.glob
......@@ -76,14 +80,18 @@ if __name__ == "__main__":
"timestamp": 1000 * _msg["tagblock_timestamp"]
})
producer.send(TOPIC_DYNAMIC, value=_msg, key=str(msg.get("mmsi", -1)))
if args.pause:
time.sleep(args.pause)
# We ignore all the other message types
count += 1
# in seconds
current_time = time.time()
elapsed = current_time - start_time
total = len(files) * elapsed / file_count
print("Total records : %s" % count)
print("Estimated total time : %s" % total)
print("Estimated remaining time : %s" % (total - elapsed))
print("Total records: %s" % count)
print("Estimated total time: %s" % total)
print("Estimated remaining time: %s" % (total - elapsed))
print("Average Message rate: %s" % (float(count)/elapsed))
producer.flush()
# make sure to flush the buffered message on the wire
# before proceeding
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment