Skip to content

Python API Documentation

SnipeSig

A class to handle Sourmash signatures with additional functionalities such as customized set operations and abundance management.

Source code in src/snipe/api/snipe_sig.py
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
class SnipeSig:
    """
    A class to handle Sourmash signatures with additional functionalities
    such as customized set operations and abundance management.
    """

    def __init__(self, *, 
                 sourmash_sig: Union[str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature], 
                 sig_type=SigType.SAMPLE, enable_logging: bool = False, **kwargs):
        r"""
        Initialize the SnipeSig with a sourmash signature object or a path to a signature.

        Parameters:
            sourmash_sig (str or sourmash.signature.SourmashSignature): A path to a signature file or a signature object.
            ksize (int): K-mer size.
            scale (int): Scale value.
            sig_type (SigType): Type of the signature.
            enable_logging (bool): Flag to enable detailed logging.
            **kwargs: Additional keyword arguments.
        """
        # Initialize logging based on the flag
        self.logger = logging.getLogger(self.__class__.__name__)

        # Configure the logger
        if enable_logging:
            self.logger.setLevel(logging.DEBUG)
            if not self.logger.hasHandlers():
                # Create console handler
                ch = logging.StreamHandler()
                ch.setLevel(logging.DEBUG)
                # Create formatter
                formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
                # Add formatter to handler
                ch.setFormatter(formatter)
                # Add handler to logger
                self.logger.addHandler(ch)
            self.logger.debug("Logging is enabled for SnipeSig.")
        else:
            self.logger.setLevel(logging.CRITICAL)

        # Initialize internal variables
        self.logger.debug("Initializing SnipeSig with sourmash_sig: %s", sourmash_sig)

        self._scale: int = None
        self._ksize: int = None
        self._md5sum: str = None
        self._hashes = np.array([], dtype=np.uint64)
        self._abundances = np.array([], dtype=np.uint32)
        self._type: SigType = sig_type
        self._name: str = None
        self._filename: str = None
        self._track_abundance: bool = True

        sourmash_sigs: Dict[str, sourmash.signature.SourmashSignature] = {}
        _sourmash_sig: Union[sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature] = None

        self.chr_to_sig: Dict[str, SnipeSig] = {}


        self.logger.debug("Proceeding with a sigtype of %s", sig_type)

        if not isinstance(sourmash_sig, (str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature)):
            # if the str is not a file path
            self.logger.error("Invalid type for sourmash_sig: %s", type(sourmash_sig).__name__)
            raise TypeError(f"sourmash_sig must be a file path, sourmash.signature.SourmashSignature, or Frozensourmash_signature, got {type(sourmash_sig).__name__}")

        # Case 1: If sourmash_sig is already a valid sourmash signature object
        if isinstance(sourmash_sig, (sourmash.signature.FrozenSourmashSignature, sourmash.signature.SourmashSignature)):
            self.logger.debug("Loaded sourmash signature directly from object.")
            sourmash_sigs = {sourmash_sig.name: sourmash_sig}

        # Case 2: If sourmash_sig is a string, try to load as JSON or a file
        elif isinstance(sourmash_sig, str):
            self.logger.debug("Attempting to load sourmash signature from string input.")

            # First, try loading from JSON
            sourmash_sigs = self._try_load_from_json(sourmash_sig)
            self.logger.debug("Loaded sourmash signature from JSON: %s", sourmash_sigs)

            # If JSON loading fails, try loading from file
            if not sourmash_sigs:
                sourmash_sigs = self._try_load_from_file(sourmash_sig)

            # If both attempts fail, raise an error
            if not sourmash_sigs:
                self.logger.error("Failed to load sourmash signature from the provided string.")
                raise ValueError("An unexpected error occurred while loading the sourmash signature.")

        if sig_type == SigType.SAMPLE or sig_type == SigType.AMPLICON:
            if len(sourmash_sigs) > 1:
                self.logger.debug("Multiple signatures found in the input. Expected a single sample signature.")
                # not supported at this time
                raise ValueError("Loading multiple sample signatures is not supported at this time.")
            elif len(sourmash_sigs) == 1:
                self.logger.debug("Found a single signature in the sample sig input; Will use this signature.")
                _sourmash_sig = list(sourmash_sigs.values())[0]
            else:
                self.logger.debug("No signature found in the input. Expected a single sample signature.")
                raise ValueError("No signature found in the input. Expected a single sample signature.")

        elif sig_type == SigType.GENOME:
            if len(sourmash_sigs) > 1:
                for signame, sig in sourmash_sigs.items():
                    self.logger.debug(f"Iterating over signature: {signame}")
                    if signame.endswith("-snipegenome"):
                        sig = sig.to_mutable()
                        # self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.GENOME, enable_logging=enable_logging)
                        sig.name = sig.name.replace("-snipegenome", "")
                        self.logger.debug("Found a genome signature with the snipe suffix `-snipegenome`. Restoring original name `%s`.", sig.name)
                        _sourmash_sig = sig
                    elif signame.startswith("sex-"):
                        self.logger.debug("Found a sex chr signature %s", signame)
                        sig = sig.to_mutable()
                        # sig.name = signame.replace("sex-","")
                        self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
                    elif signame.startswith("autosome-"):
                        self.logger.debug("Found an autosome signature %s", signame)
                        sig = sig.to_mutable()
                        # sig.name = signame.replace("autosome-","")
                        self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
                    else:
                        continue
                else:
                    if not _sourmash_sig:
                        self.logger.debug("Found multiple signature per the genome file, but none with the snipe suffix `-snipegenome`.")
                        raise ValueError("Found multiple signature per the genome file, but none with the snipe suffix `-snipegenome`.")
            elif len(sourmash_sigs) == 1:
                self.logger.debug("Found a single signature in the genome sig input; Will use this signature.")
                _sourmash_sig = list(sourmash_sigs.values())[0]
        else:
            self.logger.debug("Unknown sigtype: %s", sig_type)
            raise ValueError(f"Unknown sigtype: {sig_type}")

        self.logger.debug("Length of currently loaded signature: %d, with name: %s", len(_sourmash_sig), _sourmash_sig.name)

        # Extract properties from the loaded signature
        self._ksize = _sourmash_sig.minhash.ksize
        self._scale = _sourmash_sig.minhash.scaled
        self._md5sum = _sourmash_sig.md5sum()
        self._name = _sourmash_sig.name
        self._filename = _sourmash_sig.filename
        self._track_abundance = _sourmash_sig.minhash.track_abundance

        if self._name.endswith("-snipesample"):
            self._name = self._name.replace("-snipesample", "")
            self.logger.debug("Found a sample signature with the snipe suffix `-snipesample`. Restoring original name `%s`.", self._name)
        elif self._name.endswith("-snipeamplicon"):
            self._name = self._name.replace("-snipeamplicon", "")
            self.logger.debug("Found an amplicon signature with the snipe suffix `-snipeamplicon`. Restoring original name `%s`.", self._name)

        # If the signature does not track abundance, assume abundance of 1 for all hashes
        if not self._track_abundance:
            self.logger.debug("Signature does not track abundance. Setting all abundances to 1.")
            self._abundances = np.ones(len(_sourmash_sig.minhash.hashes), dtype=np.uint32)
            # self._track_abundance = True
        else:
            self._abundances = np.array(list(_sourmash_sig.minhash.hashes.values()), dtype=np.uint32)

        self._hashes = np.array(list(_sourmash_sig.minhash.hashes.keys()), dtype=np.uint64)

        # Sort the hashes and rearrange abundances accordingly
        sorted_indices = np.argsort(self._hashes)
        self._hashes = self._hashes[sorted_indices]
        self._abundances = self._abundances[sorted_indices]

        self.logger.debug(
            "Loaded sourmash signature from file: %s, name: %s, md5sum: %s, ksize: %d, scale: %d, "
            "track_abundance: %s, type: %s, length: %d",
            self._filename, self._name, self._md5sum, self._ksize, self._scale,
            self._track_abundance, self._type, len(self._hashes)
        )
        self.logger.debug("Hashes sorted during initialization.")
        self.logger.debug("Sourmash signature loading completed successfully.")

    def _try_load_from_json(self, sourmash_sig: str) -> Union[List[sourmash.signature.SourmashSignature], None]:
        r"""
        Attempt to load sourmash signature from JSON string.

        Parameters:
            sourmash_sig (str): JSON string representing a sourmash signature.

        Returns:
            sourmash.signature.SourmashSignature or None if loading fails.
        """
        try:
            self.logger.debug("Trying to load sourmash signature from JSON.")
            list_of_sigs = list(sourmash.load_signatures_from_json(sourmash_sig))
            return {sig.name: sig for sig in list_of_sigs}
        except Exception as e:
            self.logger.debug("Loading from JSON failed. Proceeding to file loading.", exc_info=e)
            return None  # Return None to indicate failure

    def _try_load_from_file(self, sourmash_sig_path: str) -> Union[List[sourmash.signature.SourmashSignature], None]:
        r"""
        Attempt to load sourmash signature(s) from a file.

        Parameters:
            sourmash_sig_path (str): File path to a sourmash signature.

        Returns:
            sourmash.signature.SourmashßSignature, list of sourmash.signature.SourmashSignature, or None if loading fails.
        """
        self.logger.debug("Trying to load sourmash signature from file.")
        try:
            signatures = list(sourmash.load_file_as_signatures(sourmash_sig_path))
            self.logger.debug("Loaded %d sourmash signature(s) from file.", len(signatures))
            sigs_dict = {_sig.name: _sig for _sig in signatures}
            self.logger.debug("Loaded sourmash signatures into sigs_dict: %s", sigs_dict)
            return sigs_dict
        except Exception as e:
            self.logger.exception("Failed to load the sourmash signature from the file.", exc_info=e)
            raise ValueError("An unexpected error occurred while loading the sourmash signature.") from e

    # Setters and getters
    @property
    def hashes(self) -> np.ndarray:
        r"""Return a copy of the hashes array."""
        return self._hashes.view()

    @property
    def abundances(self) -> np.ndarray:
        r"""Return a copy of the abundances array."""
        return self._abundances.view()

    @property
    def md5sum(self) -> str:
        r"""Return the MD5 checksum of the signature."""
        return self._md5sum

    @property
    def ksize(self) -> int:
        r"""Return the k-mer size."""
        return self._ksize

    @property
    def scale(self) -> int:
        r"""Return the scale value."""
        return self._scale

    @property
    def name(self) -> str:
        r"""Return the name of the signature."""
        return self._name

    @property
    def filename(self) -> str:
        r"""Return the filename of the signature."""
        return self._filename

    @property
    def sigtype(self) -> SigType:
        r"""Return the type of the signature."""
        return self._type

    @property
    def track_abundance(self) -> bool:
        r"""Return whether the signature tracks abundance."""
        return self._track_abundance

    # Basic class methods
    def get_name(self) -> str:
        r"""Get the name of the signature."""
        return self._name

    # setter sigtype
    @sigtype.setter
    def sigtype(self, sigtype: SigType):
        r"""
        Set the type of the signature.
        """
        self._type = sigtype

    @track_abundance.setter
    def track_abundance(self, track_abundance: bool):
        r"""
        Set whether the signature tracks abundance.
        """
        self._track_abundance = track_abundance

    def get_info(self) -> dict:
        r"""
        Get information about the signature.

        Returns:
            dict: A dictionary containing signature information.
        """
        info = {
            "name": self._name,
            "filename": self._filename,
            "md5sum": self._md5sum,
            "ksize": self._ksize,
            "scale": self._scale,
            "track_abundance": self._track_abundance,
            "sigtype": self._type,
            "num_hashes": len(self._hashes)
        }
        return info

    def __len__(self) -> int:
        r"""Return the number of hashes in the signature."""
        return len(self._hashes)

    def __iter__(self) -> Iterator[tuple]:
        r"""
        Iterate over the hashes and their abundances.

        Yields:
            tuple: A tuple containing (hash, abundance).
        """
        for h, a in zip(self._hashes, self._abundances):
            yield (h, a)

    def __contains__(self, hash_value: int) -> bool:
        r"""
        Check if a hash is present in the signature.

        Parameters:
            hash_value (int): The hash value to check.

        Returns:
            bool: True if the hash is present, False otherwise.
        """
        # Utilize binary search since hashes are sorted
        index = np.searchsorted(self._hashes, hash_value)
        if index < len(self._hashes) and self._hashes[index] == hash_value:
            return True
        return False

    def __repr__(self) -> str:
        return (f"SnipeSig(name={self._name}, ksize={self._ksize}, scale={self._scale}, "
                f"type={self._type}, num_hashes={len(self._hashes)})")

    def __str__(self) -> str:
        return self.__repr__()

    def __verify_snipe_signature(self, other: 'SnipeSig'):
        r"""
        Verify that the other object is a SnipeSig instance.

        Parameters:
            other (SnipeSig): The other signature to verify.

        Raises:
            ValueError: If the other object is not a SnipeSig instance.
        """
        if not isinstance(other, SnipeSig):
            msg = f"Provided sig ({type(other).__name__}) is not a SnipeSig instance."
            self.logger.error(msg)
            raise ValueError(msg)

    def __verify_matching_ksize_scale(self, other: 'SnipeSig'):
        r"""
        Verify that the ksize and scale match between two signatures.

        Parameters:
            other (SnipeSig): The other signature to compare.

        Raises:
            ValueError: If ksize or scale do not match.
        """
        if self._ksize != other.ksize:
            _e_msg = f"K-mer size does not match between the two signatures: {self._ksize} vs {other.ksize}."
            self.logger.error(_e_msg)
            raise ValueError(_e_msg)
        if self._scale != other.scale:
            _e_msg = f"Scale value does not match between the two signatures: {self._scale} vs {other.scale}."
            self.logger.error(_e_msg)
            raise ValueError(_e_msg)

    def _validate_abundance_operation(self, value: Union[int, None], operation: str):
        r"""
        Validate that the signature tracks abundance and that the provided value is a non-negative integer.

        Parameters:
            value (int or None): The abundance value to validate. Can be None for operations that don't require a value.
            operation (str): Description of the operation for logging purposes.

        Raises:
            ValueError: If the signature does not track abundance or if the value is invalid.
        """
        if not self._track_abundance and self.sigtype == SigType.SAMPLE:
            self.logger.error("Cannot %s: signature does not track abundance.", operation)
            raise ValueError("Signature does not track abundance.")

        if value is not None:
            if not isinstance(value, int) or value < 0:
                self.logger.error("%s requires a non-negative integer value.", operation.capitalize())
                raise ValueError(f"{operation.capitalize()} requires a non-negative integer value.")

    # Mask application method
    def _apply_mask(self, mask: np.ndarray):
        r"""
        Apply a boolean mask to the hashes and abundances arrays.
        Ensures that the sorted order is preserved.

        Parameters:
            mask (np.ndarray): Boolean array indicating which elements to keep.
        """
        self._hashes = self._hashes[mask]
        self._abundances = self._abundances[mask]

        # Verify that the hashes remain sorted
        if self._hashes.size > 1:
            if not np.all(self._hashes[:-1] <= self._hashes[1:]):
                self.logger.error("Hashes are not sorted after applying mask.")
                raise RuntimeError("Hashes are not sorted after applying mask.")
        self.logger.debug("Applied mask. Hashes remain sorted.")

    # Set operation methods
    def union_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Combine this signature with another by summing abundances where hashes overlap.

        Given two signatures \( A \) and \( B \) with hash sets \( H_A \) and \( H_B \),
        and their corresponding abundance functions \( a_A \) and \( a_B \), the union
        signature \( C \) is defined as follows:

        - **Hash Set**: 

        $$
        H_C = H_A \cup H_B
        $$

        - **Abundance Function**:

        $$
        a_C(h) =
        \begin{cases} 
            a_A(h) + a_B(h), & \text{if } h \in H_A \cap H_B \\
            a_A(h), & \text{if } h \in H_A \setminus H_B \\
            a_B(h), & \text{if } h \in H_B \setminus H_A
        \end{cases}
        $$
        """
        self.__verify_snipe_signature(other)
        self.__verify_matching_ksize_scale(other)

        self.logger.debug("Unioning signatures (including all unique hashes).")

        # Access internal arrays directly
        self_hashes = self._hashes
        self_abundances = self._abundances
        other_hashes = other._hashes
        other_abundances = other._abundances

        # Handle the case where 'other' does not track abundance
        if not other.track_abundance:
            self.logger.debug("Other signature does not track abundance. Setting abundances to 1.")
            other_abundances = np.ones_like(other_abundances, dtype=np.uint32)

        # Combine hashes and abundances
        combined_hashes = np.concatenate((self_hashes, other_hashes))
        combined_abundances = np.concatenate((self_abundances, other_abundances))

        # Use numpy's unique function with return_inverse to sum abundances efficiently
        unique_hashes, inverse_indices = np.unique(combined_hashes, return_inverse=True)
        summed_abundances = np.zeros_like(unique_hashes, dtype=np.uint32)

        # Sum abundances for duplicate hashes
        np.add.at(summed_abundances, inverse_indices, combined_abundances)

        # Handle potential overflow
        summed_abundances = np.minimum(summed_abundances, np.iinfo(np.uint32).max)

        self.logger.debug("Union operation completed. Total hashes: %d", len(unique_hashes))

        # Create a new SnipeSig instance
        return self.create_from_hashes_abundances(
            hashes=unique_hashes,
            abundances=summed_abundances,
            ksize=self._ksize,
            scale=self._scale,
            name=f"{self._name}_union_{other._name}",
            filename=None,
            enable_logging=self.logger.level <= logging.DEBUG
        )

    def _convert_to_sourmash_signature(self):
        r"""
        Convert the SnipeSig instance to a sourmash.signature.SourmashSignature object.

        Returns:
            sourmash.signature.SourmashSignature: A new sourmash.signature.SourmashSignature instance.
        """
        self.logger.debug("Converting SnipeSig to sourmash.signature.SourmashSignature.")

        mh = sourmash.minhash.MinHash(n=0, ksize=self._ksize, scaled=self._scale, track_abundance=self._track_abundance)
        if self._track_abundance:
            mh.set_abundances(dict(zip(self._hashes, self._abundances)))
        else:
            mh.add_many(self._hashes)
        self.sourmash_sig = sourmash.signature.SourmashSignature(mh, name=self._name, filename=self._filename)
        self.logger.debug("Conversion to sourmash.signature.SourmashSignature completed.")

    def export(self, path, force=False) -> None:
        r"""
        Export the signature to a file.

        Parameters:
            path (str): The path to save the signature to.
            force (bool): Flag to overwrite the file if it already exists.
        """
        self._convert_to_sourmash_signature()
        if path.endswith(".sig"):
            self.logger.debug("Exporting signature to a .sig file.")
            with open(str(path), "wb") as fp:
                sourmash.signature.save_signatures_to_json([self.sourmash_sig], fp)
        # sourmash.save_load.SaveSignatures_SigFile

        elif path.endswith(".zip"):
            if os.path.exists(path): 
                raise FileExistsError("Output file already exists.")
            try:
                with sourmash.save_load.SaveSignatures_ZipFile(path) as save_sigs:
                    save_sigs.add(self.sourmash_sig)
            except Exception as e:
                self.logger.error("Failed to export signatures to zip: %s", e)
                raise Exception(f"Failed to export signatures to zip: {e}") from e
        else:
            raise ValueError("Output file must be either a .sig or .zip file.")



    def export_to_string(self):
        r"""
        Export the signature to a JSON string.

        Returns:
            str: JSON string representation of the signature.
        """
        self._convert_to_sourmash_signature()
        return sourmash.signature.save_signatures_to_json([self.sourmash_sig]).decode('utf-8')

    def intersection_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Compute the intersection of the current signature with another signature.

        This method keeps only the hashes that are common to both signatures, and retains the abundances from self.

        **Mathematical Explanation**:

        Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
        and abundance functions \( a_A(h) \) and \( a_B(h) \), the intersection signature \( C \) has:

        - Hash set:
        $$
        H_C = H_A \cap H_B
        $$

        - Abundance function:
        $$
        a_C(h) = a_A(h), \quad \text{for } h \in H_C
        $$

        **Parameters**:
            - `other (SnipeSig)`: Another `SnipeSig` instance to intersect with.

        **Returns**:
            - `SnipeSig`: A new `SnipeSig` instance representing the intersection of the two signatures.

        **Raises**:
            - `ValueError`: If `ksize` or `scale` do not match between signatures.
        """
        self.__verify_snipe_signature(other)
        self.__verify_matching_ksize_scale(other)

        self.logger.debug("Intersecting signatures.")

        # Use numpy's intersect1d function
        common_hashes, self_indices, _ = np.intersect1d(
            self._hashes, other._hashes, assume_unique=True, return_indices=True
        )

        if common_hashes.size == 0:
            self.logger.debug("No common hashes found. Returning an empty signature.")
            return self.create_from_hashes_abundances(
                hashes=np.array([], dtype=np.uint64),
                abundances=np.array([], dtype=np.uint32),
                ksize=self._ksize,
                scale=self._scale,
                name=f"{self._name}_intersection_{other._name}",
                filename=None,
                enable_logging=self.logger.level <= logging.DEBUG
            )

        # Get the abundances from self
        common_abundances = self._abundances[self_indices]

        self.logger.debug("Intersection operation completed. Total common hashes: %d", len(common_hashes))

        # Create a new SnipeSig instance
        return self.create_from_hashes_abundances(
            hashes=common_hashes,
            abundances=common_abundances,
            ksize=self._ksize,
            scale=self._scale,
            name=f"{self._name}_intersection_{other._name}",
            filename=None,
            enable_logging=self.logger.level <= logging.DEBUG
        )

    def difference_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Compute the difference of the current signature with another signature.

        This method removes hashes that are present in the other signature from self,
        keeping the abundances from self.

        **Mathematical Explanation**:

        Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
        and abundance function \( a_A(h) \), the difference signature \( C \) has:

        - Hash set:
        $$
        H_C = H_A \setminus H_B
        $$

        - Abundance function:
        $$
        a_C(h) = a_A(h), \quad \text{for } h \in H_C
        $$

        **Parameters**:
            - `other (SnipeSig)`: Another `SnipeSig` instance to subtract from the current signature.

        **Returns**:
            - `SnipeSig`: A new `SnipeSig` instance representing the difference of the two signatures.

        **Raises**:
            - `ValueError`: If `ksize` or `scale` do not match between signatures.
            - `RuntimeError`: If zero hashes remain after difference.
        """
        self.__verify_snipe_signature(other)
        self.__verify_matching_ksize_scale(other)

        self.logger.debug("Differencing signatures.")

        # Use numpy's setdiff1d function
        diff_hashes = np.setdiff1d(self._hashes, other._hashes, assume_unique=True)

        if diff_hashes.size == 0:
            _e_msg = f"Difference operation resulted in zero hashes, which is not allowed for {self._name} and {other._name}."
            self.logger.warning(_e_msg)

        # Get the indices of the hashes in self
        mask = np.isin(self._hashes, diff_hashes, assume_unique=True)
        diff_abundances = self._abundances[mask]

        self.logger.debug("Difference operation completed. Remaining hashes: %d", len(diff_hashes))

        # Create a new SnipeSig instance
        return self.create_from_hashes_abundances(
            hashes=diff_hashes,
            abundances=diff_abundances,
            ksize=self._ksize,
            scale=self._scale,
            name=f"{self._name}_difference_{other._name}",
            filename=None,
            enable_logging=self.logger.level <= logging.DEBUG
        )

    def symmetric_difference_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Compute the symmetric difference of the current signature with another signature.

        This method retains hashes that are unique to each signature, with their respective abundances.

        **Mathematical Explanation**:

        Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
        and abundance functions \( a_A(h) \) and \( a_B(h) \), the symmetric difference signature \( C \) has:

        - Hash set:
        $$
        H_C = (H_A \setminus H_B) \cup (H_B \setminus H_A)
        $$

        - Abundance function:
        $$
        a_C(h) =
        \begin{cases}
        a_A(h), & \text{for } h \in H_A \setminus H_B \\
        a_B(h), & \text{for } h \in H_B \setminus H_A \\
        \end{cases}
        $$

        **Parameters**:
            - `other (SnipeSig)`: Another `SnipeSig` instance to compute the symmetric difference with.

        **Returns**:
            - `SnipeSig`: A new `SnipeSig` instance representing the symmetric difference of the two signatures.

        **Raises**:
            - `ValueError`: If `ksize` or `scale` do not match between signatures.
            - `RuntimeError`: If zero hashes remain after symmetric difference.
        """
        self.__verify_snipe_signature(other)
        self.__verify_matching_ksize_scale(other)

        self.logger.debug("Computing symmetric difference of signatures.")

        # Hashes unique to self and other
        unique_self_hashes = np.setdiff1d(self._hashes, other._hashes, assume_unique=True)
        unique_other_hashes = np.setdiff1d(other._hashes, self._hashes, assume_unique=True)

        # Abundances for unique hashes
        mask_self = np.isin(self._hashes, unique_self_hashes, assume_unique=True)
        unique_self_abundances = self._abundances[mask_self]

        mask_other = np.isin(other._hashes, unique_other_hashes, assume_unique=True)
        unique_other_abundances = other._abundances[mask_other]

        # Handle the case where 'other' does not track abundance
        if not other.track_abundance:
            self.logger.debug("Other signature does not track abundance. Setting abundances to 1.")
            unique_other_abundances = np.ones_like(unique_other_abundances, dtype=np.uint32)

        # Combine hashes and abundances
        combined_hashes = np.concatenate((unique_self_hashes, unique_other_hashes))
        combined_abundances = np.concatenate((unique_self_abundances, unique_other_abundances))

        if combined_hashes.size == 0:
            _e_msg = "Symmetric difference operation resulted in zero hashes, which is not allowed."
            self.logger.error(_e_msg)
            raise RuntimeError(_e_msg)

        # Sort combined hashes and abundances
        sorted_indices = np.argsort(combined_hashes)
        combined_hashes = combined_hashes[sorted_indices]
        combined_abundances = combined_abundances[sorted_indices]

        self.logger.debug("Symmetric difference operation completed. Total unique hashes: %d", len(combined_hashes))

        # Create a new SnipeSig instance
        return self.create_from_hashes_abundances(
            hashes=combined_hashes,
            abundances=combined_abundances,
            ksize=self._ksize,
            scale=self._scale,
            name=f"{self._name}_symmetric_difference_{other._name}",
            filename=None,
            enable_logging=self.logger.level <= logging.DEBUG
        )

    # Magic methods for union operations
    def __add__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the + operator.
        Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
        returning a new signature.

        Returns:
            SnipeSig: Union of self and other.
        """
        return self.union_sigs(other)

    def __iadd__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the += operator.
        Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
        modifying self in-place.

        Returns:
            SnipeSig: Updated self after addition.
        """
        union_sig = self.union_sigs(other)
        self._update_from_union(union_sig)
        return self

    def __or__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the | operator.
        Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
        returning a new signature.

        Returns:
            SnipeSig: Union of self and other.
        """
        return self.union_sigs(other)

    def __ior__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the |= operator.
        Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
        modifying self in-place.

        Returns:
            SnipeSig: Updated self after union.
        """
        union_sig = self.union_sigs(other)
        self._update_from_union(union_sig)
        return self

    def __sub__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the - operator.
        Removes hashes present in other from self, keeping abundances from self,
        returning a new signature.

        Returns:
            SnipeSig: Difference of self and other.
        """
        return self.difference_sigs(other)

    def __isub__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the -= operator.
        Removes hashes present in other from self, keeping abundances from self,
        modifying self in-place.

        Returns:
            SnipeSig: Updated self after difference.

        Raises:
            RuntimeError: If zero hashes remain after difference.
        """
        difference_sig = self.difference_sigs(other)
        self._update_from_union(difference_sig)
        return self

    def __xor__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the ^ operator.
        Keeps unique hashes from each signature with their respective abundances, returning a new signature.

        Returns:
            SnipeSig: Symmetric difference of self and other.
        """
        return self.symmetric_difference_sigs(other)

    def __ixor__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the ^= operator.
        Keeps unique hashes from each signature with their respective abundances, modifying self in-place.

        Returns:
            SnipeSig: Updated self after symmetric difference.

        Raises:
            RuntimeError: If zero hashes remain after symmetric difference.
        """
        symmetric_diff_sig = self.symmetric_difference_sigs(other)
        self._update_from_union(symmetric_diff_sig)
        return self

    def __and__(self, other: 'SnipeSig') -> 'SnipeSig':
        r"""
        Implements the & operator.
        Keeps common hashes and retains abundances from self only, returning a new signature.

        Returns:
            SnipeSig: Intersection of self and other.
        """
        return self.intersection_sigs(other)

    def _update_from_union(self, other: 'SnipeSig'):
        r"""
        Update self's hashes and abundances from another SnipeSig instance.

        Parameters:
            other (SnipeSig): The other SnipeSig instance to update from.
        """
        self._hashes = other.hashes
        self._abundances = other.abundances
        self._name = other.name
        self._filename = other.filename
        self._md5sum = other.md5sum
        self._track_abundance = other.track_abundance
        # No need to update ksize and scale since they are verified to match

    @classmethod
    def create_from_hashes_abundances(cls, hashes: np.ndarray, abundances: np.ndarray,
                                      ksize: int, scale: int, name: str = None,
                                      filename: str = None, enable_logging: bool = False, sig_type: SigType = SigType.SAMPLE) -> 'SnipeSig':
        """
        Internal method to create a SnipeSig instance from hashes and abundances.

        Parameters:
            hashes (np.ndarray): Array of hash values.
            abundances (np.ndarray): Array of abundance values corresponding to the hashes.
            ksize (int): K-mer size.
            scale (int): Scale value.
            name (str): Optional name for the signature.
            filename (str): Optional filename for the signature.
            sig_type (SigType): Type of the signature.
            enable_logging (bool): Flag to enable logging.

        Returns:
            SnipeSig: A new SnipeSig instance.
        """
        # Create a mock sourmash signature object
        mh = sourmash.minhash.MinHash(n=0, ksize=ksize, scaled=scale, track_abundance=True)
        mh.set_abundances(dict(zip(hashes, abundances)))
        sig = sourmash.signature.SourmashSignature(mh, name=name or "", filename=filename or "")
        return cls(sourmash_sig=sig, sig_type=sig_type, enable_logging=enable_logging)

    # Aggregation Operations
    @classmethod
    def sum_signatures(cls, signatures: List['SnipeSig'], name: str = "summed_signature",
                       filename: str = None, enable_logging: bool = False) -> 'SnipeSig':

        r"""
        Sum multiple SnipeSig instances by including all unique hashes and summing their abundances where hashes overlap.
        This method utilizes a heap-based multi-way merge algorithm for enhanced efficiency when handling thousands of signatures.

        $$
        \text{Sum}(A_1, A_2, \dots, A_n) = \bigcup_{i=1}^{n} A_i
        $$

        For each hash \( h \), its total abundance is:
        $$
        \text{abundance}(h) = \sum_{i=1}^{n} \text{abundance}_i(h)
        $$

        **Mathematical Explanation**:

        - **Union of Signatures**:
            The summation of signatures involves creating a union of all unique k-mers (hashes) present across the input signatures.

        - **Total Abundance Calculation**:
            For each unique hash \( h \), the total abundance is the sum of its abundances across all signatures where it appears.

        - **Algorithm Efficiency**:
            By using a min-heap to perform a multi-way merge of sorted hash arrays, the method ensures that each hash is processed in ascending order without the need to store all hashes in memory simultaneously.

        **Parameters**:
            - `signatures (List[SnipeSig])`: List of `SnipeSig` instances to sum.
            - `name (str)`: Optional name for the resulting signature.
            - `filename (str)`: Optional filename for the resulting signature.
            - `enable_logging (bool)`: Flag to enable detailed logging.

        **Returns**:
            - `SnipeSig`: A new `SnipeSig` instance representing the sum of the signatures.

        **Raises**:
            - `ValueError`: If the signatures list is empty or if `ksize`/`scale` do not match across signatures.
            - `RuntimeError`: If an error occurs during the summation process.
        """
        if not signatures:
            raise ValueError("No signatures provided for summation.")

        # Verify that all signatures have the same ksize, scale, and track_abundance
        first_sig = signatures[0]
        ksize = first_sig.ksize
        scale = first_sig.scale
        track_abundance = first_sig.track_abundance

        for sig in signatures[1:]:
            if sig.ksize != ksize or sig.scale != scale:
                raise ValueError("All signatures must have the same ksize and scale.")

        # Initialize iterators for each signature's hashes and abundances
        iterators = []
        for sig in signatures:
            it = iter(zip(sig.hashes, sig.abundances))
            try:
                first_hash, first_abundance = next(it)
                iterators.append((first_hash, first_abundance, it))
            except StopIteration:
                continue  # Skip empty signatures

        if not iterators:
            raise ValueError("All provided signatures are empty.")

        # Initialize the heap with the first element from each iterator
        heap = []
        for idx, (hash_val, abundance, it) in enumerate(iterators):
            heap.append((hash_val, abundance, idx))
        heapq.heapify(heap)

        # Prepare lists to collect the summed hashes and abundances
        summed_hashes = []
        summed_abundances = []

        while heap:
            current_hash, current_abundance, idx = heapq.heappop(heap)
            # Initialize total abundance for the current_hash
            total_abundance = current_abundance

            # Check if the next element in the heap has the same hash
            while heap and heap[0][0] == current_hash:
                _, abundance, same_idx = heapq.heappop(heap)
                total_abundance += abundance
                # Push the next element from the same iterator
                try:
                    next_hash, next_abundance = next(iterators[same_idx][2])
                    heapq.heappush(heap, (next_hash, next_abundance, same_idx))
                except StopIteration:
                    pass  # No more elements in this iterator

            # Append the summed hash and abundance
            summed_hashes.append(current_hash)
            summed_abundances.append(total_abundance)

            # Push the next element from the current iterator
            try:
                next_hash, next_abundance = next(iterators[idx][2])
                heapq.heappush(heap, (next_hash, next_abundance, idx))
            except StopIteration:
                pass  # No more elements in this iterator

        # Convert the results to NumPy arrays for efficient storage and processing
        summed_hashes = np.array(summed_hashes, dtype=np.uint64)
        summed_abundances = np.array(summed_abundances, dtype=np.uint32)

        # Handle potential overflow by capping at the maximum value of uint32
        summed_abundances = np.minimum(summed_abundances, np.iinfo(np.uint32).max)

        # Create a new SnipeSig instance from the summed hashes and abundances
        summed_signature = cls.create_from_hashes_abundances(
            hashes=summed_hashes,
            abundances=summed_abundances,
            ksize=ksize,
            scale=scale,
            name=name,
            filename=filename,
            enable_logging=enable_logging
        )

        return summed_signature

    @staticmethod
    def get_unique_signatures(signatures: Dict[str, 'SnipeSig']) -> Dict[str, 'SnipeSig']:
        """
        Extract unique signatures from a dictionary of SnipeSig instances.

        For each signature, the unique_sig contains only the hashes that do not overlap with any other signature.

        Parameters:
            signatures (Dict[str, SnipeSig]): A dictionary mapping signature names to SnipeSig instances.

        Returns:
            Dict[str, SnipeSig]: A dictionary mapping signature names to their unique SnipeSig instances.

        Raises:
            ValueError: If the input dictionary is empty or if signatures have mismatched ksize/scale.
        """
        if not signatures:
            raise ValueError("The input signatures dictionary is empty.")

        # Extract ksize and scale from the first signature
        first_name, first_sig = next(iter(signatures.items()))
        ksize = first_sig.ksize
        scale = first_sig.scale

        # Verify that all signatures have the same ksize and scale
        for name, sig in signatures.items():
            if sig.ksize != ksize or sig.scale != scale:
                raise ValueError(f"Signature '{name}' has mismatched ksize or scale.")

        # Aggregate all hashes from all signatures
        all_hashes = np.concatenate([sig.hashes for sig in signatures.values()])

        # Count the occurrences of each hash
        unique_hashes, counts = np.unique(all_hashes, return_counts=True)

        # Identify hashes that are unique across all signatures (count == 1)
        unique_across_all = unique_hashes[counts == 1]

        # Convert to a set for faster membership testing
        unique_set = set(unique_across_all)

        unique_signatures = {}

        for name, sig in signatures.items():
            # Find hashes in the current signature that are unique across all signatures
            mask_unique = np.isin(sig.hashes, list(unique_set))

            # Extract unique hashes and their abundances
            unique_hashes_sig = sig.hashes[mask_unique]
            unique_abundances_sig = sig.abundances[mask_unique]

            # Create a new SnipeSig instance with the unique hashes and abundances
            unique_sig = SnipeSig.create_from_hashes_abundances(
                hashes=unique_hashes_sig,
                abundances=unique_abundances_sig,
                ksize=ksize,
                scale=scale,
                name=f"{name}_unique",
                filename=None,
                enable_logging=False,  # Set to True if you want logging for the new signatures
                sig_type=SigType.SAMPLE  # Adjust sig_type as needed
            )

            unique_signatures[name] = unique_sig

        return unique_signatures


    @classmethod
    def common_hashes(cls, signatures: List['SnipeSig'], name: str = "common_hashes_signature",
                      filename: str = None, enable_logging: bool = False) -> 'SnipeSig':
        r"""
        Compute the intersection of multiple SnipeSig instances, returning a new SnipeSig containing
        only the hashes present in all signatures, with abundances set to the minimum abundance across signatures.

        This method uses a heap-based multi-way merge algorithm for efficient computation,
        especially when handling a large number of signatures with sorted hashes.

        **Mathematical Explanation**:

        Given signatures \( A_1, A_2, \dots, A_n \) with hash sets \( H_1, H_2, \dots, H_n \),
        the intersection signature \( C \) has:

        - Hash set:
        $$
        H_C = \bigcap_{i=1}^{n} H_i
        $$

        - Abundance function:
        $$
        a_C(h) = \min_{i=1}^{n} a_i(h), \quad \text{for } h \in H_C
        $$

        **Parameters**:
            - `signatures (List[SnipeSig])`: List of `SnipeSig` instances to compute the intersection.
            - `name (str)`: Optional name for the resulting signature.
            - `filename (str)`: Optional filename for the resulting signature.
            - `enable_logging (bool)`: Flag to enable detailed logging.

        **Returns**:
            - `SnipeSig`: A new `SnipeSig` instance representing the intersection of the signatures.

        **Raises**:
            - `ValueError`: If the signatures list is empty or if `ksize`/`scale` do not match across signatures.
        """
        if not signatures:
            raise ValueError("No signatures provided for intersection.")

        # Verify that all signatures have the same ksize and scale
        first_sig = signatures[0]
        ksize = first_sig.ksize
        scale = first_sig.scale
        for sig in signatures[1:]:
            if sig.ksize != ksize or sig.scale != scale:
                raise ValueError("All signatures must have the same ksize and scale.")

        num_signatures = len(signatures)
        iterators = []
        for sig in signatures:
            it = iter(zip(sig.hashes, sig.abundances))
            try:
                first_hash, first_abundance = next(it)
                iterators.append((first_hash, first_abundance, it))
            except StopIteration:
                # One of the signatures is empty; intersection is empty
                return cls.create_from_hashes_abundances(
                    hashes=np.array([], dtype=np.uint64),
                    abundances=np.array([], dtype=np.uint32),
                    ksize=ksize,
                    scale=scale,
                    name=name,
                    filename=filename,
                    enable_logging=enable_logging
                )

        # Initialize the heap with the first element from each iterator
        heap = []
        for idx, (hash_val, abundance, it) in enumerate(iterators):
            heap.append((hash_val, abundance, idx))
        heapq.heapify(heap)

        common_hashes = []
        common_abundances = []

        while heap:
            # Pop all entries with the smallest hash
            current_hash, current_abundance, idx = heapq.heappop(heap)
            same_hash_entries = [(current_hash, current_abundance, idx)]

            # Collect all entries in the heap that have the same current_hash
            while heap and heap[0][0] == current_hash:
                h, a, i = heapq.heappop(heap)
                same_hash_entries.append((h, a, i))

            if len(same_hash_entries) == num_signatures:
                # The current_hash is present in all signatures
                # Take the minimum abundance across signatures
                min_abundance = min(entry[1] for entry in same_hash_entries)
                common_hashes.append(current_hash)
                common_abundances.append(min_abundance)

            # Push the next element from each iterator that had the current_hash
            for entry in same_hash_entries:
                h, a, i = entry
                try:
                    next_hash, next_abundance = next(iterators[i][2])
                    heapq.heappush(heap, (next_hash, next_abundance, i))
                except StopIteration:
                    pass  # Iterator exhausted

        # Convert the results to NumPy arrays
        if not common_hashes:
            # No common hashes found
            unique_hashes = np.array([], dtype=np.uint64)
            unique_abundances = np.array([], dtype=np.uint32)
        else:
            unique_hashes = np.array(common_hashes, dtype=np.uint64)
            unique_abundances = np.array(common_abundances, dtype=np.uint32)

        # Create a new SnipeSig instance from the common hashes and abundances
        common_signature = cls.create_from_hashes_abundances(
            hashes=unique_hashes,
            abundances=unique_abundances,
            ksize=ksize,
            scale=scale,
            name=name,
            filename=filename,
            enable_logging=enable_logging
        )

        return common_signature

    def copy(self) -> 'SnipeSig':
        r"""
        Create a copy of the current SnipeSig instance.

        Returns:
            SnipeSig: A new instance that is a copy of self.
        """
        return SnipeSig(sourmash_sig=self.export_to_string(), sig_type=self.sigtype, enable_logging=self.logger.level <= logging.DEBUG)

    # Implement the __radd__ method to support sum()
    def __radd__(self, other: Union[int, 'SnipeSig']) -> 'SnipeSig':
        r"""
        Implements the right-hand + operator to support sum().

        Returns:
            SnipeSig: Union of self and other.
        """
        return self.__radd_sum__(other)

    # Override the __sum__ method
    def __radd_sum__(self, other: Union[int, 'SnipeSig']) -> 'SnipeSig':
        r"""
        Internal helper method to support the sum() function.

        Parameters:
            other (int or SnipeSig): The other object to add. If other is 0, return self.

        Returns:
            SnipeSig: The result of the addition.
        """
        if other == 0:
            return self
        if not isinstance(other, SnipeSig):
            raise TypeError(f"Unsupported operand type(s) for +: 'SnipeSig' and '{type(other).__name__}'")
        return self.union_sigs(other)

    def reset_abundance(self, new_abundance: int = 1):
        r"""
        Reset all abundances to a specified value.

        This method sets the abundance of every hash in the signature to the specified `new_abundance` value.

        **Mathematical Explanation**:

        For each hash \( h \) in the signature, the abundance function is updated to:
        $$
        a(h) = \text{new\_abundance}
        $$

        **Parameters**:
            - `new_abundance (int)`: The new abundance value to set for all hashes. Default is 1.

        **Raises**:
            - `ValueError`: If the signature does not track abundance or if `new_abundance` is invalid.
        """

        self._validate_abundance_operation(new_abundance, "reset abundance")

        self._abundances[:] = new_abundance
        self.track_abundance = True
        self.logger.debug("Reset all abundances to %d.", new_abundance)

    def keep_min_abundance(self, min_abundance: int):
        r"""
        Keep only hashes with abundances greater than or equal to a minimum threshold.

        This method removes hashes whose abundances are less than the specified `min_abundance`.

        **Mathematical Explanation**:

        The updated hash set \( H' \) is:
        $$
        H' = \{ h \in H \mid a(h) \geq \text{min\_abundance} \}
        $$

        **Parameters**:
            - `min_abundance (int)`: The minimum abundance threshold.

        **Raises**:
            - `ValueError`: If the signature does not track abundance or if `min_abundance` is invalid.
        """
        self._validate_abundance_operation(min_abundance, "keep minimum abundance")

        mask = self._abundances >= min_abundance
        self._apply_mask(mask)
        self.logger.debug("Kept hashes with abundance >= %d.", min_abundance)

    def keep_max_abundance(self, max_abundance: int):
        r"""
        Keep only hashes with abundances less than or equal to a maximum threshold.

        This method removes hashes whose abundances are greater than the specified `max_abundance`.

        **Mathematical Explanation**:

        The updated hash set \( H' \) is:
        $$
        H' = \{ h \in H \mid a(h) \leq \text{max\_abundance} \}
        $$

        **Parameters**:
            - `max_abundance (int)`: The maximum abundance threshold.

        **Raises**:
            - `ValueError`: If the signature does not track abundance or if `max_abundance` is invalid.
        """
        self._validate_abundance_operation(max_abundance, "keep maximum abundance")

        mask = self._abundances <= max_abundance
        self._apply_mask(mask)
        self.logger.debug("Kept hashes with abundance <= %d.", max_abundance)

    def trim_below_median(self):
        r"""
        Trim hashes with abundances below the median abundance.

        This method removes all hashes whose abundances are less than the median abundance of the signature.

        **Mathematical Explanation**:

        Let \\( m \\) be the median of \\( \\{ a(h) \mid h \in H \\} \\).
        The updated hash set \\( H' \\) is:

        $$
        H' = \\{ h \in H \mid a(h) \geq m \\}
        $$

        **Raises**:
            - `ValueError`: If the signature does not track abundance.
        """

        self._validate_abundance_operation(None, "trim below median")

        if len(self._abundances) == 0:
            self.logger.debug("No hashes to trim based on median abundance.")
            return

        median = np.median(self._abundances)
        mask = self._abundances >= median
        self._apply_mask(mask)
        self.logger.debug("Trimmed hashes with abundance below median (%f).", median)

    def count_singletons(self) -> int:
        r"""
        Return the number of hashes with abundance equal to 1.

        Returns:
            int: Number of singletons.

        Raises:
            ValueError: If the signature does not track abundance.
        """
        self._validate_abundance_operation(None, "count singletons")

        count = np.sum(self._abundances == 1)
        self.logger.debug("Number of singletons (abundance == 1): %d", count)
        return int(count)

    def trim_singletons(self):
        r"""
        Remove hashes with abundance equal to 1.

        This method removes all hashes that are singletons (abundance equals 1).

        **Mathematical Explanation**:

        The updated hash set \( H' \) is:
        $$
        H' = \{ h \in H \mid a(h) \neq 1 \}
        $$

        **Raises**:
            - `ValueError`: If the signature does not track abundance.
        """
        self._validate_abundance_operation(None, "trim singletons")

        mask = self._abundances != 1
        self.logger.debug("Trimming %d hashes with abundance equal to 1.", np.sum(~mask))
        self._apply_mask(mask)
        self.logger.debug("Size after trimming singletons: %d", len(self._hashes)) 

    # New Properties Implemented as per Request

    @property
    def total_abundance(self) -> int:
        r"""
        Return the total abundance (sum of all abundances).

        Returns:
            int: Total abundance.
        """
        self._validate_abundance_operation(None, "calculate total abundance")

        total = int(np.sum(self._abundances))
        self.logger.debug("Total abundance: %d", total)
        return total

    @property
    def mean_abundance(self) -> float:
        r"""
        Return the mean (average) abundance.

        Returns:
            float: Mean abundance.
        """
        self._validate_abundance_operation(None, "calculate mean abundance")

        if len(self._abundances) == 0:
            self.logger.debug("No abundances to calculate mean.")
            return 0.0

        mean = float(np.mean(self._abundances))  # Changed to float
        self.logger.debug("Mean abundance: %f", mean)
        return mean

    @property
    def get_sample_stats(self) -> dict:
        r"""
        Retrieve statistical information about the signature.

        This property computes and returns a dictionary containing various statistics of the signature, such as total abundance, mean and median abundances, number of singletons, and total number of hashes.

        **Returns**:
            - `dict`: A dictionary containing sample statistics:
                - `total_abundance`: Sum of abundances.
                - `mean_abundance`: Mean abundance.
                - `median_abundance`: Median abundance.
                - `num_singletons`: Number of hashes with abundance equal to 1.
                - `num_hashes`: Total number of hashes.
                - `ksize`: K-mer size.
                - `scale`: Scale value.
                - `name`: Name of the signature.
                - `filename`: Filename of the signature.
        """

        # if self.sigtype != SigType.SAMPLE then don't return abundance stats

        stats = {
            "num_hashes": len(self._hashes),
            "ksize": self._ksize,
            "scale": self._scale,
            "name": self._name,
            "filename": self._filename
        }

        if self.sigtype != SigType.SAMPLE:
            stats["total_abundance"] = None
            stats["mean_abundance"] = None
            stats["median_abundance"] = None
            stats["num_singletons"] = None
        else:
            stats["total_abundance"] = self.total_abundance
            stats["mean_abundance"] = self.mean_abundance
            stats["median_abundance"] = self.median_abundance
            stats["num_singletons"] = self.count_singletons()

        return stats

    @property
    def median_abundance(self) -> float:
        r"""
        Return the median abundance.

        Returns:
            float: Median abundance.

        Raises:
            ValueError: If the signature does not track abundance.
        """
        self._validate_abundance_operation(None, "calculate median abundance")

        if len(self._abundances) == 0:
            self.logger.debug("No abundances to calculate median.")
            return 0.0

        median = float(np.median(self._abundances))  # Changed to float
        self.logger.debug("Median abundance: %f", median)
        return median

abundances: np.ndarray property

Return a copy of the abundances array.

filename: str property

Return the filename of the signature.

get_sample_stats: dict property

Retrieve statistical information about the signature.

This property computes and returns a dictionary containing various statistics of the signature, such as total abundance, mean and median abundances, number of singletons, and total number of hashes.

Returns: - dict: A dictionary containing sample statistics: - total_abundance: Sum of abundances. - mean_abundance: Mean abundance. - median_abundance: Median abundance. - num_singletons: Number of hashes with abundance equal to 1. - num_hashes: Total number of hashes. - ksize: K-mer size. - scale: Scale value. - name: Name of the signature. - filename: Filename of the signature.

hashes: np.ndarray property

Return a copy of the hashes array.

ksize: int property

Return the k-mer size.

md5sum: str property

Return the MD5 checksum of the signature.

mean_abundance: float property

Return the mean (average) abundance.

Returns:

Name Type Description
float float

Mean abundance.

median_abundance: float property

Return the median abundance.

Returns:

Name Type Description
float float

Median abundance.

Raises:

Type Description
ValueError

If the signature does not track abundance.

name: str property

Return the name of the signature.

scale: int property

Return the scale value.

sigtype: SigType property writable

Return the type of the signature.

total_abundance: int property

Return the total abundance (sum of all abundances).

Returns:

Name Type Description
int int

Total abundance.

track_abundance: bool property writable

Return whether the signature tracks abundance.

__add__(other)

Implements the + operator. Includes all unique hashes from both signatures and sums their abundances where hashes overlap, returning a new signature.

Returns:

Name Type Description
SnipeSig SnipeSig

Union of self and other.

Source code in src/snipe/api/snipe_sig.py
def __add__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the + operator.
    Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
    returning a new signature.

    Returns:
        SnipeSig: Union of self and other.
    """
    return self.union_sigs(other)

__and__(other)

Implements the & operator. Keeps common hashes and retains abundances from self only, returning a new signature.

Returns:

Name Type Description
SnipeSig SnipeSig

Intersection of self and other.

Source code in src/snipe/api/snipe_sig.py
def __and__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the & operator.
    Keeps common hashes and retains abundances from self only, returning a new signature.

    Returns:
        SnipeSig: Intersection of self and other.
    """
    return self.intersection_sigs(other)

__contains__(hash_value)

Check if a hash is present in the signature.

Parameters:

Name Type Description Default
hash_value int

The hash value to check.

required

Returns:

Name Type Description
bool bool

True if the hash is present, False otherwise.

Source code in src/snipe/api/snipe_sig.py
def __contains__(self, hash_value: int) -> bool:
    r"""
    Check if a hash is present in the signature.

    Parameters:
        hash_value (int): The hash value to check.

    Returns:
        bool: True if the hash is present, False otherwise.
    """
    # Utilize binary search since hashes are sorted
    index = np.searchsorted(self._hashes, hash_value)
    if index < len(self._hashes) and self._hashes[index] == hash_value:
        return True
    return False

__iadd__(other)

Implements the += operator. Includes all unique hashes from both signatures and sums their abundances where hashes overlap, modifying self in-place.

Returns:

Name Type Description
SnipeSig SnipeSig

Updated self after addition.

Source code in src/snipe/api/snipe_sig.py
def __iadd__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the += operator.
    Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
    modifying self in-place.

    Returns:
        SnipeSig: Updated self after addition.
    """
    union_sig = self.union_sigs(other)
    self._update_from_union(union_sig)
    return self

__init__(*, sourmash_sig, sig_type=SigType.SAMPLE, enable_logging=False, **kwargs)

Initialize the SnipeSig with a sourmash signature object or a path to a signature.

Parameters:

Name Type Description Default
sourmash_sig str or SourmashSignature

A path to a signature file or a signature object.

required
ksize int

K-mer size.

required
scale int

Scale value.

required
sig_type SigType

Type of the signature.

SAMPLE
enable_logging bool

Flag to enable detailed logging.

False
**kwargs

Additional keyword arguments.

{}
Source code in src/snipe/api/snipe_sig.py
def __init__(self, *, 
             sourmash_sig: Union[str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature], 
             sig_type=SigType.SAMPLE, enable_logging: bool = False, **kwargs):
    r"""
    Initialize the SnipeSig with a sourmash signature object or a path to a signature.

    Parameters:
        sourmash_sig (str or sourmash.signature.SourmashSignature): A path to a signature file or a signature object.
        ksize (int): K-mer size.
        scale (int): Scale value.
        sig_type (SigType): Type of the signature.
        enable_logging (bool): Flag to enable detailed logging.
        **kwargs: Additional keyword arguments.
    """
    # Initialize logging based on the flag
    self.logger = logging.getLogger(self.__class__.__name__)

    # Configure the logger
    if enable_logging:
        self.logger.setLevel(logging.DEBUG)
        if not self.logger.hasHandlers():
            # Create console handler
            ch = logging.StreamHandler()
            ch.setLevel(logging.DEBUG)
            # Create formatter
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            # Add formatter to handler
            ch.setFormatter(formatter)
            # Add handler to logger
            self.logger.addHandler(ch)
        self.logger.debug("Logging is enabled for SnipeSig.")
    else:
        self.logger.setLevel(logging.CRITICAL)

    # Initialize internal variables
    self.logger.debug("Initializing SnipeSig with sourmash_sig: %s", sourmash_sig)

    self._scale: int = None
    self._ksize: int = None
    self._md5sum: str = None
    self._hashes = np.array([], dtype=np.uint64)
    self._abundances = np.array([], dtype=np.uint32)
    self._type: SigType = sig_type
    self._name: str = None
    self._filename: str = None
    self._track_abundance: bool = True

    sourmash_sigs: Dict[str, sourmash.signature.SourmashSignature] = {}
    _sourmash_sig: Union[sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature] = None

    self.chr_to_sig: Dict[str, SnipeSig] = {}


    self.logger.debug("Proceeding with a sigtype of %s", sig_type)

    if not isinstance(sourmash_sig, (str, sourmash.signature.SourmashSignature, sourmash.signature.FrozenSourmashSignature)):
        # if the str is not a file path
        self.logger.error("Invalid type for sourmash_sig: %s", type(sourmash_sig).__name__)
        raise TypeError(f"sourmash_sig must be a file path, sourmash.signature.SourmashSignature, or Frozensourmash_signature, got {type(sourmash_sig).__name__}")

    # Case 1: If sourmash_sig is already a valid sourmash signature object
    if isinstance(sourmash_sig, (sourmash.signature.FrozenSourmashSignature, sourmash.signature.SourmashSignature)):
        self.logger.debug("Loaded sourmash signature directly from object.")
        sourmash_sigs = {sourmash_sig.name: sourmash_sig}

    # Case 2: If sourmash_sig is a string, try to load as JSON or a file
    elif isinstance(sourmash_sig, str):
        self.logger.debug("Attempting to load sourmash signature from string input.")

        # First, try loading from JSON
        sourmash_sigs = self._try_load_from_json(sourmash_sig)
        self.logger.debug("Loaded sourmash signature from JSON: %s", sourmash_sigs)

        # If JSON loading fails, try loading from file
        if not sourmash_sigs:
            sourmash_sigs = self._try_load_from_file(sourmash_sig)

        # If both attempts fail, raise an error
        if not sourmash_sigs:
            self.logger.error("Failed to load sourmash signature from the provided string.")
            raise ValueError("An unexpected error occurred while loading the sourmash signature.")

    if sig_type == SigType.SAMPLE or sig_type == SigType.AMPLICON:
        if len(sourmash_sigs) > 1:
            self.logger.debug("Multiple signatures found in the input. Expected a single sample signature.")
            # not supported at this time
            raise ValueError("Loading multiple sample signatures is not supported at this time.")
        elif len(sourmash_sigs) == 1:
            self.logger.debug("Found a single signature in the sample sig input; Will use this signature.")
            _sourmash_sig = list(sourmash_sigs.values())[0]
        else:
            self.logger.debug("No signature found in the input. Expected a single sample signature.")
            raise ValueError("No signature found in the input. Expected a single sample signature.")

    elif sig_type == SigType.GENOME:
        if len(sourmash_sigs) > 1:
            for signame, sig in sourmash_sigs.items():
                self.logger.debug(f"Iterating over signature: {signame}")
                if signame.endswith("-snipegenome"):
                    sig = sig.to_mutable()
                    # self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.GENOME, enable_logging=enable_logging)
                    sig.name = sig.name.replace("-snipegenome", "")
                    self.logger.debug("Found a genome signature with the snipe suffix `-snipegenome`. Restoring original name `%s`.", sig.name)
                    _sourmash_sig = sig
                elif signame.startswith("sex-"):
                    self.logger.debug("Found a sex chr signature %s", signame)
                    sig = sig.to_mutable()
                    # sig.name = signame.replace("sex-","")
                    self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
                elif signame.startswith("autosome-"):
                    self.logger.debug("Found an autosome signature %s", signame)
                    sig = sig.to_mutable()
                    # sig.name = signame.replace("autosome-","")
                    self.chr_to_sig[sig.name] = SnipeSig(sourmash_sig=sig, sig_type=SigType.AMPLICON, enable_logging=enable_logging)
                else:
                    continue
            else:
                if not _sourmash_sig:
                    self.logger.debug("Found multiple signature per the genome file, but none with the snipe suffix `-snipegenome`.")
                    raise ValueError("Found multiple signature per the genome file, but none with the snipe suffix `-snipegenome`.")
        elif len(sourmash_sigs) == 1:
            self.logger.debug("Found a single signature in the genome sig input; Will use this signature.")
            _sourmash_sig = list(sourmash_sigs.values())[0]
    else:
        self.logger.debug("Unknown sigtype: %s", sig_type)
        raise ValueError(f"Unknown sigtype: {sig_type}")

    self.logger.debug("Length of currently loaded signature: %d, with name: %s", len(_sourmash_sig), _sourmash_sig.name)

    # Extract properties from the loaded signature
    self._ksize = _sourmash_sig.minhash.ksize
    self._scale = _sourmash_sig.minhash.scaled
    self._md5sum = _sourmash_sig.md5sum()
    self._name = _sourmash_sig.name
    self._filename = _sourmash_sig.filename
    self._track_abundance = _sourmash_sig.minhash.track_abundance

    if self._name.endswith("-snipesample"):
        self._name = self._name.replace("-snipesample", "")
        self.logger.debug("Found a sample signature with the snipe suffix `-snipesample`. Restoring original name `%s`.", self._name)
    elif self._name.endswith("-snipeamplicon"):
        self._name = self._name.replace("-snipeamplicon", "")
        self.logger.debug("Found an amplicon signature with the snipe suffix `-snipeamplicon`. Restoring original name `%s`.", self._name)

    # If the signature does not track abundance, assume abundance of 1 for all hashes
    if not self._track_abundance:
        self.logger.debug("Signature does not track abundance. Setting all abundances to 1.")
        self._abundances = np.ones(len(_sourmash_sig.minhash.hashes), dtype=np.uint32)
        # self._track_abundance = True
    else:
        self._abundances = np.array(list(_sourmash_sig.minhash.hashes.values()), dtype=np.uint32)

    self._hashes = np.array(list(_sourmash_sig.minhash.hashes.keys()), dtype=np.uint64)

    # Sort the hashes and rearrange abundances accordingly
    sorted_indices = np.argsort(self._hashes)
    self._hashes = self._hashes[sorted_indices]
    self._abundances = self._abundances[sorted_indices]

    self.logger.debug(
        "Loaded sourmash signature from file: %s, name: %s, md5sum: %s, ksize: %d, scale: %d, "
        "track_abundance: %s, type: %s, length: %d",
        self._filename, self._name, self._md5sum, self._ksize, self._scale,
        self._track_abundance, self._type, len(self._hashes)
    )
    self.logger.debug("Hashes sorted during initialization.")
    self.logger.debug("Sourmash signature loading completed successfully.")

__ior__(other)

Implements the |= operator. Includes all unique hashes from both signatures and sums their abundances where hashes overlap, modifying self in-place.

Returns:

Name Type Description
SnipeSig SnipeSig

Updated self after union.

Source code in src/snipe/api/snipe_sig.py
def __ior__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the |= operator.
    Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
    modifying self in-place.

    Returns:
        SnipeSig: Updated self after union.
    """
    union_sig = self.union_sigs(other)
    self._update_from_union(union_sig)
    return self

__isub__(other)

Implements the -= operator. Removes hashes present in other from self, keeping abundances from self, modifying self in-place.

Returns:

Name Type Description
SnipeSig SnipeSig

Updated self after difference.

Raises:

Type Description
RuntimeError

If zero hashes remain after difference.

Source code in src/snipe/api/snipe_sig.py
def __isub__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the -= operator.
    Removes hashes present in other from self, keeping abundances from self,
    modifying self in-place.

    Returns:
        SnipeSig: Updated self after difference.

    Raises:
        RuntimeError: If zero hashes remain after difference.
    """
    difference_sig = self.difference_sigs(other)
    self._update_from_union(difference_sig)
    return self

__iter__()

Iterate over the hashes and their abundances.

Yields:

Name Type Description
tuple tuple

A tuple containing (hash, abundance).

Source code in src/snipe/api/snipe_sig.py
def __iter__(self) -> Iterator[tuple]:
    r"""
    Iterate over the hashes and their abundances.

    Yields:
        tuple: A tuple containing (hash, abundance).
    """
    for h, a in zip(self._hashes, self._abundances):
        yield (h, a)

__ixor__(other)

Implements the ^= operator. Keeps unique hashes from each signature with their respective abundances, modifying self in-place.

Returns:

Name Type Description
SnipeSig SnipeSig

Updated self after symmetric difference.

Raises:

Type Description
RuntimeError

If zero hashes remain after symmetric difference.

Source code in src/snipe/api/snipe_sig.py
def __ixor__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the ^= operator.
    Keeps unique hashes from each signature with their respective abundances, modifying self in-place.

    Returns:
        SnipeSig: Updated self after symmetric difference.

    Raises:
        RuntimeError: If zero hashes remain after symmetric difference.
    """
    symmetric_diff_sig = self.symmetric_difference_sigs(other)
    self._update_from_union(symmetric_diff_sig)
    return self

__len__()

Return the number of hashes in the signature.

Source code in src/snipe/api/snipe_sig.py
def __len__(self) -> int:
    r"""Return the number of hashes in the signature."""
    return len(self._hashes)

__or__(other)

Implements the | operator. Includes all unique hashes from both signatures and sums their abundances where hashes overlap, returning a new signature.

Returns:

Name Type Description
SnipeSig SnipeSig

Union of self and other.

Source code in src/snipe/api/snipe_sig.py
def __or__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the | operator.
    Includes all unique hashes from both signatures and sums their abundances where hashes overlap,
    returning a new signature.

    Returns:
        SnipeSig: Union of self and other.
    """
    return self.union_sigs(other)

__radd__(other)

Implements the right-hand + operator to support sum().

Returns:

Name Type Description
SnipeSig SnipeSig

Union of self and other.

Source code in src/snipe/api/snipe_sig.py
def __radd__(self, other: Union[int, 'SnipeSig']) -> 'SnipeSig':
    r"""
    Implements the right-hand + operator to support sum().

    Returns:
        SnipeSig: Union of self and other.
    """
    return self.__radd_sum__(other)

__radd_sum__(other)

Internal helper method to support the sum() function.

Parameters:

Name Type Description Default
other int or SnipeSig

The other object to add. If other is 0, return self.

required

Returns:

Name Type Description
SnipeSig SnipeSig

The result of the addition.

Source code in src/snipe/api/snipe_sig.py
def __radd_sum__(self, other: Union[int, 'SnipeSig']) -> 'SnipeSig':
    r"""
    Internal helper method to support the sum() function.

    Parameters:
        other (int or SnipeSig): The other object to add. If other is 0, return self.

    Returns:
        SnipeSig: The result of the addition.
    """
    if other == 0:
        return self
    if not isinstance(other, SnipeSig):
        raise TypeError(f"Unsupported operand type(s) for +: 'SnipeSig' and '{type(other).__name__}'")
    return self.union_sigs(other)

__sub__(other)

Implements the - operator. Removes hashes present in other from self, keeping abundances from self, returning a new signature.

Returns:

Name Type Description
SnipeSig SnipeSig

Difference of self and other.

Source code in src/snipe/api/snipe_sig.py
def __sub__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the - operator.
    Removes hashes present in other from self, keeping abundances from self,
    returning a new signature.

    Returns:
        SnipeSig: Difference of self and other.
    """
    return self.difference_sigs(other)

__verify_matching_ksize_scale(other)

Verify that the ksize and scale match between two signatures.

Parameters:

Name Type Description Default
other SnipeSig

The other signature to compare.

required

Raises:

Type Description
ValueError

If ksize or scale do not match.

Source code in src/snipe/api/snipe_sig.py
def __verify_matching_ksize_scale(self, other: 'SnipeSig'):
    r"""
    Verify that the ksize and scale match between two signatures.

    Parameters:
        other (SnipeSig): The other signature to compare.

    Raises:
        ValueError: If ksize or scale do not match.
    """
    if self._ksize != other.ksize:
        _e_msg = f"K-mer size does not match between the two signatures: {self._ksize} vs {other.ksize}."
        self.logger.error(_e_msg)
        raise ValueError(_e_msg)
    if self._scale != other.scale:
        _e_msg = f"Scale value does not match between the two signatures: {self._scale} vs {other.scale}."
        self.logger.error(_e_msg)
        raise ValueError(_e_msg)

__verify_snipe_signature(other)

Verify that the other object is a SnipeSig instance.

Parameters:

Name Type Description Default
other SnipeSig

The other signature to verify.

required

Raises:

Type Description
ValueError

If the other object is not a SnipeSig instance.

Source code in src/snipe/api/snipe_sig.py
def __verify_snipe_signature(self, other: 'SnipeSig'):
    r"""
    Verify that the other object is a SnipeSig instance.

    Parameters:
        other (SnipeSig): The other signature to verify.

    Raises:
        ValueError: If the other object is not a SnipeSig instance.
    """
    if not isinstance(other, SnipeSig):
        msg = f"Provided sig ({type(other).__name__}) is not a SnipeSig instance."
        self.logger.error(msg)
        raise ValueError(msg)

__xor__(other)

Implements the ^ operator. Keeps unique hashes from each signature with their respective abundances, returning a new signature.

Returns:

Name Type Description
SnipeSig SnipeSig

Symmetric difference of self and other.

Source code in src/snipe/api/snipe_sig.py
def __xor__(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Implements the ^ operator.
    Keeps unique hashes from each signature with their respective abundances, returning a new signature.

    Returns:
        SnipeSig: Symmetric difference of self and other.
    """
    return self.symmetric_difference_sigs(other)

common_hashes(signatures, name='common_hashes_signature', filename=None, enable_logging=False) classmethod

Compute the intersection of multiple SnipeSig instances, returning a new SnipeSig containing only the hashes present in all signatures, with abundances set to the minimum abundance across signatures.

This method uses a heap-based multi-way merge algorithm for efficient computation, especially when handling a large number of signatures with sorted hashes.

Mathematical Explanation:

Given signatures \( A_1, A_2, \dots, A_n \) with hash sets \( H_1, H_2, \dots, H_n \), the intersection signature \( C \) has:

  • Hash set: $$ H_C = \bigcap_{i=1}^{n} H_i $$

  • Abundance function: $$ a_C(h) = \min_{i=1}^{n} a_i(h), \quad \text{for } h \in H_C $$

Parameters: - signatures (List[SnipeSig]): List of SnipeSig instances to compute the intersection. - name (str): Optional name for the resulting signature. - filename (str): Optional filename for the resulting signature. - enable_logging (bool): Flag to enable detailed logging.

Returns: - SnipeSig: A new SnipeSig instance representing the intersection of the signatures.

Raises: - ValueError: If the signatures list is empty or if ksize/scale do not match across signatures.

Source code in src/snipe/api/snipe_sig.py
@classmethod
def common_hashes(cls, signatures: List['SnipeSig'], name: str = "common_hashes_signature",
                  filename: str = None, enable_logging: bool = False) -> 'SnipeSig':
    r"""
    Compute the intersection of multiple SnipeSig instances, returning a new SnipeSig containing
    only the hashes present in all signatures, with abundances set to the minimum abundance across signatures.

    This method uses a heap-based multi-way merge algorithm for efficient computation,
    especially when handling a large number of signatures with sorted hashes.

    **Mathematical Explanation**:

    Given signatures \( A_1, A_2, \dots, A_n \) with hash sets \( H_1, H_2, \dots, H_n \),
    the intersection signature \( C \) has:

    - Hash set:
    $$
    H_C = \bigcap_{i=1}^{n} H_i
    $$

    - Abundance function:
    $$
    a_C(h) = \min_{i=1}^{n} a_i(h), \quad \text{for } h \in H_C
    $$

    **Parameters**:
        - `signatures (List[SnipeSig])`: List of `SnipeSig` instances to compute the intersection.
        - `name (str)`: Optional name for the resulting signature.
        - `filename (str)`: Optional filename for the resulting signature.
        - `enable_logging (bool)`: Flag to enable detailed logging.

    **Returns**:
        - `SnipeSig`: A new `SnipeSig` instance representing the intersection of the signatures.

    **Raises**:
        - `ValueError`: If the signatures list is empty or if `ksize`/`scale` do not match across signatures.
    """
    if not signatures:
        raise ValueError("No signatures provided for intersection.")

    # Verify that all signatures have the same ksize and scale
    first_sig = signatures[0]
    ksize = first_sig.ksize
    scale = first_sig.scale
    for sig in signatures[1:]:
        if sig.ksize != ksize or sig.scale != scale:
            raise ValueError("All signatures must have the same ksize and scale.")

    num_signatures = len(signatures)
    iterators = []
    for sig in signatures:
        it = iter(zip(sig.hashes, sig.abundances))
        try:
            first_hash, first_abundance = next(it)
            iterators.append((first_hash, first_abundance, it))
        except StopIteration:
            # One of the signatures is empty; intersection is empty
            return cls.create_from_hashes_abundances(
                hashes=np.array([], dtype=np.uint64),
                abundances=np.array([], dtype=np.uint32),
                ksize=ksize,
                scale=scale,
                name=name,
                filename=filename,
                enable_logging=enable_logging
            )

    # Initialize the heap with the first element from each iterator
    heap = []
    for idx, (hash_val, abundance, it) in enumerate(iterators):
        heap.append((hash_val, abundance, idx))
    heapq.heapify(heap)

    common_hashes = []
    common_abundances = []

    while heap:
        # Pop all entries with the smallest hash
        current_hash, current_abundance, idx = heapq.heappop(heap)
        same_hash_entries = [(current_hash, current_abundance, idx)]

        # Collect all entries in the heap that have the same current_hash
        while heap and heap[0][0] == current_hash:
            h, a, i = heapq.heappop(heap)
            same_hash_entries.append((h, a, i))

        if len(same_hash_entries) == num_signatures:
            # The current_hash is present in all signatures
            # Take the minimum abundance across signatures
            min_abundance = min(entry[1] for entry in same_hash_entries)
            common_hashes.append(current_hash)
            common_abundances.append(min_abundance)

        # Push the next element from each iterator that had the current_hash
        for entry in same_hash_entries:
            h, a, i = entry
            try:
                next_hash, next_abundance = next(iterators[i][2])
                heapq.heappush(heap, (next_hash, next_abundance, i))
            except StopIteration:
                pass  # Iterator exhausted

    # Convert the results to NumPy arrays
    if not common_hashes:
        # No common hashes found
        unique_hashes = np.array([], dtype=np.uint64)
        unique_abundances = np.array([], dtype=np.uint32)
    else:
        unique_hashes = np.array(common_hashes, dtype=np.uint64)
        unique_abundances = np.array(common_abundances, dtype=np.uint32)

    # Create a new SnipeSig instance from the common hashes and abundances
    common_signature = cls.create_from_hashes_abundances(
        hashes=unique_hashes,
        abundances=unique_abundances,
        ksize=ksize,
        scale=scale,
        name=name,
        filename=filename,
        enable_logging=enable_logging
    )

    return common_signature

copy()

Create a copy of the current SnipeSig instance.

Returns:

Name Type Description
SnipeSig SnipeSig

A new instance that is a copy of self.

Source code in src/snipe/api/snipe_sig.py
def copy(self) -> 'SnipeSig':
    r"""
    Create a copy of the current SnipeSig instance.

    Returns:
        SnipeSig: A new instance that is a copy of self.
    """
    return SnipeSig(sourmash_sig=self.export_to_string(), sig_type=self.sigtype, enable_logging=self.logger.level <= logging.DEBUG)

count_singletons()

Return the number of hashes with abundance equal to 1.

Returns:

Name Type Description
int int

Number of singletons.

Raises:

Type Description
ValueError

If the signature does not track abundance.

Source code in src/snipe/api/snipe_sig.py
def count_singletons(self) -> int:
    r"""
    Return the number of hashes with abundance equal to 1.

    Returns:
        int: Number of singletons.

    Raises:
        ValueError: If the signature does not track abundance.
    """
    self._validate_abundance_operation(None, "count singletons")

    count = np.sum(self._abundances == 1)
    self.logger.debug("Number of singletons (abundance == 1): %d", count)
    return int(count)

create_from_hashes_abundances(hashes, abundances, ksize, scale, name=None, filename=None, enable_logging=False, sig_type=SigType.SAMPLE) classmethod

Internal method to create a SnipeSig instance from hashes and abundances.

Parameters:

Name Type Description Default
hashes ndarray

Array of hash values.

required
abundances ndarray

Array of abundance values corresponding to the hashes.

required
ksize int

K-mer size.

required
scale int

Scale value.

required
name str

Optional name for the signature.

None
filename str

Optional filename for the signature.

None
sig_type SigType

Type of the signature.

SAMPLE
enable_logging bool

Flag to enable logging.

False

Returns:

Name Type Description
SnipeSig SnipeSig

A new SnipeSig instance.

Source code in src/snipe/api/snipe_sig.py
@classmethod
def create_from_hashes_abundances(cls, hashes: np.ndarray, abundances: np.ndarray,
                                  ksize: int, scale: int, name: str = None,
                                  filename: str = None, enable_logging: bool = False, sig_type: SigType = SigType.SAMPLE) -> 'SnipeSig':
    """
    Internal method to create a SnipeSig instance from hashes and abundances.

    Parameters:
        hashes (np.ndarray): Array of hash values.
        abundances (np.ndarray): Array of abundance values corresponding to the hashes.
        ksize (int): K-mer size.
        scale (int): Scale value.
        name (str): Optional name for the signature.
        filename (str): Optional filename for the signature.
        sig_type (SigType): Type of the signature.
        enable_logging (bool): Flag to enable logging.

    Returns:
        SnipeSig: A new SnipeSig instance.
    """
    # Create a mock sourmash signature object
    mh = sourmash.minhash.MinHash(n=0, ksize=ksize, scaled=scale, track_abundance=True)
    mh.set_abundances(dict(zip(hashes, abundances)))
    sig = sourmash.signature.SourmashSignature(mh, name=name or "", filename=filename or "")
    return cls(sourmash_sig=sig, sig_type=sig_type, enable_logging=enable_logging)

difference_sigs(other)

Compute the difference of the current signature with another signature.

This method removes hashes that are present in the other signature from self, keeping the abundances from self.

Mathematical Explanation:

Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \), and abundance function \( a_A(h) \), the difference signature \( C \) has:

  • Hash set: $$ H_C = H_A \setminus H_B $$

  • Abundance function: $$ a_C(h) = a_A(h), \quad \text{for } h \in H_C $$

Parameters: - other (SnipeSig): Another SnipeSig instance to subtract from the current signature.

Returns: - SnipeSig: A new SnipeSig instance representing the difference of the two signatures.

Raises: - ValueError: If ksize or scale do not match between signatures. - RuntimeError: If zero hashes remain after difference.

Source code in src/snipe/api/snipe_sig.py
def difference_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Compute the difference of the current signature with another signature.

    This method removes hashes that are present in the other signature from self,
    keeping the abundances from self.

    **Mathematical Explanation**:

    Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
    and abundance function \( a_A(h) \), the difference signature \( C \) has:

    - Hash set:
    $$
    H_C = H_A \setminus H_B
    $$

    - Abundance function:
    $$
    a_C(h) = a_A(h), \quad \text{for } h \in H_C
    $$

    **Parameters**:
        - `other (SnipeSig)`: Another `SnipeSig` instance to subtract from the current signature.

    **Returns**:
        - `SnipeSig`: A new `SnipeSig` instance representing the difference of the two signatures.

    **Raises**:
        - `ValueError`: If `ksize` or `scale` do not match between signatures.
        - `RuntimeError`: If zero hashes remain after difference.
    """
    self.__verify_snipe_signature(other)
    self.__verify_matching_ksize_scale(other)

    self.logger.debug("Differencing signatures.")

    # Use numpy's setdiff1d function
    diff_hashes = np.setdiff1d(self._hashes, other._hashes, assume_unique=True)

    if diff_hashes.size == 0:
        _e_msg = f"Difference operation resulted in zero hashes, which is not allowed for {self._name} and {other._name}."
        self.logger.warning(_e_msg)

    # Get the indices of the hashes in self
    mask = np.isin(self._hashes, diff_hashes, assume_unique=True)
    diff_abundances = self._abundances[mask]

    self.logger.debug("Difference operation completed. Remaining hashes: %d", len(diff_hashes))

    # Create a new SnipeSig instance
    return self.create_from_hashes_abundances(
        hashes=diff_hashes,
        abundances=diff_abundances,
        ksize=self._ksize,
        scale=self._scale,
        name=f"{self._name}_difference_{other._name}",
        filename=None,
        enable_logging=self.logger.level <= logging.DEBUG
    )

export(path, force=False)

Export the signature to a file.

Parameters:

Name Type Description Default
path str

The path to save the signature to.

required
force bool

Flag to overwrite the file if it already exists.

False
Source code in src/snipe/api/snipe_sig.py
def export(self, path, force=False) -> None:
    r"""
    Export the signature to a file.

    Parameters:
        path (str): The path to save the signature to.
        force (bool): Flag to overwrite the file if it already exists.
    """
    self._convert_to_sourmash_signature()
    if path.endswith(".sig"):
        self.logger.debug("Exporting signature to a .sig file.")
        with open(str(path), "wb") as fp:
            sourmash.signature.save_signatures_to_json([self.sourmash_sig], fp)
    # sourmash.save_load.SaveSignatures_SigFile

    elif path.endswith(".zip"):
        if os.path.exists(path): 
            raise FileExistsError("Output file already exists.")
        try:
            with sourmash.save_load.SaveSignatures_ZipFile(path) as save_sigs:
                save_sigs.add(self.sourmash_sig)
        except Exception as e:
            self.logger.error("Failed to export signatures to zip: %s", e)
            raise Exception(f"Failed to export signatures to zip: {e}") from e
    else:
        raise ValueError("Output file must be either a .sig or .zip file.")

export_to_string()

Export the signature to a JSON string.

Returns:

Name Type Description
str

JSON string representation of the signature.

Source code in src/snipe/api/snipe_sig.py
def export_to_string(self):
    r"""
    Export the signature to a JSON string.

    Returns:
        str: JSON string representation of the signature.
    """
    self._convert_to_sourmash_signature()
    return sourmash.signature.save_signatures_to_json([self.sourmash_sig]).decode('utf-8')

get_info()

Get information about the signature.

Returns:

Name Type Description
dict dict

A dictionary containing signature information.

Source code in src/snipe/api/snipe_sig.py
def get_info(self) -> dict:
    r"""
    Get information about the signature.

    Returns:
        dict: A dictionary containing signature information.
    """
    info = {
        "name": self._name,
        "filename": self._filename,
        "md5sum": self._md5sum,
        "ksize": self._ksize,
        "scale": self._scale,
        "track_abundance": self._track_abundance,
        "sigtype": self._type,
        "num_hashes": len(self._hashes)
    }
    return info

get_name()

Get the name of the signature.

Source code in src/snipe/api/snipe_sig.py
def get_name(self) -> str:
    r"""Get the name of the signature."""
    return self._name

get_unique_signatures(signatures) staticmethod

Extract unique signatures from a dictionary of SnipeSig instances.

For each signature, the unique_sig contains only the hashes that do not overlap with any other signature.

Parameters:

Name Type Description Default
signatures Dict[str, SnipeSig]

A dictionary mapping signature names to SnipeSig instances.

required

Returns:

Type Description
Dict[str, SnipeSig]

Dict[str, SnipeSig]: A dictionary mapping signature names to their unique SnipeSig instances.

Raises:

Type Description
ValueError

If the input dictionary is empty or if signatures have mismatched ksize/scale.

Source code in src/snipe/api/snipe_sig.py
@staticmethod
def get_unique_signatures(signatures: Dict[str, 'SnipeSig']) -> Dict[str, 'SnipeSig']:
    """
    Extract unique signatures from a dictionary of SnipeSig instances.

    For each signature, the unique_sig contains only the hashes that do not overlap with any other signature.

    Parameters:
        signatures (Dict[str, SnipeSig]): A dictionary mapping signature names to SnipeSig instances.

    Returns:
        Dict[str, SnipeSig]: A dictionary mapping signature names to their unique SnipeSig instances.

    Raises:
        ValueError: If the input dictionary is empty or if signatures have mismatched ksize/scale.
    """
    if not signatures:
        raise ValueError("The input signatures dictionary is empty.")

    # Extract ksize and scale from the first signature
    first_name, first_sig = next(iter(signatures.items()))
    ksize = first_sig.ksize
    scale = first_sig.scale

    # Verify that all signatures have the same ksize and scale
    for name, sig in signatures.items():
        if sig.ksize != ksize or sig.scale != scale:
            raise ValueError(f"Signature '{name}' has mismatched ksize or scale.")

    # Aggregate all hashes from all signatures
    all_hashes = np.concatenate([sig.hashes for sig in signatures.values()])

    # Count the occurrences of each hash
    unique_hashes, counts = np.unique(all_hashes, return_counts=True)

    # Identify hashes that are unique across all signatures (count == 1)
    unique_across_all = unique_hashes[counts == 1]

    # Convert to a set for faster membership testing
    unique_set = set(unique_across_all)

    unique_signatures = {}

    for name, sig in signatures.items():
        # Find hashes in the current signature that are unique across all signatures
        mask_unique = np.isin(sig.hashes, list(unique_set))

        # Extract unique hashes and their abundances
        unique_hashes_sig = sig.hashes[mask_unique]
        unique_abundances_sig = sig.abundances[mask_unique]

        # Create a new SnipeSig instance with the unique hashes and abundances
        unique_sig = SnipeSig.create_from_hashes_abundances(
            hashes=unique_hashes_sig,
            abundances=unique_abundances_sig,
            ksize=ksize,
            scale=scale,
            name=f"{name}_unique",
            filename=None,
            enable_logging=False,  # Set to True if you want logging for the new signatures
            sig_type=SigType.SAMPLE  # Adjust sig_type as needed
        )

        unique_signatures[name] = unique_sig

    return unique_signatures

intersection_sigs(other)

Compute the intersection of the current signature with another signature.

This method keeps only the hashes that are common to both signatures, and retains the abundances from self.

Mathematical Explanation:

Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \), and abundance functions \( a_A(h) \) and \( a_B(h) \), the intersection signature \( C \) has:

  • Hash set: $$ H_C = H_A \cap H_B $$

  • Abundance function: $$ a_C(h) = a_A(h), \quad \text{for } h \in H_C $$

Parameters: - other (SnipeSig): Another SnipeSig instance to intersect with.

Returns: - SnipeSig: A new SnipeSig instance representing the intersection of the two signatures.

Raises: - ValueError: If ksize or scale do not match between signatures.

Source code in src/snipe/api/snipe_sig.py
def intersection_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Compute the intersection of the current signature with another signature.

    This method keeps only the hashes that are common to both signatures, and retains the abundances from self.

    **Mathematical Explanation**:

    Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
    and abundance functions \( a_A(h) \) and \( a_B(h) \), the intersection signature \( C \) has:

    - Hash set:
    $$
    H_C = H_A \cap H_B
    $$

    - Abundance function:
    $$
    a_C(h) = a_A(h), \quad \text{for } h \in H_C
    $$

    **Parameters**:
        - `other (SnipeSig)`: Another `SnipeSig` instance to intersect with.

    **Returns**:
        - `SnipeSig`: A new `SnipeSig` instance representing the intersection of the two signatures.

    **Raises**:
        - `ValueError`: If `ksize` or `scale` do not match between signatures.
    """
    self.__verify_snipe_signature(other)
    self.__verify_matching_ksize_scale(other)

    self.logger.debug("Intersecting signatures.")

    # Use numpy's intersect1d function
    common_hashes, self_indices, _ = np.intersect1d(
        self._hashes, other._hashes, assume_unique=True, return_indices=True
    )

    if common_hashes.size == 0:
        self.logger.debug("No common hashes found. Returning an empty signature.")
        return self.create_from_hashes_abundances(
            hashes=np.array([], dtype=np.uint64),
            abundances=np.array([], dtype=np.uint32),
            ksize=self._ksize,
            scale=self._scale,
            name=f"{self._name}_intersection_{other._name}",
            filename=None,
            enable_logging=self.logger.level <= logging.DEBUG
        )

    # Get the abundances from self
    common_abundances = self._abundances[self_indices]

    self.logger.debug("Intersection operation completed. Total common hashes: %d", len(common_hashes))

    # Create a new SnipeSig instance
    return self.create_from_hashes_abundances(
        hashes=common_hashes,
        abundances=common_abundances,
        ksize=self._ksize,
        scale=self._scale,
        name=f"{self._name}_intersection_{other._name}",
        filename=None,
        enable_logging=self.logger.level <= logging.DEBUG
    )

keep_max_abundance(max_abundance)

Keep only hashes with abundances less than or equal to a maximum threshold.

This method removes hashes whose abundances are greater than the specified max_abundance.

Mathematical Explanation:

The updated hash set \( H' \) is: $$ H' = { h \in H \mid a(h) \leq \text{max_abundance} } $$

Parameters: - max_abundance (int): The maximum abundance threshold.

Raises: - ValueError: If the signature does not track abundance or if max_abundance is invalid.

Source code in src/snipe/api/snipe_sig.py
def keep_max_abundance(self, max_abundance: int):
    r"""
    Keep only hashes with abundances less than or equal to a maximum threshold.

    This method removes hashes whose abundances are greater than the specified `max_abundance`.

    **Mathematical Explanation**:

    The updated hash set \( H' \) is:
    $$
    H' = \{ h \in H \mid a(h) \leq \text{max\_abundance} \}
    $$

    **Parameters**:
        - `max_abundance (int)`: The maximum abundance threshold.

    **Raises**:
        - `ValueError`: If the signature does not track abundance or if `max_abundance` is invalid.
    """
    self._validate_abundance_operation(max_abundance, "keep maximum abundance")

    mask = self._abundances <= max_abundance
    self._apply_mask(mask)
    self.logger.debug("Kept hashes with abundance <= %d.", max_abundance)

keep_min_abundance(min_abundance)

Keep only hashes with abundances greater than or equal to a minimum threshold.

This method removes hashes whose abundances are less than the specified min_abundance.

Mathematical Explanation:

The updated hash set \( H' \) is: $$ H' = { h \in H \mid a(h) \geq \text{min_abundance} } $$

Parameters: - min_abundance (int): The minimum abundance threshold.

Raises: - ValueError: If the signature does not track abundance or if min_abundance is invalid.

Source code in src/snipe/api/snipe_sig.py
def keep_min_abundance(self, min_abundance: int):
    r"""
    Keep only hashes with abundances greater than or equal to a minimum threshold.

    This method removes hashes whose abundances are less than the specified `min_abundance`.

    **Mathematical Explanation**:

    The updated hash set \( H' \) is:
    $$
    H' = \{ h \in H \mid a(h) \geq \text{min\_abundance} \}
    $$

    **Parameters**:
        - `min_abundance (int)`: The minimum abundance threshold.

    **Raises**:
        - `ValueError`: If the signature does not track abundance or if `min_abundance` is invalid.
    """
    self._validate_abundance_operation(min_abundance, "keep minimum abundance")

    mask = self._abundances >= min_abundance
    self._apply_mask(mask)
    self.logger.debug("Kept hashes with abundance >= %d.", min_abundance)

reset_abundance(new_abundance=1)

Reset all abundances to a specified value.

This method sets the abundance of every hash in the signature to the specified new_abundance value.

Mathematical Explanation:

For each hash \( h \) in the signature, the abundance function is updated to: $$ a(h) = \text{new_abundance} $$

Parameters: - new_abundance (int): The new abundance value to set for all hashes. Default is 1.

Raises: - ValueError: If the signature does not track abundance or if new_abundance is invalid.

Source code in src/snipe/api/snipe_sig.py
def reset_abundance(self, new_abundance: int = 1):
    r"""
    Reset all abundances to a specified value.

    This method sets the abundance of every hash in the signature to the specified `new_abundance` value.

    **Mathematical Explanation**:

    For each hash \( h \) in the signature, the abundance function is updated to:
    $$
    a(h) = \text{new\_abundance}
    $$

    **Parameters**:
        - `new_abundance (int)`: The new abundance value to set for all hashes. Default is 1.

    **Raises**:
        - `ValueError`: If the signature does not track abundance or if `new_abundance` is invalid.
    """

    self._validate_abundance_operation(new_abundance, "reset abundance")

    self._abundances[:] = new_abundance
    self.track_abundance = True
    self.logger.debug("Reset all abundances to %d.", new_abundance)

sum_signatures(signatures, name='summed_signature', filename=None, enable_logging=False) classmethod

Sum multiple SnipeSig instances by including all unique hashes and summing their abundances where hashes overlap. This method utilizes a heap-based multi-way merge algorithm for enhanced efficiency when handling thousands of signatures.

\[ \text{Sum}(A_1, A_2, \dots, A_n) = \bigcup_{i=1}^{n} A_i \]

For each hash \( h \), its total abundance is: $$ \text{abundance}(h) = \sum_{i=1}^{n} \text{abundance}_i(h) $$

Mathematical Explanation:

  • Union of Signatures: The summation of signatures involves creating a union of all unique k-mers (hashes) present across the input signatures.

  • Total Abundance Calculation: For each unique hash \( h \), the total abundance is the sum of its abundances across all signatures where it appears.

  • Algorithm Efficiency: By using a min-heap to perform a multi-way merge of sorted hash arrays, the method ensures that each hash is processed in ascending order without the need to store all hashes in memory simultaneously.

Parameters: - signatures (List[SnipeSig]): List of SnipeSig instances to sum. - name (str): Optional name for the resulting signature. - filename (str): Optional filename for the resulting signature. - enable_logging (bool): Flag to enable detailed logging.

Returns: - SnipeSig: A new SnipeSig instance representing the sum of the signatures.

Raises: - ValueError: If the signatures list is empty or if ksize/scale do not match across signatures. - RuntimeError: If an error occurs during the summation process.

Source code in src/snipe/api/snipe_sig.py
@classmethod
def sum_signatures(cls, signatures: List['SnipeSig'], name: str = "summed_signature",
                   filename: str = None, enable_logging: bool = False) -> 'SnipeSig':

    r"""
    Sum multiple SnipeSig instances by including all unique hashes and summing their abundances where hashes overlap.
    This method utilizes a heap-based multi-way merge algorithm for enhanced efficiency when handling thousands of signatures.

    $$
    \text{Sum}(A_1, A_2, \dots, A_n) = \bigcup_{i=1}^{n} A_i
    $$

    For each hash \( h \), its total abundance is:
    $$
    \text{abundance}(h) = \sum_{i=1}^{n} \text{abundance}_i(h)
    $$

    **Mathematical Explanation**:

    - **Union of Signatures**:
        The summation of signatures involves creating a union of all unique k-mers (hashes) present across the input signatures.

    - **Total Abundance Calculation**:
        For each unique hash \( h \), the total abundance is the sum of its abundances across all signatures where it appears.

    - **Algorithm Efficiency**:
        By using a min-heap to perform a multi-way merge of sorted hash arrays, the method ensures that each hash is processed in ascending order without the need to store all hashes in memory simultaneously.

    **Parameters**:
        - `signatures (List[SnipeSig])`: List of `SnipeSig` instances to sum.
        - `name (str)`: Optional name for the resulting signature.
        - `filename (str)`: Optional filename for the resulting signature.
        - `enable_logging (bool)`: Flag to enable detailed logging.

    **Returns**:
        - `SnipeSig`: A new `SnipeSig` instance representing the sum of the signatures.

    **Raises**:
        - `ValueError`: If the signatures list is empty or if `ksize`/`scale` do not match across signatures.
        - `RuntimeError`: If an error occurs during the summation process.
    """
    if not signatures:
        raise ValueError("No signatures provided for summation.")

    # Verify that all signatures have the same ksize, scale, and track_abundance
    first_sig = signatures[0]
    ksize = first_sig.ksize
    scale = first_sig.scale
    track_abundance = first_sig.track_abundance

    for sig in signatures[1:]:
        if sig.ksize != ksize or sig.scale != scale:
            raise ValueError("All signatures must have the same ksize and scale.")

    # Initialize iterators for each signature's hashes and abundances
    iterators = []
    for sig in signatures:
        it = iter(zip(sig.hashes, sig.abundances))
        try:
            first_hash, first_abundance = next(it)
            iterators.append((first_hash, first_abundance, it))
        except StopIteration:
            continue  # Skip empty signatures

    if not iterators:
        raise ValueError("All provided signatures are empty.")

    # Initialize the heap with the first element from each iterator
    heap = []
    for idx, (hash_val, abundance, it) in enumerate(iterators):
        heap.append((hash_val, abundance, idx))
    heapq.heapify(heap)

    # Prepare lists to collect the summed hashes and abundances
    summed_hashes = []
    summed_abundances = []

    while heap:
        current_hash, current_abundance, idx = heapq.heappop(heap)
        # Initialize total abundance for the current_hash
        total_abundance = current_abundance

        # Check if the next element in the heap has the same hash
        while heap and heap[0][0] == current_hash:
            _, abundance, same_idx = heapq.heappop(heap)
            total_abundance += abundance
            # Push the next element from the same iterator
            try:
                next_hash, next_abundance = next(iterators[same_idx][2])
                heapq.heappush(heap, (next_hash, next_abundance, same_idx))
            except StopIteration:
                pass  # No more elements in this iterator

        # Append the summed hash and abundance
        summed_hashes.append(current_hash)
        summed_abundances.append(total_abundance)

        # Push the next element from the current iterator
        try:
            next_hash, next_abundance = next(iterators[idx][2])
            heapq.heappush(heap, (next_hash, next_abundance, idx))
        except StopIteration:
            pass  # No more elements in this iterator

    # Convert the results to NumPy arrays for efficient storage and processing
    summed_hashes = np.array(summed_hashes, dtype=np.uint64)
    summed_abundances = np.array(summed_abundances, dtype=np.uint32)

    # Handle potential overflow by capping at the maximum value of uint32
    summed_abundances = np.minimum(summed_abundances, np.iinfo(np.uint32).max)

    # Create a new SnipeSig instance from the summed hashes and abundances
    summed_signature = cls.create_from_hashes_abundances(
        hashes=summed_hashes,
        abundances=summed_abundances,
        ksize=ksize,
        scale=scale,
        name=name,
        filename=filename,
        enable_logging=enable_logging
    )

    return summed_signature

symmetric_difference_sigs(other)

Compute the symmetric difference of the current signature with another signature.

This method retains hashes that are unique to each signature, with their respective abundances.

Mathematical Explanation:

Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \), and abundance functions \( a_A(h) \) and \( a_B(h) \), the symmetric difference signature \( C \) has:

  • Hash set: $$ H_C = (H_A \setminus H_B) \cup (H_B \setminus H_A) $$

  • Abundance function: $$ a_C(h) = \begin{cases} a_A(h), & \text{for } h \in H_A \setminus H_B \ a_B(h), & \text{for } h \in H_B \setminus H_A \ \end{cases} $$

Parameters: - other (SnipeSig): Another SnipeSig instance to compute the symmetric difference with.

Returns: - SnipeSig: A new SnipeSig instance representing the symmetric difference of the two signatures.

Raises: - ValueError: If ksize or scale do not match between signatures. - RuntimeError: If zero hashes remain after symmetric difference.

Source code in src/snipe/api/snipe_sig.py
def symmetric_difference_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Compute the symmetric difference of the current signature with another signature.

    This method retains hashes that are unique to each signature, with their respective abundances.

    **Mathematical Explanation**:

    Let \( A \) and \( B \) be two signatures with sets of hashes \( H_A \) and \( H_B \),
    and abundance functions \( a_A(h) \) and \( a_B(h) \), the symmetric difference signature \( C \) has:

    - Hash set:
    $$
    H_C = (H_A \setminus H_B) \cup (H_B \setminus H_A)
    $$

    - Abundance function:
    $$
    a_C(h) =
    \begin{cases}
    a_A(h), & \text{for } h \in H_A \setminus H_B \\
    a_B(h), & \text{for } h \in H_B \setminus H_A \\
    \end{cases}
    $$

    **Parameters**:
        - `other (SnipeSig)`: Another `SnipeSig` instance to compute the symmetric difference with.

    **Returns**:
        - `SnipeSig`: A new `SnipeSig` instance representing the symmetric difference of the two signatures.

    **Raises**:
        - `ValueError`: If `ksize` or `scale` do not match between signatures.
        - `RuntimeError`: If zero hashes remain after symmetric difference.
    """
    self.__verify_snipe_signature(other)
    self.__verify_matching_ksize_scale(other)

    self.logger.debug("Computing symmetric difference of signatures.")

    # Hashes unique to self and other
    unique_self_hashes = np.setdiff1d(self._hashes, other._hashes, assume_unique=True)
    unique_other_hashes = np.setdiff1d(other._hashes, self._hashes, assume_unique=True)

    # Abundances for unique hashes
    mask_self = np.isin(self._hashes, unique_self_hashes, assume_unique=True)
    unique_self_abundances = self._abundances[mask_self]

    mask_other = np.isin(other._hashes, unique_other_hashes, assume_unique=True)
    unique_other_abundances = other._abundances[mask_other]

    # Handle the case where 'other' does not track abundance
    if not other.track_abundance:
        self.logger.debug("Other signature does not track abundance. Setting abundances to 1.")
        unique_other_abundances = np.ones_like(unique_other_abundances, dtype=np.uint32)

    # Combine hashes and abundances
    combined_hashes = np.concatenate((unique_self_hashes, unique_other_hashes))
    combined_abundances = np.concatenate((unique_self_abundances, unique_other_abundances))

    if combined_hashes.size == 0:
        _e_msg = "Symmetric difference operation resulted in zero hashes, which is not allowed."
        self.logger.error(_e_msg)
        raise RuntimeError(_e_msg)

    # Sort combined hashes and abundances
    sorted_indices = np.argsort(combined_hashes)
    combined_hashes = combined_hashes[sorted_indices]
    combined_abundances = combined_abundances[sorted_indices]

    self.logger.debug("Symmetric difference operation completed. Total unique hashes: %d", len(combined_hashes))

    # Create a new SnipeSig instance
    return self.create_from_hashes_abundances(
        hashes=combined_hashes,
        abundances=combined_abundances,
        ksize=self._ksize,
        scale=self._scale,
        name=f"{self._name}_symmetric_difference_{other._name}",
        filename=None,
        enable_logging=self.logger.level <= logging.DEBUG
    )

trim_below_median()

Trim hashes with abundances below the median abundance.

This method removes all hashes whose abundances are less than the median abundance of the signature.

Mathematical Explanation:

Let \( m \) be the median of \( \{ a(h) \mid h \in H \} \). The updated hash set \( H' \) is:

\[ H' = \\{ h \in H \mid a(h) \geq m \\} \]

Raises: - ValueError: If the signature does not track abundance.

Source code in src/snipe/api/snipe_sig.py
def trim_below_median(self):
    r"""
    Trim hashes with abundances below the median abundance.

    This method removes all hashes whose abundances are less than the median abundance of the signature.

    **Mathematical Explanation**:

    Let \\( m \\) be the median of \\( \\{ a(h) \mid h \in H \\} \\).
    The updated hash set \\( H' \\) is:

    $$
    H' = \\{ h \in H \mid a(h) \geq m \\}
    $$

    **Raises**:
        - `ValueError`: If the signature does not track abundance.
    """

    self._validate_abundance_operation(None, "trim below median")

    if len(self._abundances) == 0:
        self.logger.debug("No hashes to trim based on median abundance.")
        return

    median = np.median(self._abundances)
    mask = self._abundances >= median
    self._apply_mask(mask)
    self.logger.debug("Trimmed hashes with abundance below median (%f).", median)

trim_singletons()

Remove hashes with abundance equal to 1.

This method removes all hashes that are singletons (abundance equals 1).

Mathematical Explanation:

The updated hash set \( H' \) is: $$ H' = { h \in H \mid a(h) \neq 1 } $$

Raises: - ValueError: If the signature does not track abundance.

Source code in src/snipe/api/snipe_sig.py
def trim_singletons(self):
    r"""
    Remove hashes with abundance equal to 1.

    This method removes all hashes that are singletons (abundance equals 1).

    **Mathematical Explanation**:

    The updated hash set \( H' \) is:
    $$
    H' = \{ h \in H \mid a(h) \neq 1 \}
    $$

    **Raises**:
        - `ValueError`: If the signature does not track abundance.
    """
    self._validate_abundance_operation(None, "trim singletons")

    mask = self._abundances != 1
    self.logger.debug("Trimming %d hashes with abundance equal to 1.", np.sum(~mask))
    self._apply_mask(mask)
    self.logger.debug("Size after trimming singletons: %d", len(self._hashes)) 

union_sigs(other)

Combine this signature with another by summing abundances where hashes overlap.

Given two signatures \( A \) and \( B \) with hash sets \( H_A \) and \( H_B \), and their corresponding abundance functions \( a_A \) and \( a_B \), the union signature \( C \) is defined as follows:

  • Hash Set:
\[ H_C = H_A \cup H_B \]
  • Abundance Function:
\[ a_C(h) = \begin{cases} a_A(h) + a_B(h), & \text{if } h \in H_A \cap H_B \\ a_A(h), & \text{if } h \in H_A \setminus H_B \\ a_B(h), & \text{if } h \in H_B \setminus H_A \end{cases} \]
Source code in src/snipe/api/snipe_sig.py
def union_sigs(self, other: 'SnipeSig') -> 'SnipeSig':
    r"""
    Combine this signature with another by summing abundances where hashes overlap.

    Given two signatures \( A \) and \( B \) with hash sets \( H_A \) and \( H_B \),
    and their corresponding abundance functions \( a_A \) and \( a_B \), the union
    signature \( C \) is defined as follows:

    - **Hash Set**: 

    $$
    H_C = H_A \cup H_B
    $$

    - **Abundance Function**:

    $$
    a_C(h) =
    \begin{cases} 
        a_A(h) + a_B(h), & \text{if } h \in H_A \cap H_B \\
        a_A(h), & \text{if } h \in H_A \setminus H_B \\
        a_B(h), & \text{if } h \in H_B \setminus H_A
    \end{cases}
    $$
    """
    self.__verify_snipe_signature(other)
    self.__verify_matching_ksize_scale(other)

    self.logger.debug("Unioning signatures (including all unique hashes).")

    # Access internal arrays directly
    self_hashes = self._hashes
    self_abundances = self._abundances
    other_hashes = other._hashes
    other_abundances = other._abundances

    # Handle the case where 'other' does not track abundance
    if not other.track_abundance:
        self.logger.debug("Other signature does not track abundance. Setting abundances to 1.")
        other_abundances = np.ones_like(other_abundances, dtype=np.uint32)

    # Combine hashes and abundances
    combined_hashes = np.concatenate((self_hashes, other_hashes))
    combined_abundances = np.concatenate((self_abundances, other_abundances))

    # Use numpy's unique function with return_inverse to sum abundances efficiently
    unique_hashes, inverse_indices = np.unique(combined_hashes, return_inverse=True)
    summed_abundances = np.zeros_like(unique_hashes, dtype=np.uint32)

    # Sum abundances for duplicate hashes
    np.add.at(summed_abundances, inverse_indices, combined_abundances)

    # Handle potential overflow
    summed_abundances = np.minimum(summed_abundances, np.iinfo(np.uint32).max)

    self.logger.debug("Union operation completed. Total hashes: %d", len(unique_hashes))

    # Create a new SnipeSig instance
    return self.create_from_hashes_abundances(
        hashes=unique_hashes,
        abundances=summed_abundances,
        ksize=self._ksize,
        scale=self._scale,
        name=f"{self._name}_union_{other._name}",
        filename=None,
        enable_logging=self.logger.level <= logging.DEBUG
    )