@@ -20,7 +20,11 @@ import {
20
20
MemberSegmentAffiliation ,
21
21
MemberSegmentAffiliationJoined ,
22
22
} from '../../types/memberSegmentAffiliationTypes'
23
- import { SegmentData } from '../../types/segmentTypes'
23
+ import {
24
+ SegmentData ,
25
+ SegmentProjectGroupNestedData ,
26
+ SegmentProjectNestedData ,
27
+ } from '../../types/segmentTypes'
24
28
import { AttributeData } from '../attributes/attribute'
25
29
import SequelizeFilterUtils from '../utils/sequelizeFilterUtils'
26
30
import { IRepositoryOptions } from './IRepositoryOptions'
@@ -1036,6 +1040,281 @@ class MemberRepository {
1036
1040
} )
1037
1041
}
1038
1042
1043
+ static async findAndCountActiveOpensearch (
1044
+ filter : IActiveMemberFilter ,
1045
+ limit : number ,
1046
+ offset : number ,
1047
+ orderBy : string ,
1048
+ options : IRepositoryOptions ,
1049
+ attributesSettings = [ ] as AttributeData [ ] ,
1050
+ segments : string [ ] = [ ] ,
1051
+ ) : Promise < PageData < IActiveMemberData > > {
1052
+ const segmentsEnabled = await isFeatureEnabled ( FeatureFlag . SEGMENTS , options )
1053
+
1054
+ let originalSegment
1055
+
1056
+ if ( segmentsEnabled ) {
1057
+ if ( segments . length !== 1 ) {
1058
+ throw new Error400 (
1059
+ `This operation can have exactly one segment. Found ${ segments . length } segments.` ,
1060
+ )
1061
+ }
1062
+ originalSegment = segments [ 0 ]
1063
+
1064
+ const segmentRepository = new SegmentRepository ( options )
1065
+
1066
+ const segment = await segmentRepository . findById ( originalSegment )
1067
+
1068
+ if ( segment === null ) {
1069
+ return {
1070
+ rows : [ ] ,
1071
+ count : 0 ,
1072
+ limit,
1073
+ offset,
1074
+ }
1075
+ }
1076
+
1077
+ if ( SegmentRepository . isProjectGroup ( segment ) ) {
1078
+ segments = ( segment as SegmentProjectGroupNestedData ) . projects . reduce ( ( acc , p ) => {
1079
+ acc . push ( ...p . subprojects . map ( ( sp ) => sp . id ) )
1080
+ return acc
1081
+ } , [ ] )
1082
+ } else if ( SegmentRepository . isProject ( segment ) ) {
1083
+ segments = ( segment as SegmentProjectNestedData ) . subprojects . map ( ( sp ) => sp . id )
1084
+ } else {
1085
+ segments = [ originalSegment ]
1086
+ }
1087
+ } else {
1088
+ originalSegment = ( await new SegmentRepository ( options ) . getDefaultSegment ( ) ) . id
1089
+ }
1090
+
1091
+ const activityPageSize = 100
1092
+ let activityOffset = 0
1093
+
1094
+ const activityQuery = {
1095
+ query : {
1096
+ bool : {
1097
+ must : [
1098
+ {
1099
+ range : {
1100
+ date_timestamp : {
1101
+ gte : filter . activityTimestampFrom ,
1102
+ lte : filter . activityTimestampTo ,
1103
+ } ,
1104
+ } ,
1105
+ } ,
1106
+ ] ,
1107
+ } ,
1108
+ } ,
1109
+ aggs : {
1110
+ group_by_member : {
1111
+ terms : {
1112
+ field : 'uuid_memberId' ,
1113
+ size : 10000000 ,
1114
+ } ,
1115
+ aggs : {
1116
+ activity_count : {
1117
+ value_count : {
1118
+ field : 'uuid_id' ,
1119
+ } ,
1120
+ } ,
1121
+ active_days_count : {
1122
+ cardinality : {
1123
+ field : 'date_timestamp' ,
1124
+ script : {
1125
+ source : "doc['date_timestamp'].value.toInstant().toEpochMilli()/86400000" ,
1126
+ } ,
1127
+ } ,
1128
+ } ,
1129
+ active_members_bucket_sort : {
1130
+ bucket_sort : {
1131
+ sort : [ { activity_count : { order : 'desc' } } ] ,
1132
+ size : activityPageSize ,
1133
+ from : activityOffset ,
1134
+ } ,
1135
+ } ,
1136
+ } ,
1137
+ } ,
1138
+ } ,
1139
+ size : 0 ,
1140
+ } as any
1141
+
1142
+ if ( filter . platforms ) {
1143
+ const subQueries = filter . platforms . map ( ( p ) => ( { match_phrase : { keyword_platform : p } } ) )
1144
+
1145
+ activityQuery . query . bool . must . push ( {
1146
+ bool : {
1147
+ should : subQueries ,
1148
+ } ,
1149
+ } )
1150
+ }
1151
+
1152
+ if ( filter . activityIsContribution === true ) {
1153
+ activityQuery . query . bool . must . push ( {
1154
+ term : {
1155
+ bool_isContribution : true ,
1156
+ } ,
1157
+ } )
1158
+ }
1159
+
1160
+ if ( segmentsEnabled ) {
1161
+ const subQueries = segments . map ( ( s ) => ( { term : { uuid_segmentId : s } } ) )
1162
+
1163
+ activityQuery . query . bool . must . push ( {
1164
+ bool : {
1165
+ should : subQueries ,
1166
+ } ,
1167
+ } )
1168
+ }
1169
+
1170
+ const direction = orderBy . split ( '_' ) [ 1 ] . toLowerCase ( ) === 'desc' ? 'desc' : 'asc'
1171
+ if ( orderBy . startsWith ( 'activityCount' ) ) {
1172
+ activityQuery . aggs . group_by_member . aggs . active_members_bucket_sort . bucket_sort . sort = [
1173
+ { activity_count : { order : direction } } ,
1174
+ ]
1175
+ } else if ( orderBy . startsWith ( 'activeDaysCount' ) ) {
1176
+ activityQuery . aggs . group_by_member . aggs . active_members_bucket_sort . bucket_sort . sort = [
1177
+ { active_days_count : { order : direction } } ,
1178
+ ]
1179
+ } else {
1180
+ throw new Error ( `Invalid order by: ${ orderBy } ` )
1181
+ }
1182
+
1183
+ const memberIds = [ ]
1184
+ let memberMap = { }
1185
+ let activities
1186
+
1187
+ do {
1188
+ activities = await options . opensearch . search ( {
1189
+ index : OpenSearchIndex . ACTIVITIES ,
1190
+ body : activityQuery ,
1191
+ } )
1192
+
1193
+ memberIds . push ( ...activities . body . aggregations . group_by_member . buckets . map ( ( b ) => b . key ) )
1194
+
1195
+ memberMap = {
1196
+ ...memberMap ,
1197
+ ...activities . body . aggregations . group_by_member . buckets . reduce ( ( acc , b ) => {
1198
+ acc [ b . key ] = {
1199
+ activityCount : b . activity_count ,
1200
+ activeDaysCount : b . active_days_count ,
1201
+ }
1202
+
1203
+ return acc
1204
+ } , { } ) ,
1205
+ }
1206
+
1207
+ activityOffset += activityPageSize
1208
+
1209
+ // update page
1210
+ activityQuery . aggs . group_by_member . aggs . active_members_bucket_sort . bucket_sort . from =
1211
+ activityOffset
1212
+ } while ( activities . body . aggregations . group_by_member . buckets . length === activityPageSize )
1213
+
1214
+ if ( memberIds . length === 0 ) {
1215
+ return {
1216
+ rows : [ ] ,
1217
+ count : 0 ,
1218
+ limit,
1219
+ offset,
1220
+ }
1221
+ }
1222
+
1223
+ const memberQueryPayload = {
1224
+ and : [
1225
+ {
1226
+ id : {
1227
+ in : memberIds ,
1228
+ } ,
1229
+ } ,
1230
+ ] ,
1231
+ } as any
1232
+
1233
+ if ( filter . isBot === true ) {
1234
+ memberQueryPayload . and . push ( {
1235
+ isBot : {
1236
+ eq : true ,
1237
+ } ,
1238
+ } )
1239
+ } else if ( filter . isBot === false ) {
1240
+ memberQueryPayload . and . push ( {
1241
+ isBot : {
1242
+ not : true ,
1243
+ } ,
1244
+ } )
1245
+ }
1246
+
1247
+ if ( filter . isTeamMember === true ) {
1248
+ memberQueryPayload . and . push ( {
1249
+ isTeamMember : {
1250
+ eq : true ,
1251
+ } ,
1252
+ } )
1253
+ } else if ( filter . isTeamMember === false ) {
1254
+ memberQueryPayload . and . push ( {
1255
+ isTeamMember : {
1256
+ not : true ,
1257
+ } ,
1258
+ } )
1259
+ }
1260
+
1261
+ if ( filter . isOrganization === true ) {
1262
+ memberQueryPayload . and . push ( {
1263
+ isOrganization : {
1264
+ eq : true ,
1265
+ } ,
1266
+ } )
1267
+ } else if ( filter . isOrganization === false ) {
1268
+ memberQueryPayload . and . push ( {
1269
+ isOrganization : {
1270
+ not : true ,
1271
+ } ,
1272
+ } )
1273
+ }
1274
+
1275
+ // to retain the sort came from activity query
1276
+ const customSortFunction = {
1277
+ _script : {
1278
+ type : 'number' ,
1279
+ script : {
1280
+ lang : 'painless' ,
1281
+ source : `
1282
+ def memberId = doc['uuid_memberId'].value;
1283
+ return params.memberIds.indexOf(memberId);
1284
+ ` ,
1285
+ params : {
1286
+ memberIds : memberIds . map ( ( i ) => `${ i } ` ) ,
1287
+ } ,
1288
+ } ,
1289
+ order : 'asc' ,
1290
+ } ,
1291
+ }
1292
+
1293
+ const members = await this . findAndCountAllOpensearch (
1294
+ {
1295
+ filter : memberQueryPayload ,
1296
+ attributesSettings,
1297
+ segments : [ originalSegment ] ,
1298
+ countOnly : false ,
1299
+ limit,
1300
+ offset,
1301
+ customSortFunction,
1302
+ } ,
1303
+ options ,
1304
+ )
1305
+
1306
+ return {
1307
+ rows : members . rows . map ( ( m ) => {
1308
+ m . activityCount = memberMap [ m . id ] . activityCount . value
1309
+ m . activeDaysCount = memberMap [ m . id ] . activeDaysCount . value
1310
+ return m
1311
+ } ) ,
1312
+ count : members . count ,
1313
+ offset,
1314
+ limit,
1315
+ }
1316
+ }
1317
+
1039
1318
static async findAndCountActive (
1040
1319
filter : IActiveMemberFilter ,
1041
1320
limit : number ,
@@ -1623,16 +1902,13 @@ class MemberRepository {
1623
1902
countOnly = false ,
1624
1903
attributesSettings = [ ] as AttributeData [ ] ,
1625
1904
segments = [ ] as string [ ] ,
1905
+ customSortFunction = undefined ,
1626
1906
} ,
1627
1907
options : IRepositoryOptions ,
1628
1908
) : Promise < PageData < any > > {
1629
1909
const tenant = SequelizeRepository . getCurrentTenant ( options )
1630
1910
1631
- if ( segments . length !== 1 ) {
1632
- throw new Error400 (
1633
- `This operation can have exactly one segment. Found ${ segments . length } segments.` ,
1634
- )
1635
- }
1911
+ const segmentsEnabled = await isFeatureEnabled ( FeatureFlag . SEGMENTS , options )
1636
1912
1637
1913
const segment = segments [ 0 ]
1638
1914
@@ -1663,7 +1939,7 @@ class MemberRepository {
1663
1939
} ,
1664
1940
} )
1665
1941
1666
- if ( await isFeatureEnabled ( FeatureFlag . SEGMENTS , options ) ) {
1942
+ if ( segmentsEnabled ) {
1667
1943
// add segment filter
1668
1944
parsed . query . bool . must . push ( {
1669
1945
term : {
@@ -1672,6 +1948,10 @@ class MemberRepository {
1672
1948
} )
1673
1949
}
1674
1950
1951
+ if ( customSortFunction ) {
1952
+ parsed . sort = customSortFunction
1953
+ }
1954
+
1675
1955
const countResponse = await options . opensearch . count ( {
1676
1956
index : OpenSearchIndex . MEMBERS ,
1677
1957
body : { query : parsed . query } ,
@@ -1691,7 +1971,6 @@ class MemberRepository {
1691
1971
body : parsed ,
1692
1972
} )
1693
1973
1694
- // const translated = response.body.hits.hits[0]._source
1695
1974
const translatedRows = response . body . hits . hits . map ( ( o ) =>
1696
1975
translator . translateObjectToCrowd ( o . _source ) ,
1697
1976
)
0 commit comments