For Connecting apache Flink with SQL
For Connecting apache Flink with SQL
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Connect to MySQL and read data
source_ddl = """
CREATE TABLE my_table (
id INT,
name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/my_database',
'table-name' = 'my_table',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'your_username',
'password' = 'your_password'
)
"""
t_env.execute_sql(source_ddl)
# Perform real-time processing
result = t_env.sql_query("SELECT * FROM my_table WHERE id > 100")
# Print the result to the console
result.execute().print()
Comments
Post a Comment