hello

发布时间 2023-12-17 00:07:58作者: 牧之丨
package com.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.QueueUtils;
import com.StreamUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.sql.*;
import java.util.*;
import java.util.function.Consumer;

@SaIgnore
@RestController("/hello")
@Slf4j
public class HelloController {
    @Data
    @ToString
    @AllArgsConstructor
    public static final class Person{
        private String username;
        private String password;
    }

    @Autowired
    private StreamUtils streamUtils;

    //phoenix驱动
    private String phoenixDriver = "org.apache.phoenix.jdbc.PhoenixDriver";
    //zookeeper地址
    private String phoenixURL = "jdbc:phoenix:192.168.50.106,192.168.50.107,192.168.50.108:2181";

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 生产消息内容
     *
     * @param msg
     * @return
     */
    public void produceMsg(String msg) {
        RStream<String, String> stream = redissonClient.getStream("stream_user");
        stream.add("user_id", "1");
        stream.add("user_name", "brand");
        stream.add("age", "18");
    }

    /**
     * 消费消息内容
     */
    public void consumeMessage() {
        // 根据队列名称获取消息队列
        redissonClient.getBoundedBlockingQueue("stream_user");
        RStream<String, String> stream = redissonClient.getStream("stream_user");
        // 创建消费者小组
//        stream.createGroup("consumer_group1", StreamMessageId.ALL);
        // 消费者读取消息
        Map<StreamMessageId, Map<String, String>> msgs
                = stream.readGroup("consumer_group1", "consumer1");
        for (Map.Entry<StreamMessageId, Map<String, String>> entry : msgs.entrySet()) {
            Map<String, String> msg = entry.getValue();
            log.info("{}", msg);
            // todo:处理消息的业务逻辑代码
            stream.ack("consumer_group1", entry.getKey());
        }
    }

    @GetMapping("/h1")
    public String h1() throws Exception {
        produceMsg("hello");
        consumeMessage();
        QueueUtils.subscribeBlockingQueue("hello", (Consumer<Person>) s -> log.info("xxxxx{}",s));
        Thread.sleep(9999999);
        new HelloController().test();
        return "h1";
    }

    @GetMapping("/h2")
    public String h2() throws Exception {
        boolean hello = QueueUtils.trySetBoundedQueueCapacity("hello", 3);
        for (int i = 0; i < 5; i++) {
            QueueUtils.addBoundedQueueObject("hello", new Person("hello"+1, "world" + 1));
        }
        Thread.sleep(9999999);
        return "h1";
    }

    public void test() throws Exception {
        // 创建表
        System.out.println("\n--- 开始创建 student 表 ---");
        createTable();
 
        // 获取Phoenix中的表(系统表除外)
        System.out.println("\n--- 获取Phoenix中的表(系统表除外) ---");
        List<String> tables = getTables();
        System.out.println(tables);
 
        // 插入数据
        System.out.println("\n--- 开始插入数据 ---");
        insertData();
 
        // 删除数据
        System.out.println("\n--- 开始删除数据 ---");
        deleteData();
 
        // 查询数据
        System.out.println("\n--- 开始查询数据 ---");
        List<Map<String, String>> list = getData("\"student\"");
        System.out.println(list);
 
        //删除表
        System.out.println("\n--- 开始删除 student 表 ---");
        dropTable();
    }
 
    // 获取连接
    public Connection getConnection() throws Exception {
//        Class.forName(phoenixDriver);
        Properties properties = new Properties();
        return DriverManager.getConnection(phoenixURL, properties);
    }
 
    // 创建表
    public void createTable() throws Exception {
            //获取连接
            Connection connection = getConnection();
            // 创建Statement对象
            String sql = "CREATE TABLE IF NOT EXISTS \"student\"(" +
                    "id VARCHAR primary key," +
                    "name VARCHAR," +
                    "age VARCHAR)";
            PreparedStatement statement = connection.prepareStatement(sql);
            // 执行sql操作
            statement.execute();
            // 关闭
            statement.close();
            connection.close();
    }
 
    // 获取Phoenix中的表(系统表除外)
    public List<String> getTables() throws Exception {
        //获取连接
        Connection connection = getConnection();
        List<String> tables = new ArrayList<>();
        DatabaseMetaData metaData = connection.getMetaData();
        String[] types = {"TABLE"}; //"SYSTEM TABLE"
        ResultSet resultSet = metaData.getTables(null, null, null, types);
        while (resultSet.next()) {
            tables.add(resultSet.getString("TABLE_NAME"));
        }
        return tables;
    }
 
    // 删除表
    public void dropTable() throws Exception {
        //获取连接
        Connection connection = getConnection();
        // 创建Statement对象
        String sql = "DROP TABLE \"student\"";
        PreparedStatement statement = connection.prepareStatement(sql);
        // 执行sql操作
        statement.execute();
        // 关闭
        statement.close();
        connection.close();
    }
 
    // 插入数据
    public void insertData() throws Exception {
        //获取连接
        Connection connection = getConnection();
 
        //获取Statement对象,并进行数据插入
        Statement statement = connection.createStatement();
        statement.executeUpdate("upsert into \"student\" values('1001','大刘','20')");
        statement.executeUpdate("upsert into \"student\" values('1002','小星','22')");
        connection.commit();
        statement.close();
 
        //获取PreparedStatement对象,并进行数据插入
        PreparedStatement preparedStatement = connection.prepareStatement(
                "upsert into \"student\" values(?,?,?)");
        //给参数赋值
        preparedStatement.setString(1,"1003");
        preparedStatement.setString(2,"hangge");
        preparedStatement.setString(3,"1000");
        //执行插入
        preparedStatement.execute();
        connection.commit();
        preparedStatement.close();
 
        connection.close();
    }
 
    // 删除数据
    public void deleteData() throws Exception {
        //获取连接
        Connection connection = getConnection();
 
        //获取Statement对象,并进行数据删除
        Statement statement = connection.createStatement();
        statement.execute("delete from \"student\" where id = '1002'");
        connection.commit();
        statement.close();
        connection.close();
    }
 
    // 查询数据(获取表中的所有数据)
    public List<Map<String, String>> getData(String tableName) throws Exception {
        //获取连接
        Connection connection = getConnection();
        String sql = "SELECT * FROM " + tableName;
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        ResultSet resultSet = preparedStatement.executeQuery();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        List<Map<String, String>> resultList = new ArrayList<>();
        while (resultSet.next()) {
            Map<String, String> result = new HashMap<>();
            for (int i = 1, len = resultSetMetaData.getColumnCount(); i <= len; i++) {
                result.put(resultSetMetaData.getColumnName(i), resultSet.getString(i));
            }
            resultList.add(result);
        }
        return resultList;
    }
}

 

package com.nvxclouds.baize.tally.computing.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.nvxclouds.baize.common.redis.utils.QueueUtils;
import com.nvxclouds.baize.common.redis.utils.StreamUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.sql.*;
import java.util.*;
import java.util.function.Consumer;

@SaIgnore
@RestController("/hello")
@Slf4j
public class HelloController {
@Data
@ToString
@AllArgsConstructor
public static final class Person{
private String username;
private String password;
}

@Autowired
private StreamUtils streamUtils;

//phoenix驱动
private String phoenixDriver = "org.apache.phoenix.jdbc.PhoenixDriver";
//zookeeper地址
private String phoenixURL = "jdbc:phoenix:192.168.50.106,192.168.50.107,192.168.50.108:2181";

@Autowired
private RedissonClient redissonClient;

/**
* 生产消息内容
*
* @param msg
* @return
*/
public void produceMsg(String msg) {
RStream<String, String> stream = redissonClient.getStream("stream_user");
stream.add("user_id", "1");
stream.add("user_name", "brand");
stream.add("age", "18");
}

/**
* 消费消息内容
*/
public void consumeMessage() {
// 根据队列名称获取消息队列
redissonClient.getBoundedBlockingQueue("stream_user");
RStream<String, String> stream = redissonClient.getStream("stream_user");
// 创建消费者小组
// stream.createGroup("consumer_group1", StreamMessageId.ALL);
// 消费者读取消息
Map<StreamMessageId, Map<String, String>> msgs
= stream.readGroup("consumer_group1", "consumer1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : msgs.entrySet()) {
Map<String, String> msg = entry.getValue();
log.info("{}", msg);
// todo:处理消息的业务逻辑代码
stream.ack("consumer_group1", entry.getKey());
}
}

@GetMapping("/h1")
public String h1() throws Exception {
produceMsg("hello");
consumeMessage();
QueueUtils.subscribeBlockingQueue("hello", (Consumer<Person>) s -> log.info("xxxxx{}",s));
Thread.sleep(9999999);
new HelloController().test();
return "h1";
}

@GetMapping("/h2")
public String h2() throws Exception {
boolean hello = QueueUtils.trySetBoundedQueueCapacity("hello", 3);
for (int i = 0; i < 5; i++) {
QueueUtils.addBoundedQueueObject("hello", new Person("hello"+1, "world" + 1));
}
Thread.sleep(9999999);
return "h1";
}

public void test() throws Exception {
// 创建表
System.out.println("\n--- 开始创建 student ---");
createTable();

// 获取Phoenix中的表(系统表除外)
System.out.println("\n--- 获取Phoenix中的表(系统表除外) ---");
List<String> tables = getTables();
System.out.println(tables);

// 插入数据
System.out.println("\n--- 开始插入数据 ---");
insertData();

// 删除数据
System.out.println("\n--- 开始删除数据 ---");
deleteData();

// 查询数据
System.out.println("\n--- 开始查询数据 ---");
List<Map<String, String>> list = getData("\"student\"");
System.out.println(list);

//删除表
System.out.println("\n--- 开始删除 student ---");
dropTable();
}

// 获取连接
public Connection getConnection() throws Exception {
// Class.forName(phoenixDriver);
Properties properties = new Properties();
return DriverManager.getConnection(phoenixURL, properties);
}

// 创建表
public void createTable() throws Exception {
//获取连接
Connection connection = getConnection();
// 创建Statement对象
String sql = "CREATE TABLE IF NOT EXISTS \"student\"(" +
"id VARCHAR primary key," +
"name VARCHAR," +
"age VARCHAR)";
PreparedStatement statement = connection.prepareStatement(sql);
// 执行sql操作
statement.execute();
// 关闭
statement.close();
connection.close();
}

// 获取Phoenix中的表(系统表除外)
public List<String> getTables() throws Exception {
//获取连接
Connection connection = getConnection();
List<String> tables = new ArrayList<>();
DatabaseMetaData metaData = connection.getMetaData();
String[] types = {"TABLE"}; //"SYSTEM TABLE"
ResultSet resultSet = metaData.getTables(null, null, null, types);
while (resultSet.next()) {
tables.add(resultSet.getString("TABLE_NAME"));
}
return tables;
}

// 删除表
public void dropTable() throws Exception {
//获取连接
Connection connection = getConnection();
// 创建Statement对象
String sql = "DROP TABLE \"student\"";
PreparedStatement statement = connection.prepareStatement(sql);
// 执行sql操作
statement.execute();
// 关闭
statement.close();
connection.close();
}

// 插入数据
public void insertData() throws Exception {
//获取连接
Connection connection = getConnection();

//获取Statement对象,并进行数据插入
Statement statement = connection.createStatement();
statement.executeUpdate("upsert into \"student\" values('1001','大刘','20')");
statement.executeUpdate("upsert into \"student\" values('1002','小星','22')");
connection.commit();
statement.close();

//获取PreparedStatement对象,并进行数据插入
PreparedStatement preparedStatement = connection.prepareStatement(
"upsert into \"student\" values(?,?,?)");
//给参数赋值
preparedStatement.setString(1,"1003");
preparedStatement.setString(2,"hangge");
preparedStatement.setString(3,"1000");
//执行插入
preparedStatement.execute();
connection.commit();
preparedStatement.close();

connection.close();
}

// 删除数据
public void deleteData() throws Exception {
//获取连接
Connection connection = getConnection();

//获取Statement对象,并进行数据删除
Statement statement = connection.createStatement();
statement.execute("delete from \"student\" where id = '1002'");
connection.commit();
statement.close();
connection.close();
}

// 查询数据(获取表中的所有数据)
public List<Map<String, String>> getData(String tableName) throws Exception {
//获取连接
Connection connection = getConnection();
String sql = "SELECT * FROM " + tableName;
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Map<String, String>> resultList = new ArrayList<>();
while (resultSet.next()) {
Map<String, String> result = new HashMap<>();
for (int i = 1, len = resultSetMetaData.getColumnCount(); i <= len; i++) {
result.put(resultSetMetaData.getColumnName(i), resultSet.getString(i));
}
resultList.add(result);
}
return resultList;
}
}