当前位置: 首页 > news >正文

高校网站建设 调查网页界面设计首页

高校网站建设 调查,网页界面设计首页,沪深300指数基金,电商热门关键词实验5 Spark Structured Streaming编程实践 实验内容和要求 0.结构化流练习任务 0.1 讲义文件源-json数据任务。按照讲义中json数据的生成及分析,复现实验,并适当分析。 (1)创建程序生成JSON格式的File源测试数据 import osimp…

实验5 Spark Structured Streaming编程实践

实验内容和要求

0.结构化流练习任务

0.1 讲义文件源-json数据任务。按照讲义中json数据的生成及分析,复现实验,并适当分析。

  • (1)创建程序生成JSON格式的File源测试数据
import osimport shutilimport randomimport time
TEST_DATA_TEMP_DIR = '/tmp/'
TEST_DATA_DIR = '/tmp/testdata/'ACTION_DEF = ['login', 'logout', 'purchase']
DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n‘# 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)os.mkdir(TEST_DATA_DIR) 
# 测试环境的恢复,对文件夹进行清理
def test_tearDown():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)# 生成测试文件
def write_and_move(filename, data):with open(TEST_DATA_TEMP_DIR + filename,"wt", encoding="utf-8") as f:f.write(data)shutil.move(TEST_DATA_TEMP_DIR + filename,TEST_DATA_DIR + filename)if __name__ == "__main__":test_setUp()# 这里生成200个文件for i in range(200):filename = 'e-mall-{}.json'.format(i)content = ''rndcount = list(range(100))random.shuffle(rndcount)for _ in rndcount:content += JSON_LINE_PATTERN.format(str(int(time.time())),random.choice(ACTION_DEF),random	.choice(DISTRICT_DEF))write_and_move(filename, content)time.sleep(1)test_tearDown()
  • (2)创建程序对数据进行统计
# 导入需要用到的模块
import os
import shutil
from pprint import pprintfrom pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
# 定义JSON文件的路径常量(此为本地路径)
TEST_DATA_DIR_SPARK = '/tmp/testdata/'
if __name__ == "__main__":# 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成schema = StructType([StructField("eventTime", TimestampType(), True),StructField("action", StringType(), True),StructField("district", StringType(), True)])spark = SparkSession \.builder \.appName("StructuredEMallPurchaseCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format("json") \.schema(schema) \.option("maxFilesPerTrigger", 100) \.load(TEST_DATA_DIR_SPARK)# 定义窗口windowDuration = '1 minutes'windowedCounts = lines \.filter("action = 'purchase'") \.groupBy('district', window('eventTime', windowDuration)) \.count() \.sort(asc('window')) query = windowedCounts \.writeStream \.outputMode("complete") \.format("console") \.option('truncate', 'false') \.trigger(processingTime="10 seconds") \.start()query.awaitTermination()
  • (3)测试运行程序

0.2 讲义kafka源,2字母单词分析任务按照讲义要求,复现kafka源实验。

  • 1.启动Kafka
    • 在Linux系统中新建一个终端(记作“Zookeeper终端”),输入下面命令启动Zookeeper服务:
      • cd /usr/local/kafka
      • bin/zookeeper-server-start.sh config/zookeeper.properties
    • 新建第二个终端(记作“Kafka终端”),然后输入下面命令启动Kafka服务:
      • cd /usr/local/kafka
      • bin/kafka-server-start.sh config/server.properties
    • 新建第三个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
      • cd /usr/local/kafka
      • bin/kafka-console-consumer.sh > --bootstrap-server localhost:9092 --topic wordcount-topic
    • 新建第四个终端(记作“监控输出终端”),执行如下命令监控输出的结果文本:
      • cd /usr/local/kafka
      • bin/kafka-console-consumer.sh > --bootstrap-server localhost:9092 --topic wordcount-result-topic
  • 2.编写生产者(Producer)程序
# spark_ss_kafka_producer.pyimport string
import random
import timefrom kafka import KafkaProducerif __name__ == "__main__":producer = KafkaProducer(bootstrap_servers=['localhost:9092'])while True:s2 = (random.choice(string.ascii_lowercase) for _ in range(2))word = ''.join(s2)value = bytearray(word, 'utf-8')producer.send('wordcount-topic', value=value) \.get(timeout=10)time.sleep(0.1)
  • 3.编写消费者(Consumer)程序
# spark_ss_kafka_consumer.pyfrom pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredKafkaWordCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN') lines = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", 'wordcount-topic') \.load() \.selectExpr("CAST(value AS STRING)")wordCounts = lines.groupBy("value").count()query = wordCounts \.selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \.writeStream \.outputMode("complete") \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "wordcount-result-topic") \.option("checkpointLocation", "file:///tmp/kafka-sink-cp") \.trigger(processingTime="8 seconds") \.start()query.awaitTermination()
  • 在终端中执行如下命令运行消费者程序:

0.3 讲义socket源,结构化流实现词频统计。按照讲义要求,复现socket源实验。

  • 代码文件spark_ss_rate.py
# spark_ss_rate.pyfrom pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession \.builder \.appName("TestRateStreamSource") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format("rate") \.option('rowsPerSecond', 5) \.load()print(lines.schema)query = lines \.writeStream \.outputMode("update") \.format("console") \.option('truncate', 'false') \.start()query.awaitTermination()
  • 在Linux终端中执行spark_ss_rate.py

0.4(不选)使用rate源,评估系统性能。

1.日志分析任务

1.1通过Socket传送Syslog到Spark日志分析是一个大数据分析中较为常见的场景。

  • 实验原理:
    • 在Unix类操作系统里,Syslog广泛被应用于系统或者应用的日志记录中。
    • Syslog通常被记录在本地文件内,比如Ubuntu内为/var/log/syslog文件名,也可以被发送给远程Syslog服务器。
    • Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。
    • 日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。
  • 实验过程:
    • 新建一个终端,执行如下命令:
    • tail -n+1 -f /var/log/syslog | nc -lk 9988“tail -n+1 -f /var/log/syslog”
      • 表示从第一行开始打印文件syslog的内容
      • “-f”表示如果文件有增加则持续输出最新的内容。
    • 然后,通过管道把文件内容发送到nc程序(nc程序可以进一步把数据发送给Spark)。
    • 如果/var/log/syslog内的内容增长速度较慢,可以再新开一个终端(计作“手动发送日志终端”),手动在终端输入如下内容来增加日志信息到/var/log/syslog内:
    • logger ‘I am a test error log message.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext(appName="SyslogAnalysis")
ssc = StreamingContext(sc, 1)# 创建一个DStream,接收来自Socket的数据流
lines = ssc.socketTextStream("localhost", 9988)# 在数据流上应用转换和操作
word_counts = lines.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda x, y: x + y)# 输出结果到控制台
word_counts.pprint()# 启动StreamingContext
ssc.start()
ssc.awaitTermination()

1.2对Syslog进行查询

  • 由Spark接收nc程序发送过来的日志信息,然后完成以下任务:
    • 统计CRON这个进程每小时生成的日志数,并以时间顺序排列,水印设置为1分钟。
    • 统计每小时的每个进程或者服务分别产生的日志总数,水印设置为1分钟。
    • 输出所有日志内容带error的日志。
from pyspark.sql.functions import window
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType# 创建SparkSession
spark = SparkSession.builder \.appName("LogAnalysis") \.getOrCreate()# 定义日志数据的模式
schema = StructType([StructField("timestamp", TimestampType(), True),StructField("message", StringType(), True)
])# 从socket接收日志数据流
logs = spark.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9988) \.load()# 将接收到的日志数据流应用模式
logs = logs.selectExpr("CAST(value AS STRING)") \.selectExpr("to_timestamp(value, 'yyyy-MM-dd HH:mm:ss') AS timestamp", "value AS message") \.select(col("timestamp"), col("message").alias("log_message"))# 统计CRON进程每小时生成的日志数,并按时间顺序排列
cron_logs = logs.filter(col("log_message").contains("CRON")) \.groupBy(window("timestamp", "1 hour")) \.count() \.orderBy("window")# 统计每小时每个进程或服务产生的日志总数
service_logs = logs.groupBy(window("timestamp", "1 hour"), "log_message") \.count() \.orderBy("window")# 输出所有带有"error"的日志内容
error_logs = logs.filter(col("log_message").contains("error"))# 设置水印为1分钟
cron_logs = cron_logs.withWatermark("window", "1 minute")
service_logs = service_logs.withWatermark("window", "1 minute")
error_logs = error_logs.withWatermark("timestamp", "1 minute")# 启动流式处理并输出结果
query_cron_logs = cron_logs.writeStream \.outputMode("complete") \.format("console") \.start()query_service_logs = service_logs.writeStream \.outputMode("complete") \.format("console") \.start()query_error_logs = error_logs.writeStream \.outputMode("append") \.format("console") \.start()# 等待流式处理完成
query_cron_logs.awaitTermination()
query_service_logs.awaitTermination()
query_error_logs.awaitTermination()

2.股市分析任务(进阶任务)

  • 数据集采用dj30数据集,见教学平台。
  • 实验说明:
    • 本实验将使用两个移动均线策略,短期移动均线为10天,长期移动均线为40天。
    • 当短期移动均线越过长期移动均线时,这是一个买入信号,因为它表明趋势正在向上移动。这就是所谓的黄金交叉。
    • 同时,当短期移动均线穿过长期移动均线下方时,这是一个卖出信号,因为它表明趋势正在向下移动。这就是所谓的死亡交叉。
    • 两种叉形如下图所示:dj30.csv包含了道琼斯工业平均指数25年的价格历史。
  • 实验要求:
    • 1.设置流以将数据输入structed streaming。
    • 2.使用structed streaming窗口累计 dj30sum和dj30ct,分别为价格的总和和计数。
    • 3.将这两个structed streaming (dj30sum和dj30ct)分开产生dj30avg,从而创建10天MA和40天MA的移动平均值。
    • 4.比较两个移动平均线(短期移动平均线和长期移动平均线)来指示买入和卖出信号。
      • 您的输出[dj30-feeder只有一个符号的数据:DJI,这是隐含的。
      • 这个问题的输出将是[(<日期>买入DJI),(<日期>卖出DJI),等等]。
      • 应该是[(<日期>买入<符号>),(<日期>卖出<符号>),等等]的形式。

1.设置流以将数据输入structed streaming。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *# 创建一个SparkSession对象:
spark = SparkSession.builder \.appName("StructuredStreamingExample") \.getOrCreate()
inputPath = "path_to_dj30.csv"# 读取dj30.csv文件并创建一个输入流:
df = spark.readStream \.format("csv") \.option("header", "true") \.load(inputPath)# 对数据进行处理和转换:
df = df.withColumn("timestamp", to_timestamp(col("date"), "yyyy-MM-dd"))# 定义输出操作:
agg_df = df.groupBy(window("timestamp", "1 hour")).agg(sum("price").alias("dj30sum"), count("price").alias("dj30ct"))# 启动流式处理:
query = agg_df.writeStream \.outputMode("complete") \.format("console") \.start()# 等待流式处理完成:
query.awaitTermination()
from pyspark.sql import SparkSession
from pyspark.sql.functions import colspark = SparkSession.builder \.appName("DJ30 Structured Streaming") \.getOrCreate()dj30_data = spark.read.csv("path/to/dj30.csv", header=True)streaming_data = dj30_data.select(col("Long Date").alias("date"), col("Close").cast("float").alias("close"))streaming_data.createOrReplaceTempView("dj30_stream")streaming_df = spark.sql("SELECT * FROM dj30_stream")

2.使用structed streaming窗口累计 dj30sum和dj30ct,分别为价格的总和和计数
3.将这两个structed streaming (dj30sum和dj30ct)分开产生dj30avg,从而创建10天MA和40天MA的移动平均值
4.比较两个移动平均线(短期移动平均线和长期移动平均线)来指示买入和卖出信号。

http://www.yayakq.cn/news/649230/

相关文章:

  • 用ps切片做网站能不能完成泰安百度推广公司
  • 手机网站怎么搜索引擎咸阳营销型网站开发
  • 做思维导图的资源网站企业网站规划书
  • 一款蛋糕食品类企业手机网站源码个人网站的内容
  • 圣辉友联做网站公司建立网站wordpress
  • 公司在线网站制作系统提供网站建设商家
  • 模块化网站建设wordpress后台缺少菜单
  • 烟台市建设工程质量监督站网站遥控器外壳设计网站推荐
  • 励志网站源码单片机项目外包网站
  • 重庆网站建设红旗河沟沧州讯呗网络科技有限公司
  • 国外做的好点电商网站内网网站搭建设
  • 网站建设软著网站建设私单
  • 专业网页设计模板抖音seo怎么做的
  • 网站案例代码石景山网站制作案例
  • 网站规划怎么做学计算机哪个培训机构好
  • 建一个自己用的网站要多少钱承德名城建设集团网站
  • 关于电子商务网站建设的现状无锡企业网站制作哪家比较好
  • 免费的网站搭建前端课程网站
  • php mysql做网站登录建设网站的预期收益
  • 网站推广效益怎么分析网站建设的成本分析
  • 企业做定制网站的好处怎么做外国网站流量
  • 电影网站如何做seo江苏核酸检测机构
  • 自己做网站要买什么网络规划设计师考试资料百度云
  • 网站没有ftp 怎么推广php+mysql 2012也买酒商城网站源码
  • 沁阳建网站wordpress做x站主题
  • 长沙建设网站哪家好网站空间商 权限
  • 西部中大建设集团有限公司网站如何开公众号微信公众平台
  • 蕲春做网站重庆网站建设jccit
  • 网站建设的开发方式知乎公司网站要怎么做
  • 学做网站制作邮政招c1驾驶员8000元