На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.
Все примеры используют: Apache Spark 3.0.1.
Необходимо установить:
Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.
SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;
При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.
Для написания тестов будет использоваться следующий пример: LinearRegression.
Итак, пусть код для тестирования использует следующий “шаблон” для Python:
class XService:
def __init__(self):
# Инициализация
def train(self, ds):
# Обучение
def predict(self, ds):
# Предсказание и вывод результатов
Для Scala шаблон выглядит соответственно.
Полный пример:
from sklearn import linear_model
class LocalService:
def __init__(self):
self.model = linear_model.LinearRegression()
def train(self, ds):
X, y = ds
self.model.fit(X, y)
def predict(self, ds):
r = self.model.predict(ds)
print(r)
Тест.
Импорт:
import unittest
import numpy as np
Основной класс:
class RunTest(unittest.TestCase):
Запуск тестов:
if __name__ == "__main__":
unittest.main()
Подготовка данных:
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
Создание модели и обучение:
service = local_service.LocalService()
service.train((X, y))
Получение результатов:
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
Ответ:
[16.]
[19.]
Все вместе:
import unittest
import numpy as np
from spark_streaming_pp import local_service
class RunTest(unittest.TestCase):
def test_run(self):
# Prepare data.
X = np.array([
[1, 1], # 6
[1, 2], # 8
[2, 2], # 9
[2, 3] # 11
])
y = np.dot(X, np.array([1, 2])) + 3 # [ 6 8 9 11], y = 1 * x_0 + 2 * x_1 + 3
# Create model and train.
service = local_service.LocalService()
service.train((X, y))
# Predict and results.
service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))
# [16.]
# [19.]
if __name__ == "__main__":
unittest.main()
Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.
Инициализация:
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
Обучение:
self.model = self.service.fit(ds)
Получение результатов:
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Все вместе:
from pyspark.ml.regression import LinearRegression
class StructuredStreamingService:
def __init__(self):
self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None
def train(self, ds):
self.model = self.service.fit(ds)
def predict(self, ds):
transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q
Сам тест.
Обычно в тестах можно использовать данные, которые создаются прямо в тестах.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Это очень удобно и код получается компактным.
Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake')
.schema(bad_schema)
.load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
И так.
Создается контекст для работы:
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Подготовка данных для обучения (можно сделать обычным способом):
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
Обучение:
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load()
.withColumn("features", extract_features_udf(col("value")))
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
Ответ:
15.96699
18.96138
Все вместе:
import unittest
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT
from spark_streaming_pp import structure_streaming_service
class RunTest(unittest.TestCase):
def test_run(self):
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Prepare data.
train_ds = spark.createDataFrame([
(6.0, Vectors.dense([1.0, 1.0])),
(8.0, Vectors.dense([1.0, 2.0])),
(9.0, Vectors.dense([2.0, 2.0])),
(11.0, Vectors.dense([2.0, 3.0]))
],
["label", "features"]
)
# Create model and train.
service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)
# Predict and results.
def extract_features(x):
values = x.split(",")
features_ = []
for i in values[1:]:
features_.append(float(i))
features = Vectors.dense(features_)
return features
extract_features_udf = udf(extract_features, VectorUDT())
def extract_label(x):
values = x.split(",")
label = float(values[0])
return label
extract_label_udf = udf(extract_label, FloatType())
predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load()
.withColumn("features", extract_features_udf(col("value")))
.withColumn("label", extract_label_udf(col("value")))
service.predict(predict_ds).awaitTermination(3)
# +-----+------------------+
# |label| prediction|
# +-----+------------------+
# | 1.0|15.966990887541273|
# | 2.0|18.961384020443553|
# +-----+------------------+
def setUp(self):
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
if __name__ == "__main__":
unittest.main()
Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
Полный пример на Scala (здесь, для разнообразия, не используется sql):
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery
class StructuredStreamingService {
var service: LinearRegression = _
var model: LinearRegressionModel = _
def train(ds: DataFrame): Unit = {
service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
model = service.fit(ds)
}
def predict(ds: DataFrame): StreamingQuery = {
val m = ds.sparkSession.sparkContext.broadcast(model)
def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
m.value.predict(features)
}
val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun
val toUpperUdf = udf(transform)
val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))
predictionDs
.writeStream
.foreachBatch((r: DataFrame, i: Long) => {
r.show()
// scalastyle:off println
println(s"$i")
// scalastyle:on println
})
.start()
}
}
Тест:
package aaa.abc.dd.spark_streaming_pr.cluster
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}
class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
test("run") { spark =>
// Prepare data.
val trainDs = spark.createDataFrame(Seq(
(6.0, Vectors.dense(1.0, 1.0)),
(8.0, Vectors.dense(1.0, 2.0)),
(9.0, Vectors.dense(2.0, 2.0)),
(11.0, Vectors.dense(2.0, 3.0))
)).toDF("label", "features")
// Create model and train.
val service = new StructuredStreamingService()
service.train(trainDs)
// Predict and results.
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)
// +-----+---------+------------------+
// |label| features| prediction|
// +-----+---------+------------------+
// | 1.0|[3.0,5.0]|15.966990887541273|
// | 2.0|[4.0,6.0]|18.961384020443553|
// +-----+---------+------------------+
}
override protected def withFixture(test: OneArgTest): Outcome = {
val spark = SparkSession.builder().master("local[2]").getOrCreate()
try withFixture(test.toNoArgTest(spark))
finally spark.stop()
}
override type FixtureParam = SparkSession
case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)
}
При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.
Такие абстракции как “DataFrame” позволяют это сделать легко и просто.
При использовании Python данные придется хранить в файлах.
Apple возобновила переговоры с OpenAI о возможности внедрения ИИ-технологий в iOS 18, на основе данной операционной системы будут работать новые…
Конкурсный управляющий российской «дочки» Google подготовил 23 иска к участникам рекламного рынка. Общая сумма исков составляет 16 млрд рублей –…
Google завершил обновление основного алгоритма March 2024 Core Update. Раскатка обновлений была завершена 19 апреля, но сообщил об этом поисковик…
У частных продавцов на Авито появилась возможность составлять текст объявлений с помощью нейросети. Новый функционал доступен в категории «Обувь, одежда,…
24 апреля 2024 года в Москве состоялась церемония вручения наград международного конкурса Workspace Digital Awards. В этом году участниками стали…
27 июня Яндекс проведет гик-фестиваль Young Con для студентов и молодых специалистов, которые интересуются технологиями и хотят работать в IT.…