在Python中实现可观察集合的推荐方法是什么?

18 浏览
0 Comments

在Python中实现可观察集合的推荐方法是什么?

我想在Python中使用一些可观察的集合/序列,允许我监听变化事件,比如添加新项或更新项:

list = ObservableList(['a','b','c'])
list.addChangeListener(lambda new_value: print(new_value))
list.append('a') # => 应该触发附加的变化监听器
data_frame = ObservableDataFrame({'x': [1,2,3], 'y':[10,20,30]})
data_frame.addChangeListener(update_dependent_table_cells) # => 只允许更新依赖的单元格而不是整个表格

A.

我找到了一个提供可观察集合实现的项目,看起来很有前景:

https://github.com/dimsf/Python-observable-collections

它可以满足我的需求:

from observablelist import ObservableList
def listHandler(event):
    if event.action == 'itemsUpdated':
        print event.action + ', old items: ' + str(event.oldItems) + ' new items: ' + str(event.newItems) + ' at index: ' + str(event.index)
    elif event.action == 'itemsAdded' or event.action == 'itemsRemoved':
        print(event.action + ', items: ' + str(event.items) + ' at index: ' + str(event.index))
myList = ObservableList()
myList.attach(listHandler)
#进行一些突变操作,就像普通列表一样。
myList.append(10)
myList.insert(3, 0)

然而,最后一次更改是6年前的事情,我想知道是否有一些更现代或内置的Python替代方案?

B. 我还找到了RxPy:https://github.com/ReactiveX/RxPY

import rx
list = ["Alpha", "Beta", "Gamma"]
source = rx.from_(list)
source.subscribe(
   lambda value: print(value),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
) 

是否有可能保持订阅开启,以便我在订阅后能够向列表中追加新值?虚拟代码:

source.subscribe(..., keep_open = True)
source.append("Delta")  # <= 不起作用;没有append方法
source.close()

换句话说:我能/应该将RxPy源作为可观察集合使用吗?

C. 在Python中有许多不同的方法来处理事件和实现观察者模式:

Event system in Python

Python Observer Pattern: Examples, Tips?

alternate ways to implement observer pattern in python

Using decorators to implement Observer Pattern in Python3

=> 在Python中实现可观察集合的推荐/Pythonic方法是什么?我应该使用(过时的?)A.还是B.的改编形式(似乎有不同的用途?)还是来自C.的其他策略?

=> 有计划以某种方式标准化这些可能性,并直接在Python中包含默认的可观察集合吗?

相关问题,特定于数据框:

How to make tables/spreadsheets (e.g. pandas DataFrame) observable, use triggers or change events?

0
0 Comments

建议在Python中实现可观察集合的方法是使用RxPy库。然而,最后一次更新是在2018年,似乎还不支持RxPY 3.x版本。这可能导致一些问题。有一个基于RxPy的实现可观察集合的项目,但它可能需要一些修改才能与最新版本的RxPY兼容。有关该项目的更多信息可以在以下链接中找到:https://github.com/shyam-s00/ObservableCollections。这个项目提供了ObservableList、ObservableDict和ObservableSet三个类。如果你想了解更多关于这个项目的问题和讨论,可以查看以下链接:https://github.com/shyam-s00/ObservableCollections/issues/1。以下是一个使用ObservableList的示例代码:

from reactive.ObservableList import ObservableList
ol = ObservableList([1, 2, 3, 4])
ol.when_collection_changes() \
    .map(lambda x: x.Items) \
    .subscribe(print, print)
ol.append(5)

此外,还可以查看以下链接了解更多关于使用RxPY实现可观察集合的问题:https://github.com/ReactiveX/RxPY/issues/553

0
0 Comments

推荐在Python中实现可观察集合的方法

在Python中实现可观察集合的推荐方法是使用RxPy库,它是一个非常接近JavaScript/TypeScript rx模式的实现。

首先,您需要一个可观察对象,您可以使用它来推送数据和观察数据。这就是一个subject,可能是一个行为subject或重放subject。创建subject,然后使用on_next()操作符将新值推送到其中。

对于第二个问题,您似乎想要将多个可观察对象合并为一个可观察对象。有多种方法可以实现这一点,但最有可能的是,您要寻找的是CombineLatest或Concat。请查看操作符的文档。

如果按照您的第二个示例,代码将如下所示:

from rx.subject.subject import Subject
list = ["Alpha", "Beta", "Gamma"]
# assuming that you want each item to be emitted one after the other
subject = Subject()
subject.subscribe(
    lambda value: print(value),
    on_error = lambda e: print("Error : {0}".format(e)),
    on_completed = lambda: print("Job Done!")
)
subject.on_next('Alpha')
subject.on_next('Beta')
subject.on_next('Gamma')
subject.on_next('Delta')

如果您使用的是BehaviorSubject,您将能够提供一个初始值,并且当一个新的观察者订阅时,它将接收到最后一个发出的值。

如果您使用的是ReplaySubject,您可以提供值,然后订阅,观察者将接收到主题在此之前发出的所有值。

0