Быстро извлекать уникальные целые числа из массива PySpark DataFrame списков целых чисел?


Предположим, что у вас есть фрейм данных Pyspark, df:

DataFrame[set_sid_index: array<int>]

Это выглядит так:

+--------------------+
|       set_sid_index|
+--------------------+
|           [8, 0, 1]|
|              [8, 1]|
|                 [9]|
|                 [0]|
|                 [2]|
|           [0, 1, 3]|
|           [8, 0, 1]|
|[22, 2, 6, 0, 1, 21]|
|  [2, 0, 1, 4, 5, 3]|
|              [0, 1]|
|           [0, 1, 3]|
|              [0, 1]|
|                 [9]|
|      [2, 105, 4, 3]|
+--------------------+

И еще один фрейм данных PySpark, df2:

DataFrame[set_sid_index: array<int>]

+--------------------+
|       set_sid_index|
+--------------------+
|           [8, 0, 1]|
+--------------------+

Как бы вы преобразовали элементы списков в массиве df таким образом, что любой элемент, который не является {0, 1, 8} (уникальные элементы df2), преобразуется в "0", или "1", или "8"?

--- разъяснение вышеуказанного пункта - - -

Для моего конкретного случая использования я должен был бы найти uniq, который был бы набором уникальные элементы из массива списков целых чисел. Поэтому, чтобы быть точным, в приведенном выше примере df2 был только один список с уникальными значениями (0, 1, 8). В действительности, df2 будет иметь несколько списков с перекрывающимися значениями. Мне бы понадобилось uniq = unique(df2values). Как мне это сделать?

1 2

1 ответ:

Я немного озадачен вашим "преобразованием либо в 0, либо в 1, либо в 8"; поэтому давайте уточним:

Если элемент 1-го df не находится в массиве [0, 1, 8], мы преобразуем его в 0

Учитывая эту квалификацию, давайте начнем.

У нас есть:

from pyspark.sql.functions import udf
from pyspark.sql.types import *
uniq = [8, 0, 1]
sdf.show()
+--------------------+
|       set_sid_index|
+--------------------+
|           [8, 0, 1]|
|              [8, 1]|
|                 [9]|
|                 [0]|
|                 [2]|
|           [0, 1, 3]|
|           [8, 0, 1]|
|[22, 2, 6, 0, 1, 21]|
|  [2, 0, 1, 4, 5, 3]|
|              [0, 1]|
|           [0, 1, 3]|
|              [0, 1]|
|                 [9]|
|      [2, 105, 4, 3]|
+--------------------+


sdf.printSchema()
root
 |-- set_sid_index: array (nullable = true)
 |    |-- element: long (containsNull = true)
Теперь давайте определим простую udf и применим ее:
convertToZero = udf(lambda x: [0 if i not in uniq else i for i in x], ArrayType(IntegerType()))
sdf.withColumn('set_sid_index', convertToZero(sdf['set_sid_index'])).show(truncate=False)
+------------------+
|set_sid_index     |
+------------------+
|[8, 0, 1]         |
|[8, 1]            |
|[0]               |
|[0]               |
|[0]               |
|[0, 1, 0]         |
|[8, 0, 1]         |
|[0, 0, 0, 0, 1, 0]|
|[0, 0, 1, 0, 0, 0]|
|[0, 1]            |
|[0, 1, 0]         |
|[0, 1]            |
|[0]               |
|[0, 0, 0, 0]      |
+------------------+

Обновить

Предположим, что у вас нет легкодоступного массива uniq.

Затем:

sdf2.show()
+--------------------+
|       set_sid_index|
+--------------------+
|[22, 2, 6, 0, 1, 21]|
|  [2, 0, 1, 4, 5, 3]|
|              [0, 1]|
+--------------------+

x = sdf2.withColumn('set_sid_index', explode(sdf2['set_sid_index'])).drop_duplicates().collect()
uniq = [i[0] for i in x]
uniq
[0, 22, 6, 5, 1, 3, 2, 4, 21]