Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
151 views
in Technique[技术] by (71.8m points)

apache kafka - How to find if two objects have been close for a certain amount of time

I'm new with ksqlDB and trying to get my head around the difference between the different types of windowing. Currently I'm trying to populate a table for the following:

If two objects have been closer than 1 km, having a speed of less than 2, for longer than one hour.

I have two streams, vessels and reefers. Both these streams are continuously updated with new positional data.

Currently my query looks like this:

SELECT VESSELS.ID, LATEST_BY_OFFSET(VESSELS.NAME)
  FROM vessels
  INNER JOIN REEFERS
  WITHIN 3 HOURS
  ON reefers.id_2 = vessels.id_2
  WINDOW HOPPING (size 1 HOURS, advance by 15 MINUTES)
  WHERE GEO_DISTANCE(reefers.LAT, reefers.LON, vessels.LAT, vessels.LON, 'KM') < 1 AND vessels.speed < 2.0 AND vessels.id != reefer.id
  GROUP BY VESSELS.ID

Source data for vessels:

|ID           |NAME        |LAT      |LON       |SPEED  |ID_2   |
|257184340    |KRYSSHOLM   |70.6807  |21.6932   |0.8    |1      |

The reefers stream has the same structure

As the final result I'm only interested in a list with vessels that fulfils the criteria. As of now this query gives me a long list with every datapoint (positional update).

Regards Lars

question from:https://stackoverflow.com/questions/65933728/how-to-find-if-two-objects-have-been-close-for-a-certain-amount-of-time

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
-- Source streams & topics
CREATE OR REPLACE STREAM reefers_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='reefers_v00',partitions=4,replicas=1,value_format='protobuf');
CREATE OR REPLACE STREAM vessels_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='vessels_v00',partitions=4,replicas=1,value_format='protobuf');
-- Test data
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);

INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:03:05', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:04:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:03', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:10', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.64016, 83.44720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:09:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-57.64016, 83.44720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:10:00', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 15:15:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.45720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 19:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),22,'bar',-56.68016, 83.44720, 0.8,1);
-- Join the streams
-- Assumes that movement reports within a minute of each other can be counted as the same timestamp
-- Creates a new field to indicate if the two objects are within the criteria (range & speed)
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM MOVEMENTS_v00 AS
SELECT TIMESTAMPTOSTRING(v.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_v,
  v.*,
  TIMESTAMPTOSTRING(r.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_r,
  r.*,
  geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') as distance,
  case
    when geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') < 3 
     and (r.speed <2 and v.speed < 2) then 1
    else 0
  end as in_range_and_speed
FROM vessels_v00 v
  inner join reefers_v00 r 
  WITHIN 1 minute on R.DUMMY = V.DUMMY
  ;
-- Query the data for debug purposes
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS, V_ID, R_ID, V_SPEED, R_SPEED, DISTANCE, IN_RANGE_AND_SPEED 
  FROM MOVEMENTS_v00 
  EMIT CHANGES;
-- Select object pairs using a session window based on the critera flag
-- and a predicate on the duration of the session
SELECT V_ID,
  V_NAME,
  R_ID,
  R_NAME,
  MIN(DISTANCE) AS CLOSEST_DISTANCE_KM,
  MAX(DISTANCE) AS FURTHEST_DISTANCE_KM,
  COLLECT_LIST(DISTANCE) AS DISTANCE_POINTS_KM,
  TIMESTAMPTOSTRING( MIN(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS FIRST_TS,
  TIMESTAMPTOSTRING( MAX(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS LAST_TS,
  (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS DIFF_SEC
FROM MOVEMENTS_v00 WINDOW SESSION (1 HOUR)
where IN_RANGE_AND_SPEED = 1
GROUP BY V_ID,
  V_NAME,
  R_ID,
  R_NAME,
  IN_RANGE_AND_SPEED 
HAVING (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 3600  
  EMIT CHANGES;

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...