На текущий момент не так много примеров тестов для приложений на основе Spark Structured Streaming. Поэтому в данной статье приводятся базовые примеры тестов с подробным описанием.

Все примеры используют: Apache Spark 3.0.1.

Необходимо установить:

  • Apache Spark 3.0.x
  • Python 3.7 и виртуальное окружение для него
  • Conda 4.y
  • scikit-learn 0.22.z
  • Maven 3.v
  • В примерах для Scala используется версия 2.12.10.

  1. Загрузить Apache Spark
  2. Распаковать: tar -xvzf ./spark-3.0.1-bin-hadoop2.7.tgz
  3. Создать окружение, к примеру, с помощью conda: conda create -n sp python=3.7

Необходимо настроить переменные среды. Здесь приведен пример для локального запуска.

SPARK_HOME=/Users/$USER/Documents/spark/spark-3.0.1-bin-hadoop2.7
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip;

Пример с scikit-learn

При написании тестов необходимо разделять код таким образом, чтобы можно было изолировать логику и реальное применение конечного API. Хороший пример изоляции: DataFrame-pandas, DataFrame-spark.

Для написания тестов будет использоваться следующий пример: LinearRegression.

Итак, пусть код для тестирования использует следующий “шаблон” для Python:

class XService:

    def __init__(self):
        # Инициализация

    def train(self, ds):
        # Обучение

    def predict(self, ds):
        # Предсказание и вывод результатов

Для Scala шаблон выглядит соответственно.

Полный пример:

from sklearn import linear_model

class LocalService:

    def __init__(self):
        self.model = linear_model.LinearRegression()

    def train(self, ds):
        X, y = ds
        self.model.fit(X, y)

    def predict(self, ds):
        r = self.model.predict(ds)
        print(r)

Тест.

Импорт:

import unittest
import numpy as np

Основной класс:

class RunTest(unittest.TestCase):

Запуск тестов:

if __name__ == "__main__":
    unittest.main()

Подготовка данных:

X = np.array([
    [1, 1],  # 6
    [1, 2],  # 8
    [2, 2],  # 9
    [2, 3]  # 11
])
y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3

Создание модели и обучение:

service = local_service.LocalService()
service.train((X, y))

Получение результатов:

service.predict(np.array([[3, 5]]))
service.predict(np.array([[4, 6]]))

Ответ:

[16.]
[19.]

Все вместе:

import unittest
import numpy as np

from spark_streaming_pp import local_service

class RunTest(unittest.TestCase):
    def test_run(self):
        # Prepare data.
        X = np.array([
            [1, 1],  # 6
            [1, 2],  # 8
            [2, 2],  # 9
            [2, 3]  # 11
        ])
        y = np.dot(X, np.array([1, 2])) + 3  # [ 6  8  9 11], y = 1 * x_0 + 2 * x_1 + 3
        # Create model and train.
        service = local_service.LocalService()
        service.train((X, y))
        # Predict and results.
        service.predict(np.array([[3, 5]]))
        service.predict(np.array([[4, 6]]))
        # [16.]
        # [19.]

if __name__ == "__main__":
    unittest.main()

Пример с Spark и Python

Будет использован аналогичный алгоритм – LinearRegression. Нужно отметить, что Structured Streaming основан на тех же DataFrame-х, которые используются и в Spark Sql. Но как обычно есть нюансы.

Инициализация:

self.service = LinearRegression(maxIter=10, regParam=0.01)
self.model = None

Обучение:

self.model = self.service.fit(ds)

Получение результатов:

transformed_ds = self.model.transform(ds)
q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
return q

Все вместе:

from pyspark.ml.regression import LinearRegression

class StructuredStreamingService:
    def __init__(self):
        self.service = LinearRegression(maxIter=10, regParam=0.01)
        self.model = None

    def train(self, ds):
        self.model = self.service.fit(ds)

    def predict(self, ds):
        transformed_ds = self.model.transform(ds)
        q = transformed_ds.select("label", "prediction").writeStream.format("console").start()
        return q

Сам тест.

Обычно в тестах можно использовать данные, которые создаются прямо в тестах.

train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)

Это очень удобно и код получается компактным.

Но подобный код, к сожалению, не будет работать в Structured Streaming, т.к. созданный DataFrame не будет обладать нужными свойствами, хотя и будет соответствовать контракту DataFrame.
На текущий момент для создания источников для тестов можно использовать такой же подход, что и в тестах для Spark.

def test_stream_read_options_overwrite(self):
    bad_schema = StructType([StructField("test", IntegerType(), False)])
    schema = StructType([StructField("data", StringType(), False)])
    df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') 
        .schema(bad_schema)
        .load(path='python/test_support/sql/streaming', schema=schema, format='text')
    self.assertTrue(df.isStreaming)
    self.assertEqual(df.schema.simpleString(), "struct<data:string>")

И так.

Создается контекст для работы:

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Подготовка данных для обучения (можно сделать обычным способом):

train_ds = spark.createDataFrame([
    (6.0, Vectors.dense([1.0, 1.0])),
    (8.0, Vectors.dense([1.0, 2.0])),
    (9.0, Vectors.dense([2.0, 2.0])),
    (11.0, Vectors.dense([2.0, 3.0]))
],
    ["label", "features"]
)

Обучение:

service = structure_streaming_service.StructuredStreamingService()
service.train(train_ds)

Получение результатов. Для начала считываем данные из файла и выделяем: признаки и идентификатор для объектов. После запускаем предсказание с ожиданием в 3 секунды.

def extract_features(x):
    values = x.split(",")
    features_ = []
    for i in values[1:]:
        features_.append(float(i))
    features = Vectors.dense(features_)
    return features

extract_features_udf = udf(extract_features, VectorUDT())

def extract_label(x):
    values = x.split(",")
    label = float(values[0])
    return label

extract_label_udf = udf(extract_label, FloatType())

predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() 
    .withColumn("features", extract_features_udf(col("value"))) 
    .withColumn("label", extract_label_udf(col("value")))

service.predict(predict_ds).awaitTermination(3)

Ответ:

15.96699
18.96138

Все вместе:

import unittest
import warnings

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import Vectors, VectorUDT

from spark_streaming_pp import structure_streaming_service

class RunTest(unittest.TestCase):
    def test_run(self):
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        # Prepare data.
        train_ds = spark.createDataFrame([
            (6.0, Vectors.dense([1.0, 1.0])),
            (8.0, Vectors.dense([1.0, 2.0])),
            (9.0, Vectors.dense([2.0, 2.0])),
            (11.0, Vectors.dense([2.0, 3.0]))
        ],
            ["label", "features"]
        )
        # Create model and train.
        service = structure_streaming_service.StructuredStreamingService()
        service.train(train_ds)

        # Predict and results.

        def extract_features(x):
            values = x.split(",")
            features_ = []
            for i in values[1:]:
                features_.append(float(i))
            features = Vectors.dense(features_)
            return features

        extract_features_udf = udf(extract_features, VectorUDT())

        def extract_label(x):
            values = x.split(",")
            label = float(values[0])
            return label

        extract_label_udf = udf(extract_label, FloatType())

        predict_ds = spark.readStream.format("text").option("path", "data/structured_streaming").load() 
            .withColumn("features", extract_features_udf(col("value"))) 
            .withColumn("label", extract_label_udf(col("value")))

        service.predict(predict_ds).awaitTermination(3)

        # +-----+------------------+
        # |label|        prediction|
        # +-----+------------------+
        # |  1.0|15.966990887541273|
        # |  2.0|18.961384020443553|
        # +-----+------------------+

    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)

if __name__ == "__main__":
    unittest.main()

Нужно отметить, что для Scala можно воспользоваться созданием потока в памяти.
Это может выглядеть вот так:

implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val source = MemoryStream[Record]
source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
val predictDs = source.toDF()
service.predict(predictDs).awaitTermination(2000)

Полный пример на Scala (здесь, для разнообразия, не используется sql):

package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.streaming.StreamingQuery

class StructuredStreamingService {
  var service: LinearRegression = _
  var model: LinearRegressionModel = _

  def train(ds: DataFrame): Unit = {
    service = new LinearRegression().setMaxIter(10).setRegParam(0.01)
    model = service.fit(ds)
  }

  def predict(ds: DataFrame): StreamingQuery = {

    val m = ds.sparkSession.sparkContext.broadcast(model)

    def transformFun(features: org.apache.spark.ml.linalg.Vector): Double = {
      m.value.predict(features)
    }

    val transform: org.apache.spark.ml.linalg.Vector => Double = transformFun

    val toUpperUdf = udf(transform)

    val predictionDs = ds.withColumn("prediction", toUpperUdf(ds("features")))

    predictionDs
      .writeStream
      .foreachBatch((r: DataFrame, i: Long) => {
        r.show()
        // scalastyle:off println
        println(s"$i")
        // scalastyle:on println
      })
      .start()
  }
}

Тест:

package aaa.abc.dd.spark_streaming_pr.cluster

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.scalatest.{Matchers, Outcome, fixture}

class StructuredStreamingServiceSuite extends fixture.FunSuite with Matchers {
  test("run") { spark =>
    // Prepare data.
    val trainDs = spark.createDataFrame(Seq(
      (6.0, Vectors.dense(1.0, 1.0)),
      (8.0, Vectors.dense(1.0, 2.0)),
      (9.0, Vectors.dense(2.0, 2.0)),
      (11.0, Vectors.dense(2.0, 3.0))
    )).toDF("label", "features")
    // Create model and train.
    val service = new StructuredStreamingService()
    service.train(trainDs)
    // Predict and results.
    implicit val sqlCtx = spark.sqlContext
    import spark.implicits._
    val source = MemoryStream[Record]
    source.addData(Record(1.0, Vectors.dense(3.0, 5.0)))
    source.addData(Record(2.0, Vectors.dense(4.0, 6.0)))
    val predictDs = source.toDF()
    service.predict(predictDs).awaitTermination(2000)
    // +-----+---------+------------------+
    // |label| features|        prediction|
    // +-----+---------+------------------+
    // |  1.0|[3.0,5.0]|15.966990887541273|
    // |  2.0|[4.0,6.0]|18.961384020443553|
    // +-----+---------+------------------+
  }

  override protected def withFixture(test: OneArgTest): Outcome = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()

    try withFixture(test.toNoArgTest(spark))

    finally spark.stop()
  }

  override type FixtureParam = SparkSession

  case class Record(label: Double, features: org.apache.spark.ml.linalg.Vector)

}

При написании тестов необходимо разделять код таким образом, чтобы разделять логику и применение конкретных вызовов API. Можно использоваться любые доступные источники. В том числе и kafka.

Такие абстракции как “DataFrame” позволяют это сделать легко и просто.

При использовании Python данные придется хранить в файлах.

Let’s block ads! (Why?)

Read More

Recent Posts

Apple возобновила переговоры с OpenAI и Google для интеграции ИИ в iPhone

Apple возобновила переговоры с OpenAI о возможности внедрения ИИ-технологий в iOS 18, на основе данной операционной системы будут работать новые…

1 день ago

Российская «дочка» Google подготовила 23 иска к крупнейшим игрокам рекламного рынка

Конкурсный управляющий российской «дочки» Google подготовил 23 иска к участникам рекламного рынка. Общая сумма исков составляет 16 млрд рублей –…

1 день ago

Google завершил обновление основного алгоритма March 2024 Core Update

Google завершил обновление основного алгоритма March 2024 Core Update. Раскатка обновлений была завершена 19 апреля, но сообщил об этом поисковик…

1 день ago

Нейросети будут писать тексты объявления за продавцов на Авито

У частных продавцов на Авито появилась возможность составлять текст объявлений с помощью нейросети. Новый функционал доступен в категории «Обувь, одежда,…

1 день ago

Объявлены победители международной премии Workspace Digital Awards-2024

24 апреля 2024 года в Москве состоялась церемония вручения наград международного конкурса Workspace Digital Awards. В этом году участниками стали…

2 дня ago

Яндекс проведет гик-фестиваль Young Con

27 июня Яндекс проведет гик-фестиваль Young Con для студентов и молодых специалистов, которые интересуются технологиями и хотят работать в IT.…

2 дня ago