remove threadpool executor

parent 4a38d32b
......@@ -4,9 +4,6 @@ Mutate the original AIS with some minimal pre-computed informations.
"""
import argparse
import asyncio
import functools
import concurrent.futures
import glob
import logging
import pickle
......@@ -43,13 +40,8 @@ if __name__ == "__main__":
logging.info("Loaded %s geometries" % len(geometries))
loop = asyncio.get_event_loop()
pool = concurrent.futures.ThreadPoolExecutor()
app = faust.App(
APP_NAME % args.cell_size,
loop=loop,
broker="kafka://{}".format(",".join(args.bootstrap_servers))
)
......@@ -67,18 +59,12 @@ if __name__ == "__main__":
x = msg.y
y = msg.x
for geometry in geometries:
d = await loop.run_in_executor(pool,
functools.partial(distance,
x,
y,
geometry))
d = geometry.distance(x, y)
if d <= 0.0:
print(msg)
await out_topic.send(key=str(msg.mmsi), value=msg)
app.finalize()
worker = faust.Worker(app,
loglevel="INFO",
loop=loop)
loglevel="INFO")
worker.execute_from_commandline()
pool.shutdown()
......@@ -93,6 +93,8 @@ services:
command: --bootstrap_servers kafka:9092 --cell_size 0.25
container_name: ca1-015-csquares
restart: always
ports:
- 6067:6066
build:
context: ca1-015-csquares
args:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment