-- Source streams & topics
CREATE OR REPLACE STREAM reefers_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='reefers_v00',partitions=4,replicas=1,value_format='protobuf');
CREATE OR REPLACE STREAM vessels_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='vessels_v00',partitions=4,replicas=1,value_format='protobuf');
-- Test data
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.72016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:03:05', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:04:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:03', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:10', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.64016, 83.44720, 0.8,1);
INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:09:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-57.64016, 83.44720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:10:00', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 15:15:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.45720, 0.8,1);
INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 19:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),22,'bar',-56.68016, 83.44720, 0.8,1);
-- Join the streams
-- Assumes that movement reports within a minute of each other can be counted as the same timestamp
-- Creates a new field to indicate if the two objects are within the criteria (range & speed)
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM MOVEMENTS_v00 AS
SELECT TIMESTAMPTOSTRING(v.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_v,
v.*,
TIMESTAMPTOSTRING(r.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_r,
r.*,
geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') as distance,
case
when geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') < 3
and (r.speed <2 and v.speed < 2) then 1
else 0
end as in_range_and_speed
FROM vessels_v00 v
inner join reefers_v00 r
WITHIN 1 minute on R.DUMMY = V.DUMMY
;
-- Query the data for debug purposes
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS, V_ID, R_ID, V_SPEED, R_SPEED, DISTANCE, IN_RANGE_AND_SPEED
FROM MOVEMENTS_v00
EMIT CHANGES;
-- Select object pairs using a session window based on the critera flag
-- and a predicate on the duration of the session
SELECT V_ID,
V_NAME,
R_ID,
R_NAME,
MIN(DISTANCE) AS CLOSEST_DISTANCE_KM,
MAX(DISTANCE) AS FURTHEST_DISTANCE_KM,
COLLECT_LIST(DISTANCE) AS DISTANCE_POINTS_KM,
TIMESTAMPTOSTRING( MIN(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS FIRST_TS,
TIMESTAMPTOSTRING( MAX(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS LAST_TS,
(MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS DIFF_SEC
FROM MOVEMENTS_v00 WINDOW SESSION (1 HOUR)
where IN_RANGE_AND_SPEED = 1
GROUP BY V_ID,
V_NAME,
R_ID,
R_NAME,
IN_RANGE_AND_SPEED
HAVING (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 3600
EMIT CHANGES;