update to sesamelib 0.0.22 (common decode)

parent e52c5099
......@@ -150,7 +150,7 @@ def splitTrajectories(max_seconds_gap, mmsi_msgs):
def removeDuplicate(mmsi_msgs):
"""Remove consecutive duplicated message if the position is the same.
>>> import pprint as pp
>>> pp.pprint(removeDuplicate((1, [{"x": 1, "y":1}])))
(1, [{'x': 1, 'y': 1}])
......
This diff is collapsed.
......@@ -61,7 +61,7 @@ if __name__ == "__main__":
for geometry in geometries:
d = geometry.distance(x, y)
if d <= 0.0:
print(msg)
print("%s[%s] -> %s" % (geometry.name, geometry.mrgid, msg))
await out_topic.send(key=str(msg.mmsi), value=msg)
app.finalize()
......
......@@ -31,7 +31,8 @@ services:
ingestion:
image: $NAMESPACE/ingestion:$TARGET_REF
command: --bootstrap_servers kafka:9092 "/bigdata/groups/sesame/orbcomm/aivdm/2017/03/01/*.nm4"
#command: --bootstrap_servers kafka:9092 "/bigdata/groups/sesame/orbcomm/aivdm/2017/03/01/*.nm4"
command: --bootstrap_servers kafka:9092 "/bigdata/groups/sesame/ais_britany/raw/2011/07/08/*" brittany
container_name: ingestion
# don':t restart otherwise messages will be indexed several times...
restart: on-failure
......
......@@ -5,13 +5,16 @@ Ingest data to Kafka.
Input: files made of binary ais
Output: ais in json serialized form in ais topic
"""
import ais.stream
import argparse
import json
import glob
from kafka import KafkaProducer
import logging
import time
from kafka import KafkaProducer
import sesamelib.ais_utils as ais_utils
TOPIC_STATIC = "ais.static"
TOPIC_DYNAMIC = "ais.dynamic"
......@@ -47,11 +50,18 @@ if __name__ == "__main__":
parser.add_argument("glob", type=str,
help="input files to parse in a glob format")
parser.add_argument("ais_type", type=str,
choices=[ais_utils.GLOBAL, ais_utils.BRITTANY],
default=ais_utils.GLOBAL,
help="ais type")
args = parser.parse_args()
print(args)
bootstrap_servers = args.bootstrap_servers
source_glob = args.glob
ais_type = args.ais_type
producer = KafkaProducer(
key_serializer=lambda k: str(k).encode("utf-8"),
......@@ -68,7 +78,11 @@ if __name__ == "__main__":
msgs = []
with open(g) as f:
print("Parsing %s", g)
for msg in ais.stream.decode(f):
for line in f:
msg = ais_utils.decode(line, ais_type=ais_type)
if not msg:
logging.error("msg can't be decoded: %s" % line)
continue
t = ""
if msg.get("destination"):
_msg = clean(msg, STATIC_KEY)
......@@ -76,6 +90,7 @@ if __name__ == "__main__":
elif msg.get("x") and msg.get("y"):
_msg = clean(msg, DYNAMIC_KEY)
_msg.update(fix_pos(msg))
print("sending %s" % _msg)
producer.send(TOPIC_DYNAMIC, value=_msg, key=str(msg.get("mmsi", "null")))
if args.pause:
time.sleep(args.pause)
......
libais
kafka-python
sesamelib >= 0.0.22
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment