diff --git a/java/Subscribe.java b/java/Subscribe.java new file mode 100644 index 0000000..be3b15c --- /dev/null +++ b/java/Subscribe.java @@ -0,0 +1,153 @@ +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; +import java.sql.ResultSet; +import java.sql.Statement; + +public class Subscribe { + + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; + private final String user = "MATERIALIZE_USERNAME"; + private final String password = "MATERIALIZE_PASSWORD"; + + /** + * Connect to Materialize + * + * @return a Connection object + */ + public Connection connect() throws SQLException { + Properties props = new Properties(); + props.setProperty("user", user); + props.setProperty("password", password); + + return DriverManager.getConnection(url, props); + } + + public void subscribe() { + try (Connection conn = connect()) { + + Statement stmt = conn.createStatement(); + stmt.execute("BEGIN"); + stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);"); + State> state = new State<>(false); + List>> buffer = new ArrayList<>(); + + while (true) { + ResultSet rs = stmt.executeQuery("FETCH ALL c"); + boolean updated = false; + while (rs.next()) { + // Map row fields + long ts = rs.getLong("mz_timestamp"); + boolean progress = rs.getBoolean("mz_progressed"); + int diff = rs.getInt("mz_diff"); + + HashMap rowData = new HashMap<>(); + rowData.put("sum", rs.getBigDecimal("sum")); + + // When a progress is detected, get the last values + if (progress) { + if (updated) { + updated = false; + + // Update the state with the last data + state.update(buffer, ts); + buffer.clear(); + + System.out.println(state.getState()); + } + } else { + updated = true; + buffer.add(new Update<>(rowData, diff)); + } + } + } + } catch (SQLException ex) { + System.out.println(ex.getMessage()); + } + } + + public static void main(String[] args) { + Subscribe app = new Subscribe(); + app.subscribe(); + } +} + +/* + * State class to handle updates from a subscription. + */ +record Update(T value, int diff) {}; + +class State { + private final HashMap state; + private long timestamp; + private boolean valid; + private List> history; + + public State(boolean collectHistory) { + state = new HashMap<>(); + timestamp = 0; + valid = true; + if (collectHistory) { + history = new ArrayList<>(); + } + } + + public List getState() { + List list = new ArrayList<>(); + + for (Map.Entry entry : state.entrySet()) { + T value = entry.getKey(); + int count = entry.getValue(); + + for (int i = 0; i < count; i++) { + list.add(value); + } + } + + return list; + } + + public List> getHistory() { + return history; + } + + private void validate(long timestamp) { + if (!valid) { + throw new RuntimeException("Invalid state."); + } else if (timestamp < this.timestamp) { + System.err.println("Invalid timestamp."); + valid = false; + throw new RuntimeException(String.format( + "Update with timestamp (%d) is lower than the last timestamp (%d). Invalid state.", timestamp, this.timestamp)); + } + } + + private void process(Update update) { + T value = update.value(); + int diff = update.diff(); + + int count = state.containsKey(value) ? state.get(value) + diff : diff; + + if (count <= 0) { + state.remove(value); + } else { + state.put(value, count); + } + + if (history != null) { + history.add(update); + } + } + + public void update(List> updates, long timestamp) { + if (!updates.isEmpty()) { + validate(timestamp); + this.timestamp = timestamp; + updates.forEach(this::process); + } + } +} + diff --git a/java/connection.java b/java/connection.java index 464d477..8b5fb59 100644 --- a/java/connection.java +++ b/java/connection.java @@ -1,11 +1,10 @@ -import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; -public class App { +public class Connection { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -14,12 +13,11 @@ public class App { * * @return a Connection object */ - public Connection connect() { + public java.sql.Connection connect() { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); - Connection conn = null; + java.sql.Connection conn = null; try { conn = DriverManager.getConnection(url, props); System.out.println("Connected to Materialize successfully!"); @@ -31,7 +29,7 @@ public Connection connect() { } public static void main(String[] args) { - App app = new App(); + Connection app = new Connection(); app.connect(); } } \ No newline at end of file diff --git a/java/insert.java b/java/insert.java index 9816587..0eff108 100644 --- a/java/insert.java +++ b/java/insert.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class Insert { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -21,7 +21,6 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); @@ -44,7 +43,7 @@ public void insert() { } public static void main(String[] args) { - App app = new App(); + Insert app = new Insert(); app.insert(); } } \ No newline at end of file diff --git a/java/query.java b/java/query.java index 8a5d4c4..0f65643 100644 --- a/java/query.java +++ b/java/query.java @@ -5,9 +5,9 @@ import java.sql.ResultSet; import java.sql.Statement; -public class App { +public class Query { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -20,7 +20,6 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); @@ -42,7 +41,7 @@ public void query() { } public static void main(String[] args) { - App app = new App(); + Query app = new Query(); app.query(); } } \ No newline at end of file diff --git a/java/source.java b/java/source.java index f390cae..f2d62a7 100644 --- a/java/source.java +++ b/java/source.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class Source { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -21,16 +21,16 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); } public void source() { - - String SQL = "CREATE SOURCE counter FROM " - + "LOAD GENERATOR COUNTER"; + String SQL = "CREATE SOURCE IF NOT EXISTS counter" + + "FROM LOAD GENERATOR COUNTER" + + "(TICK INTERVAL '500ms')" + + "WITH (SIZE = '3xsmall');"; try (Connection conn = connect()) { Statement st = conn.createStatement(); @@ -43,7 +43,7 @@ public void source() { } public static void main(String[] args) { - App app = new App(); + Source app = new Source(); app.source(); } } \ No newline at end of file diff --git a/java/subscribe.java b/java/subscribe.java deleted file mode 100644 index 5a56b96..0000000 --- a/java/subscribe.java +++ /dev/null @@ -1,50 +0,0 @@ -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Properties; -import java.sql.ResultSet; -import java.sql.Statement; - -public class App { - - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; - private final String user = "MATERIALIZE_USERNAME"; - private final String password = "MATERIALIZE_PASSWORD"; - - /** - * Connect to Materialize - * - * @return a Connection object - */ - public Connection connect() throws SQLException { - Properties props = new Properties(); - props.setProperty("user", user); - props.setProperty("password", password); - props.setProperty("ssl","true"); - - return DriverManager.getConnection(url, props); - - } - - public void subscribe() { - try (Connection conn = connect()) { - - Statement stmt = conn.createStatement(); - stmt.execute("BEGIN"); - stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view"); - while (true) { - ResultSet rs = stmt.executeQuery("FETCH ALL c"); - if(rs.next()) { - System.out.println(rs.getString(1) + " " + rs.getString(2) + " " + rs.getString(3)); - } - } - } catch (SQLException ex) { - System.out.println(ex.getMessage()); - } - } - - public static void main(String[] args) { - App app = new App(); - app.subscribe(); - } -} \ No newline at end of file diff --git a/java/view.java b/java/view.java index cda21df..530fa41 100644 --- a/java/view.java +++ b/java/view.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class View { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -21,18 +21,15 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); } public void view() { - String SQL = "CREATE VIEW market_orders_2 AS " - + "SELECT " - + " val->>'symbol' AS symbol, " - + " (val->'bid_price')::float AS bid_price " - + "FROM (SELECT text::jsonb AS val FROM market_orders_raw_2)"; + String SQL = "CREATE MATERIALIZED VIEW IF NOT EXISTS counter_sum AS" + + "SELECT sum(counter)" + + "FROM counter;"; try (Connection conn = connect()) { Statement st = conn.createStatement(); @@ -45,7 +42,7 @@ public void view() { } public static void main(String[] args) { - App app = new App(); + View app = new View(); app.view(); } } \ No newline at end of file